2016年5月16日月曜日

RabbitMQ の 冗長化を検証する / Consumer 編

はじめに

前回、『RabbitMQ の 冗長化を考える』でRabbitMQ の冗長化の設定を行いました。
今回は、その冗長化した RabbitMQ サーバーを実際、Failover させて、その様子を観察したいと思います。
冗長化を考える理由に
何かしらの障害が発生したとしても Publisher が送信したメッセージを
Consumer によって確実に処理させたいということが挙げられると思います。
全体を通してでは、
Publisher が送信したメッセージは、Consumer で確実に処理されるということになると思います。
これを Publisher と Consumer の間に Queue を置いて考えると
Publisher は、何かしらの障害が発生したとしてもメッセージを確実に Queue に届けるところまでが
責任範囲で、その後、Queue のメッセージが Consumer でどう処理されるかは、責任の範囲外になると思います。
一方、Consumer は、Queue にメッセージがどのような方法で届けられたかはということは、関係なく
Queue にあるメッセージを確実に処理することが責任範囲になると思います。
このように Publisher と Consumer の責任範囲は、それぞれ被らないものだと思います。
そう考えると Failover をそれぞれ別々の視点で観察できるとのではないかと思いました。
(RPC 方式のメッセージ処理では、また違ってくると思うのですが今回は、考えません。
そんな訳で今回は、Consumer側 の Failover の様子を観察してみたと思います。
JavaClient では、Cluster の Node を複数、ConnectionFactory に設定することができ、
ConnectionFactory は、その中から接続可能な Node を選択して Connection を生成しているようです。
このことが接続している Node が落ちてしまった場合に ConnectionFactory に設定されている
別の接続可能な Node で自動的に接続が復帰することを可能にしているようです。
Node に仮想IP などを設定する必要がないようです。
厳密に負荷分散を考える場合は、仮想IP などが必要かもしれません。
(接続可能な IP の選択方法が Collections.shuffle() になってそうなので...
検証には、2種類の Consumer を考えています。
1つは、適当にスリープさせながらメッセージ処理をする HappyPath での Consumer と
もう1つは、スリープさせずに連続的にメッセージ処理をする UnhappyPath での Consumer になります。
以下、3点を Failover 時に検証します。

  1. 接続可能な Node に切り替わること。
  2. メッセージが消えたしまわないこと。
  3. メッセージが2重に処理されないこと。
始める前に...
RabbitMQ がインストール済であること
RabbitMQ サーバーにログイン可能でリソースにアクセスできるユーザが作成済であること
(ユーザとパスワードを "user01" と "passwd" としています。
ManagementPlugin が有効になっていて CommandLineTool が使えること
ホスト名が設定されていること
以上が前提になります。
このあたりはわたしも『yum の Local Repository に RabbitMQ をインストール』と
『RabbitMQ の 冗長化を考える』にまとめていますので
よろしければ、こちらも合わせて、ご参考に。
前置きが長くなってしまいましたが
ではでは!Getting Started!!


環境

検証機

開発言語 Java 1.8.0
フレームワーク rabbitmq-java-client 3.6.1
ビルドツール Gradle 2.12
※gradle コマンドが実行できるように設定しておきます。
OS Windows 8.1


前回と同様に以下、2台のサーバー(vm-1st と vm-2nd)で冗長化します。
RabbitMQ Server (vm-1st)

ホスト名 vm-1st
IP 192.168.10.110
ミドルウェア RabbitMQ server 3.6.1
  ユーザ  user01   パスワード  passwd
OS CentOS-7-x86_64-Minimal-1511.iso
VirtualBox 5.0.16


RabbitMQ Server (vm-2nd)

ホスト名 vm-2nd
IP 192.168.10.120
ミドルウェア RabbitMQ server 3.6.1
  ユーザ  user01   パスワード  passwd
OS CentOS-7-x86_64-Minimal-1511.iso
VirtualBox 5.0.16



プロジェクト構成

C ドライブ直下に「studying-rabbitmq-failover」フォルダを作成して各ファイルを以下のように配置します。
ファイルは、UTF-8 で保存します。
  • C:\studying-rabbitmq-failover
    • src
      • main
        • java
          • studying
            • MyAbstractConsumer.java
            • MyCancelConsumer.java
            • MyHappyConsumer.java
            • MyUnhappyConsumer.java
    • build.gradle
    • publisher.gradle


準備

前回のおさらいも兼ねて Clustering の設定から始めたいと思います。
vm-2nd を vm-1st の Cluster に参加させます。
vm-2nd のターミナルから以下のコマンドを実行します。
vm-2nd

rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@vm-1st
rabbitmqctl start_app

Cluster の設定を確認します。
vm-1st と vm-2nd の両方のターミナルで以下のコマンドを実行します。
vm-1st

rabbitmqctl cluster_status

Cluster status of node 'rabbit@vm-1st' ...
[{nodes,[{disc,['rabbit@vm-1st','rabbit@vm-2nd']}]},
 {running_nodes,['rabbit@vm-2nd','rabbit@vm-1st']},
 {cluster_name,<<"rabbit@vm-1st">>},
...

vm-2nd

rabbitmqctl cluster_status

[{nodes,[{disc,['rabbit@vm-1st','rabbit@vm-2nd']}]},
 {running_nodes,['rabbit@vm-1st','rabbit@vm-2nd']},
 {cluster_name,<<"rabbit@vm-1st">>},
...
vm-1st と vm-2nd のどちらも "rabbit@vm-1st" という Cluster に参加していて起動中である。
問題ないようですね。

次に Queue と directタイプの Exchange と宣言します。
Queue名と Exchange名は、それぞれ "myQueue" と "myExchange" にします。
RoutingKey を "myKey" にして "myQueue" と "myExchange" を Bind します。
vm-1st が HomeNode になるようにします。
また、今回使用する Queue名と Exchange名で Queue と Exchange がすでに存在していると
ややこしくなるので念のため削除してから作成します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqadmin delete exchange name=myExchange
rabbitmqadmin delete queue name=myQueue

rabbitmqadmin declare queue name=myQueue
rabbitmqadmin declare exchange name=myExchange type=direct
rabbitmqadmin declare binding source=myExchange destination=myQueue routing_key=myKey

宣言した Queue と Exchange そして、Bind を確認します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-1st | 0        |
+---------+---------+---------------+----------+

rabbitmqadmin list exchanges name type durable

+--------------------+---------+---------+
|        name        |  type   | durable |
+--------------------+---------+---------+
|                    | direct  | True    |
...
| myExchange         | direct  | True    |
+--------------------+---------+---------+

rabbitmqadmin list bindings

+------------+-------------+-------------+
|   source   | destination | routing_key |
+------------+-------------+-------------+
|            | myQueue     | myQueue     |
| myExchange | myQueue     | myKey       |
+------------+-------------+-------------+
問題なく宣言されているのが確認できると思います。

最後に QueueMirroring の設定をします。
先ほど宣言した Queue を Cluster 内のすべての Node で Mirroring するようにします。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqctl set_policy myPolicy "^myQueue$" '{"ha-mode":"all", "ha-sync-mode":"automatic"}'

追加でもう1つ
検証機が Windows なので Windows の hosts ファイルにも vm-1st と vm-2nd を追加しておきます。
C:\Windows\System32\drivers\etc\hosts
...
192.168.10.110 vm-1st
192.168.10.120 vm-2nd

準備は、これで終わりです。


ビルドスクリプトの作成

以下のような内容でビルドスクリプトを作成します。
build.gradle
apply from: 'publisher.gradle'

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'application'

sourceCompatibility = '1.8'
targetCompatibility = '1.8'
[compileJava, compileTestJava]*.options*.encoding = 'UTF-8'

mainClassName = (project.hasProperty('mainClass')) ? "$mainClass" : ''

repositories {
  jcenter()
}

dependencies {
  compile 'com.rabbitmq:amqp-client:3.6.1'
}

Consumer側の冗長化の検証に RabbitMQ サーバーにメッセージを送信するスクリプトがあった方が
やりやすいと思ったので Java でメッセージ送信する Publisher側のプロジェクトも考えたのですが
今回は、以下のような gradle の task で済ませようと思います。
publisher.gradle
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

def messageCount = project.hasProperty('messageCount') ? "$messageCount" : 10000

// (1)
task happyPublisher << {
  println 'start'
  Connection connection = newConnection();
  Channel channel = connection.createChannel();

  def tasks = [
    'Maurice', 'Verdine', 'Philip', 'Jessica', 'Larry', 
    'Johnny', 'Ralph', 'Al', 'Andrew', 'XXX'
  ]

  tasks.each {
    byte[] body = it.getBytes();
    channel.basicPublish("myExchange", "myKey", MessageProperties.PERSISTENT_TEXT_PLAIN, body);
  }

  connection.close();
  println 'done'
}

// (2)
task unhappyPublisher << {
  println 'start'
  
  Connection connection = newConnection();
  Channel channel = connection.createChannel();
      
  for (int ct = 0; ct < messageCount; ct++) {
    byte[] body = String.format("test-%d", ct + 1).getBytes();
    channel.basicPublish("myExchange", "myKey", MessageProperties.PERSISTENT_TEXT_PLAIN, body);
  }

  connection.close();
  println 'done'
}

// RabbitMQ Server に接続して connection を生成する。
Connection newConnection() {  
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("vm-1st");
  factory.setUsername("user01");
  factory.setPassword("passwd");
  factory.newConnection();
}

buildscript {
  repositories { jcenter() }
  dependencies { classpath 'com.rabbitmq:amqp-client:3.6.1' }
}
スクリプトの内容は、以下のようになります。
(1) happyPublisher タスク
HappyPath の検証ケースで使用するメッセージを RabbitMQサーバーに送信します。
メッセージを10通送信します。
(2) unhappyPublisher タスク
UnhappyPath の検証ケースで使用するメッセージを RabbitMQサーバーに送信します。
メッセージを10,000通送信します。


RabbitMQ サーバー接続クラスを作成する

RabbitMQ サーバーに接続する処理などを共通化した以下のようなクラスを作成します。
検証で作成する Consumerクラスはこのクラスを継承する形にします。
src/main/java/studying/rabbitmq/MyAbstractConsumer.java
package studying.rabbitmq;

import java.io.IOException;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;

public abstract class MyAbstractConsumer {
  
  public void doAction() throws Exception {
 
    // RabbitMQ Server 接続情報
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("user01");
    factory.setPassword("passwd");


    // (1) 冗長化の設定をして RabbitMQ Server に接続する。
    Address[] addrs = new Address[]{new Address("vm-1st"), new Address("vm-2nd")};
    factory.setAutomaticRecoveryEnabled(true);
    AutorecoveringConnection connection = (AutorecoveringConnection)factory.newConnection(addrs);

    System.out.printf("listening... %s %n", connection.getAddress().getHostName());
    Thread.sleep(10 * 1000);

    
    // (2) Queue と Exchange の宣言と Bind をする。
    boolean durable = true;
    String queueName    = "myQueue";
    String exchangeType = "direct";
    String exchangeName = "myExchange";
    String routingKey   = "myKey";

    AutorecoveringChannel channel = (AutorecoveringChannel)connection.createChannel();
    channel.exchangeDeclare(exchangeName, exchangeType, durable);
    channel.queueDeclare(queueName, durable, false, false, null);
    channel.queueBind(queueName, exchangeName, routingKey);
    channel.basicQos(1);

    
    // (3) Connection と Channel に ShutdownListener を設定する。
    connection.addShutdownListener(cause -> System.out.printf("Connection Shutdown... %n"));
    channel.addShutdownListener(cause -> System.out.printf("Channel    Shutdown... %n"));


    // (4) Connection と Channel に RecoveryListener を設定する。
    connection.addRecoveryListener((Recoverable recoverable) -> {
      AutorecoveringConnection conn = (AutorecoveringConnection)recoverable;
      String hostName = conn.getAddress().getHostName();
      System.out.printf("Connection Recovered !!! %s %n", hostName);
    });
    channel.addRecoveryListener((Recoverable recoverable) -> {
      AutorecoveringChannel chan = (AutorecoveringChannel)recoverable;
      String hostName = chan.getConnection().getAddress().getHostName();
      System.out.printf("Channel    Recovered !!! %s %n", hostName);
    });

 
    // Consumer を設定する。
    this.assignConsumerFor(channel, queueName);
  }

  // (5)
  protected abstract void assignConsumerFor(Channel channel, String queue) throws IOException;
}
処理内容は、以下のようになります。
(1) 冗長化の設定をして RabbitMQ Server に接続する。
ClusterNode のホスト名(または、IP)を複数指定して Connection を作成します。
AutomaticRecovery も忘れずに有効化しておきます。
(2) Queue と Exchange の宣言と Bind をする。
準備の時に行っているので不要だと思いますが
ここでも Queue と Exchange の宣言とその Bind をしています。
念のため basicQos でメッセージの取り出しを1回につき1つにしています。
(3) Connection と Channel に ShutdownListener を設定する。
Connection と Channel の Shutdown時、コンソールにログを出力します。
(4) Connection と Channel に RecoveryListener を設定する。
Connection と Channel の Recovery時、コンソールにログを出力します。
(5) assignConsumerFor() メソッド
サブクラスはこのメソッドを実装して Consumer を Channel に割り当てます。


Happy Path を検証する

初めは、おおよそ想定通りにいってくれるであろうパターンから検証してみたいと思います。
適当にスリープさせながら10通のメッセージを処理します。
以下のような内容で Consumerクラスを作成します。
src/main/java/studying/rabbitmq/MyHappyConsumer.java
package studying.rabbitmq;

import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class MyHappyConsumer extends MyAbstractConsumer {

  @Override
  protected void assignConsumerFor(Channel channel, String queue) throws IOException {

    Consumer consumer = new DefaultConsumer(channel) {

      protected int ack;
      
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, 
          AMQP.BasicProperties properties, byte[] body) throws IOException {
        
        String message = new String(body, "UTF-8");
        String hostName = channel.getConnection().getAddress().getHostName();
        
        System.out.printf(" Received '%-7s'", message);
        System.out.printf(" - %s", hostName);
        System.out.printf(" deliveryTag:%s",  envelope.getDeliveryTag());
        System.out.printf(" redeliver:%s %n", envelope.isRedeliver());

        try {
          // 3秒 sleep する。
          Thread.sleep(3 * 1000);
          
          // ack を返す。
          channel.basicAck(envelope.getDeliveryTag(), false);
          this.ack++;
          System.out.printf("  Done '%s' ack:%d %n", message, this.ack);

        } catch (InterruptedException e) {
          System.out.printf("  Interrupted '%s' %n", message);
          e.printStackTrace();
        } catch (Exception e) {
          System.out.printf("  Error '%s' %n", message);
          throw e;
        }
      }
    };

    boolean autoAck = false;
    channel.basicConsume(queue, autoAck, consumer);
  }

  
  public static void main(String[] args) {
    try {
      new MyHappyConsumer().doAction();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}
DefaultConsumerクラスの handleDelivery() メソッドをオーバーライドしてメッセージ処理をしています。
処理内容としては、

  1. 受信したメッセージをコンソールに表示する。
  2. 3秒スリープさせてタイミングを整える。
  3. RabbitMQ サーバーに ack を返す。
     RabbitMQ サーバーに返した ack をカウントする。
     ただし Exception が発生した場合は、ack を返さないようにします。
  4. 最後にもう一度メッセージをコンソールに表示する。

のようになります。

想定されるケースとしては、
HomeNode に接続している状態で Failover が発生する場合と
HomeNode 以外に接続している状態で Failover が発生する場合があると思います。
とりあえず、HomeNode に接続しているケースから見ていきたいと思います。
検証の前に Queue のメッセージをクリアしてから
HomeNode が vm-1st になっているのと QueueMirroring の同期を確認します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqadmin purge queue name=myQueue

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-1st | 0        |
+---------+---------+---------------+----------+

rabbitmqctl sync_queue myQueue

rabbitmqctl list_queues name slave_pids synchronised_slave_pids

Listing queues ...
myQueue [<rabbit@vm-2nd.3.297.0>]       [<rabbit@vm-2nd.3.297.0>]
問題がなければ、検証を始めます。

Consumer を HomeNode の vm-1st に接続させてメッセージを送信します。
適当なところで vm-1st の RabbitMQ サーバーを停止させます。
その時、どうように Failover するか観察したいと思います。
コマンドプロンプトを開いて gradle から Consumer を起動します。
Consumer

cd /d C:\studying-rabbitmq-failover
gradle clean run -PmainClass=studying.rabbitmq.MyHappyConsumer -q

listening... vm-1st
と表示され vm-1st に接続できたらメッセージを送信します。
vm-2nd に接続されたら
どの ClusterNode に接続されるかは、ランダムのようです。
(よく分かりませんが...
今回の場合、vm-1st と vm-2nd の2台なので
確率論的にみて3回くらいやり直せば希望する方に接続される筈なんですが...
新たにコマンドプロンプトを開いて gradle から 以下のようにメッセージを送信します。
Publisher

cd /d C:\studying-rabbitmq-failover
gradle happyPublisher -q

start
done

しばらくすると Consumer のコマンドプロンプトからメッセージが処理されていく様子が確認できると思います。
Consumer

...
listening... vm-1st
 Received 'Maurice' - vm-1st deliveryTag:1 redeliver:false
  Done 'Maurice' ack:1
 Received 'Verdine' - vm-1st deliveryTag:2 redeliver:false
  Done 'Verdine' ack:2
 Received 'Philip ' - vm-1st deliveryTag:3 redeliver:false

ここで vm-1st の RabbitMQ サーバーを停止させます。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

systemctl stop rabbitmq-server.service

そうすると Consumer のコマンドプロンプトから Failover する様子が確認できると思います。
Consumer

...
listening... vm-1st
 Received 'Maurice' - vm-1st deliveryTag:1 redeliver:false
  Done 'Maurice' ack:1
 Received 'Verdine' - vm-1st deliveryTag:2 redeliver:false
  Done 'Verdine' ack:2
 Received 'Philip ' - vm-1st deliveryTag:3 redeliver:false
  Done 'Philip' ack:3
 Received 'Jessica' - vm-1st deliveryTag:4 redeliver:false
Channel    Shutdown...
  Error 'Jessica'
com.rabbitmq.client.impl.DefaultExceptionHandler: ...
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; ...
        at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
        at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChan ...
        ...
Channel    Recovered !!! vm-2nd
Connection Recovered !!! vm-2nd
Connection Shutdown...
 Received 'Jessica' - vm-2nd deliveryTag:5 redeliver:true
  Done 'Jessica' ack:4
 Received 'Larry  ' - vm-2nd deliveryTag:6 redeliver:true
  Done 'Larry' ack:5
 Received 'Johnny ' - vm-2nd deliveryTag:7 redeliver:true
  Done 'Johnny' ack:6
 Received 'Ralph  ' - vm-2nd deliveryTag:8 redeliver:true
  Done 'Ralph' ack:7
 Received 'Al     ' - vm-2nd deliveryTag:9 redeliver:true
  Done 'Al' ack:8
 Received 'Andrew ' - vm-2nd deliveryTag:10 redeliver:true
  Done 'Andrew' ack:9
 Received 'XXX    ' - vm-2nd deliveryTag:11 redeliver:true
  Done 'XXX' ack:10
検証してみると

  1. "Jessica" のメッセージ処理をしている時に vm-1st の停止が発生していて
     HomeNode を vm-2nd に Failover できているようです。
  2. 最後のメッセージ "XXX" まで処理できているようです。
  3. Queue にあったメッセージの数と ack の数が一致しているので
     2重にメッセージが処理されていることもないようです。

概ね想定通りうまくいっているのではないかと思います。
Failover のタイミングを拾えているのでここに Rollback 処理などを挟み込めると思います。
また、Failover 時に Queue に残っていたメッセージは、redeliver フラグが "true" になっているので
Re-Queue 扱いになっているのも分かると思います。


次に HomeNode でない Node に接続しているケースを見てみたいと思います。
恐らく今、HomeNode は、vm-2nd になっていると思います。
vm-1st の RabbitMQ サーバーを再起動させて確認します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

systemctl start rabbitmq-server.service

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-2nd | 0        |
+---------+---------+---------------+----------+

HomeNode を vm-1st に戻したいので vm-2nd の RabbitMQ サーバーを再起動させます。
vm-2nd のターミナルから以下のコマンドを実行します。
vm-2nd

systemctl stop rabbitmq-server.service
systemctl start rabbitmq-server.service

先ほどと同様に Queue のメッセージをクリアしてから
HomeNode が vm-1st に戻ってきているのと QueueMirroring の同期を確認します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqadmin purge queue name=myQueue

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-1st | 0        |
+---------+---------+---------------+----------+

rabbitmqctl sync_queue myQueue

rabbitmqctl list_queues name slave_pids synchronised_slave_pids

Listing queues ...
myQueue [<rabbit@vm-2nd.3.295.0>]       [<rabbit@vm-2nd.3.295.0>]
問題がなければ、検証を始めます。

Consumer を vm-2nd に接続させてメッセージを送信します。
適当なところで vm-1st の RabbitMQ サーバーを停止させて Failover する様子を観察したいと思います。
コマンドプロンプトを開いて gradle から Consumer を起動します。
Consumer

cd /d C:\studying-rabbitmq-failover
gradle clean run -PmainClass=studying.rabbitmq.MyHappyConsumer -q

listening... vm-2nd

vm-2nd に接続されたらメッセージを送信します。
新たにコマンドプロンプトを開いて gradle から以下のようにメッセージを送信します。
Publisher

cd /d C:\studying-rabbitmq-failover
gradle happyPublisher -q

start
done

Consumer のコマンドプロンプトからメッセージ処理が始まるのが確認できたら
Consumer

...
listening... vm-2nd
 Received 'Maurice' - vm-2nd deliveryTag:1 redeliver:false
  Done 'Maurice' ack:1
 Received 'Verdine' - vm-2nd deliveryTag:2 redeliver:false
  Done 'Verdine' ack:2
 Received 'Philip ' - vm-2nd deliveryTag:3 redeliver:false

vm-2nd の RabbitMQ サーバーを停止して Failover させます。
vm-2nd のターミナルから以下のコマンドを実行します。
vm-2nd

systemctl stop rabbitmq-server.service

Consumer のコマンドプロンプトを見ると Failover する様子が確認できると思います。
Consumer

...
listening... vm-2nd
 Received 'Maurice' - vm-2nd deliveryTag:1 redeliver:false
  Done 'Maurice' ack:1
 Received 'Verdine' - vm-2nd deliveryTag:2 redeliver:false
  Done 'Verdine' ack:2
 Received 'Philip ' - vm-2nd deliveryTag:3 redeliver:false
  Done 'Philip' ack:3
 Received 'Jessica' - vm-2nd deliveryTag:4 redeliver:false
Channel    Shutdown...
  Error 'Jessica'
com.rabbitmq.client.impl.DefaultExceptionHandler: ...
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; ...
        at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
        at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChan ...
        ...
Channel    Recovered !!! vm-1st
Connection Recovered !!! vm-1st
Connection Shutdown...
 Received 'Jessica' - vm-1st deliveryTag:5 redeliver:true
  Done 'Jessica' ack:4
 Received 'Larry  ' - vm-1st deliveryTag:6 redeliver:false
  Done 'Larry' ack:5
 Received 'Johnny ' - vm-1st deliveryTag:7 redeliver:false
  Done 'Johnny' ack:6
 Received 'Ralph  ' - vm-1st deliveryTag:8 redeliver:false
  Done 'Ralph' ack:7
 Received 'Al     ' - vm-1st deliveryTag:9 redeliver:false
  Done 'Al' ack:8
 Received 'Andrew ' - vm-1st deliveryTag:10 redeliver:false
  Done 'Andrew' ack:9
 Received 'XXX    ' - vm-1st deliveryTag:11 redeliver:false
  Done 'XXX' ack:10
検証してみると

  1. "Jessica" のメッセージ処理をしている時に vm-1st の停止が発生していて
     HomeNode を vm-2nd に Failover できているようです。
  2. 最後のメッセージ "XXX" まで処理できているようです。
  3. Queue にあったメッセージの数と ack の数が一致しているので
     2重にメッセージが処理されていることもないようです。

こちらもうまくいっていると思います。
HomeNode に接続しているケースとの違いとして
Failover 時に処理していたメッセージは、redeliver フラグが "true" なので
Re-Queue 扱いになっているようですが、
それ以外の Queue に残っていたメッセージの redeliver フラグは "false" のままのようです。


Unhappy Path を検証する

不幸なタイミングで処理される可能性が高いパターンを検証してみたいと思います。
内容的には、先ほどの HappyPath とほぼ同じなのですが
今度は、スリープさせずに連続してメッセージ処理をするとどうなるか試してみたいと思います。
メッセージは、10,000通処理します。
以下のような内容で Consumerクラスを作成します。
src/main/java/studying/rabbitmq/MyUnhappyConsumer.java
package studying.rabbitmq;

import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class MyUnhappyConsumer extends MyAbstractConsumer {

  public final static int MESSAGE_COUNT = 10000;
  
  @Override
  protected void assignConsumerFor(Channel channel, String queue) throws IOException {
    
    int count = MyUnhappyConsumer.MESSAGE_COUNT;

    Consumer consumer = new DefaultConsumer(channel) {
      
      protected int ack;
      
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, 
          AMQP.BasicProperties properties, byte[] body) throws IOException {
        
        String message = new String(body, "UTF-8");
        long deliveryTag = envelope.getDeliveryTag();

        try {
          // ack を返す。
          boolean multiple = false;
          channel.basicAck(deliveryTag, multiple);
          this.ack++;
          
          if ((deliveryTag == 1) || (deliveryTag > (count - 2)) || 
              ((deliveryTag % (count / 5)) == 0)) {

            System.out.printf(" Done '%s'", message);
            System.out.printf(" - %s", channel.getConnection().getAddress().getHostName());
            System.out.printf(" deliveryTag:%s", deliveryTag);
            System.out.printf(" ack:%d %n", this.ack);
          }
        } catch (Exception e) {
          System.out.printf(" Error '%s' - deliveryTag:%s %n", message, deliveryTag);
          throw e;
        }
      }
    };

    boolean autoAck = false;
    channel.basicConsume(queue, autoAck, consumer);
  }

  
  public static void main(String[] args) {
    try {      
      new MyUnhappyConsumer().doAction();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}
DefaultConsumerクラスの handleDelivery() メソッドをオーバーライドしてメッセージ処理をしています。
処理内容としては、

  1. 受信したメッセージをコンソールに表示する。
     今回は、メッセージを10,000通処理するので最初の1通目と最後の3通、そして途中5回だけにします。
  2. RabbitMQ サーバーに ack を返す。
     RabbitMQ サーバーに返した ack をカウントする。
     ただし Exception が発生した場合は、ack を返さないようにします。

のようになります。


こちらも HomeNode に接続している状態で Failover が発生する場合と
HomeNode 以外に接続している状態で Failover が発生する場合が考えられるのですが
HomeNode に接続している状態で Failover が発生する場合で検証したいと思います。
検証の前に Queue のメッセージをクリアしてから
HomeNode が vm-1st になっているのと QueueMirroring の同期を確認します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqadmin purge queue name=myQueue

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-1st | 0        |
+---------+---------+---------------+----------+

rabbitmqctl sync_queue myQueue

rabbitmqctl list_queues name slave_pids synchronised_slave_pids

Listing queues ...
myQueue [<rabbit@vm-2nd.1.295.0>]       [<rabbit@vm-2nd.1.295.0>]
問題がなければ、検証を始めます。

今回は、先にメッセージを送信してしまいます。
コマンドプロンプトを開いて gradle から 以下のようにメッセージを送信します。
Publisher

cd /d C:\studying-rabbitmq-failover
gradle unhappyPublisher -q

start
done

送信したメッセージが Queue に届いているか確認します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-1st | 10000    |
+---------+---------+---------------+----------+

Queue にメッセージが届いているのが確認できたら
コマンドプロンプトを開いて gradle から以下のように Consumer を vm-1st に接続させて起動します。
Consumer

cd /d C:\studying-rabbitmq-failover
gradle clean run -PmainClass=studying.rabbitmq.MyUnhappyConsumer -q

listening... vm-1st
vm-2nd に接続されたら
メッセージが処理されるまで10秒空けていますので
その間に「CTRL」+「C」キーで処理を止めてもう一度、Consumer を起動し直します。

しばらくして Consumer のコマンドプロンプトからメッセージ処理が始まるのが確認できたら
Consumer

...
listening... vm-1st
 Done 'test-1' - vm-1st deliveryTag:1 ack:1
 Done 'test-2000' - vm-1st deliveryTag:2000 ack:2000
 Done 'test-4000' - vm-1st deliveryTag:4000 ack:4000

vm-1st の RabbitMQ サーバーを停止して Failover させます。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

systemctl stop rabbitmq-server.service

Consumer のコマンドプロンプトを見ると
以下のように Failover する様子が確認できると思います。
Consumer

...
listening... vm-1st
 Done 'test-1' - vm-1st deliveryTag:1 ack:1
 Done 'test-2000' - vm-1st deliveryTag:2000 ack:2000
 Done 'test-4000' - vm-1st deliveryTag:4000 ack:4000
Channel    Shutdown...
Channel    Recovered !!! vm-2nd
Connection Recovered !!! vm-2nd
Connection Shutdown...
 Done 'test-5999' - vm-2nd deliveryTag:6000 ack:6000
 Done 'test-7999' - vm-2nd deliveryTag:8000 ack:8000
 Done 'test-9998' - vm-2nd deliveryTag:9999 ack:9999
 Done 'test-9999' - vm-2nd deliveryTag:10000 ack:10000
 Done 'test-10000' - vm-2nd deliveryTag:10001 ack:10001
検証してみると

  1. 4,000通目から6,000通目のメッセージ処理をしている時に vm-1st の停止が発生していて
     HomeNode を vm-2nd に Failover できているようです。
  2. 最後のメッセージ "test-10000" まで処理できているようです。
  3. Queue にあったメッセージの数より ack の数が1つ多くなっています。
     2重にメッセージが処理されてしまっているようです。
メッセージの2重処理が発生しているのに加えて Failover のタイミングも拾えていないようです。
Exception が catch できていないということは、channel.basicAck() メソッドで
ack を返すところまでは成功しているけれども ack が 届く前に RabbitMQ サーバーが
停止したのだろうと考えられます。
HomeNode 以外に接続している場合に Failover したら
こちらの場合も同様にメッセージの2重処理が発生するようです。
Consumer

listening... vm-2nd
 Done 'test-1' - vm-2nd deliveryTag:1 ack:1
 Done 'test-2000' - vm-2nd deliveryTag:2000 ack:2000
 Done 'test-4000' - vm-2nd deliveryTag:4000 ack:4000
Channel    Shutdown...
Channel    Recovered !!! vm-1st
Connection Recovered !!! vm-1st
Connection Shutdown...
 Done 'test-5999' - vm-1st deliveryTag:6000 ack:6000
 Done 'test-7999' - vm-1st deliveryTag:8000 ack:8000
 Done 'test-9998' - vm-1st deliveryTag:9999 ack:9999
 Done 'test-9999' - vm-1st deliveryTag:10000 ack:10000
 Done 'test-10000' - vm-1st deliveryTag:10001 ack:10001


Consumer Cancel Notification

Consumer に割り当てられている Queue が削除されたり、その他の理由で Queue にアクセスできなくなった時に
RabbitMQ サーバー から Consumer Cancel Notification というものが送られる仕組みがあるようです。
このキャンセル通知を Consumer が受け取ると以降のメッセージ処理を中止するようです。
Failover 時も、このキャンセル通知が送信されるようなので、ちょっと試してみようと思います。
わたしが確認できたところでは、
Failover 時、以下の条件をすべて満たす場合に、このキャンセル通知を受け取れるようです。

   QueueMirroring している。
   HomeNode 以外に接続している。
   HomeNode が原因で Failover が発生した。
   Consumer 引数の x-cancel-on-ha-failover が "true" になっている。
上記のケースで x-cancel-on-ha-failover を設定していないとどうなるか
先ほどの HappyPath の検証を使って見てみます。

検証の前に Queue のメッセージをクリアして
HomeNode が vm-1st になっているのと QueueMirroring の同期を確認します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqadmin purge queue name=myQueue

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-1st | 0        |
+---------+---------+---------------+----------+

rabbitmqctl sync_queue myQueue

rabbitmqctl list_queues name slave_pids synchronised_slave_pids

Listing queues ...
myQueue [<rabbit@vm-2nd.3.297.0>]       [<rabbit@vm-2nd.3.297.0>]

Consumer を HomeNode でない vm-2nd に接続します。
コマンドプロンプトを開いて gradle から Consumer を起動します。
Consumer

cd /d C:\studying-rabbitmq-failover
gradle clean run -PmainClass=studying.rabbitmq.MyHappyConsumer -q

listening... vm-2nd

vm-2nd に接続できたのを確認したらメッセージを送信します。
新たにコマンドプロンプトを開いて gradle から以下のようにメッセージを送信します。
Publisher

cd /d C:\studying-rabbitmq-failover
gradle happyPublisher -q

start
done

Consumer のコマンドプロンプトからメッセージ処理が始まるのが確認できたら
Consumer

...
listening... vm-2nd
 Received 'Maurice' - vm-2nd deliveryTag:1 redeliver:false
  Done 'Maurice' ack:1
 Received 'Verdine' - vm-2nd deliveryTag:2 redeliver:false
  Done 'Verdine' ack:2
 Received 'Philip ' - vm-2nd deliveryTag:3 redeliver:false

HomeNode の vm-1st の RabbitMQ サーバーを停止して Failover させます。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

systemctl stop rabbitmq-server.service

Consumer のコマンドプロンプトでは以下のようにメッセージ処理が続いていると思います。
Consumer

...
listening... vm-2nd
 Received 'Maurice' - vm-2nd deliveryTag:1 redeliver:false
  Done 'Maurice' ack:1
 Received 'Verdine' - vm-2nd deliveryTag:2 redeliver:false
  Done 'Verdine' ack:2
 Received 'Philip ' - vm-2nd deliveryTag:3 redeliver:false
  Done 'Philip' ack:3
 Received 'Jessica' - vm-2nd deliveryTag:4 redeliver:false
  Done 'Jessica' ack:4
 Received 'Jessica' - vm-2nd deliveryTag:5 redeliver:true
  Done 'Jessica' ack:5
 Received 'Larry  ' - vm-2nd deliveryTag:6 redeliver:true
  Done 'Larry' ack:6
 Received 'Johnny ' - vm-2nd deliveryTag:7 redeliver:true
  Done 'Johnny' ack:7
 Received 'Ralph  ' - vm-2nd deliveryTag:8 redeliver:true
  Done 'Ralph' ack:8
 Received 'Al     ' - vm-2nd deliveryTag:9 redeliver:true
  Done 'Al' ack:9
 Received 'Andrew ' - vm-2nd deliveryTag:10 redeliver:true
  Done 'Andrew' ack:10
 Received 'XXX    ' - vm-2nd deliveryTag:11 redeliver:true
  Done 'XXX' ack:11
見てみると Queue のメッセージの数より ack の数が1つ多くなっています。
メッセージの2重処理が発生してしまっているようです。
もう少し見てみると "Jessica" のメッセージ処理が2回実行されているようです。
1回目の redeliver は、"false" ですが2回目の "true" になっています。
"Jessica" の2回目のメッセージ処理以降、メッセージの redeliver が "true" になっています。
恐らく vm-2nd で "Jessica" のメッセージを処理している間に vm-1st が Failover したのだと思います。
vm-1st は、"Jessica" のメッセージ処理の ack を受け取っていないので Re-Queue したのだと思います。
これがメッセージの2重処理の原因ではないでしょうか?
HappyPath と思っていましたが、実は、うまくいかないパターンもあるようです。
このような場合に Consumer Cancel Notification を利用して Consumer のメッセージ処理を止めることが
できるようです。
先ほどの HappyPath の検証で使用した MyHappyConsumerクラスを変更して
以下のような MyCancelConsumerクラスを作成します。
src/main/java/studying/rabbitmq/MyCancelConsumer.java
package studying.rabbitmq;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class MyCancelConsumer extends MyAbstractConsumer {
  
  @Override
  protected void assignConsumerFor(Channel channel, String queue) throws IOException {

    Consumer consumer = new DefaultConsumer(channel) {
 
      protected int ack;
       
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, 
          AMQP.BasicProperties properties, byte[] body) throws IOException {
         
        String message = new String(body, "UTF-8");
        String hostName = channel.getConnection().getAddress().getHostName();
         
        System.out.printf(" Received '%-7s'", message);
        System.out.printf(" - %s", hostName);
        System.out.printf(" deliveryTag:%s",  envelope.getDeliveryTag());
        System.out.printf(" redeliver:%s %n", envelope.isRedeliver());
 
        try {
          // 3秒 sleep する。
          Thread.sleep(3 * 1000);
           
          // ack を返す。
          channel.basicAck(envelope.getDeliveryTag(), false);
          this.ack++;
          System.out.printf("  Done '%s' ack:%d %n", message, this.ack);
 
        } catch (InterruptedException e) {
          System.out.printf("  Interrupted '%s' %n", message);
          e.printStackTrace();
        } catch (Exception e) {
          System.out.printf("  Error '%s' %n", message);
          throw e;
        }
      }
        
      @Override
      public void handleCancel(String consumerTag) throws IOException {
        System.out.printf(" Canceled !!!!! %n");
      }
    };
 
    // x-cancel-on-ha-failover を true にして Consumer を Channel に割り当てる。
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-cancel-on-ha-failover", true);
    boolean autoAck = false;
    channel.basicConsume(queue, autoAck, args, consumer);
  }


  public static void main(String[] args) {
    try {      
      new MyCancelConsumer().doAction();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}
MyHappyConsumerクラスからの変更点としては、

  1. DefaultConsumerクラスの handleDelivery() メソッドをオーバーライドしてメッセージを処理しています。
  2. Channel に Consumer を割り当てる時に x-cancel-on-ha-failover引数を "true" にしています。

になります。

再度、検証したいと思います。
検証の前に Queue のメッセージをクリアして
HomeNode が vm-1st になっているのと QueueMirroring の同期を確認します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqadmin purge queue name=myQueue

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-1st | 0        |
+---------+---------+---------------+----------+

rabbitmqctl sync_queue myQueue

rabbitmqctl list_queues name slave_pids synchronised_slave_pids

Listing queues ...
myQueue [<rabbit@vm-2nd.1.297.0>]       [<rabbit@vm-2nd.1.297.0>]

Consumer を HomeNode でない vm-2nd に接続します。
コマンドプロンプトを開いて gradle から Consumer を起動します。
Consumer

cd /d C:\studying-rabbitmq-failover
gradle clean run -PmainClass=studying.rabbitmq.MyCancelConsumer -q

listening... vm-2nd

vm-2nd に接続できたのを確認したらメッセージを送信します。
新たにコマンドプロンプトを開いて gradle から以下のようにメッセージを送信します。
Publisher

cd /d C:\studying-rabbitmq-failover
gradle happyPublisher -q

start
done

Consumer のコマンドプロンプトからメッセージ処理が始まるのが確認できたら
Consumer

...
listening... vm-2nd
 Received 'Maurice' - vm-2nd deliveryTag:1 redeliver:false
  Done 'Maurice' ack:1
 Received 'Verdine' - vm-2nd deliveryTag:2 redeliver:false
  Done 'Verdine' ack:2
 Received 'Philip ' - vm-2nd deliveryTag:3 redeliver:false

vm-1st の RabbitMQ サーバーを停止させて Consumer Cancel Notification が受け取れるか確認します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

systemctl stop rabbitmq-server.service

以下のように Consumer のコマンドプロンプトから Consumer Cancel Notification を受け取って
Consumer が停止しているのが確認できると思います。
Consumer

...
listening... vm-2nd
 Received 'Maurice' - vm-2nd deliveryTag:1 redeliver:false
  Done 'Maurice' ack:1
 Received 'Verdine' - vm-2nd deliveryTag:2 redeliver:false
  Done 'Verdine' ack:2
 Received 'Philip ' - vm-2nd deliveryTag:3 redeliver:false
  Done 'Philip' ack:3
 Received 'Jessica' - vm-2nd deliveryTag:4 redeliver:false
  Done 'Jessica' ack:4
 Canceled !!!!!

vm-1st の RabbitMQ サーバーを再起動させて Queue のメッセージを見ています。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

systemctl start rabbitmq-server.service

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-2nd | 7        |
+---------+---------+---------------+----------+
見てみると Consumer は、ack を 4つカウントしているのですが
Queue のメッセージは、7通あります。
恐らく Failover が発生したため Consumer が最後に処理していたメッセージの ack が受け取れずに
Re-Queue されたのだと思います。
Consumer が止まらずにメッセージ処理を続けてたらメッセージの2重処理が発生していると思います。
それをどう回避するか、どう対応するかは、アプリの実装に委ねられるのだと思います。


おわりに

Failover は、ライブラリーやミドルウェアの設定でなんとかなりそうですが、
メッセージの2重処理については、アプリ側での対応になるようです。
このあたりは、アプリの要件によって変わってくるので仕方ないと思います。
(2重にメッセージが処理されてもサービス的に影響がない場合もあると思います。
いづれにせよ Consumer は、メッセージが2重に処理されることも
考慮した作りにしておく必要がありそうです。
また、2重処理されたメッセージを捕捉できるようなログも出力しておきたいところです。
参考URL
http://www.rabbitmq.com/clustering.html
http://www.rabbitmq.com/ha.html
http://www.rabbitmq.com/consumer-cancel.html
http://www.rabbitmq.com/api-guide.html

2016年5月12日木曜日

RabbitMQ の 冗長化を考える

はじめに

RabbitMQ の冗長化の機能として
Clustering(クラスタリング)と QueueMirroring(Queueミラーリング)が提供されているようです。
今回は、これらを使って冗長化を考えてみたいと思います。
Cluster の Node では、Queue、Exchange、Binding、User などが共有できるようです。
しかし Queue だけは、他と扱いが異なるということです。
Queue は、Cluster 内のどこか1つの Node が HomeNode となり保持される仕組みになっているようです。
HomeNode 以外の Node は、Queue を参照する形になるようです。
そのため HomeNode が落ちてしまうと他の Node は、Queue の参照できるけれども
Queue にあるメッセージが参照できなくなるという事態に陥ってしまうようです。
さらにメッセージが Persistence でなければメッセージ自体もなくなってしまう。
これをカバーするものが QueueMirroring となるようです。
HomeNode のメッセージを他の Node でも共有しようという仕組みのようです。
始める前に...
RabbitMQ がインストール済であること
RabbitMQ サーバーにログイン可能でリソースにアクセスできるユーザが作成済であること
(ユーザとパスワードを "user01" と "passwd" としています。
ManagementPlugin が有効になっていて CommandLineTool がインストールていること
以上が前提になります。
このあたりはわたしも『yum の Local Repository に RabbitMQ をインストール』にまとめていますので
よろしければ、こちらも合わせて、ご参考に。
ではでは!Getting Started!


環境

今回は、以下の2台のサーバー(vm-1st と vm-2nd)で冗長化構成を考えたいと思います。
RabbitMQ Server (vm-1st)

ホスト名 vm-1st
IP 192.168.10.110
ミドルウェア RabbitMQ server 3.6.1
  ユーザ  user01   パスワード  passwd
OS CentOS-7-x86_64-Minimal-1511.iso
VirtualBox 5.0.16


RabbitMQ Server (vm-2nd)

ホスト名 vm-2nd
IP 192.168.10.120
ミドルウェア RabbitMQ server 3.6.1
  ユーザ  user01   パスワード  passwd
OS CentOS-7-x86_64-Minimal-1511.iso
VirtualBox 5.0.16



準備

Clustering と QueueMirroring の設定をするにあたりホスト名とその名前解決が必要になるようです。
( ! ) 注意 RabbitMQ のリソース
ホスト名を変更すると User や Queue などが消えてしまう?
Queue、Exchange、Binding、User などは、ホスト名に関連付けされて
/var/lib/rabbitmq/mnesia ディレクトリ以下に保存されるようです。
(Mnesia という分散データベースが使われているようです。
実際、消えてはないのですが、変更前のホスト名で保存されているので
アクセスできなくなってしまうようです。
それぞれ以下のように設定します。
ホスト名を設定した後は、一度、再起動しておきます。
vm-1st

/etc/hostname
vm-1st
/etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.10.110 vm-1st
192.168.10.120 vm-2nd
reboot


vm-2nd

/etc/hostname
vm-2nd
/etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.10.110 vm-1st
192.168.10.120 vm-2nd
reboot

ホスト名を変更した場合
guest 以外のユーザが存在するか確認します。
rabbitmqadmin list users name tags

+--------+---------------+
|  name  |     tags      |
+--------+---------------+
| guest  | administrator |
+--------+---------------+
guest しか存在しない場合、恐らく User が消えてしまっていると思います。
ユーザとパスワードを "user01" と "passwd" にして再作成します。
リソースへのアクセス権限と Web の管理コンソールを使用するための Role も設定します。
rabbitmqctl add_user user01 passwd
rabbitmqctl set_permissions -p / user01 ".*" ".*" ".*"
rabbitmqctl set_user_tags user01 administrator
もう一度ユーザを確認します。
rabbitmqadmin list users name tags

+--------+---------------+
|  name  |     tags      |
+--------+---------------+
| guest  | administrator |
| user01 | administrator |
+--------+---------------+
user01 が追加されていれば OK です。
ユーザ作成は、Main の ClusterNode だけで OK です。
Sub の ClusterNode は、Main の ClusterNode の情報がレプリケートされます。

Clustering の設定方法には、いくつかあるのですが、
今回は、ManagementPlugin の CommandLineTool を使おうと思います。
そのためには、Cluster に参加する Node は、同じ ErlangCookie を持っていないといけないようです。
(コマンド実行時の認証に必要なようです。
ErlangCookie に特別な制限は、なくアルファベットの文字列であればなんでもよいようです。
とりあえず、vm-1st と vm-2nd に適当な同じ ErlangCookie 文字列を設定します。
手順としては、

  1. RabbitMQ サーバーを停止してから ErlangCookie を変更する。
  2. ErlangCookie を変更した後、RabbitMQ サーバーを再起動する。

とします。
vm-1st と vm-2nd のターミナルからそれぞれ以下のコマンドを実行します。
vm-1st

systemctl stop rabbitmq-server.service
echo MYERLANGCOOKIESTR > /var/lib/rabbitmq/.erlang.cookie
systemctl start rabbitmq-server.service

vm-2nd

systemctl stop rabbitmq-server.service
echo MYERLANGCOOKIESTR > /var/lib/rabbitmq/.erlang.cookie
systemctl start rabbitmq-server.service

ErlangCookie
vm-1st の ErlangCookie を vm-2nd にコピーするのもよいと思います。
vm-2nd のターミナルから以下のコマンドを実行します。
vm-2nd

mv /var/lib/rabbitmq/.erlang.cookie /var/lib/rabbitmq/.erlang.cookie_bk
scp root@vm-1st:/var/lib/rabbitmq/.erlang.cookie /var/lib/rabbitmq/.erlang.cookie
chown rabbitmq /var/lib/rabbitmq/.erlang.cookie
準備は、これで終わりです。


Clustering の設定

今、vm-1st と vm-2nd の Clustering がどうなっているか確認してみましょう。
まず、vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqctl cluster_status

Cluster status of node 'rabbit@vm-1st' ...
[{nodes,[{disc,['rabbit@vm-1st']}]},
 {running_nodes,['rabbit@vm-1st']},
 {cluster_name,<<"rabbit@vm-1st">>},
...
見てみると "rabbit@vm-1st" という Cluster があり
このサーバー(vm-1st) だけが参加していて起動している。
こんな感じでしょうか?

次に、vm-2nd のターミナルから以下のコマンドを実行します。
vm-2nd

rabbitmqctl cluster_status

Cluster status of node 'rabbit@vm-2nd' ...
[{nodes,[{disc,['rabbit@vm-2nd']}]},
 {running_nodes,['rabbit@vm-2nd']},
 {cluster_name,<<"rabbit@vm-2nd">>},
...
こちらも "rabbit@vm-2nd" という Cluster があり
このサーバー(vm-2nd) だけが参加していて起動している。

整理してみると
vm-1st と vm-2nd のどちらも
それぞれが Cluster を持っていて自サーバーのみ参加していて起動している。
Disk Node と RAM Node
Node の保存先に Disk と RAM があるようだ。
保存先に RAM を選ぶとパフォーマンスを稼げるということらしいが
一般的には、Disk を選択するのが無難なようだ。

ではでは、vm-2nd を vm-1st の Cluster に参加させてみましょう。
手順としては、

  1. vm-2nd を offline にする。
  2. vm-2nd を vm-1st の Cluster に参加させる。
  3. vm-2nd を online にする。

のようになります。
vm-2nd のターミナルから以下のコマンドを実行します。
vm-2nd

rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@vm-1st
rabbitmqctl start_app

ちゃんと Cluster に参加できたか確認します。
vm-1st と vm-2nd の両方のターミナルで以下のコマンドを実行します。
vm-1st

rabbitmqctl cluster_status

Cluster status of node 'rabbit@vm-1st' ...
[{nodes,[{disc,['rabbit@vm-1st','rabbit@vm-2nd']}]},
 {running_nodes,['rabbit@vm-2nd','rabbit@vm-1st']},
 {cluster_name,<<"rabbit@vm-1st">>},
...

vm-2nd

rabbitmqctl cluster_status

[{nodes,[{disc,['rabbit@vm-1st','rabbit@vm-2nd']}]},
 {running_nodes,['rabbit@vm-1st','rabbit@vm-2nd']},
 {cluster_name,<<"rabbit@vm-1st">>},
...
どうなったでしょう?
ちょっと見てみると
vm-1st と vm-2nd のどちらにも
"rabbit@vm-1st" という Cluster があり
そこにどちらも参加していてどちらも起動している。
いけてそうな感じですね!
Clustering の解除
vm-2nd を vm-1st の Cluster から解除する手順としては、

  1. vm-2nd を offline にする。
  2. vm-2nd を vm-1st の Cluster から解除する。
  3. vm-2nd を online にする。

のようになります。
vm-2nd のターミナルから以下のコマンドを実行します。
vm-2nd

rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
RabbitMQ と Erlang の version
RabbitMQ の version が違う Node は、Cluster に参加できないようです。
同様に Erlang の version が違う Node も Cluster に参加できないようです。
Clustering された RabbitMQ の version up は、なかなか大変?!


Queue を宣言する

とりあえず、Queue がないと始まらないので宣言をします。
Queue名を "myQueue" とします。
vm-1st と vm-2nd のどちらでも構わないのですが
ここでは、vm-2nd を HomeNode にして Queue を宣言したいと思います。
vm-2nd のターミナルから以下のコマンドを実行します。
vm-2nd

rabbitmqadmin declare queue name=myQueue

宣言できたか確認します。
デフォルト(無名)の Exchange に Bind されているかも確認します。
vm-2nd

rabbitmqadmin list queues name durable node messages 

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-2nd | 0        |
+---------+---------+---------------+----------+
Queue名が "myQueue"、durable が "True"、HomeNode が "vm-2nd" で宣言されているようです。
rabbitmqadmin list bindings

+------------+-------------+-------------+
|   source   | destination | routing_key |
+------------+-------------+-------------+
|            | myQueue     | myQueue     |
+------------+-------------+-------------+
Queue名を RoutingKey にしてデフォルトの Exchange に Bind されているようです。
Queue の HomeNode
デフォルトでは、Queue を宣言した Node が HomeNode になるようです。
以下で HomeNode の決定条件を変更できるようです。
  Queue 作成時の x-queue-master-locator 引数
  queue-master-locator policy
  configuration ファイル
HomeNode の決定条件として以下の値が設定可能なようです。
  min-masters
  client-local (デフォルト)
  random
Queue の削除
Queue を指定して以下のコマンドを実行します。
rabbitmqadmin delete queue name=myQueue

HomeNode でない vm-1st からはどう見えているか確認したいと思います。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqadmin list queues name durable node messages 

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-2nd | 0        |
+---------+---------+---------------+----------+
vm-2nd と同じように見えているのが分かると思います。

HomeNode でない vm-1st からメッセージを送信するとどうなるか試してみたいと思います。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqadmin publish routing_key=myQueue payload="Hello?"
送信されたメッセージが Queue に届いたか確認します。
vm-1st と vm-2nd の両方のターミナルで以下のコマンドを実行します。
vm-1st

rabbitmqadmin list queues name durable node messages 

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-2nd | 1        |
+---------+---------+---------------+----------+

vm-2nd

rabbitmqadmin list queues name durable node messages 

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-2nd | 1        |
+---------+---------+---------------+----------+
ちゃんと届いているようです。
Queue には、HomeNode があり Master / Slave のような構成になっているが
どこの Node からメッセージを送信しても Cluster 内のすべての Queue にレプリケートされるようです。
ある意味、MultiMaster的な動きになっていると思います。
メッセージの取り出しと削除
メッセージを取り出す場合は、Queue を指定して以下のコマンドを実行します。
rabbitmqadmin get queue=myQueue requeue=false
requeue を省略、または "true" にすると取り出したメッセージがもう一度、Queue に戻されます。

メッセージを削除する場合は、Queue を指定して以下のコマンドを実行します。
rabbitmqadmin purge queue name=myQueue

QueueMirroring していない状態で HomeNode になっている vm-2nd が落ちるとどうなるか試してみます。
vm-2nd の RabbitMQ サーバーを停止させた後、vm-1st から Queue がどう見えているか確認します。
vm-1st と vm-2nd のターミナルでそれぞれ以下のコマンドを実行します。
vm-2nd

systemctl stop rabbitmq-server.service

vm-1st

rabbitmqadmin list queues name durable node messages 

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-2nd |          |
+---------+---------+---------------+----------+
Queue は見えているもののメッセージが見えなくなりました。

vm-2nd が復帰した時、メッセージがどうなっているか確認したいと思います。
vm-2nd の RabbitMQ サーバーを起動させた後、vm-1st と vm-2nd から Queue が
どう見えているか確認します。
vm-1st と vm-2nd のターミナルでそれぞれ以下のコマンドを実行します。
vm-2nd

systemctl start rabbitmq-server.service

rabbitmqadmin list queues name durable node messages 

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-2nd | 0        |
+---------+---------+---------------+----------+

vm-1st

rabbitmqadmin list queues name durable node messages 

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-2nd | 0        |
+---------+---------+---------------+----------+
メッセージが消えてしまっているのが分かると思います。
HomeNode が落ちてもメッセージが消えないように QueueMirroring の設定をすることにします。
DeliveryMode
メッセージ送信時に DeliveryMode を persistent にすると
送信したメッセージがディスク領域に保存されるようになるため
HomeNode が落ちてもメッセージが消えてしまうことはないようです。
ただし HomeNode が落ちている間、メッセージが見えなくなるので取り出しはできないようです。

DeliveryMode を persistent にしてメッセージを送信する。
rabbitmqadmin publish routing_key=myQueue payload="Hello?" properties='{"delivery_mode":2}'
HomeNode を停止する。
systemctl stop rabbitmq-server.service
メッセージを取り出してみる。
rabbitmqadmin get queue=myQueue requeue=false

*** Not found: /api/queues/%2F/myQueue/get
Not found になりメッセージを取り出せない。
HomeNode を起動させて Queue を確認する。
systemctl start rabbitmq-server.service

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-2nd | 1        |
+---------+---------+---------------+----------+
メッセージは、残っている。
メッセージを取り出してみる。
rabbitmqadmin get queue=myQueue requeue=false

+-------------+----------+---------------+---------+---------------+------------------+-------------+
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | redelivered |
+-------------+----------+---------------+---------+---------------+------------------+-------------+
| myQueue     |          | 0             | Hello?  | 6             | string           | False       |
+-------------+----------+---------------+---------+---------------+------------------+-------------+

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-2nd | 0        |
+---------+---------+---------------+----------+
メッセージが取り出せた。


QueueMirroring の設定

Clustering の設定と Queue の宣言に続いて QueueMirroring の設定に入りたいと思います。
policy を使って設定することになるようです。
設定する policy は、ha-mode とその値の ha-params になるようです。
先ほど宣言した Queue を Cluster 内のすべての Node で Mirroring するようにします。
policy名を "myPolicy" とします。
vm-1st と vm-2nd のどちらでも構わないのですが
とりあえず、vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqctl set_policy myPolicy "^myQueue$" '{"ha-mode":"all"}'
rabbitmqctl set_policy コマンドの書式

rabbitmqctl set_policy    policy名    パターン    policy設定
  • policy名

    policy名を指定する。

    パターン

    policy を適用する Queue を正規表現を使って指定する。

    policy設定

    設定する policy をJSON 形式で指定する。

ha-mode と ha-params
ha-mode は、3つあり ha-params と合わせて以下のように設定できるようです。
  • all

    すべての Node で mirroring する。(ha-params はない)

    例)  '{"ha-mode":"all"}'

    exactly

    指定した数の Node で mirroring する。

    ha-params には、Node数を指定する。

    例)  '{"ha-mode":"exactly","ha-params":2}'

    nodes

    指定した Node で mirroring する。

    ha-params には、Node名を指定する。

    例)  '{"ha-mode":"nodes","ha-params":["rabbit@vm-1st", "rabbit@vm-2nd"]}'


policy が作成できたか確認します。
vm-1st のターミナルから以下のコマンドを実行します。
rabbitmqctl list_policies

Listing policies ...
/       myPolicy        all     ^myQueue$       {"ha-mode":"all"}       0
作成できているようです。
policy の削除
policy を指定して以下のコマンドを実行します。
rabbitmqctl clear_policy myPolicy

Cluster 内の Node で QueueMirroring が同期しているか確認します。
vm-1st のターミナルから以下のコマンドを実行します。
rabbitmqctl list_queues name slave_pids synchronised_slave_pids

Listing queues ...
myQueue [<rabbit@vm-1st.3.24159.1>]     [<rabbit@vm-1st.3.24159.1>]
"myQueue" の slave に vm-1st があり、同期している slave にも vm-1st があるようなので
問題ないと思います。

では、メッセージを送信してみて HomeNode が落ちてもメッセージが消えてしまわないか確認したいと思います。
vm-1st からメッセージを送信します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqadmin publish routing_key=myQueue payload="Hello?"

HomeNode になっている vm-2nd の RabbitMQ サーバーを停止します。
vm-2nd のターミナルから以下のコマンドを実行します。
vm-2nd

systemctl stop rabbitmq-server.service

この状態で HomeNode でない vm-1st で Queue がどういう状態になっているか確認します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-1st | 1        |
+---------+---------+---------------+----------+
メッセージが消えずに残っているのが確認できると思います。
もう一点、HomeNode が vm-2nd から vm-1st に変わっているのも確認できると思います。

ここで vm-2nd の RabbitMQ サーバーが起動すると
vm-1st と vm-2nd から Queue がどう見えているか確認します。
vm-1st と vm-2nd のターミナルでそれぞれ以下のコマンドを実行します。
vm-2nd

systemctl start rabbitmq-server.service

rabbitmqadmin list queues name durable node messages 

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-1st | 1        |
+---------+---------+---------------+----------+

vm-1st

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-1st | 1        |
+---------+---------+---------------+----------+
HomeNode が vm-1st に変更されたままになっているのが確認できると思います。
HomeNode が落ちたタイミングで別の Node が HomeNode になり
メッセージを引き継ぐ形になっているようです。
そのため HomeNode が落ちてもメッセージが消えることは、ないようです。

HomeNode が変更されたので QueueMirroring の同期を確認します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqctl list_queues name slave_pids synchronised_slave_pids

Listing queues ...
myQueue [<rabbit@vm-2nd.3.293.0>]       []
slave として、vm-2nd が追加されているのは、確認できますが同期がうまくいっていないようです。
vm-1st のターミナルから以下のコマンドを実行して同期の設定をします。
vm-1st

rabbitmqctl sync_queue myQueue

もう一度、QueueMirroring の同期を確認します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqctl list_queues name slave_pids synchronised_slave_pids

Listing queues ...
myQueue [<rabbit@vm-2nd.3.293.0>]       [<rabbit@vm-2nd.3.293.0>]
今度は、問題なく同期できているのが確認できると思います。
QueueMirroring の同期
デフォルトの QueueMirroring の同期設定は、手動になっているようです。
policy 作成時に ha-sync-mode を "automatic" にすると自動同期にすることができるようです。
rabbitmqctl set_policy myPolicy "^myQueue$" '{"ha-mode":"all", "ha-sync-mode":"automatic"}'


おわりに

なかなかいい感じに MultiMaster のような形で冗長化できたのではと思います。
後は、Cluster 内の Node に仮想の IP を設定して、アプリからアクセスする感じでしょうか?
参考URL
http://www.rabbitmq.com/clustering.html
http://www.rabbitmq.com/ha.html

2016年5月4日水曜日

RabbitMQ で JSON RPC

はじめに

前回、『RabbitMQ で同時処理実行数を制限する』で送信処理と受信処理を見てきましたが
今回は、それに加えてレスポンスを取得できるようにしたいと思います。
やり方として RabbitMQ Tutorials の『6 Remote procedure call (RPC)』が
参考になるのではないかと思います。
tutorial では、送信用の Queue とは別に 返信用の Queue を用意して
レスポンスを処理する方法になっていたかと思います。
非同期的にレスポンス処理するのであれば、このやり方になるのだろうと思います。
でも、なにか、こう、もうちょっと、そこまででなくていいのでお手軽にできる方法はないかと
JavaDoc を眺めていると JsonRpcServer と JsonRpcClient なるものを見つけました!
名前的にみてメッセージを JSON で RPC風に送受信処理と返信処理ができそうな雰囲気です!
これは!と思い試してみることにしました。
始める前に...
RabbitMQ がインストール済であること
RabbitMQ サーバーにログイン可能でリソースにアクセスできるユーザが作成済であること
(ユーザとパスワードを "user01" と "passwd" としています。
以上が前提になります。
このあたりはわたしも『yum の Local Repository に RabbitMQ をインストール』にまとめていますので
よろしければ、こちらも合わせて、ご参考に。
ではでは!Getting Started!

環境

開発機

開発言語 Java 1.8.0
フレームワーク rabbitmq-java-client 3.6.1
ビルドツール Gradle 2.12
※gradle コマンドが実行できるように設定しておきます。
OS Windows 8.1


RabbitMQ Server

IP 192.168.10.110
ミドルウェア RabbitMQ server 3.6.1
  ユーザ  user01   パスワード  passwd
OS CentOS-7-x86_64-Minimal-1511.iso
VirtualBox 5.0.16


プロジェクト構成

今回は、RPC ということでプロジェクトを Client と Service そして Gateway の3つに分けました。
というのも Client、Service、Gateway の3つそれぞれ別の jar ファイルになっていた方が
RPC では、何が何に依存していて、どこで何が必要なのかなど
その雰囲気がより伝わるだろうと思ったからです。
もちろん、1つのプロジェクトにするこもできます。
Client と Service のインターフェイス部分と両者で使用するデータクラスを Gateway に切り出しました。
Service は、Gateway のインターフェイスを実装してサービスとして公開します。
Client は、Gateway のインターフェイスからスタブを作成してサービスにアクセスします。
Service と Client は、Gateway のデータクラスを使って通信します。
おおよそ、こんな感じになると思います。
C ドライブ直下に「studying-rabbitmq-rpc」フォルダを作成して各ファイルを以下のように配置します。
ファイルは、UTF-8 で保存します。
  • C:\studying-rabbitmq-rpc
    • client
      • src
        • main
          • java
            • studying
              • client
                • MyRpcClient.java
    • gateway
      • src
        • main
          • java
            • studying
              • gateway
                • MyRequest.java
                • MyResponse.java
                • MyService.java
    • service
      • src
        • main
          • java
            • studying
              • service
                • MyRpcServer.java
                • MyServiceImpl.java
    • build.gradle
    • settings.gradle


準備

ビルドスクリプトを作成します。
プロジェクトは、Client、Service、Gateway と3つあるのですがビルドスクリプトは1つにしています。
build.gradle
subprojects {
  apply plugin: 'java'
  apply plugin: 'eclipse'
  apply plugin: 'application'

  sourceCompatibility = '1.8'
  targetCompatibility = '1.8'
  [compileJava, compileTestJava]*.options*.encoding = 'UTF-8'
  
  mainClassName = (project.hasProperty('mainClass')) ? "$mainClass" : ''

  repositories {
    jcenter()
  }

  dependencies {
    compile 'com.rabbitmq:amqp-client:3.6.1'
    compile 'org.slf4j:slf4j-api:1.7.18'
    testCompile 'junit:junit:4.12'
  }
}

project(':client') {
  dependencies {
    compile project(':gateway')
  }
}

project(':service') {
  dependencies {
    compile project(':gateway')
  }
}
client と service の compile 時の依存関係に gateway の project/jar ファイルを追加しています。
settings.gradle
rootProject.name = 'studying-rabbitmq-rpc'
include 'client', 'gateway', 'service'

RabbitMQ のユーザが未作成の場合は、以下のようにして作成します。
作成したユーザがリソースにアクセスできるように権限も設定します。
rabbitmqctl add_user user01 passwd
rabbitmqctl set_permissions -p / user01 ".*" ".*" ".*"
ユーザとパスワードは、"user01" と "passwd" にしています。


Gateway の作成

まず、Client側と Service側の両方で使う Gateway から作成します。
送信と返信に使うメッセージデータクラスを作成します。
gateway/src/main/java/studying/gateway/MyRequest.java
package studying.gateway;

public class MyRequest {
  private String message;

  public String getMessage() {
    return message;
  }

  public void setMessage(String message) {
    this.message = message;
  }
}
gateway/src/main/java/studying/gateway/MyResponse.java
package studying.gateway;

public class MyResponse {
  private String result;

  public String getResult() {
    return result;
  }

  public void setResult(String result) {
    this.result = result;
  }
}

次にインターフェイスを作成します。
メソッドは2つ定義します。
gateway/src/main/java/studying/gateway/MyService.java
package studying.gateway;

public interface MyService {
  
  // (1)
  public String echo(String message);

  // (2)
  public Object print(Object param);
}
メソッドは、以下のようになります。
(1) echo()メソッド

文字列を引数に受けて、また文字列を返します。

(2) print()メソッド
Object型の引数で Object型の戻り値になっていますが、
実装的には、MyRequestクラスを Object型の引数で受けて、
MyResponseクラスを Object型で返すようになります。
(MyRequestクラスと MyResponseクラスを直接、引数と戻り値にできない事情が...


Service の作成

Gateway のインターフェイスの実装クラスを作成します。
service/src/main/java/studying/service/MyServiceImpl.java
package studying.service;

import java.util.Map;
import com.rabbitmq.tools.json.JSONUtil;
import studying.gateway.MyRequest;
import studying.gateway.MyResponse;
import studying.gateway.MyService;

public class MyServiceImpl implements MyService {

  // (1)
  @Override
  public String echo(String message) {
    System.out.printf("Received '%s' %n", message);
    
    return "Hello? " + message;
  }

  // (2)
  @SuppressWarnings("unchecked")
  @Override
  public Object print(Object param) {
    System.out.printf("Received '%s' %n", param);

    MyRequest request = new MyRequest();
    JSONUtil.tryFill(request, (Map<String, Object>) param);
    
    MyResponse response = new MyResponse();
    response.setResult("Hello! " + request.getMessage());
    
    return response;
  }
}
実装内容は、以下のようになります。
(1) echo()メソッド

受信したメッセージをコンソールに出力します。
メッセージに文字列 "Hello?" を付けて返します。

(2) print()メソッド
受信したメッセージをコンソールに出力します。
メッセージは、一旦、Object型で受けて JSONUtil を使って MyRequestクラスに出力します。
MyRequestクラスからメッセージを取り出して文字列 "Hello!" を付けて返信データにします。
返信データを MyResponseクラスに設定して返します。
    
引数と戻り値
JsonRpcServer/JsonRpcClient というだけあってメッセージデータは、JSON になっているようです。
String型や int型など primitive型は、直接 JSON に変換できるようなので
そのまま引数や戻り値にしても問題ないようです。
シンプルな JavaBean の形式をとっているクラスであえば、JSON に変換可能なようで
同じように引数や戻り値に使えるみたいです。
ただし、この場合、注意が必要で Proxy 越しに呼び出されるのが原因だと思うのですが
一旦 Object型にしないとうまく動かないようです。
RPC といっても今回は、Java クラスをシリアライズしたメッセージデータを使っている訳ではないので
Service側と Client側で同じ Javaクラスを使っていないと通信できないことはないと思います。
飽くまでも今回のメッセージデータは、JSON なので Service側と Client側がメッセージデータに
それぞれの Javaクラスを使っていたとしても JSON をそれぞれのクラスに変換(出力)できれば問題ないと思います。
今回は、Gateway という形でメッセージデータに同じクラスを使っていますが
Service側と Client側がそれぞれでメッセージデータ用の Javaクラスを用意してもよいと思います。
(おおよそ、フィールド名を合わせておけばクラス名が違っていてもいけそうです。

次にサービスを公開してメッセージの受付を開始するクラスを作成します。
service/src/main/java/studying/service/MyRpcServer.java
package studying.service;

import java.io.IOException;
import java.util.Map;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.tools.jsonrpc.JsonRpcServer;
import studying.gateway.MyService;

public class MyRpcServer {

  public static void main(String[] args) {
    System.out.printf("listening... %n");
    
    // RabbitMQ Server 接続情報
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.10.110");
    factory.setUsername("user01");
    factory.setPassword("passwd");
    Connection connection = null;
    
    try {
      // RabbitMQ Server に接続して Channel を生成する。
      connection = factory.newConnection();
      Channel channel = connection.createChannel();
      
      
      // Queue と Exchange を宣言して Bind をする。
      String queueName    = "myRpcQueue";
      String exchangeType = "direct";
      String exchangeName = "myRpcExchange";
      String routingKey   = "myRpcKey";
   
      boolean durable = true;
      boolean exclusive = false;
      boolean autoDelete = false;
      Map<String, Object> arguments = null;
      
      channel.exchangeDeclare(exchangeName, exchangeType, durable);
      channel.queueDeclare(queueName, durable, exclusive, autoDelete, arguments);
      channel.queueBind(queueName, exchangeName, routingKey);
      
      
      // メッセージの受付を開始する。
      JsonRpcServer server = new JsonRpcServer(channel, queueName, MyService.class, new MyServiceImpl());
      server.mainloop();
      
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      if (connection != null && connection.isOpen()) {
        try {
          connection.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
  }
}
処理内容としては、
送受信処理に使う Queue と Exchange (direct タイプ) を宣言しています。
Queue名と Exchange名は、"myRpcQueue" と "myRpcExchange" になります。
RoutingKey を "myRpcKey" にして "myRpcQueue" と "myRpcExchange" を Bind しています。
MyServiceインターフェイスとその実装クラス MyServiceImpl を使って
メッセージの受信処理を開始しています。


Client の作成

サービスの呼び出しクラスを作成します。
echo()メソッドと print()メソッドを呼び出して結果をコンソールに出力します。
以下の内容で作成します。
client/src/main/java/studying/client/MyRpcClient.java
package studying.client;

import java.io.IOException;
import java.util.Map;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.tools.json.JSONUtil;
import com.rabbitmq.tools.jsonrpc.JsonRpcClient;
import studying.gateway.MyRequest;
import studying.gateway.MyResponse;
import studying.gateway.MyService;

public class MyRpcClient {
  
  @SuppressWarnings("unchecked")
  public void doAction() {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.10.110");
    factory.setUsername("user01");
    factory.setPassword("passwd");
    Connection connection = null;
    
    try {
      connection = factory.newConnection();
      Channel channel = connection.createChannel();

      String exchangeName = "myRpcExchange";
      String routingKey   = "myRpcKey";
      int timeout = 1000 * 5;

      // Stub を作成する。
      JsonRpcClient client = new JsonRpcClient(channel, exchangeName, routingKey, timeout);
      MyService serivce = (MyService)client.createProxy(MyService.class);

      
      // (1) echo()メソッド呼び出し
      String message = serivce.echo("dolphin");
      System.out.printf("echo: %s %n", message);
      
      
      // (2) print()メソッド呼び出し
      MyRequest request = new MyRequest();
      request.setMessage("revolution");
      Object source = serivce.print(request);
      
      MyResponse response = new MyResponse();
      JSONUtil.fill(response, (Map<String, Object>)source);
      System.out.printf("print: %s %n", response.getResult());
      
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      if (connection != null && connection.isOpen()) {
        try {
          connection.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
  }

  public static void main(String[] args) {
    new MyRpcClient().doAction();
  }
}
Exchange と Queue の宣言と Bind は、Service側で行うので
Client側では、送信時に Exchange と RoutingKey を指定するだけにします。
送信する Exchange と RoutingKey は、"myRpcExchange" と "myRpcKey" にしています。
送信するまでに最低限 Exchange が存在していないとうまくいかないので
先に Service側を起動することになります。
処理内容は、以下のようになります。
(1) echo()メソッド呼び出し

引数と戻り値とも文字 型なのでそのまま送信して、その結果をコンソールに出力します。

(2) print()メソッド呼び出し
メッセージを MyRequest クラスで送信します。
(インターフェイス的には、Object 型ですが実装は、MyRequest クラスになっています。
結果メッセージを一旦、Object 型で受けて JSONUtil を使って MyResponse クラスに出力します。
MyResponse クラスから結果メッセージを取り出してコンソールに出力します。
返信用の Queue
RabbitMQ の Tutorial 的には、返信メッセージの受付に Queue が宣言されているのですが
JsonRpcServer/JsonRpcClient では、Direct reply-to (amq.rabbitmq.reply-to.XXX) という
疑似的な Queue が暗黙的に返信用の Queue の代わりに使われるため
明示的に返信用の Queue を宣言する必要はないようです。


実行する

Queue と Exchange の設定をしている Service側を先に起動します。
コマンドプロンプトを開いて、gradle から Service側を起動します。
Service

cd /d C:\studying-rabbitmq-rpc
gradle :service:clean :service:run -PmainClass=studying.service.MyRpcServer -q

listening...
Service側が起動したのが確認できたら Client側を実行します。
Service側のコマンドプロンプトはそのままにしておいて
新たにコマンドプロンプトを開いて以下のコマンドを実行します。
以下のように表示されると思います。
Client

cd /d C:\studying-rabbitmq-rpc
gradle :client:clean :client:run -PmainClass=studying.client.MyRpcClient -q

echo: Hello? dolphin
print: Hello! revolution

Service側のコマンドプロンプトには、以下のように表示されると思います。
Service

...
listening...

Received 'dolphin'
Received '{message=revolution}'

また、以下のように jar ファイルを作って実行してみるのもいいと思います。
cd /d C:\studying-rabbitmq-rpc
gradle :service:clean :service:distZip -PmainClass=studying.service.MyRpcServer -q
gradle :client:clean :client:distZip -PmainClass=studying.client.MyRpcClient -q


おわりに

同期処理ではありますがレスポンスを取得できるようになったと思います。
(レスポンスを非同期処理で取得できるようにするのは、なかなか手間だと思います...
今回は、お手軽に、というのも裏テーマにあったので JsonRpcServer と JsonRpcClient を使いました。
使ってみたところ、Client側の実装が Java に限定されてしまいそうな懸念を持ちました。
しかも JsonRpcClientクラスを使わないとうまく通信できないかもしれません。
(RPC は、こんな感じ?
JsonRpcServerクラスの doCall() メソッド、JsonRpcClientクラスの call()メソッドあたりを
読み解いてみると Java 以外の Client を使って今回のサービスにアクセスできるかもしれません。
それもどうかな?
Service側と Client側がどちらも Java なら JMS でいいような気がします。
(RabbitMQ の JMS は有償だったような気が...
わたしは、Service側の実装が Java に依存するのには、なんら抵抗ないのですが
Client側が特定のプログラム言語(Java であっても)に依存してしまうのはイマイチだったりします。
ここは、なんとか考えないと。
参考URL
http://www.rabbitmq.com/getstarted.html
http://www.rabbitmq.com/api-guide.html

2016年5月1日日曜日

RabbitMQ で同時処理実行数を制限する

はじめに

長い時間かかる処理を実行している間、ユーザー(またはクライアント)の対応をどう考えるか?
処理が終わるまで待たせるか?
待たせないようにするために処理を切り出して非同期的に実行できるようにするか?
どっちにしても長い時間かかる処理だけあってサーバー負荷は高い。
サーバー負荷の高い処理が連打されたりすると大変なことになってしまう。
処理が終わるまで待たせないようにする場合はもちろん、待たせる場合であっても
複数ユーザが同時に実行することを考えると何かしらの対策は必要になる。
理想としては、同時に実行できる処理数を制限して超えた分は Queue のようなものを使って処理待ちさせる。
処理に空きができたところで処理待ちしている処理を順に処理していく。
なにか簡単にできる方法はないかと考えていたところ MessageQueue に至った訳です。
MessageQueue サーバーとして RabbitMQ を使って検証してみようと思います。
始める前に...
RabbitMQ がインストール済であること
RabbitMQ サーバーにログイン可能でリソースにアクセスできるユーザが作成済であること
(ユーザとパスワードを "user01" と "passwd" としています。
以上が前提になります。
このあたりはわたしも『yum の Local Repository に RabbitMQ をインストール 』にまとめていますので
よろしければ、こちらも合わせて、ご参考に。
ではでは!Getting Started!!

ちょっと整理

主な Class と interface を整理したいと思います。
(com.rabbitmq.client パッケージにあるものです。
ConnectionFactory
  • hostName、userName、password などを元に RabbitMQサーバーと接続する。
    接続情報を Connectionオブジェクトにして返す。
    
Connection
  • 接続情報を保持する。
    接続の Abort、Close を行う。
    接続中に発生するイベントのハンドリングを行う。
    Channelオブジェクトを生成する。
    など。
    
Channel
  • Exchange と Queue の宣言を行う。
    宣言した Exchange と Queue を Bind する。
    メッセージの送信処理と受信処理を行う。
    メッセージの受信処理に使う Consumer の割り当てを行う。
    その他 AMQP の各種操作メソッドを提供する。
    
Exchange
  • メッセージの送信先になる。
    メッセージ送信者(sender)はメッセージを直接 Queue に送信することはできない。
    送られてきたメッセージを RoutingKey を元に格納する Queue を決定する。
    
    以下4つのタイプがある。
    
    fanout
    無条件に Queue へメッセージを格納する。
    ※ ルーティングなし。(RoutingKey は無効)
    direct
    RoutingKey にマッチする Queue へ格納する。
    topic
    RoutingKey にパターンマッチする Queue へ格納する。
    RoutingKey は「.」ドット区切りの単語で指定する。
      例) "kern.critical"  "auth.info"  "www.google.com"
    ワイルドカード("*" "#")を含めたマッチング条件を指定できる。
      "*" は、1の単語にマッチする。
      "#" は、複数の単語にマッチする。
      例) "kern.*"  "*.info"  "#.com"
    headers
    メッセージ Header にマッチする Queue へ格納する。
    direct タイプで無名の Exchange がデフォルトの Exchange として必ず提供されている。
    Queue は、宣言と同時に暗黙的にこのデフォルトの Exchange と Queue名を RoutingKey にして
    Bind される。
    
Queue
  • メッセージを格納する。
    
Consumer
  • 割り当てられた Queue からメッセージを取り出して処理する。
    

環境

開発機

開発言語 Java 1.8.0
フレームワーク rabbitmq-java-client 3.6.1
ビルドツール Gradle 2.12
※gradle コマンドが実行できるように設定しておきます。
OS Windows 8.1


RabbitMQ Server

IP 192.168.10.110
ミドルウェア RabbitMQ server 3.6.1
  ユーザ  user01   パスワード  passwd
OS CentOS-7-x86_64-Minimal-1511.iso
VirtualBox 5.0.16


プロジェクト構成

作成するファイルは少ないのですがプロジェクトをApp と Worker に分けました。
というのも別の jar ファイルにしてそれぞれ実行した方が
プロセス間でメッセージのやりとりをしている様子をより実感できるかなと思ったからです。
C ドライブ直下に「studying-rabbitmq」フォルダを作成して各ファイルを以下のように配置します。
ファイルは、UTF-8 で保存します。
  • C:\studying-rabbitmq
    • app
      • src
        • main
          • java
            • studying
              • sender
                • App.java
    • worker
      • src
        • main
          • java
            • studying
              • receiver
                • MyConsumer.java
                • Worker.java
    • build.gradle
    • settings.gradle


準備

ビルドスクリプトを作成します。
プロジェクトは、App と Worker と2つあるのですがビルドスクリプトは1つにしています。
build.gradle
subprojects {
  apply plugin: 'java'
  apply plugin: 'eclipse'
  apply plugin: 'application'

  sourceCompatibility = '1.8'
  targetCompatibility = '1.8'
  [compileJava, compileTestJava]*.options*.encoding = 'UTF-8'
  
  mainClassName = (project.hasProperty('mainClass')) ? "$mainClass" : ''

  repositories {
    jcenter()
  }

  dependencies {
    compile 'com.rabbitmq:amqp-client:3.6.1'
    compile 'org.slf4j:slf4j-api:1.7.18'
    testCompile 'junit:junit:4.12'
  }
}
settings.gradle
rootProject.name = 'studying-rabbitmq'
include 'app', 'worker'
RabbitMQ のユーザが未作成の場合は、以下のようにして作成します。
作成したユーザがリソースにアクセスできるように権限も設定します。
rabbitmqctl add_user user01 passwd
rabbitmqctl set_permissions -p / user01 ".*" ".*" ".*"
ユーザとパスワードは user01 と passwd にしています。

Worker の作成

Worker側から作成していきます。
全体的な内容としては Queue に届いたメッセージを取り出してコンソールに出力するとった感じになります。
メッセージを出力した後は、3秒間スリープします。
(これを長い時間かかる処理と想定しています。
Connection が使用できるスレッドを3つに制限します。
Channel を3つ作成してそれぞれに Consumer を1つ割り当てます。
3つまで同時にメッセージを処理してそれを超えた分は Queue で待たされるといった感じになると思います。
まず、Consumer クラスを以下の内容で作成します。
worker/src/main/java/studying/receiver/MyConsumer.java
package studying.receiver;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class MyConsumer extends DefaultConsumer {
  
  private final String name;

  public MyConsumer(Channel channel, String name) {
    super(channel);
    
    this.name = name;
  }
  
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, 
      AMQP.BasicProperties properties, byte[] body) throws IOException {
    
    String message = new String(body, "UTF-8");
    long threadId = Thread.currentThread().getId();
    System.out.printf(" [thread-id:%02d] - %s Received '%s' %n", threadId, name, message);
    
    try {
      Thread.sleep(3 * 1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      System.out.printf("  [thread-id:%02d] - %s Done '%s' %n", threadId, name, message);
      this.getChannel().basicAck(envelope.getDeliveryTag(), false);
    }
  }
}
処理内容としては、
受信したメッセージと共にスレッドID、Consumer名をコンソールに出力します。
3秒後(長い時間かかる処理を想定)にもう一度スレッドID、Consumer名、メッセージをコンソールに出力します。

次にメッセージの受付を開始するクラスを作成します。
worker/src/main/java/studying/receiver/Worker.java
package studying.receiver;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Worker {

  public static void main(String[] args) {
    try {
      new Worker().start();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
  
  public void start() throws Exception {
    
    final long threadId = Thread.currentThread().getId();
    System.out.printf("[thread-id:%02d] start %n", threadId);
    
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.10.110");
    factory.setUsername("user01");
    factory.setPassword("passwd");

    // Worker の数を3つにする。
    ExecutorService es = Executors.newFixedThreadPool(3);
    Connection connection = factory.newConnection(es);

    boolean durable = true;
    boolean autoAck = false;
    String queueName    = "myQueue";
    String exchangeType = "direct";
    String exchangeName = "myExchange";
    String routingKey   = "myKey";


    // Worker A
    Channel channelA = connection.createChannel();
    channelA.exchangeDeclare(exchangeName, exchangeType, durable);
    channelA.queueDeclare(queueName, durable, false, false, null);
    channelA.queueBind(queueName, exchangeName, routingKey);
    
    MyConsumer myConsumerA = new MyConsumer(channelA, "ConsumerA");
    channelA.basicConsume(queueName, autoAck, myConsumerA);
    

    // Worker B
    Channel channelB = connection.createChannel();
    channelB.exchangeDeclare(exchangeName, exchangeType, durable);
    channelB.queueDeclare(queueName, durable, false, false, null);
    channelB.queueBind(queueName, exchangeName, routingKey);
    
    MyConsumer myConsumerB = new MyConsumer(channelB, "ConsumerB");
    channelB.basicConsume(queueName, autoAck, myConsumerB);
    

    // Worker C
    Channel channelC = connection.createChannel();
    channelC.exchangeDeclare(exchangeName, exchangeType, durable);
    channelC.queueDeclare(queueName, durable, false, false, null);
    channelC.queueBind(queueName, exchangeName, routingKey);

    MyConsumer myConsumerC = new MyConsumer(channelC, "ConsumerC");
    channelC.basicConsume(queueName, autoAck, myConsumerC);
  }  
}
処理内容としては、
送受信処理に使う Queue と Exchange (direct タイプ) を宣言しています。
Queue名と Exchange名は、"myQueue" と "myExchange" になります。
RoutingKey を "myKey" にして "myQueue" と "myExchange" を Bind しています。
1つの Queue に対して3つの Consumer を割り当ててメッセージの受信処理を開始しています。

Connection が使用できるスレッドの数
Workerクラスの以下の部分でスレッドの数を設定していますが
...
// Worker の数を3つにする。
ExecutorService es = Executors.newFixedThreadPool(3);
Connection connection = factory.newConnection(es);
...
以下のように設定しなかった場合、どうなるか?
...
Connection connection = factory.newConnection();
...
JavaVM が使用できるプロセッサの数 × 2 になるようです。
(com.rabbitmq.client.impl.ConsumerWorkService 参照)
Consumer の実装
Consumerインターフェイスのメソッドの実装コードを1つ1つ書いていくのは、なかなかなので
今回は DefaultConsumerクラスを継承して必要なメソッドをオーバーライドする方法をとりました。
別のやり方として以下のように QueueingConsumerクラスを使ってメッセージの処理をする方法もあるようです。
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, false, consumer);

while (true) {
  QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  String message = new String(delivery.getBody());
  
  channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

Worker側は、だいたいこんな感じです。


App の作成

メッセージを送信するクラスをを作成します。
以下の内容で作成します。
app/src/main/java/studying/sender/App.java
package studying.sender;

import java.util.ArrayList;
import java.util.List;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class App {

  public static void main(String[] args) {    
    try {
      List<String> tasks = new ArrayList<String>();
      tasks.add("Maurice");
      tasks.add("Verdine");
      tasks.add("Philip");
      
      tasks.add("Jessica");
      tasks.add("Larry");
      tasks.add("Johnny");
      
      tasks.add("Ralph");
      tasks.add("Al");
      tasks.add("Andrew");

      tasks.add("XXX");
      
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("192.168.10.110");
      factory.setUsername("user01");
      factory.setPassword("passwd");
      Connection connection = factory.newConnection();

      System.out.printf("send start %n");
      System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");
      tasks.parallelStream().forEach((String message) -> {        
        try {
          String exchangeName = "myExchange";
          String routingKey = "myKey";
          
          Channel channel = connection.createChannel();
          AMQP.BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
          channel.basicPublish(exchangeName, routingKey, props, message.getBytes());

          System.out.printf("[thread-id:%02d] '%s' %n", Thread.currentThread().getId(), message);
        } catch (Exception e) {
          e.printStackTrace();
        }
      });
      System.out.printf("send end %n");

      connection.close();
      
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}
Exchange と Queue の宣言と Bind は、Worker側で行うので
App側では、送信時に Exchange と RoutingKey を指定するだけにします。
送信する Exchange と RoutingKey は、"myRpcExchange" と "myRpcKey" にしています。
送信するまでに最低限 Exchange が存在していないとうまくいかないので
先に Worker側を起動することになります。

デフォルトの Exchange
今回、Exchange を宣言しましたが direct タイプの Exchange で問題なければ
デフォルトの Exchange を使ってもいいと思います。
Queue を宣言すると自動的にデフォルトの Exchange に Bind されるようです。
以下のようなコードが暗黙的に実行されているようなイメージだと思います。
...
String queueName    = "myQueue";
String exchangeName = "";
String routingKey   = queueName;

...
// 以下の Bind が暗黙的に行われていると思います。
channel.queueBind(queueName, exchangeName, routingKey);
...
rabbitmqctl list_bindings コマンドを使って Bind を確認できると思います。
rabbitmqctl list_bindings -p / source_name source_kind destination_name destination_kind routing_key
送信処理にデフォルトの Exchange を使うと以下のようになると思います。
...
String queueName    = "myQueue";
String exchangeName = "";
String routingKey   = queueName;
 
Channel channel = connection.createChannel();
AMQP.BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
channel.basicPublish(exchangeName, routingKey, props, message.getBytes());
...
受信処理にデフォルトの Exchange を使うと以下のようになると思います。
...
boolean durable = true;
boolean autoAck = false;
String queueName    = "myQueue";

Channel channel = connection.createChannel();
channel.queueDeclare(queueName, durable, false, false, null);
 
MyConsumer myConsumer = new MyConsumer(channel, "Consumer");
channel.basicConsume(queueName, autoAck, myConsumer);
...

実行する

本来、以下のようにして jar ファイルを作って実行したいところなのですが
cd /d C:\studying-rabbitmq
gradle :worker:clean :worker:distZip -PmainClass=studying.receiver.Worker -q
gradle :app:clean :app:distZip -PmainClass=studying.sender.App -q
gradle から実行してしまいます。
コマンドプロンプトを開いて、まず Worker側から起動します。
Worker

cd /d C:\studying-rabbitmq
gradle :worker:clean :worker:run -PmainClass=studying.receiver.Worker -q

[thread-id:01] start
Worker側が起動したのが確認できたら App側を実行します。
Worker側のコマンドプロンプトはそのままにしておいて
新たにコマンドプロンプトを開いて以下のコマンドで実行します。
App

cd /d C:\studying-rabbitmq
gradle :app:clean :app:run -PmainClass=studying.sender.App -q

send start
[thread-id:11] 'XXX'
...
しばらくすると Worker側のコマンドプロンプトに3秒間隔で以下のように表示されると思います。
Worker

[thread-id:01] start
 [thread-id:11] - ConsumerA Received 'XXX'
 [thread-id:12] - ConsumerB Received 'Larry'
 [thread-id:13] - ConsumerC Received 'Jessica'
3秒間後...
[thread-id:01] start
 [thread-id:11] - ConsumerA Received 'XXX'
 [thread-id:12] - ConsumerB Received 'Larry'
 [thread-id:13] - ConsumerC Received 'Jessica'
  [thread-id:11] - ConsumerA Done 'XXX'
 [thread-id:11] - ConsumerA Received 'Maurice'
  [thread-id:12] - ConsumerB Done 'Larry'
 [thread-id:12] - ConsumerB Received 'Verdine'
  [thread-id:13] - ConsumerC Done 'Jessica'
 [thread-id:13] - ConsumerC Received 'Andrew'
3秒間後...
[thread-id:01] start
 [thread-id:11] - ConsumerA Received 'XXX'
 [thread-id:12] - ConsumerB Received 'Larry'
 [thread-id:13] - ConsumerC Received 'Jessica'
  [thread-id:11] - ConsumerA Done 'XXX'
 [thread-id:11] - ConsumerA Received 'Maurice'
  [thread-id:12] - ConsumerB Done 'Larry'
 [thread-id:12] - ConsumerB Received 'Verdine'
  [thread-id:13] - ConsumerC Done 'Jessica'
 [thread-id:13] - ConsumerC Received 'Andrew'
  [thread-id:11] - ConsumerA Done 'Maurice'
 [thread-id:11] - ConsumerA Received 'Johnny'
  [thread-id:12] - ConsumerB Done 'Verdine'
 [thread-id:12] - ConsumerB Received 'Ralph'
  [thread-id:13] - ConsumerC Done 'Andrew'
 [thread-id:13] - ConsumerC Received 'Al'
3秒間後...
[thread-id:01] start
 [thread-id:11] - ConsumerA Received 'XXX'
 [thread-id:12] - ConsumerB Received 'Larry'
 [thread-id:13] - ConsumerC Received 'Jessica'
  [thread-id:11] - ConsumerA Done 'XXX'
 [thread-id:11] - ConsumerA Received 'Maurice'
  [thread-id:12] - ConsumerB Done 'Larry'
 [thread-id:12] - ConsumerB Received 'Verdine'
  [thread-id:13] - ConsumerC Done 'Jessica'
 [thread-id:13] - ConsumerC Received 'Andrew'
  [thread-id:11] - ConsumerA Done 'Maurice'
 [thread-id:11] - ConsumerA Received 'Johnny'
  [thread-id:12] - ConsumerB Done 'Verdine'
 [thread-id:12] - ConsumerB Received 'Ralph'
  [thread-id:13] - ConsumerC Done 'Andrew'
 [thread-id:13] - ConsumerC Received 'Al'
  [thread-id:11] - ConsumerA Done 'Johnny'
 [thread-id:11] - ConsumerA Received 'Philip'
  [thread-id:12] - ConsumerB Done 'Ralph'
  [thread-id:13] - ConsumerC Done 'Al'
3秒間後...
[thread-id:01] start
 [thread-id:11] - ConsumerA Received 'XXX'
 [thread-id:12] - ConsumerB Received 'Larry'
 [thread-id:13] - ConsumerC Received 'Jessica'
  [thread-id:11] - ConsumerA Done 'XXX'
 [thread-id:11] - ConsumerA Received 'Maurice'
  [thread-id:12] - ConsumerB Done 'Larry'
 [thread-id:12] - ConsumerB Received 'Verdine'
  [thread-id:13] - ConsumerC Done 'Jessica'
 [thread-id:13] - ConsumerC Received 'Andrew'
  [thread-id:11] - ConsumerA Done 'Maurice'
 [thread-id:11] - ConsumerA Received 'Johnny'
  [thread-id:12] - ConsumerB Done 'Verdine'
 [thread-id:12] - ConsumerB Received 'Ralph'
  [thread-id:13] - ConsumerC Done 'Andrew'
 [thread-id:13] - ConsumerC Received 'Al'
  [thread-id:11] - ConsumerA Done 'Johnny'
 [thread-id:11] - ConsumerA Received 'Philip'
  [thread-id:12] - ConsumerB Done 'Ralph'
  [thread-id:13] - ConsumerC Done 'Al'
  [thread-id:11] - ConsumerA Done 'Philip'
同時に3つずつメッセージが処理されているのが分かると思います。
また3つのスレッドが使い回されているのも確認できると思います。
さらに Consumer の処理が終わるまでスレッドが独占されている様子も分かると思います。 
この意味でスレッドセーフであるといえると思います。

おまけ

Connection が使用できるスレッドを1つに制限したらどうなるか?
ちょっと試してみます。
Worker側の処理で Connection を作成しているところを以下のように変更します。
worker/src/main/java/studying/receiver/Worker.java
...
// Worker の数を3つにする。
// ExecutorService es = Executors.newFixedThreadPool(3);
// Connection connection = factory.newConnection(es);

ExecutorService es = Executors.newFixedThreadPool(1);
Connection connection = factory.newConnection(es);
...
Worker側を再起動してから App側を実行すると
Worker側のコマンドプロンプトに3秒間隔で1つずつメッセージが以下のように表示されると思います。
Worker

[thread-id:01] start
 [thread-id:11] - ConsumerA Received 'Andrew'
  [thread-id:11] - ConsumerA Done 'Andrew'
 [thread-id:11] - ConsumerB Received 'Maurice'
  [thread-id:11] - ConsumerB Done 'Maurice'
 [thread-id:11] - ConsumerB Received 'Ralph'
  [thread-id:11] - ConsumerB Done 'Ralph'
 [thread-id:11] - ConsumerB Received 'Larry'
  [thread-id:11] - ConsumerB Done 'Larry'
 [thread-id:11] - ConsumerC Received 'Al'
  [thread-id:11] - ConsumerC Done 'Al'
 [thread-id:11] - ConsumerC Received 'Verdine'
  [thread-id:11] - ConsumerC Done 'Verdine'
 [thread-id:11] - ConsumerC Received 'Johnny'
  [thread-id:11] - ConsumerC Done 'Johnny'
 [thread-id:11] - ConsumerA Received 'Jessica'
  [thread-id:11] - ConsumerA Done 'Jessica'
 [thread-id:11] - ConsumerA Received 'XXX'
  [thread-id:11] - ConsumerA Done 'XXX'
 [thread-id:11] - ConsumerA Received 'Philip'
  [thread-id:11] - ConsumerA Done 'Philip'
3の Consumer が1つのスレッドを使い回して順番に処理してく様子が分かると思います。

今回のテーマから逸れるのですが
1つのメッセージを3つある Consumer のうちのどれか1つで処理するのでなく
1つのメッセージを3つある Consumer 3つで処理する方法を考えてみたいと思います。
"Publish/Subscribe" 方式でのメッセージ処理になると思います。
1つの Queue に3つの Consumer を割り当てていたのでラウンドロビンする形で処理されていました。
今度は、Consumer ごとにそれぞれ Queue を割り当てるようにします。
Worker側の処理で Queue の宣言しているところと
Consumer に割り当てているところを以下のように変更します。
また Connection が使用できるスレッド数を3つに戻しています。
worker/src/main/java/studying/receiver/Worker.java
...
// Worker の数を3つにする。
ExecutorService es = Executors.newFixedThreadPool(3);
Connection connection = factory.newConnection(es);
...

// Worker A
Channel channelA = connection.createChannel();
channelA.exchangeDeclare(exchangeName, exchangeType, durable);
channelA.queueDeclare(queueName + "A", durable, false, false, null);
channelA.queueBind(queueName + "A", exchangeName, routingKey);

MyConsumer myConsumerA = new MyConsumer(channelA, "ConsumerA");
channelA.basicConsume(queueName + "A", autoAck, myConsumerA);


// Worker B
Channel channelB = connection.createChannel();
channelB.exchangeDeclare(exchangeName, exchangeType, durable);
channelB.queueDeclare(queueName + "B", durable, false, false, null);
channelB.queueBind(queueName + "B", exchangeName, routingKey);

MyConsumer myConsumerB = new MyConsumer(channelB, "ConsumerB");
channelB.basicConsume(queueName + "B", autoAck, myConsumerB);


// Worker C
Channel channelC = connection.createChannel();
channelC.exchangeDeclare(exchangeName, exchangeType, durable);
channelC.queueDeclare(queueName + "C", durable, false, false, null);
channelC.queueBind(queueName + "C", exchangeName, routingKey);

MyConsumer myConsumerC = new MyConsumer(channelC, "ConsumerC");
channelC.basicConsume(queueName + "C", autoAck, myConsumerC);
...
Worker側を再起動してから App側を実行すると
Worker側のコマンドプロンプトに3秒間隔でメッセージが以下のように表示されると思います。
Worker

[thread-id:01] start
 [thread-id:11] - ConsumerA Received 'XXX'
 [thread-id:13] - ConsumerC Received 'XXX'
 [thread-id:12] - ConsumerB Received 'XXX'
  [thread-id:11] - ConsumerA Done 'XXX'
 [thread-id:11] - ConsumerA Received 'Al'
  [thread-id:13] - ConsumerC Done 'XXX'
  [thread-id:12] - ConsumerB Done 'XXX'
 [thread-id:13] - ConsumerC Received 'Al'
 [thread-id:12] - ConsumerB Received 'Al'
  [thread-id:11] - ConsumerA Done 'Al'
 [thread-id:11] - ConsumerA Received 'Johnny'
  [thread-id:13] - ConsumerC Done 'Al'
 [thread-id:13] - ConsumerC Received 'Johnny'
  [thread-id:12] - ConsumerB Done 'Al'
 [thread-id:12] - ConsumerB Received 'Johnny'
 ...
今度は3つの Consumer が送信されたメッセージをそれぞれ処理するのでログが3倍出力されると思います。

おわりに

MessageQueue サーバーを使って同時実行数を制御して処理できることが一応確認できたと思います。
しかし実際これでシステム開発しようとした場合、
冗長化であったり、トランザクションであったり、認証であったりと
考慮しないといけないことがまだまだあります。
今回は、3つの Worker を1つのプロセスで動かしていたので
プロセスが落ちると全部の Worker が止まっていまします。
簡単な冗長化であれば1プロセス1 Worker にして3つプロセスを起動すれば
同じように Worker 3つで処理できます。
この場合、プロセスが1つ落ちても Worker 全体が止まってしまうことはありません。
しかし Worker は動いていても MessageQueueサーバー自体が止まってしまうと
システム全体に影響が出ます。
ミドルウェアを導入することで開発は軽減されるものの管理が増える。
このあたりがミドルウェア導入の考えどころか?
参考URL
http://www.rabbitmq.com/getstarted.html
http://www.rabbitmq.com/api-guide.html