2016年5月16日月曜日

RabbitMQ の 冗長化を検証する / Consumer 編

はじめに

前回、『RabbitMQ の 冗長化を考える』でRabbitMQ の冗長化の設定を行いました。
今回は、その冗長化した RabbitMQ サーバーを実際、Failover させて、その様子を観察したいと思います。
冗長化を考える理由に
何かしらの障害が発生したとしても Publisher が送信したメッセージを
Consumer によって確実に処理させたいということが挙げられると思います。
全体を通してでは、
Publisher が送信したメッセージは、Consumer で確実に処理されるということになると思います。
これを Publisher と Consumer の間に Queue を置いて考えると
Publisher は、何かしらの障害が発生したとしてもメッセージを確実に Queue に届けるところまでが
責任範囲で、その後、Queue のメッセージが Consumer でどう処理されるかは、責任の範囲外になると思います。
一方、Consumer は、Queue にメッセージがどのような方法で届けられたかはということは、関係なく
Queue にあるメッセージを確実に処理することが責任範囲になると思います。
このように Publisher と Consumer の責任範囲は、それぞれ被らないものだと思います。
そう考えると Failover をそれぞれ別々の視点で観察できるとのではないかと思いました。
(RPC 方式のメッセージ処理では、また違ってくると思うのですが今回は、考えません。
そんな訳で今回は、Consumer側 の Failover の様子を観察してみたと思います。
JavaClient では、Cluster の Node を複数、ConnectionFactory に設定することができ、
ConnectionFactory は、その中から接続可能な Node を選択して Connection を生成しているようです。
このことが接続している Node が落ちてしまった場合に ConnectionFactory に設定されている
別の接続可能な Node で自動的に接続が復帰することを可能にしているようです。
Node に仮想IP などを設定する必要がないようです。
厳密に負荷分散を考える場合は、仮想IP などが必要かもしれません。
(接続可能な IP の選択方法が Collections.shuffle() になってそうなので...
検証には、2種類の Consumer を考えています。
1つは、適当にスリープさせながらメッセージ処理をする HappyPath での Consumer と
もう1つは、スリープさせずに連続的にメッセージ処理をする UnhappyPath での Consumer になります。
以下、3点を Failover 時に検証します。

  1. 接続可能な Node に切り替わること。
  2. メッセージが消えたしまわないこと。
  3. メッセージが2重に処理されないこと。
始める前に...
RabbitMQ がインストール済であること
RabbitMQ サーバーにログイン可能でリソースにアクセスできるユーザが作成済であること
(ユーザとパスワードを "user01" と "passwd" としています。
ManagementPlugin が有効になっていて CommandLineTool が使えること
ホスト名が設定されていること
以上が前提になります。
このあたりはわたしも『yum の Local Repository に RabbitMQ をインストール』と
『RabbitMQ の 冗長化を考える』にまとめていますので
よろしければ、こちらも合わせて、ご参考に。
前置きが長くなってしまいましたが
ではでは!Getting Started!!


環境

検証機

開発言語 Java 1.8.0
フレームワーク rabbitmq-java-client 3.6.1
ビルドツール Gradle 2.12
※gradle コマンドが実行できるように設定しておきます。
OS Windows 8.1


前回と同様に以下、2台のサーバー(vm-1st と vm-2nd)で冗長化します。
RabbitMQ Server (vm-1st)

ホスト名 vm-1st
IP 192.168.10.110
ミドルウェア RabbitMQ server 3.6.1
  ユーザ  user01   パスワード  passwd
OS CentOS-7-x86_64-Minimal-1511.iso
VirtualBox 5.0.16


RabbitMQ Server (vm-2nd)

ホスト名 vm-2nd
IP 192.168.10.120
ミドルウェア RabbitMQ server 3.6.1
  ユーザ  user01   パスワード  passwd
OS CentOS-7-x86_64-Minimal-1511.iso
VirtualBox 5.0.16



プロジェクト構成

C ドライブ直下に「studying-rabbitmq-failover」フォルダを作成して各ファイルを以下のように配置します。
ファイルは、UTF-8 で保存します。
  • C:\studying-rabbitmq-failover
    • src
      • main
        • java
          • studying
            • MyAbstractConsumer.java
            • MyCancelConsumer.java
            • MyHappyConsumer.java
            • MyUnhappyConsumer.java
    • build.gradle
    • publisher.gradle


準備

前回のおさらいも兼ねて Clustering の設定から始めたいと思います。
vm-2nd を vm-1st の Cluster に参加させます。
vm-2nd のターミナルから以下のコマンドを実行します。
vm-2nd

rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@vm-1st
rabbitmqctl start_app

Cluster の設定を確認します。
vm-1st と vm-2nd の両方のターミナルで以下のコマンドを実行します。
vm-1st

rabbitmqctl cluster_status

Cluster status of node 'rabbit@vm-1st' ...
[{nodes,[{disc,['rabbit@vm-1st','rabbit@vm-2nd']}]},
 {running_nodes,['rabbit@vm-2nd','rabbit@vm-1st']},
 {cluster_name,<<"rabbit@vm-1st">>},
...

vm-2nd

rabbitmqctl cluster_status

[{nodes,[{disc,['rabbit@vm-1st','rabbit@vm-2nd']}]},
 {running_nodes,['rabbit@vm-1st','rabbit@vm-2nd']},
 {cluster_name,<<"rabbit@vm-1st">>},
...
vm-1st と vm-2nd のどちらも "rabbit@vm-1st" という Cluster に参加していて起動中である。
問題ないようですね。

次に Queue と directタイプの Exchange と宣言します。
Queue名と Exchange名は、それぞれ "myQueue" と "myExchange" にします。
RoutingKey を "myKey" にして "myQueue" と "myExchange" を Bind します。
vm-1st が HomeNode になるようにします。
また、今回使用する Queue名と Exchange名で Queue と Exchange がすでに存在していると
ややこしくなるので念のため削除してから作成します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqadmin delete exchange name=myExchange
rabbitmqadmin delete queue name=myQueue

rabbitmqadmin declare queue name=myQueue
rabbitmqadmin declare exchange name=myExchange type=direct
rabbitmqadmin declare binding source=myExchange destination=myQueue routing_key=myKey

宣言した Queue と Exchange そして、Bind を確認します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-1st | 0        |
+---------+---------+---------------+----------+

rabbitmqadmin list exchanges name type durable

+--------------------+---------+---------+
|        name        |  type   | durable |
+--------------------+---------+---------+
|                    | direct  | True    |
...
| myExchange         | direct  | True    |
+--------------------+---------+---------+

rabbitmqadmin list bindings

+------------+-------------+-------------+
|   source   | destination | routing_key |
+------------+-------------+-------------+
|            | myQueue     | myQueue     |
| myExchange | myQueue     | myKey       |
+------------+-------------+-------------+
問題なく宣言されているのが確認できると思います。

最後に QueueMirroring の設定をします。
先ほど宣言した Queue を Cluster 内のすべての Node で Mirroring するようにします。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqctl set_policy myPolicy "^myQueue$" '{"ha-mode":"all", "ha-sync-mode":"automatic"}'

追加でもう1つ
検証機が Windows なので Windows の hosts ファイルにも vm-1st と vm-2nd を追加しておきます。
C:\Windows\System32\drivers\etc\hosts
...
192.168.10.110 vm-1st
192.168.10.120 vm-2nd

準備は、これで終わりです。


ビルドスクリプトの作成

以下のような内容でビルドスクリプトを作成します。
build.gradle
apply from: 'publisher.gradle'

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'application'

sourceCompatibility = '1.8'
targetCompatibility = '1.8'
[compileJava, compileTestJava]*.options*.encoding = 'UTF-8'

mainClassName = (project.hasProperty('mainClass')) ? "$mainClass" : ''

repositories {
  jcenter()
}

dependencies {
  compile 'com.rabbitmq:amqp-client:3.6.1'
}

Consumer側の冗長化の検証に RabbitMQ サーバーにメッセージを送信するスクリプトがあった方が
やりやすいと思ったので Java でメッセージ送信する Publisher側のプロジェクトも考えたのですが
今回は、以下のような gradle の task で済ませようと思います。
publisher.gradle
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

def messageCount = project.hasProperty('messageCount') ? "$messageCount" : 10000

// (1)
task happyPublisher << {
  println 'start'
  Connection connection = newConnection();
  Channel channel = connection.createChannel();

  def tasks = [
    'Maurice', 'Verdine', 'Philip', 'Jessica', 'Larry', 
    'Johnny', 'Ralph', 'Al', 'Andrew', 'XXX'
  ]

  tasks.each {
    byte[] body = it.getBytes();
    channel.basicPublish("myExchange", "myKey", MessageProperties.PERSISTENT_TEXT_PLAIN, body);
  }

  connection.close();
  println 'done'
}

// (2)
task unhappyPublisher << {
  println 'start'
  
  Connection connection = newConnection();
  Channel channel = connection.createChannel();
      
  for (int ct = 0; ct < messageCount; ct++) {
    byte[] body = String.format("test-%d", ct + 1).getBytes();
    channel.basicPublish("myExchange", "myKey", MessageProperties.PERSISTENT_TEXT_PLAIN, body);
  }

  connection.close();
  println 'done'
}

// RabbitMQ Server に接続して connection を生成する。
Connection newConnection() {  
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("vm-1st");
  factory.setUsername("user01");
  factory.setPassword("passwd");
  factory.newConnection();
}

buildscript {
  repositories { jcenter() }
  dependencies { classpath 'com.rabbitmq:amqp-client:3.6.1' }
}
スクリプトの内容は、以下のようになります。
(1) happyPublisher タスク
HappyPath の検証ケースで使用するメッセージを RabbitMQサーバーに送信します。
メッセージを10通送信します。
(2) unhappyPublisher タスク
UnhappyPath の検証ケースで使用するメッセージを RabbitMQサーバーに送信します。
メッセージを10,000通送信します。


RabbitMQ サーバー接続クラスを作成する

RabbitMQ サーバーに接続する処理などを共通化した以下のようなクラスを作成します。
検証で作成する Consumerクラスはこのクラスを継承する形にします。
src/main/java/studying/rabbitmq/MyAbstractConsumer.java
package studying.rabbitmq;

import java.io.IOException;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;

public abstract class MyAbstractConsumer {
  
  public void doAction() throws Exception {
 
    // RabbitMQ Server 接続情報
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("user01");
    factory.setPassword("passwd");


    // (1) 冗長化の設定をして RabbitMQ Server に接続する。
    Address[] addrs = new Address[]{new Address("vm-1st"), new Address("vm-2nd")};
    factory.setAutomaticRecoveryEnabled(true);
    AutorecoveringConnection connection = (AutorecoveringConnection)factory.newConnection(addrs);

    System.out.printf("listening... %s %n", connection.getAddress().getHostName());
    Thread.sleep(10 * 1000);

    
    // (2) Queue と Exchange の宣言と Bind をする。
    boolean durable = true;
    String queueName    = "myQueue";
    String exchangeType = "direct";
    String exchangeName = "myExchange";
    String routingKey   = "myKey";

    AutorecoveringChannel channel = (AutorecoveringChannel)connection.createChannel();
    channel.exchangeDeclare(exchangeName, exchangeType, durable);
    channel.queueDeclare(queueName, durable, false, false, null);
    channel.queueBind(queueName, exchangeName, routingKey);
    channel.basicQos(1);

    
    // (3) Connection と Channel に ShutdownListener を設定する。
    connection.addShutdownListener(cause -> System.out.printf("Connection Shutdown... %n"));
    channel.addShutdownListener(cause -> System.out.printf("Channel    Shutdown... %n"));


    // (4) Connection と Channel に RecoveryListener を設定する。
    connection.addRecoveryListener((Recoverable recoverable) -> {
      AutorecoveringConnection conn = (AutorecoveringConnection)recoverable;
      String hostName = conn.getAddress().getHostName();
      System.out.printf("Connection Recovered !!! %s %n", hostName);
    });
    channel.addRecoveryListener((Recoverable recoverable) -> {
      AutorecoveringChannel chan = (AutorecoveringChannel)recoverable;
      String hostName = chan.getConnection().getAddress().getHostName();
      System.out.printf("Channel    Recovered !!! %s %n", hostName);
    });

 
    // Consumer を設定する。
    this.assignConsumerFor(channel, queueName);
  }

  // (5)
  protected abstract void assignConsumerFor(Channel channel, String queue) throws IOException;
}
処理内容は、以下のようになります。
(1) 冗長化の設定をして RabbitMQ Server に接続する。
ClusterNode のホスト名(または、IP)を複数指定して Connection を作成します。
AutomaticRecovery も忘れずに有効化しておきます。
(2) Queue と Exchange の宣言と Bind をする。
準備の時に行っているので不要だと思いますが
ここでも Queue と Exchange の宣言とその Bind をしています。
念のため basicQos でメッセージの取り出しを1回につき1つにしています。
(3) Connection と Channel に ShutdownListener を設定する。
Connection と Channel の Shutdown時、コンソールにログを出力します。
(4) Connection と Channel に RecoveryListener を設定する。
Connection と Channel の Recovery時、コンソールにログを出力します。
(5) assignConsumerFor() メソッド
サブクラスはこのメソッドを実装して Consumer を Channel に割り当てます。


Happy Path を検証する

初めは、おおよそ想定通りにいってくれるであろうパターンから検証してみたいと思います。
適当にスリープさせながら10通のメッセージを処理します。
以下のような内容で Consumerクラスを作成します。
src/main/java/studying/rabbitmq/MyHappyConsumer.java
package studying.rabbitmq;

import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class MyHappyConsumer extends MyAbstractConsumer {

  @Override
  protected void assignConsumerFor(Channel channel, String queue) throws IOException {

    Consumer consumer = new DefaultConsumer(channel) {

      protected int ack;
      
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, 
          AMQP.BasicProperties properties, byte[] body) throws IOException {
        
        String message = new String(body, "UTF-8");
        String hostName = channel.getConnection().getAddress().getHostName();
        
        System.out.printf(" Received '%-7s'", message);
        System.out.printf(" - %s", hostName);
        System.out.printf(" deliveryTag:%s",  envelope.getDeliveryTag());
        System.out.printf(" redeliver:%s %n", envelope.isRedeliver());

        try {
          // 3秒 sleep する。
          Thread.sleep(3 * 1000);
          
          // ack を返す。
          channel.basicAck(envelope.getDeliveryTag(), false);
          this.ack++;
          System.out.printf("  Done '%s' ack:%d %n", message, this.ack);

        } catch (InterruptedException e) {
          System.out.printf("  Interrupted '%s' %n", message);
          e.printStackTrace();
        } catch (Exception e) {
          System.out.printf("  Error '%s' %n", message);
          throw e;
        }
      }
    };

    boolean autoAck = false;
    channel.basicConsume(queue, autoAck, consumer);
  }

  
  public static void main(String[] args) {
    try {
      new MyHappyConsumer().doAction();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}
DefaultConsumerクラスの handleDelivery() メソッドをオーバーライドしてメッセージ処理をしています。
処理内容としては、

  1. 受信したメッセージをコンソールに表示する。
  2. 3秒スリープさせてタイミングを整える。
  3. RabbitMQ サーバーに ack を返す。
     RabbitMQ サーバーに返した ack をカウントする。
     ただし Exception が発生した場合は、ack を返さないようにします。
  4. 最後にもう一度メッセージをコンソールに表示する。

のようになります。

想定されるケースとしては、
HomeNode に接続している状態で Failover が発生する場合と
HomeNode 以外に接続している状態で Failover が発生する場合があると思います。
とりあえず、HomeNode に接続しているケースから見ていきたいと思います。
検証の前に Queue のメッセージをクリアしてから
HomeNode が vm-1st になっているのと QueueMirroring の同期を確認します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqadmin purge queue name=myQueue

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-1st | 0        |
+---------+---------+---------------+----------+

rabbitmqctl sync_queue myQueue

rabbitmqctl list_queues name slave_pids synchronised_slave_pids

Listing queues ...
myQueue [<rabbit@vm-2nd.3.297.0>]       [<rabbit@vm-2nd.3.297.0>]
問題がなければ、検証を始めます。

Consumer を HomeNode の vm-1st に接続させてメッセージを送信します。
適当なところで vm-1st の RabbitMQ サーバーを停止させます。
その時、どうように Failover するか観察したいと思います。
コマンドプロンプトを開いて gradle から Consumer を起動します。
Consumer

cd /d C:\studying-rabbitmq-failover
gradle clean run -PmainClass=studying.rabbitmq.MyHappyConsumer -q

listening... vm-1st
と表示され vm-1st に接続できたらメッセージを送信します。
vm-2nd に接続されたら
どの ClusterNode に接続されるかは、ランダムのようです。
(よく分かりませんが...
今回の場合、vm-1st と vm-2nd の2台なので
確率論的にみて3回くらいやり直せば希望する方に接続される筈なんですが...
新たにコマンドプロンプトを開いて gradle から 以下のようにメッセージを送信します。
Publisher

cd /d C:\studying-rabbitmq-failover
gradle happyPublisher -q

start
done

しばらくすると Consumer のコマンドプロンプトからメッセージが処理されていく様子が確認できると思います。
Consumer

...
listening... vm-1st
 Received 'Maurice' - vm-1st deliveryTag:1 redeliver:false
  Done 'Maurice' ack:1
 Received 'Verdine' - vm-1st deliveryTag:2 redeliver:false
  Done 'Verdine' ack:2
 Received 'Philip ' - vm-1st deliveryTag:3 redeliver:false

ここで vm-1st の RabbitMQ サーバーを停止させます。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

systemctl stop rabbitmq-server.service

そうすると Consumer のコマンドプロンプトから Failover する様子が確認できると思います。
Consumer

...
listening... vm-1st
 Received 'Maurice' - vm-1st deliveryTag:1 redeliver:false
  Done 'Maurice' ack:1
 Received 'Verdine' - vm-1st deliveryTag:2 redeliver:false
  Done 'Verdine' ack:2
 Received 'Philip ' - vm-1st deliveryTag:3 redeliver:false
  Done 'Philip' ack:3
 Received 'Jessica' - vm-1st deliveryTag:4 redeliver:false
Channel    Shutdown...
  Error 'Jessica'
com.rabbitmq.client.impl.DefaultExceptionHandler: ...
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; ...
        at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
        at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChan ...
        ...
Channel    Recovered !!! vm-2nd
Connection Recovered !!! vm-2nd
Connection Shutdown...
 Received 'Jessica' - vm-2nd deliveryTag:5 redeliver:true
  Done 'Jessica' ack:4
 Received 'Larry  ' - vm-2nd deliveryTag:6 redeliver:true
  Done 'Larry' ack:5
 Received 'Johnny ' - vm-2nd deliveryTag:7 redeliver:true
  Done 'Johnny' ack:6
 Received 'Ralph  ' - vm-2nd deliveryTag:8 redeliver:true
  Done 'Ralph' ack:7
 Received 'Al     ' - vm-2nd deliveryTag:9 redeliver:true
  Done 'Al' ack:8
 Received 'Andrew ' - vm-2nd deliveryTag:10 redeliver:true
  Done 'Andrew' ack:9
 Received 'XXX    ' - vm-2nd deliveryTag:11 redeliver:true
  Done 'XXX' ack:10
検証してみると

  1. "Jessica" のメッセージ処理をしている時に vm-1st の停止が発生していて
     HomeNode を vm-2nd に Failover できているようです。
  2. 最後のメッセージ "XXX" まで処理できているようです。
  3. Queue にあったメッセージの数と ack の数が一致しているので
     2重にメッセージが処理されていることもないようです。

概ね想定通りうまくいっているのではないかと思います。
Failover のタイミングを拾えているのでここに Rollback 処理などを挟み込めると思います。
また、Failover 時に Queue に残っていたメッセージは、redeliver フラグが "true" になっているので
Re-Queue 扱いになっているのも分かると思います。


次に HomeNode でない Node に接続しているケースを見てみたいと思います。
恐らく今、HomeNode は、vm-2nd になっていると思います。
vm-1st の RabbitMQ サーバーを再起動させて確認します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

systemctl start rabbitmq-server.service

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-2nd | 0        |
+---------+---------+---------------+----------+

HomeNode を vm-1st に戻したいので vm-2nd の RabbitMQ サーバーを再起動させます。
vm-2nd のターミナルから以下のコマンドを実行します。
vm-2nd

systemctl stop rabbitmq-server.service
systemctl start rabbitmq-server.service

先ほどと同様に Queue のメッセージをクリアしてから
HomeNode が vm-1st に戻ってきているのと QueueMirroring の同期を確認します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqadmin purge queue name=myQueue

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-1st | 0        |
+---------+---------+---------------+----------+

rabbitmqctl sync_queue myQueue

rabbitmqctl list_queues name slave_pids synchronised_slave_pids

Listing queues ...
myQueue [<rabbit@vm-2nd.3.295.0>]       [<rabbit@vm-2nd.3.295.0>]
問題がなければ、検証を始めます。

Consumer を vm-2nd に接続させてメッセージを送信します。
適当なところで vm-1st の RabbitMQ サーバーを停止させて Failover する様子を観察したいと思います。
コマンドプロンプトを開いて gradle から Consumer を起動します。
Consumer

cd /d C:\studying-rabbitmq-failover
gradle clean run -PmainClass=studying.rabbitmq.MyHappyConsumer -q

listening... vm-2nd

vm-2nd に接続されたらメッセージを送信します。
新たにコマンドプロンプトを開いて gradle から以下のようにメッセージを送信します。
Publisher

cd /d C:\studying-rabbitmq-failover
gradle happyPublisher -q

start
done

Consumer のコマンドプロンプトからメッセージ処理が始まるのが確認できたら
Consumer

...
listening... vm-2nd
 Received 'Maurice' - vm-2nd deliveryTag:1 redeliver:false
  Done 'Maurice' ack:1
 Received 'Verdine' - vm-2nd deliveryTag:2 redeliver:false
  Done 'Verdine' ack:2
 Received 'Philip ' - vm-2nd deliveryTag:3 redeliver:false

vm-2nd の RabbitMQ サーバーを停止して Failover させます。
vm-2nd のターミナルから以下のコマンドを実行します。
vm-2nd

systemctl stop rabbitmq-server.service

Consumer のコマンドプロンプトを見ると Failover する様子が確認できると思います。
Consumer

...
listening... vm-2nd
 Received 'Maurice' - vm-2nd deliveryTag:1 redeliver:false
  Done 'Maurice' ack:1
 Received 'Verdine' - vm-2nd deliveryTag:2 redeliver:false
  Done 'Verdine' ack:2
 Received 'Philip ' - vm-2nd deliveryTag:3 redeliver:false
  Done 'Philip' ack:3
 Received 'Jessica' - vm-2nd deliveryTag:4 redeliver:false
Channel    Shutdown...
  Error 'Jessica'
com.rabbitmq.client.impl.DefaultExceptionHandler: ...
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; ...
        at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
        at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChan ...
        ...
Channel    Recovered !!! vm-1st
Connection Recovered !!! vm-1st
Connection Shutdown...
 Received 'Jessica' - vm-1st deliveryTag:5 redeliver:true
  Done 'Jessica' ack:4
 Received 'Larry  ' - vm-1st deliveryTag:6 redeliver:false
  Done 'Larry' ack:5
 Received 'Johnny ' - vm-1st deliveryTag:7 redeliver:false
  Done 'Johnny' ack:6
 Received 'Ralph  ' - vm-1st deliveryTag:8 redeliver:false
  Done 'Ralph' ack:7
 Received 'Al     ' - vm-1st deliveryTag:9 redeliver:false
  Done 'Al' ack:8
 Received 'Andrew ' - vm-1st deliveryTag:10 redeliver:false
  Done 'Andrew' ack:9
 Received 'XXX    ' - vm-1st deliveryTag:11 redeliver:false
  Done 'XXX' ack:10
検証してみると

  1. "Jessica" のメッセージ処理をしている時に vm-1st の停止が発生していて
     HomeNode を vm-2nd に Failover できているようです。
  2. 最後のメッセージ "XXX" まで処理できているようです。
  3. Queue にあったメッセージの数と ack の数が一致しているので
     2重にメッセージが処理されていることもないようです。

こちらもうまくいっていると思います。
HomeNode に接続しているケースとの違いとして
Failover 時に処理していたメッセージは、redeliver フラグが "true" なので
Re-Queue 扱いになっているようですが、
それ以外の Queue に残っていたメッセージの redeliver フラグは "false" のままのようです。


Unhappy Path を検証する

不幸なタイミングで処理される可能性が高いパターンを検証してみたいと思います。
内容的には、先ほどの HappyPath とほぼ同じなのですが
今度は、スリープさせずに連続してメッセージ処理をするとどうなるか試してみたいと思います。
メッセージは、10,000通処理します。
以下のような内容で Consumerクラスを作成します。
src/main/java/studying/rabbitmq/MyUnhappyConsumer.java
package studying.rabbitmq;

import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class MyUnhappyConsumer extends MyAbstractConsumer {

  public final static int MESSAGE_COUNT = 10000;
  
  @Override
  protected void assignConsumerFor(Channel channel, String queue) throws IOException {
    
    int count = MyUnhappyConsumer.MESSAGE_COUNT;

    Consumer consumer = new DefaultConsumer(channel) {
      
      protected int ack;
      
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, 
          AMQP.BasicProperties properties, byte[] body) throws IOException {
        
        String message = new String(body, "UTF-8");
        long deliveryTag = envelope.getDeliveryTag();

        try {
          // ack を返す。
          boolean multiple = false;
          channel.basicAck(deliveryTag, multiple);
          this.ack++;
          
          if ((deliveryTag == 1) || (deliveryTag > (count - 2)) || 
              ((deliveryTag % (count / 5)) == 0)) {

            System.out.printf(" Done '%s'", message);
            System.out.printf(" - %s", channel.getConnection().getAddress().getHostName());
            System.out.printf(" deliveryTag:%s", deliveryTag);
            System.out.printf(" ack:%d %n", this.ack);
          }
        } catch (Exception e) {
          System.out.printf(" Error '%s' - deliveryTag:%s %n", message, deliveryTag);
          throw e;
        }
      }
    };

    boolean autoAck = false;
    channel.basicConsume(queue, autoAck, consumer);
  }

  
  public static void main(String[] args) {
    try {      
      new MyUnhappyConsumer().doAction();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}
DefaultConsumerクラスの handleDelivery() メソッドをオーバーライドしてメッセージ処理をしています。
処理内容としては、

  1. 受信したメッセージをコンソールに表示する。
     今回は、メッセージを10,000通処理するので最初の1通目と最後の3通、そして途中5回だけにします。
  2. RabbitMQ サーバーに ack を返す。
     RabbitMQ サーバーに返した ack をカウントする。
     ただし Exception が発生した場合は、ack を返さないようにします。

のようになります。


こちらも HomeNode に接続している状態で Failover が発生する場合と
HomeNode 以外に接続している状態で Failover が発生する場合が考えられるのですが
HomeNode に接続している状態で Failover が発生する場合で検証したいと思います。
検証の前に Queue のメッセージをクリアしてから
HomeNode が vm-1st になっているのと QueueMirroring の同期を確認します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqadmin purge queue name=myQueue

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-1st | 0        |
+---------+---------+---------------+----------+

rabbitmqctl sync_queue myQueue

rabbitmqctl list_queues name slave_pids synchronised_slave_pids

Listing queues ...
myQueue [<rabbit@vm-2nd.1.295.0>]       [<rabbit@vm-2nd.1.295.0>]
問題がなければ、検証を始めます。

今回は、先にメッセージを送信してしまいます。
コマンドプロンプトを開いて gradle から 以下のようにメッセージを送信します。
Publisher

cd /d C:\studying-rabbitmq-failover
gradle unhappyPublisher -q

start
done

送信したメッセージが Queue に届いているか確認します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-1st | 10000    |
+---------+---------+---------------+----------+

Queue にメッセージが届いているのが確認できたら
コマンドプロンプトを開いて gradle から以下のように Consumer を vm-1st に接続させて起動します。
Consumer

cd /d C:\studying-rabbitmq-failover
gradle clean run -PmainClass=studying.rabbitmq.MyUnhappyConsumer -q

listening... vm-1st
vm-2nd に接続されたら
メッセージが処理されるまで10秒空けていますので
その間に「CTRL」+「C」キーで処理を止めてもう一度、Consumer を起動し直します。

しばらくして Consumer のコマンドプロンプトからメッセージ処理が始まるのが確認できたら
Consumer

...
listening... vm-1st
 Done 'test-1' - vm-1st deliveryTag:1 ack:1
 Done 'test-2000' - vm-1st deliveryTag:2000 ack:2000
 Done 'test-4000' - vm-1st deliveryTag:4000 ack:4000

vm-1st の RabbitMQ サーバーを停止して Failover させます。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

systemctl stop rabbitmq-server.service

Consumer のコマンドプロンプトを見ると
以下のように Failover する様子が確認できると思います。
Consumer

...
listening... vm-1st
 Done 'test-1' - vm-1st deliveryTag:1 ack:1
 Done 'test-2000' - vm-1st deliveryTag:2000 ack:2000
 Done 'test-4000' - vm-1st deliveryTag:4000 ack:4000
Channel    Shutdown...
Channel    Recovered !!! vm-2nd
Connection Recovered !!! vm-2nd
Connection Shutdown...
 Done 'test-5999' - vm-2nd deliveryTag:6000 ack:6000
 Done 'test-7999' - vm-2nd deliveryTag:8000 ack:8000
 Done 'test-9998' - vm-2nd deliveryTag:9999 ack:9999
 Done 'test-9999' - vm-2nd deliveryTag:10000 ack:10000
 Done 'test-10000' - vm-2nd deliveryTag:10001 ack:10001
検証してみると

  1. 4,000通目から6,000通目のメッセージ処理をしている時に vm-1st の停止が発生していて
     HomeNode を vm-2nd に Failover できているようです。
  2. 最後のメッセージ "test-10000" まで処理できているようです。
  3. Queue にあったメッセージの数より ack の数が1つ多くなっています。
     2重にメッセージが処理されてしまっているようです。
メッセージの2重処理が発生しているのに加えて Failover のタイミングも拾えていないようです。
Exception が catch できていないということは、channel.basicAck() メソッドで
ack を返すところまでは成功しているけれども ack が 届く前に RabbitMQ サーバーが
停止したのだろうと考えられます。
HomeNode 以外に接続している場合に Failover したら
こちらの場合も同様にメッセージの2重処理が発生するようです。
Consumer

listening... vm-2nd
 Done 'test-1' - vm-2nd deliveryTag:1 ack:1
 Done 'test-2000' - vm-2nd deliveryTag:2000 ack:2000
 Done 'test-4000' - vm-2nd deliveryTag:4000 ack:4000
Channel    Shutdown...
Channel    Recovered !!! vm-1st
Connection Recovered !!! vm-1st
Connection Shutdown...
 Done 'test-5999' - vm-1st deliveryTag:6000 ack:6000
 Done 'test-7999' - vm-1st deliveryTag:8000 ack:8000
 Done 'test-9998' - vm-1st deliveryTag:9999 ack:9999
 Done 'test-9999' - vm-1st deliveryTag:10000 ack:10000
 Done 'test-10000' - vm-1st deliveryTag:10001 ack:10001


Consumer Cancel Notification

Consumer に割り当てられている Queue が削除されたり、その他の理由で Queue にアクセスできなくなった時に
RabbitMQ サーバー から Consumer Cancel Notification というものが送られる仕組みがあるようです。
このキャンセル通知を Consumer が受け取ると以降のメッセージ処理を中止するようです。
Failover 時も、このキャンセル通知が送信されるようなので、ちょっと試してみようと思います。
わたしが確認できたところでは、
Failover 時、以下の条件をすべて満たす場合に、このキャンセル通知を受け取れるようです。

   QueueMirroring している。
   HomeNode 以外に接続している。
   HomeNode が原因で Failover が発生した。
   Consumer 引数の x-cancel-on-ha-failover が "true" になっている。
上記のケースで x-cancel-on-ha-failover を設定していないとどうなるか
先ほどの HappyPath の検証を使って見てみます。

検証の前に Queue のメッセージをクリアして
HomeNode が vm-1st になっているのと QueueMirroring の同期を確認します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqadmin purge queue name=myQueue

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-1st | 0        |
+---------+---------+---------------+----------+

rabbitmqctl sync_queue myQueue

rabbitmqctl list_queues name slave_pids synchronised_slave_pids

Listing queues ...
myQueue [<rabbit@vm-2nd.3.297.0>]       [<rabbit@vm-2nd.3.297.0>]

Consumer を HomeNode でない vm-2nd に接続します。
コマンドプロンプトを開いて gradle から Consumer を起動します。
Consumer

cd /d C:\studying-rabbitmq-failover
gradle clean run -PmainClass=studying.rabbitmq.MyHappyConsumer -q

listening... vm-2nd

vm-2nd に接続できたのを確認したらメッセージを送信します。
新たにコマンドプロンプトを開いて gradle から以下のようにメッセージを送信します。
Publisher

cd /d C:\studying-rabbitmq-failover
gradle happyPublisher -q

start
done

Consumer のコマンドプロンプトからメッセージ処理が始まるのが確認できたら
Consumer

...
listening... vm-2nd
 Received 'Maurice' - vm-2nd deliveryTag:1 redeliver:false
  Done 'Maurice' ack:1
 Received 'Verdine' - vm-2nd deliveryTag:2 redeliver:false
  Done 'Verdine' ack:2
 Received 'Philip ' - vm-2nd deliveryTag:3 redeliver:false

HomeNode の vm-1st の RabbitMQ サーバーを停止して Failover させます。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

systemctl stop rabbitmq-server.service

Consumer のコマンドプロンプトでは以下のようにメッセージ処理が続いていると思います。
Consumer

...
listening... vm-2nd
 Received 'Maurice' - vm-2nd deliveryTag:1 redeliver:false
  Done 'Maurice' ack:1
 Received 'Verdine' - vm-2nd deliveryTag:2 redeliver:false
  Done 'Verdine' ack:2
 Received 'Philip ' - vm-2nd deliveryTag:3 redeliver:false
  Done 'Philip' ack:3
 Received 'Jessica' - vm-2nd deliveryTag:4 redeliver:false
  Done 'Jessica' ack:4
 Received 'Jessica' - vm-2nd deliveryTag:5 redeliver:true
  Done 'Jessica' ack:5
 Received 'Larry  ' - vm-2nd deliveryTag:6 redeliver:true
  Done 'Larry' ack:6
 Received 'Johnny ' - vm-2nd deliveryTag:7 redeliver:true
  Done 'Johnny' ack:7
 Received 'Ralph  ' - vm-2nd deliveryTag:8 redeliver:true
  Done 'Ralph' ack:8
 Received 'Al     ' - vm-2nd deliveryTag:9 redeliver:true
  Done 'Al' ack:9
 Received 'Andrew ' - vm-2nd deliveryTag:10 redeliver:true
  Done 'Andrew' ack:10
 Received 'XXX    ' - vm-2nd deliveryTag:11 redeliver:true
  Done 'XXX' ack:11
見てみると Queue のメッセージの数より ack の数が1つ多くなっています。
メッセージの2重処理が発生してしまっているようです。
もう少し見てみると "Jessica" のメッセージ処理が2回実行されているようです。
1回目の redeliver は、"false" ですが2回目の "true" になっています。
"Jessica" の2回目のメッセージ処理以降、メッセージの redeliver が "true" になっています。
恐らく vm-2nd で "Jessica" のメッセージを処理している間に vm-1st が Failover したのだと思います。
vm-1st は、"Jessica" のメッセージ処理の ack を受け取っていないので Re-Queue したのだと思います。
これがメッセージの2重処理の原因ではないでしょうか?
HappyPath と思っていましたが、実は、うまくいかないパターンもあるようです。
このような場合に Consumer Cancel Notification を利用して Consumer のメッセージ処理を止めることが
できるようです。
先ほどの HappyPath の検証で使用した MyHappyConsumerクラスを変更して
以下のような MyCancelConsumerクラスを作成します。
src/main/java/studying/rabbitmq/MyCancelConsumer.java
package studying.rabbitmq;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class MyCancelConsumer extends MyAbstractConsumer {
  
  @Override
  protected void assignConsumerFor(Channel channel, String queue) throws IOException {

    Consumer consumer = new DefaultConsumer(channel) {
 
      protected int ack;
       
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, 
          AMQP.BasicProperties properties, byte[] body) throws IOException {
         
        String message = new String(body, "UTF-8");
        String hostName = channel.getConnection().getAddress().getHostName();
         
        System.out.printf(" Received '%-7s'", message);
        System.out.printf(" - %s", hostName);
        System.out.printf(" deliveryTag:%s",  envelope.getDeliveryTag());
        System.out.printf(" redeliver:%s %n", envelope.isRedeliver());
 
        try {
          // 3秒 sleep する。
          Thread.sleep(3 * 1000);
           
          // ack を返す。
          channel.basicAck(envelope.getDeliveryTag(), false);
          this.ack++;
          System.out.printf("  Done '%s' ack:%d %n", message, this.ack);
 
        } catch (InterruptedException e) {
          System.out.printf("  Interrupted '%s' %n", message);
          e.printStackTrace();
        } catch (Exception e) {
          System.out.printf("  Error '%s' %n", message);
          throw e;
        }
      }
        
      @Override
      public void handleCancel(String consumerTag) throws IOException {
        System.out.printf(" Canceled !!!!! %n");
      }
    };
 
    // x-cancel-on-ha-failover を true にして Consumer を Channel に割り当てる。
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-cancel-on-ha-failover", true);
    boolean autoAck = false;
    channel.basicConsume(queue, autoAck, args, consumer);
  }


  public static void main(String[] args) {
    try {      
      new MyCancelConsumer().doAction();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}
MyHappyConsumerクラスからの変更点としては、

  1. DefaultConsumerクラスの handleDelivery() メソッドをオーバーライドしてメッセージを処理しています。
  2. Channel に Consumer を割り当てる時に x-cancel-on-ha-failover引数を "true" にしています。

になります。

再度、検証したいと思います。
検証の前に Queue のメッセージをクリアして
HomeNode が vm-1st になっているのと QueueMirroring の同期を確認します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

rabbitmqadmin purge queue name=myQueue

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-1st | 0        |
+---------+---------+---------------+----------+

rabbitmqctl sync_queue myQueue

rabbitmqctl list_queues name slave_pids synchronised_slave_pids

Listing queues ...
myQueue [<rabbit@vm-2nd.1.297.0>]       [<rabbit@vm-2nd.1.297.0>]

Consumer を HomeNode でない vm-2nd に接続します。
コマンドプロンプトを開いて gradle から Consumer を起動します。
Consumer

cd /d C:\studying-rabbitmq-failover
gradle clean run -PmainClass=studying.rabbitmq.MyCancelConsumer -q

listening... vm-2nd

vm-2nd に接続できたのを確認したらメッセージを送信します。
新たにコマンドプロンプトを開いて gradle から以下のようにメッセージを送信します。
Publisher

cd /d C:\studying-rabbitmq-failover
gradle happyPublisher -q

start
done

Consumer のコマンドプロンプトからメッセージ処理が始まるのが確認できたら
Consumer

...
listening... vm-2nd
 Received 'Maurice' - vm-2nd deliveryTag:1 redeliver:false
  Done 'Maurice' ack:1
 Received 'Verdine' - vm-2nd deliveryTag:2 redeliver:false
  Done 'Verdine' ack:2
 Received 'Philip ' - vm-2nd deliveryTag:3 redeliver:false

vm-1st の RabbitMQ サーバーを停止させて Consumer Cancel Notification が受け取れるか確認します。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

systemctl stop rabbitmq-server.service

以下のように Consumer のコマンドプロンプトから Consumer Cancel Notification を受け取って
Consumer が停止しているのが確認できると思います。
Consumer

...
listening... vm-2nd
 Received 'Maurice' - vm-2nd deliveryTag:1 redeliver:false
  Done 'Maurice' ack:1
 Received 'Verdine' - vm-2nd deliveryTag:2 redeliver:false
  Done 'Verdine' ack:2
 Received 'Philip ' - vm-2nd deliveryTag:3 redeliver:false
  Done 'Philip' ack:3
 Received 'Jessica' - vm-2nd deliveryTag:4 redeliver:false
  Done 'Jessica' ack:4
 Canceled !!!!!

vm-1st の RabbitMQ サーバーを再起動させて Queue のメッセージを見ています。
vm-1st のターミナルから以下のコマンドを実行します。
vm-1st

systemctl start rabbitmq-server.service

rabbitmqadmin list queues name durable node messages

+---------+---------+---------------+----------+
|  name   | durable |     node      | messages |
+---------+---------+---------------+----------+
| myQueue | True    | rabbit@vm-2nd | 7        |
+---------+---------+---------------+----------+
見てみると Consumer は、ack を 4つカウントしているのですが
Queue のメッセージは、7通あります。
恐らく Failover が発生したため Consumer が最後に処理していたメッセージの ack が受け取れずに
Re-Queue されたのだと思います。
Consumer が止まらずにメッセージ処理を続けてたらメッセージの2重処理が発生していると思います。
それをどう回避するか、どう対応するかは、アプリの実装に委ねられるのだと思います。


おわりに

Failover は、ライブラリーやミドルウェアの設定でなんとかなりそうですが、
メッセージの2重処理については、アプリ側での対応になるようです。
このあたりは、アプリの要件によって変わってくるので仕方ないと思います。
(2重にメッセージが処理されてもサービス的に影響がない場合もあると思います。
いづれにせよ Consumer は、メッセージが2重に処理されることも
考慮した作りにしておく必要がありそうです。
また、2重処理されたメッセージを捕捉できるようなログも出力しておきたいところです。
参考URL
http://www.rabbitmq.com/clustering.html
http://www.rabbitmq.com/ha.html
http://www.rabbitmq.com/consumer-cancel.html
http://www.rabbitmq.com/api-guide.html