はじめに
前回、『RabbitMQ の 冗長化を考える』でRabbitMQ の冗長化の設定を行いました。 今回は、その冗長化した RabbitMQ サーバーを実際、Failover させて、その様子を観察したいと思います。
冗長化を考える理由に 何かしらの障害が発生したとしても Publisher が送信したメッセージを Consumer によって確実に処理させたいということが挙げられると思います。
全体を通してでは、 Publisher が送信したメッセージは、Consumer で確実に処理されるということになると思います。
これを Publisher と Consumer の間に Queue を置いて考えると Publisher は、何かしらの障害が発生したとしてもメッセージを確実に Queue に届けるところまでが 責任範囲で、その後、Queue のメッセージが Consumer でどう処理されるかは、責任の範囲外になると思います。 一方、Consumer は、Queue にメッセージがどのような方法で届けられたかはということは、関係なく Queue にあるメッセージを確実に処理することが責任範囲になると思います。
このように Publisher と Consumer の責任範囲は、それぞれ被らないものだと思います。 そう考えると Failover をそれぞれ別々の視点で観察できるとのではないかと思いました。 (RPC 方式のメッセージ処理では、また違ってくると思うのですが今回は、考えません。 そんな訳で今回は、Consumer側 の Failover の様子を観察してみたと思います。
JavaClient では、Cluster の Node を複数、ConnectionFactory に設定することができ、 ConnectionFactory は、その中から接続可能な Node を選択して Connection を生成しているようです。 このことが接続している Node が落ちてしまった場合に ConnectionFactory に設定されている 別の接続可能な Node で自動的に接続が復帰することを可能にしているようです。 Node に仮想IP などを設定する必要がないようです。 厳密に負荷分散を考える場合は、仮想IP などが必要かもしれません。 (接続可能な IP の選択方法が Collections.shuffle() になってそうなので...
検証には、2種類の Consumer を考えています。 1つは、適当にスリープさせながらメッセージ処理をする HappyPath での Consumer と もう1つは、スリープさせずに連続的にメッセージ処理をする UnhappyPath での Consumer になります。
以下、3点を Failover 時に検証します。 1. 接続可能な Node に切り替わること。 2. メッセージが消えたしまわないこと。 3. メッセージが2重に処理されないこと。
始める前に... RabbitMQ がインストール済であること RabbitMQ サーバーにログイン可能でリソースにアクセスできるユーザが作成済であること (ユーザとパスワードを "user01" と "passwd" としています。 ManagementPlugin が有効になっていて CommandLineTool が使えること ホスト名が設定されていること 以上が前提になります。 このあたりはわたしも『yum の Local Repository に RabbitMQ をインストール』と 『RabbitMQ の 冗長化を考える』にまとめていますので よろしければ、こちらも合わせて、ご参考に。
前置きが長くなってしまいましたが ではでは!Getting Started!!
環境
検証機開発言語 | Java 1.8.0 |
---|---|
フレームワーク | rabbitmq-java-client 3.6.1 |
ビルドツール | Gradle 2.12 ※gradle コマンドが実行できるように設定しておきます。 |
OS | Windows 8.1 |
前回と同様に以下、2台のサーバー(vm-1st と vm-2nd)で冗長化します。RabbitMQ Server (vm-1st)
ホスト名 | vm-1st |
---|---|
IP | 192.168.10.110 |
ミドルウェア |
RabbitMQ server 3.6.1
ユーザ : user01 パスワード : passwd |
OS | CentOS-7-x86_64-Minimal-1511.iso VirtualBox 5.0.16 |
RabbitMQ Server (vm-2nd)
ホスト名 | vm-2nd |
---|---|
IP | 192.168.10.120 |
ミドルウェア |
RabbitMQ server 3.6.1
ユーザ : user01 パスワード : passwd |
OS | CentOS-7-x86_64-Minimal-1511.iso VirtualBox 5.0.16 |
プロジェクト構成
C ドライブ直下に「studying-rabbitmq-failover」フォルダを作成して各ファイルを以下のように配置します。 ファイルは、UTF-8 で保存します。
準備
前回のおさらいも兼ねて Clustering の設定から始めたいと思います。 vm-2nd を vm-1st の Cluster に参加させます。
vm-2nd のターミナルから以下のコマンドを実行します。vm-2nd
rabbitmqctl stop_app rabbitmqctl join_cluster rabbit@vm-1st rabbitmqctl start_app
Cluster の設定を確認します。 vm-1st と vm-2nd の両方のターミナルで以下のコマンドを実行します。vm-1st
rabbitmqctl cluster_status Cluster status of node 'rabbit@vm-1st' ... [{nodes,[{disc,['rabbit@vm-1st','rabbit@vm-2nd']}]}, {running_nodes,['rabbit@vm-2nd','rabbit@vm-1st']}, {cluster_name,<<"rabbit@vm-1st">>}, ...
vm-2nd
rabbitmqctl cluster_status [{nodes,[{disc,['rabbit@vm-1st','rabbit@vm-2nd']}]}, {running_nodes,['rabbit@vm-1st','rabbit@vm-2nd']}, {cluster_name,<<"rabbit@vm-1st">>}, ...
vm-1st と vm-2nd のどちらも "rabbit@vm-1st" という Cluster に参加していて起動中である。 問題ないようですね。
次に Queue と directタイプの Exchange と宣言します。 Queue名と Exchange名は、それぞれ "myQueue" と "myExchange" にします。 RoutingKey を "myKey" にして "myQueue" と "myExchange" を Bind します。 vm-1st が HomeNode になるようにします。
また、今回使用する Queue名と Exchange名で Queue と Exchange がすでに存在していると ややこしくなるので念のため削除してから作成します。
vm-1st のターミナルから以下のコマンドを実行します。vm-1st
rabbitmqadmin delete exchange name=myExchange rabbitmqadmin delete queue name=myQueue
rabbitmqadmin declare queue name=myQueue rabbitmqadmin declare exchange name=myExchange type=direct rabbitmqadmin declare binding source=myExchange destination=myQueue routing_key=myKey
宣言した Queue と Exchange そして、Bind を確認します。 vm-1st のターミナルから以下のコマンドを実行します。vm-1st
rabbitmqadmin list queues name durable node messages +---------+---------+---------------+----------+ | name | durable | node | messages | +---------+---------+---------------+----------+ | myQueue | True | rabbit@vm-1st | 0 | +---------+---------+---------------+----------+
rabbitmqadmin list exchanges name type durable +--------------------+---------+---------+ | name | type | durable | +--------------------+---------+---------+ | | direct | True | ... | myExchange | direct | True | +--------------------+---------+---------+
rabbitmqadmin list bindings +------------+-------------+-------------+ | source | destination | routing_key | +------------+-------------+-------------+ | | myQueue | myQueue | | myExchange | myQueue | myKey | +------------+-------------+-------------+
問題なく宣言されているのが確認できると思います。
最後に QueueMirroring の設定をします。 先ほど宣言した Queue を Cluster 内のすべての Node で Mirroring するようにします。
vm-1st のターミナルから以下のコマンドを実行します。vm-1st
rabbitmqctl set_policy myPolicy "^myQueue$" '{"ha-mode":"all", "ha-sync-mode":"automatic"}'
追加でもう1つ 検証機が Windows なので Windows の hosts ファイルにも vm-1st と vm-2nd を追加しておきます。C:\Windows\System32\drivers\etc\hosts
... 192.168.10.110 vm-1st 192.168.10.120 vm-2nd
準備は、これで終わりです。
ビルドスクリプトの作成
以下のような内容でビルドスクリプトを作成します。build.gradle
apply from: 'publisher.gradle' apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'application' sourceCompatibility = '1.8' targetCompatibility = '1.8' [compileJava, compileTestJava]*.options*.encoding = 'UTF-8' mainClassName = (project.hasProperty('mainClass')) ? "$mainClass" : '' repositories { jcenter() } dependencies { compile 'com.rabbitmq:amqp-client:3.6.1' }
Consumer側の冗長化の検証に RabbitMQ サーバーにメッセージを送信するスクリプトがあった方が やりやすいと思ったので Java でメッセージ送信する Publisher側のプロジェクトも考えたのですが 今回は、以下のような gradle の task で済ませようと思います。publisher.gradle
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; def messageCount = project.hasProperty('messageCount') ? "$messageCount" : 10000 // (1) task happyPublisher << { println 'start' Connection connection = newConnection(); Channel channel = connection.createChannel(); def tasks = [ 'Maurice', 'Verdine', 'Philip', 'Jessica', 'Larry', 'Johnny', 'Ralph', 'Al', 'Andrew', 'XXX' ] tasks.each { byte[] body = it.getBytes(); channel.basicPublish("myExchange", "myKey", MessageProperties.PERSISTENT_TEXT_PLAIN, body); } connection.close(); println 'done' } // (2) task unhappyPublisher << { println 'start' Connection connection = newConnection(); Channel channel = connection.createChannel(); for (int ct = 0; ct < messageCount; ct++) { byte[] body = String.format("test-%d", ct + 1).getBytes(); channel.basicPublish("myExchange", "myKey", MessageProperties.PERSISTENT_TEXT_PLAIN, body); } connection.close(); println 'done' } // RabbitMQ Server に接続して connection を生成する。 Connection newConnection() { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("vm-1st"); factory.setUsername("user01"); factory.setPassword("passwd"); factory.newConnection(); } buildscript { repositories { jcenter() } dependencies { classpath 'com.rabbitmq:amqp-client:3.6.1' } }
スクリプトの内容は、以下のようになります。
- (1) happyPublisher タスク
-
HappyPath の検証ケースで使用するメッセージを RabbitMQサーバーに送信します。 メッセージを10通送信します。
- (2) unhappyPublisher タスク
-
UnhappyPath の検証ケースで使用するメッセージを RabbitMQサーバーに送信します。 メッセージを10,000通送信します。
RabbitMQ サーバー接続クラスを作成する
RabbitMQ サーバーに接続する処理などを共通化した以下のようなクラスを作成します。 検証で作成する Consumerクラスはこのクラスを継承する形にします。src/main/java/studying/rabbitmq/MyAbstractConsumer.java
package studying.rabbitmq; import java.io.IOException; import com.rabbitmq.client.Address; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Recoverable; import com.rabbitmq.client.impl.recovery.AutorecoveringChannel; import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; public abstract class MyAbstractConsumer { public void doAction() throws Exception { // RabbitMQ Server 接続情報 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("user01"); factory.setPassword("passwd"); // (1) 冗長化の設定をして RabbitMQ Server に接続する。 Address[] addrs = new Address[]{new Address("vm-1st"), new Address("vm-2nd")}; factory.setAutomaticRecoveryEnabled(true); AutorecoveringConnection connection = (AutorecoveringConnection)factory.newConnection(addrs); System.out.printf("listening... %s %n", connection.getAddress().getHostName()); Thread.sleep(10 * 1000); // (2) Queue と Exchange の宣言と Bind をする。 boolean durable = true; String queueName = "myQueue"; String exchangeType = "direct"; String exchangeName = "myExchange"; String routingKey = "myKey"; AutorecoveringChannel channel = (AutorecoveringChannel)connection.createChannel(); channel.exchangeDeclare(exchangeName, exchangeType, durable); channel.queueDeclare(queueName, durable, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); channel.basicQos(1); // (3) Connection と Channel に ShutdownListener を設定する。 connection.addShutdownListener(cause -> System.out.printf("Connection Shutdown... %n")); channel.addShutdownListener(cause -> System.out.printf("Channel Shutdown... %n")); // (4) Connection と Channel に RecoveryListener を設定する。 connection.addRecoveryListener((Recoverable recoverable) -> { AutorecoveringConnection conn = (AutorecoveringConnection)recoverable; String hostName = conn.getAddress().getHostName(); System.out.printf("Connection Recovered !!! %s %n", hostName); }); channel.addRecoveryListener((Recoverable recoverable) -> { AutorecoveringChannel chan = (AutorecoveringChannel)recoverable; String hostName = chan.getConnection().getAddress().getHostName(); System.out.printf("Channel Recovered !!! %s %n", hostName); }); // Consumer を設定する。 this.assignConsumerFor(channel, queueName); } // (5) protected abstract void assignConsumerFor(Channel channel, String queue) throws IOException; }
処理内容は、以下のようになります。
- (1) 冗長化の設定をして RabbitMQ Server に接続する。
ClusterNode のホスト名(または、IP)を複数指定して Connection を作成します。 AutomaticRecovery も忘れずに有効化しておきます。
- (2) Queue と Exchange の宣言と Bind をする。
準備の時に行っているので不要だと思いますが ここでも Queue と Exchange の宣言とその Bind をしています。 念のため basicQos でメッセージの取り出しを1回につき1つにしています。
- (3) Connection と Channel に ShutdownListener を設定する。
Connection と Channel の Shutdown時、コンソールにログを出力します。
- (4) Connection と Channel に RecoveryListener を設定する。
Connection と Channel の Recovery時、コンソールにログを出力します。
- (5) assignConsumerFor() メソッド
サブクラスはこのメソッドを実装して Consumer を Channel に割り当てます。
Happy Path を検証する
初めは、おおよそ想定通りにいってくれるであろうパターンから検証してみたいと思います。 適当にスリープさせながら10通のメッセージを処理します。 以下のような内容で Consumerクラスを作成します。src/main/java/studying/rabbitmq/MyHappyConsumer.java
package studying.rabbitmq; import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class MyHappyConsumer extends MyAbstractConsumer { @Override protected void assignConsumerFor(Channel channel, String queue) throws IOException { Consumer consumer = new DefaultConsumer(channel) { protected int ack; @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); String hostName = channel.getConnection().getAddress().getHostName(); System.out.printf(" Received '%-7s'", message); System.out.printf(" - %s", hostName); System.out.printf(" deliveryTag:%s", envelope.getDeliveryTag()); System.out.printf(" redeliver:%s %n", envelope.isRedeliver()); try { // 3秒 sleep する。 Thread.sleep(3 * 1000); // ack を返す。 channel.basicAck(envelope.getDeliveryTag(), false); this.ack++; System.out.printf(" Done '%s' ack:%d %n", message, this.ack); } catch (InterruptedException e) { System.out.printf(" Interrupted '%s' %n", message); e.printStackTrace(); } catch (Exception e) { System.out.printf(" Error '%s' %n", message); throw e; } } }; boolean autoAck = false; channel.basicConsume(queue, autoAck, consumer); } public static void main(String[] args) { try { new MyHappyConsumer().doAction(); } catch (Exception e) { e.printStackTrace(); } } }
DefaultConsumerクラスの handleDelivery() メソッドをオーバーライドしてメッセージ処理をしています。
処理内容としては、 1. 受信したメッセージをコンソールに表示する。 2. 3秒スリープさせてタイミングを整える。 3. RabbitMQ サーバーに ack を返す。 RabbitMQ サーバーに返した ack をカウントする。 ただし Exception が発生した場合は、ack を返さないようにします。 4. 最後にもう一度メッセージをコンソールに表示する。 のようになります。
想定されるケースとしては、 HomeNode に接続している状態で Failover が発生する場合と HomeNode 以外に接続している状態で Failover が発生する場合があると思います。 とりあえず、HomeNode に接続しているケースから見ていきたいと思います。
検証の前に Queue のメッセージをクリアしてから HomeNode が vm-1st になっているのと QueueMirroring の同期を確認します。 vm-1st のターミナルから以下のコマンドを実行します。vm-1st
rabbitmqadmin purge queue name=myQueue
rabbitmqadmin list queues name durable node messages +---------+---------+---------------+----------+ | name | durable | node | messages | +---------+---------+---------------+----------+ | myQueue | True | rabbit@vm-1st | 0 | +---------+---------+---------------+----------+
rabbitmqctl sync_queue myQueue
rabbitmqctl list_queues name slave_pids synchronised_slave_pids Listing queues ... myQueue [<rabbit@vm-2nd.3.297.0>] [<rabbit@vm-2nd.3.297.0>]
問題がなければ、検証を始めます。
Consumer を HomeNode の vm-1st に接続させてメッセージを送信します。 適当なところで vm-1st の RabbitMQ サーバーを停止させます。 その時、どうように Failover するか観察したいと思います。
コマンドプロンプトを開いて gradle から Consumer を起動します。Consumer
cd /d C:\studying-rabbitmq-failover gradle clean run -PmainClass=studying.rabbitmq.MyHappyConsumer -q listening... vm-1st
と表示され vm-1st に接続できたらメッセージを送信します。
vm-2nd に接続されたら
どの ClusterNode に接続されるかは、ランダムのようです。 (よく分かりませんが... 今回の場合、vm-1st と vm-2nd の2台なので 確率論的にみて3回くらいやり直せば希望する方に接続される筈なんですが...
新たにコマンドプロンプトを開いて gradle から 以下のようにメッセージを送信します。Publisher
cd /d C:\studying-rabbitmq-failover gradle happyPublisher -q start done
しばらくすると Consumer のコマンドプロンプトからメッセージが処理されていく様子が確認できると思います。Consumer
... listening... vm-1st Received 'Maurice' - vm-1st deliveryTag:1 redeliver:false Done 'Maurice' ack:1 Received 'Verdine' - vm-1st deliveryTag:2 redeliver:false Done 'Verdine' ack:2 Received 'Philip ' - vm-1st deliveryTag:3 redeliver:false
ここで vm-1st の RabbitMQ サーバーを停止させます。 vm-1st のターミナルから以下のコマンドを実行します。vm-1st
systemctl stop rabbitmq-server.service
そうすると Consumer のコマンドプロンプトから Failover する様子が確認できると思います。Consumer
... listening... vm-1st Received 'Maurice' - vm-1st deliveryTag:1 redeliver:false Done 'Maurice' ack:1 Received 'Verdine' - vm-1st deliveryTag:2 redeliver:false Done 'Verdine' ack:2 Received 'Philip ' - vm-1st deliveryTag:3 redeliver:false Done 'Philip' ack:3 Received 'Jessica' - vm-1st deliveryTag:4 redeliver:false Channel Shutdown... Error 'Jessica' com.rabbitmq.client.impl.DefaultExceptionHandler: ... com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; ... at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195) at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChan ... ... Channel Recovered !!! vm-2nd Connection Recovered !!! vm-2nd Connection Shutdown... Received 'Jessica' - vm-2nd deliveryTag:5 redeliver:true Done 'Jessica' ack:4 Received 'Larry ' - vm-2nd deliveryTag:6 redeliver:true Done 'Larry' ack:5 Received 'Johnny ' - vm-2nd deliveryTag:7 redeliver:true Done 'Johnny' ack:6 Received 'Ralph ' - vm-2nd deliveryTag:8 redeliver:true Done 'Ralph' ack:7 Received 'Al ' - vm-2nd deliveryTag:9 redeliver:true Done 'Al' ack:8 Received 'Andrew ' - vm-2nd deliveryTag:10 redeliver:true Done 'Andrew' ack:9 Received 'XXX ' - vm-2nd deliveryTag:11 redeliver:true Done 'XXX' ack:10
検証してみると 1. "Jessica" のメッセージ処理をしている時に vm-1st の停止が発生していて HomeNode を vm-2nd に Failover できているようです。 2. 最後のメッセージ "XXX" まで処理できているようです。 3. Queue にあったメッセージの数と ack の数が一致しているので 2重にメッセージが処理されていることもないようです。 概ね想定通りうまくいっているのではないかと思います。
Failover のタイミングを拾えているのでここに Rollback 処理などを挟み込めると思います。 また、Failover 時に Queue に残っていたメッセージは、redeliver フラグが "true" になっているので Re-Queue 扱いになっているのも分かると思います。
次に HomeNode でない Node に接続しているケースを見てみたいと思います。 恐らく今、HomeNode は、vm-2nd になっていると思います。 vm-1st の RabbitMQ サーバーを再起動させて確認します。 vm-1st のターミナルから以下のコマンドを実行します。vm-1st
systemctl start rabbitmq-server.service
rabbitmqadmin list queues name durable node messages +---------+---------+---------------+----------+ | name | durable | node | messages | +---------+---------+---------------+----------+ | myQueue | True | rabbit@vm-2nd | 0 | +---------+---------+---------------+----------+
HomeNode を vm-1st に戻したいので vm-2nd の RabbitMQ サーバーを再起動させます。 vm-2nd のターミナルから以下のコマンドを実行します。vm-2nd
systemctl stop rabbitmq-server.service systemctl start rabbitmq-server.service
先ほどと同様に Queue のメッセージをクリアしてから HomeNode が vm-1st に戻ってきているのと QueueMirroring の同期を確認します。 vm-1st のターミナルから以下のコマンドを実行します。vm-1st
rabbitmqadmin purge queue name=myQueue
rabbitmqadmin list queues name durable node messages +---------+---------+---------------+----------+ | name | durable | node | messages | +---------+---------+---------------+----------+ | myQueue | True | rabbit@vm-1st | 0 | +---------+---------+---------------+----------+
rabbitmqctl sync_queue myQueue
rabbitmqctl list_queues name slave_pids synchronised_slave_pids Listing queues ... myQueue [<rabbit@vm-2nd.3.295.0>] [<rabbit@vm-2nd.3.295.0>]
問題がなければ、検証を始めます。
Consumer を vm-2nd に接続させてメッセージを送信します。 適当なところで vm-1st の RabbitMQ サーバーを停止させて Failover する様子を観察したいと思います。
コマンドプロンプトを開いて gradle から Consumer を起動します。Consumer
cd /d C:\studying-rabbitmq-failover gradle clean run -PmainClass=studying.rabbitmq.MyHappyConsumer -q listening... vm-2nd
vm-2nd に接続されたらメッセージを送信します。 新たにコマンドプロンプトを開いて gradle から以下のようにメッセージを送信します。Publisher
cd /d C:\studying-rabbitmq-failover gradle happyPublisher -q start done
Consumer のコマンドプロンプトからメッセージ処理が始まるのが確認できたらConsumer
... listening... vm-2nd Received 'Maurice' - vm-2nd deliveryTag:1 redeliver:false Done 'Maurice' ack:1 Received 'Verdine' - vm-2nd deliveryTag:2 redeliver:false Done 'Verdine' ack:2 Received 'Philip ' - vm-2nd deliveryTag:3 redeliver:false
vm-2nd の RabbitMQ サーバーを停止して Failover させます。 vm-2nd のターミナルから以下のコマンドを実行します。vm-2nd
systemctl stop rabbitmq-server.service
Consumer のコマンドプロンプトを見ると Failover する様子が確認できると思います。Consumer
... listening... vm-2nd Received 'Maurice' - vm-2nd deliveryTag:1 redeliver:false Done 'Maurice' ack:1 Received 'Verdine' - vm-2nd deliveryTag:2 redeliver:false Done 'Verdine' ack:2 Received 'Philip ' - vm-2nd deliveryTag:3 redeliver:false Done 'Philip' ack:3 Received 'Jessica' - vm-2nd deliveryTag:4 redeliver:false Channel Shutdown... Error 'Jessica' com.rabbitmq.client.impl.DefaultExceptionHandler: ... com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; ... at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195) at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChan ... ... Channel Recovered !!! vm-1st Connection Recovered !!! vm-1st Connection Shutdown... Received 'Jessica' - vm-1st deliveryTag:5 redeliver:true Done 'Jessica' ack:4 Received 'Larry ' - vm-1st deliveryTag:6 redeliver:false Done 'Larry' ack:5 Received 'Johnny ' - vm-1st deliveryTag:7 redeliver:false Done 'Johnny' ack:6 Received 'Ralph ' - vm-1st deliveryTag:8 redeliver:false Done 'Ralph' ack:7 Received 'Al ' - vm-1st deliveryTag:9 redeliver:false Done 'Al' ack:8 Received 'Andrew ' - vm-1st deliveryTag:10 redeliver:false Done 'Andrew' ack:9 Received 'XXX ' - vm-1st deliveryTag:11 redeliver:false Done 'XXX' ack:10
検証してみると 1. "Jessica" のメッセージ処理をしている時に vm-1st の停止が発生していて HomeNode を vm-2nd に Failover できているようです。 2. 最後のメッセージ "XXX" まで処理できているようです。 3. Queue にあったメッセージの数と ack の数が一致しているので 2重にメッセージが処理されていることもないようです。 こちらもうまくいっていると思います。
HomeNode に接続しているケースとの違いとして Failover 時に処理していたメッセージは、redeliver フラグが "true" なので Re-Queue 扱いになっているようですが、 それ以外の Queue に残っていたメッセージの redeliver フラグは "false" のままのようです。
Unhappy Path を検証する
不幸なタイミングで処理される可能性が高いパターンを検証してみたいと思います。 内容的には、先ほどの HappyPath とほぼ同じなのですが 今度は、スリープさせずに連続してメッセージ処理をするとどうなるか試してみたいと思います。 メッセージは、10,000通処理します。 以下のような内容で Consumerクラスを作成します。src/main/java/studying/rabbitmq/MyUnhappyConsumer.java
package studying.rabbitmq; import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class MyUnhappyConsumer extends MyAbstractConsumer { public final static int MESSAGE_COUNT = 10000; @Override protected void assignConsumerFor(Channel channel, String queue) throws IOException { int count = MyUnhappyConsumer.MESSAGE_COUNT; Consumer consumer = new DefaultConsumer(channel) { protected int ack; @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); long deliveryTag = envelope.getDeliveryTag(); try { // ack を返す。 boolean multiple = false; channel.basicAck(deliveryTag, multiple); this.ack++; if ((deliveryTag == 1) || (deliveryTag > (count - 2)) || ((deliveryTag % (count / 5)) == 0)) { System.out.printf(" Done '%s'", message); System.out.printf(" - %s", channel.getConnection().getAddress().getHostName()); System.out.printf(" deliveryTag:%s", deliveryTag); System.out.printf(" ack:%d %n", this.ack); } } catch (Exception e) { System.out.printf(" Error '%s' - deliveryTag:%s %n", message, deliveryTag); throw e; } } }; boolean autoAck = false; channel.basicConsume(queue, autoAck, consumer); } public static void main(String[] args) { try { new MyUnhappyConsumer().doAction(); } catch (Exception e) { e.printStackTrace(); } } }
DefaultConsumerクラスの handleDelivery() メソッドをオーバーライドしてメッセージ処理をしています。
処理内容としては、 1. 受信したメッセージをコンソールに表示する。 今回は、メッセージを10,000通処理するので最初の1通目と最後の3通、そして途中5回だけにします。 2. RabbitMQ サーバーに ack を返す。 RabbitMQ サーバーに返した ack をカウントする。 ただし Exception が発生した場合は、ack を返さないようにします。 のようになります。
こちらも HomeNode に接続している状態で Failover が発生する場合と HomeNode 以外に接続している状態で Failover が発生する場合が考えられるのですが HomeNode に接続している状態で Failover が発生する場合で検証したいと思います。
検証の前に Queue のメッセージをクリアしてから HomeNode が vm-1st になっているのと QueueMirroring の同期を確認します。 vm-1st のターミナルから以下のコマンドを実行します。vm-1st
rabbitmqadmin purge queue name=myQueue
rabbitmqadmin list queues name durable node messages +---------+---------+---------------+----------+ | name | durable | node | messages | +---------+---------+---------------+----------+ | myQueue | True | rabbit@vm-1st | 0 | +---------+---------+---------------+----------+
rabbitmqctl sync_queue myQueue
rabbitmqctl list_queues name slave_pids synchronised_slave_pids Listing queues ... myQueue [<rabbit@vm-2nd.1.295.0>] [<rabbit@vm-2nd.1.295.0>]
問題がなければ、検証を始めます。
今回は、先にメッセージを送信してしまいます。 コマンドプロンプトを開いて gradle から 以下のようにメッセージを送信します。Publisher
cd /d C:\studying-rabbitmq-failover gradle unhappyPublisher -q start done
送信したメッセージが Queue に届いているか確認します。 vm-1st のターミナルから以下のコマンドを実行します。vm-1st
rabbitmqadmin list queues name durable node messages +---------+---------+---------------+----------+ | name | durable | node | messages | +---------+---------+---------------+----------+ | myQueue | True | rabbit@vm-1st | 10000 | +---------+---------+---------------+----------+
Queue にメッセージが届いているのが確認できたら コマンドプロンプトを開いて gradle から以下のように Consumer を vm-1st に接続させて起動します。Consumer
cd /d C:\studying-rabbitmq-failover gradle clean run -PmainClass=studying.rabbitmq.MyUnhappyConsumer -q listening... vm-1st
vm-2nd に接続されたら
メッセージが処理されるまで10秒空けていますので その間に「CTRL」+「C」キーで処理を止めてもう一度、Consumer を起動し直します。
しばらくして Consumer のコマンドプロンプトからメッセージ処理が始まるのが確認できたらConsumer
... listening... vm-1st Done 'test-1' - vm-1st deliveryTag:1 ack:1 Done 'test-2000' - vm-1st deliveryTag:2000 ack:2000 Done 'test-4000' - vm-1st deliveryTag:4000 ack:4000
vm-1st の RabbitMQ サーバーを停止して Failover させます。 vm-1st のターミナルから以下のコマンドを実行します。vm-1st
systemctl stop rabbitmq-server.service
Consumer のコマンドプロンプトを見ると 以下のように Failover する様子が確認できると思います。Consumer
... listening... vm-1st Done 'test-1' - vm-1st deliveryTag:1 ack:1 Done 'test-2000' - vm-1st deliveryTag:2000 ack:2000 Done 'test-4000' - vm-1st deliveryTag:4000 ack:4000 Channel Shutdown... Channel Recovered !!! vm-2nd Connection Recovered !!! vm-2nd Connection Shutdown... Done 'test-5999' - vm-2nd deliveryTag:6000 ack:6000 Done 'test-7999' - vm-2nd deliveryTag:8000 ack:8000 Done 'test-9998' - vm-2nd deliveryTag:9999 ack:9999 Done 'test-9999' - vm-2nd deliveryTag:10000 ack:10000 Done 'test-10000' - vm-2nd deliveryTag:10001 ack:10001
検証してみると 1. 4,000通目から6,000通目のメッセージ処理をしている時に vm-1st の停止が発生していて HomeNode を vm-2nd に Failover できているようです。 2. 最後のメッセージ "test-10000" まで処理できているようです。 3. Queue にあったメッセージの数より ack の数が1つ多くなっています。 2重にメッセージが処理されてしまっているようです。
メッセージの2重処理が発生しているのに加えて Failover のタイミングも拾えていないようです。 Exception が catch できていないということは、channel.basicAck() メソッドで ack を返すところまでは成功しているけれども ack が 届く前に RabbitMQ サーバーが 停止したのだろうと考えられます。
HomeNode 以外に接続している場合に Failover したら
こちらの場合も同様にメッセージの2重処理が発生するようです。Consumer
listening... vm-2nd Done 'test-1' - vm-2nd deliveryTag:1 ack:1 Done 'test-2000' - vm-2nd deliveryTag:2000 ack:2000 Done 'test-4000' - vm-2nd deliveryTag:4000 ack:4000 Channel Shutdown... Channel Recovered !!! vm-1st Connection Recovered !!! vm-1st Connection Shutdown... Done 'test-5999' - vm-1st deliveryTag:6000 ack:6000 Done 'test-7999' - vm-1st deliveryTag:8000 ack:8000 Done 'test-9998' - vm-1st deliveryTag:9999 ack:9999 Done 'test-9999' - vm-1st deliveryTag:10000 ack:10000 Done 'test-10000' - vm-1st deliveryTag:10001 ack:10001
Consumer Cancel Notification
Consumer に割り当てられている Queue が削除されたり、その他の理由で Queue にアクセスできなくなった時に RabbitMQ サーバー から Consumer Cancel Notification というものが送られる仕組みがあるようです。 このキャンセル通知を Consumer が受け取ると以降のメッセージ処理を中止するようです。 Failover 時も、このキャンセル通知が送信されるようなので、ちょっと試してみようと思います。
わたしが確認できたところでは、 Failover 時、以下の条件をすべて満たす場合に、このキャンセル通知を受け取れるようです。 ・ QueueMirroring している。 ・ HomeNode 以外に接続している。 ・ HomeNode が原因で Failover が発生した。 ・ Consumer 引数の x-cancel-on-ha-failover が "true" になっている。
上記のケースで x-cancel-on-ha-failover を設定していないとどうなるか 先ほどの HappyPath の検証を使って見てみます。
検証の前に Queue のメッセージをクリアして HomeNode が vm-1st になっているのと QueueMirroring の同期を確認します。 vm-1st のターミナルから以下のコマンドを実行します。vm-1st
rabbitmqadmin purge queue name=myQueue
rabbitmqadmin list queues name durable node messages +---------+---------+---------------+----------+ | name | durable | node | messages | +---------+---------+---------------+----------+ | myQueue | True | rabbit@vm-1st | 0 | +---------+---------+---------------+----------+
rabbitmqctl sync_queue myQueue
rabbitmqctl list_queues name slave_pids synchronised_slave_pids Listing queues ... myQueue [<rabbit@vm-2nd.3.297.0>] [<rabbit@vm-2nd.3.297.0>]
Consumer を HomeNode でない vm-2nd に接続します。 コマンドプロンプトを開いて gradle から Consumer を起動します。Consumer
cd /d C:\studying-rabbitmq-failover gradle clean run -PmainClass=studying.rabbitmq.MyHappyConsumer -q listening... vm-2nd
vm-2nd に接続できたのを確認したらメッセージを送信します。 新たにコマンドプロンプトを開いて gradle から以下のようにメッセージを送信します。Publisher
cd /d C:\studying-rabbitmq-failover gradle happyPublisher -q start done
Consumer のコマンドプロンプトからメッセージ処理が始まるのが確認できたらConsumer
... listening... vm-2nd Received 'Maurice' - vm-2nd deliveryTag:1 redeliver:false Done 'Maurice' ack:1 Received 'Verdine' - vm-2nd deliveryTag:2 redeliver:false Done 'Verdine' ack:2 Received 'Philip ' - vm-2nd deliveryTag:3 redeliver:false
HomeNode の vm-1st の RabbitMQ サーバーを停止して Failover させます。 vm-1st のターミナルから以下のコマンドを実行します。vm-1st
systemctl stop rabbitmq-server.service
Consumer のコマンドプロンプトでは以下のようにメッセージ処理が続いていると思います。Consumer
... listening... vm-2nd Received 'Maurice' - vm-2nd deliveryTag:1 redeliver:false Done 'Maurice' ack:1 Received 'Verdine' - vm-2nd deliveryTag:2 redeliver:false Done 'Verdine' ack:2 Received 'Philip ' - vm-2nd deliveryTag:3 redeliver:false Done 'Philip' ack:3 Received 'Jessica' - vm-2nd deliveryTag:4 redeliver:false Done 'Jessica' ack:4 Received 'Jessica' - vm-2nd deliveryTag:5 redeliver:true Done 'Jessica' ack:5 Received 'Larry ' - vm-2nd deliveryTag:6 redeliver:true Done 'Larry' ack:6 Received 'Johnny ' - vm-2nd deliveryTag:7 redeliver:true Done 'Johnny' ack:7 Received 'Ralph ' - vm-2nd deliveryTag:8 redeliver:true Done 'Ralph' ack:8 Received 'Al ' - vm-2nd deliveryTag:9 redeliver:true Done 'Al' ack:9 Received 'Andrew ' - vm-2nd deliveryTag:10 redeliver:true Done 'Andrew' ack:10 Received 'XXX ' - vm-2nd deliveryTag:11 redeliver:true Done 'XXX' ack:11
見てみると Queue のメッセージの数より ack の数が1つ多くなっています。 メッセージの2重処理が発生してしまっているようです。
もう少し見てみると "Jessica" のメッセージ処理が2回実行されているようです。 1回目の redeliver は、"false" ですが2回目の "true" になっています。 "Jessica" の2回目のメッセージ処理以降、メッセージの redeliver が "true" になっています。 恐らく vm-2nd で "Jessica" のメッセージを処理している間に vm-1st が Failover したのだと思います。 vm-1st は、"Jessica" のメッセージ処理の ack を受け取っていないので Re-Queue したのだと思います。 これがメッセージの2重処理の原因ではないでしょうか? HappyPath と思っていましたが、実は、うまくいかないパターンもあるようです。
このような場合に Consumer Cancel Notification を利用して Consumer のメッセージ処理を止めることが できるようです。 先ほどの HappyPath の検証で使用した MyHappyConsumerクラスを変更して 以下のような MyCancelConsumerクラスを作成します。src/main/java/studying/rabbitmq/MyCancelConsumer.java
package studying.rabbitmq; import java.io.IOException; import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class MyCancelConsumer extends MyAbstractConsumer { @Override protected void assignConsumerFor(Channel channel, String queue) throws IOException { Consumer consumer = new DefaultConsumer(channel) { protected int ack; @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); String hostName = channel.getConnection().getAddress().getHostName(); System.out.printf(" Received '%-7s'", message); System.out.printf(" - %s", hostName); System.out.printf(" deliveryTag:%s", envelope.getDeliveryTag()); System.out.printf(" redeliver:%s %n", envelope.isRedeliver()); try { // 3秒 sleep する。 Thread.sleep(3 * 1000); // ack を返す。 channel.basicAck(envelope.getDeliveryTag(), false); this.ack++; System.out.printf(" Done '%s' ack:%d %n", message, this.ack); } catch (InterruptedException e) { System.out.printf(" Interrupted '%s' %n", message); e.printStackTrace(); } catch (Exception e) { System.out.printf(" Error '%s' %n", message); throw e; } } @Override public void handleCancel(String consumerTag) throws IOException { System.out.printf(" Canceled !!!!! %n"); } }; // x-cancel-on-ha-failover を true にして Consumer を Channel に割り当てる。 Map<String, Object> args = new HashMap<String, Object>(); args.put("x-cancel-on-ha-failover", true); boolean autoAck = false; channel.basicConsume(queue, autoAck, args, consumer); } public static void main(String[] args) { try { new MyCancelConsumer().doAction(); } catch (Exception e) { e.printStackTrace(); } } }
MyHappyConsumerクラスからの変更点としては、 1. DefaultConsumerクラスの handleDelivery() メソッドをオーバーライドしてメッセージを処理しています。 2. Channel に Consumer を割り当てる時に x-cancel-on-ha-failover引数を "true" にしています。 になります。
再度、検証したいと思います。 検証の前に Queue のメッセージをクリアして HomeNode が vm-1st になっているのと QueueMirroring の同期を確認します。 vm-1st のターミナルから以下のコマンドを実行します。vm-1st
rabbitmqadmin purge queue name=myQueue
rabbitmqadmin list queues name durable node messages +---------+---------+---------------+----------+ | name | durable | node | messages | +---------+---------+---------------+----------+ | myQueue | True | rabbit@vm-1st | 0 | +---------+---------+---------------+----------+
rabbitmqctl sync_queue myQueue
rabbitmqctl list_queues name slave_pids synchronised_slave_pids Listing queues ... myQueue [<rabbit@vm-2nd.1.297.0>] [<rabbit@vm-2nd.1.297.0>]
Consumer を HomeNode でない vm-2nd に接続します。 コマンドプロンプトを開いて gradle から Consumer を起動します。Consumer
cd /d C:\studying-rabbitmq-failover gradle clean run -PmainClass=studying.rabbitmq.MyCancelConsumer -q listening... vm-2nd
vm-2nd に接続できたのを確認したらメッセージを送信します。 新たにコマンドプロンプトを開いて gradle から以下のようにメッセージを送信します。Publisher
cd /d C:\studying-rabbitmq-failover gradle happyPublisher -q start done
Consumer のコマンドプロンプトからメッセージ処理が始まるのが確認できたらConsumer
... listening... vm-2nd Received 'Maurice' - vm-2nd deliveryTag:1 redeliver:false Done 'Maurice' ack:1 Received 'Verdine' - vm-2nd deliveryTag:2 redeliver:false Done 'Verdine' ack:2 Received 'Philip ' - vm-2nd deliveryTag:3 redeliver:false
vm-1st の RabbitMQ サーバーを停止させて Consumer Cancel Notification が受け取れるか確認します。 vm-1st のターミナルから以下のコマンドを実行します。vm-1st
systemctl stop rabbitmq-server.service
以下のように Consumer のコマンドプロンプトから Consumer Cancel Notification を受け取って Consumer が停止しているのが確認できると思います。Consumer
... listening... vm-2nd Received 'Maurice' - vm-2nd deliveryTag:1 redeliver:false Done 'Maurice' ack:1 Received 'Verdine' - vm-2nd deliveryTag:2 redeliver:false Done 'Verdine' ack:2 Received 'Philip ' - vm-2nd deliveryTag:3 redeliver:false Done 'Philip' ack:3 Received 'Jessica' - vm-2nd deliveryTag:4 redeliver:false Done 'Jessica' ack:4 Canceled !!!!!
vm-1st の RabbitMQ サーバーを再起動させて Queue のメッセージを見ています。 vm-1st のターミナルから以下のコマンドを実行します。vm-1st
systemctl start rabbitmq-server.service
rabbitmqadmin list queues name durable node messages +---------+---------+---------------+----------+ | name | durable | node | messages | +---------+---------+---------------+----------+ | myQueue | True | rabbit@vm-2nd | 7 | +---------+---------+---------------+----------+
見てみると Consumer は、ack を 4つカウントしているのですが Queue のメッセージは、7通あります。 恐らく Failover が発生したため Consumer が最後に処理していたメッセージの ack が受け取れずに Re-Queue されたのだと思います。 Consumer が止まらずにメッセージ処理を続けてたらメッセージの2重処理が発生していると思います。 それをどう回避するか、どう対応するかは、アプリの実装に委ねられるのだと思います。
おわりに
Failover は、ライブラリーやミドルウェアの設定でなんとかなりそうですが、 メッセージの2重処理については、アプリ側での対応になるようです。 このあたりは、アプリの要件によって変わってくるので仕方ないと思います。 (2重にメッセージが処理されてもサービス的に影響がない場合もあると思います。 いづれにせよ Consumer は、メッセージが2重に処理されることも 考慮した作りにしておく必要がありそうです。 また、2重処理されたメッセージを捕捉できるようなログも出力しておきたいところです。