2016年5月1日日曜日

RabbitMQ で同時処理実行数を制限する

はじめに

長い時間かかる処理を実行している間、ユーザー(またはクライアント)の対応をどう考えるか?
処理が終わるまで待たせるか?
待たせないようにするために処理を切り出して非同期的に実行できるようにするか?
どっちにしても長い時間かかる処理だけあってサーバー負荷は高い。
サーバー負荷の高い処理が連打されたりすると大変なことになってしまう。
処理が終わるまで待たせないようにする場合はもちろん、待たせる場合であっても
複数ユーザが同時に実行することを考えると何かしらの対策は必要になる。
理想としては、同時に実行できる処理数を制限して超えた分は Queue のようなものを使って処理待ちさせる。
処理に空きができたところで処理待ちしている処理を順に処理していく。
なにか簡単にできる方法はないかと考えていたところ MessageQueue に至った訳です。
MessageQueue サーバーとして RabbitMQ を使って検証してみようと思います。
始める前に...
RabbitMQ がインストール済であること
RabbitMQ サーバーにログイン可能でリソースにアクセスできるユーザが作成済であること
(ユーザとパスワードを "user01" と "passwd" としています。
以上が前提になります。
このあたりはわたしも『yum の Local Repository に RabbitMQ をインストール 』にまとめていますので
よろしければ、こちらも合わせて、ご参考に。
ではでは!Getting Started!!

ちょっと整理

主な Class と interface を整理したいと思います。
(com.rabbitmq.client パッケージにあるものです。
ConnectionFactory
  • hostName、userName、password などを元に RabbitMQサーバーと接続する。
    接続情報を Connectionオブジェクトにして返す。
    
Connection
  • 接続情報を保持する。
    接続の Abort、Close を行う。
    接続中に発生するイベントのハンドリングを行う。
    Channelオブジェクトを生成する。
    など。
    
Channel
  • Exchange と Queue の宣言を行う。
    宣言した Exchange と Queue を Bind する。
    メッセージの送信処理と受信処理を行う。
    メッセージの受信処理に使う Consumer の割り当てを行う。
    その他 AMQP の各種操作メソッドを提供する。
    
Exchange
  • メッセージの送信先になる。
    メッセージ送信者(sender)はメッセージを直接 Queue に送信することはできない。
    送られてきたメッセージを RoutingKey を元に格納する Queue を決定する。
    
    以下4つのタイプがある。
    
    fanout
    無条件に Queue へメッセージを格納する。
    ※ ルーティングなし。(RoutingKey は無効)
    direct
    RoutingKey にマッチする Queue へ格納する。
    topic
    RoutingKey にパターンマッチする Queue へ格納する。
    RoutingKey は「.」ドット区切りの単語で指定する。
      例) "kern.critical"  "auth.info"  "www.google.com"
    ワイルドカード("*" "#")を含めたマッチング条件を指定できる。
      "*" は、1の単語にマッチする。
      "#" は、複数の単語にマッチする。
      例) "kern.*"  "*.info"  "#.com"
    headers
    メッセージ Header にマッチする Queue へ格納する。
    direct タイプで無名の Exchange がデフォルトの Exchange として必ず提供されている。
    Queue は、宣言と同時に暗黙的にこのデフォルトの Exchange と Queue名を RoutingKey にして
    Bind される。
    
Queue
  • メッセージを格納する。
    
Consumer
  • 割り当てられた Queue からメッセージを取り出して処理する。
    

環境

開発機

開発言語 Java 1.8.0
フレームワーク rabbitmq-java-client 3.6.1
ビルドツール Gradle 2.12
※gradle コマンドが実行できるように設定しておきます。
OS Windows 8.1


RabbitMQ Server

IP 192.168.10.110
ミドルウェア RabbitMQ server 3.6.1
  ユーザ  user01   パスワード  passwd
OS CentOS-7-x86_64-Minimal-1511.iso
VirtualBox 5.0.16


プロジェクト構成

作成するファイルは少ないのですがプロジェクトをApp と Worker に分けました。
というのも別の jar ファイルにしてそれぞれ実行した方が
プロセス間でメッセージのやりとりをしている様子をより実感できるかなと思ったからです。
C ドライブ直下に「studying-rabbitmq」フォルダを作成して各ファイルを以下のように配置します。
ファイルは、UTF-8 で保存します。
  • C:\studying-rabbitmq
    • app
      • src
        • main
          • java
            • studying
              • sender
                • App.java
    • worker
      • src
        • main
          • java
            • studying
              • receiver
                • MyConsumer.java
                • Worker.java
    • build.gradle
    • settings.gradle


準備

ビルドスクリプトを作成します。
プロジェクトは、App と Worker と2つあるのですがビルドスクリプトは1つにしています。
build.gradle
subprojects {
  apply plugin: 'java'
  apply plugin: 'eclipse'
  apply plugin: 'application'

  sourceCompatibility = '1.8'
  targetCompatibility = '1.8'
  [compileJava, compileTestJava]*.options*.encoding = 'UTF-8'
  
  mainClassName = (project.hasProperty('mainClass')) ? "$mainClass" : ''

  repositories {
    jcenter()
  }

  dependencies {
    compile 'com.rabbitmq:amqp-client:3.6.1'
    compile 'org.slf4j:slf4j-api:1.7.18'
    testCompile 'junit:junit:4.12'
  }
}
settings.gradle
rootProject.name = 'studying-rabbitmq'
include 'app', 'worker'
RabbitMQ のユーザが未作成の場合は、以下のようにして作成します。
作成したユーザがリソースにアクセスできるように権限も設定します。
rabbitmqctl add_user user01 passwd
rabbitmqctl set_permissions -p / user01 ".*" ".*" ".*"
ユーザとパスワードは user01 と passwd にしています。

Worker の作成

Worker側から作成していきます。
全体的な内容としては Queue に届いたメッセージを取り出してコンソールに出力するとった感じになります。
メッセージを出力した後は、3秒間スリープします。
(これを長い時間かかる処理と想定しています。
Connection が使用できるスレッドを3つに制限します。
Channel を3つ作成してそれぞれに Consumer を1つ割り当てます。
3つまで同時にメッセージを処理してそれを超えた分は Queue で待たされるといった感じになると思います。
まず、Consumer クラスを以下の内容で作成します。
worker/src/main/java/studying/receiver/MyConsumer.java
package studying.receiver;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class MyConsumer extends DefaultConsumer {
  
  private final String name;

  public MyConsumer(Channel channel, String name) {
    super(channel);
    
    this.name = name;
  }
  
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, 
      AMQP.BasicProperties properties, byte[] body) throws IOException {
    
    String message = new String(body, "UTF-8");
    long threadId = Thread.currentThread().getId();
    System.out.printf(" [thread-id:%02d] - %s Received '%s' %n", threadId, name, message);
    
    try {
      Thread.sleep(3 * 1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      System.out.printf("  [thread-id:%02d] - %s Done '%s' %n", threadId, name, message);
      this.getChannel().basicAck(envelope.getDeliveryTag(), false);
    }
  }
}
処理内容としては、
受信したメッセージと共にスレッドID、Consumer名をコンソールに出力します。
3秒後(長い時間かかる処理を想定)にもう一度スレッドID、Consumer名、メッセージをコンソールに出力します。

次にメッセージの受付を開始するクラスを作成します。
worker/src/main/java/studying/receiver/Worker.java
package studying.receiver;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Worker {

  public static void main(String[] args) {
    try {
      new Worker().start();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
  
  public void start() throws Exception {
    
    final long threadId = Thread.currentThread().getId();
    System.out.printf("[thread-id:%02d] start %n", threadId);
    
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.10.110");
    factory.setUsername("user01");
    factory.setPassword("passwd");

    // Worker の数を3つにする。
    ExecutorService es = Executors.newFixedThreadPool(3);
    Connection connection = factory.newConnection(es);

    boolean durable = true;
    boolean autoAck = false;
    String queueName    = "myQueue";
    String exchangeType = "direct";
    String exchangeName = "myExchange";
    String routingKey   = "myKey";


    // Worker A
    Channel channelA = connection.createChannel();
    channelA.exchangeDeclare(exchangeName, exchangeType, durable);
    channelA.queueDeclare(queueName, durable, false, false, null);
    channelA.queueBind(queueName, exchangeName, routingKey);
    
    MyConsumer myConsumerA = new MyConsumer(channelA, "ConsumerA");
    channelA.basicConsume(queueName, autoAck, myConsumerA);
    

    // Worker B
    Channel channelB = connection.createChannel();
    channelB.exchangeDeclare(exchangeName, exchangeType, durable);
    channelB.queueDeclare(queueName, durable, false, false, null);
    channelB.queueBind(queueName, exchangeName, routingKey);
    
    MyConsumer myConsumerB = new MyConsumer(channelB, "ConsumerB");
    channelB.basicConsume(queueName, autoAck, myConsumerB);
    

    // Worker C
    Channel channelC = connection.createChannel();
    channelC.exchangeDeclare(exchangeName, exchangeType, durable);
    channelC.queueDeclare(queueName, durable, false, false, null);
    channelC.queueBind(queueName, exchangeName, routingKey);

    MyConsumer myConsumerC = new MyConsumer(channelC, "ConsumerC");
    channelC.basicConsume(queueName, autoAck, myConsumerC);
  }  
}
処理内容としては、
送受信処理に使う Queue と Exchange (direct タイプ) を宣言しています。
Queue名と Exchange名は、"myQueue" と "myExchange" になります。
RoutingKey を "myKey" にして "myQueue" と "myExchange" を Bind しています。
1つの Queue に対して3つの Consumer を割り当ててメッセージの受信処理を開始しています。

Connection が使用できるスレッドの数
Workerクラスの以下の部分でスレッドの数を設定していますが
...
// Worker の数を3つにする。
ExecutorService es = Executors.newFixedThreadPool(3);
Connection connection = factory.newConnection(es);
...
以下のように設定しなかった場合、どうなるか?
...
Connection connection = factory.newConnection();
...
JavaVM が使用できるプロセッサの数 × 2 になるようです。
(com.rabbitmq.client.impl.ConsumerWorkService 参照)
Consumer の実装
Consumerインターフェイスのメソッドの実装コードを1つ1つ書いていくのは、なかなかなので
今回は DefaultConsumerクラスを継承して必要なメソッドをオーバーライドする方法をとりました。
別のやり方として以下のように QueueingConsumerクラスを使ってメッセージの処理をする方法もあるようです。
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, false, consumer);

while (true) {
  QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  String message = new String(delivery.getBody());
  
  channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

Worker側は、だいたいこんな感じです。


App の作成

メッセージを送信するクラスをを作成します。
以下の内容で作成します。
app/src/main/java/studying/sender/App.java
package studying.sender;

import java.util.ArrayList;
import java.util.List;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class App {

  public static void main(String[] args) {    
    try {
      List<String> tasks = new ArrayList<String>();
      tasks.add("Maurice");
      tasks.add("Verdine");
      tasks.add("Philip");
      
      tasks.add("Jessica");
      tasks.add("Larry");
      tasks.add("Johnny");
      
      tasks.add("Ralph");
      tasks.add("Al");
      tasks.add("Andrew");

      tasks.add("XXX");
      
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("192.168.10.110");
      factory.setUsername("user01");
      factory.setPassword("passwd");
      Connection connection = factory.newConnection();

      System.out.printf("send start %n");
      System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");
      tasks.parallelStream().forEach((String message) -> {        
        try {
          String exchangeName = "myExchange";
          String routingKey = "myKey";
          
          Channel channel = connection.createChannel();
          AMQP.BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
          channel.basicPublish(exchangeName, routingKey, props, message.getBytes());

          System.out.printf("[thread-id:%02d] '%s' %n", Thread.currentThread().getId(), message);
        } catch (Exception e) {
          e.printStackTrace();
        }
      });
      System.out.printf("send end %n");

      connection.close();
      
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}
Exchange と Queue の宣言と Bind は、Worker側で行うので
App側では、送信時に Exchange と RoutingKey を指定するだけにします。
送信する Exchange と RoutingKey は、"myRpcExchange" と "myRpcKey" にしています。
送信するまでに最低限 Exchange が存在していないとうまくいかないので
先に Worker側を起動することになります。

デフォルトの Exchange
今回、Exchange を宣言しましたが direct タイプの Exchange で問題なければ
デフォルトの Exchange を使ってもいいと思います。
Queue を宣言すると自動的にデフォルトの Exchange に Bind されるようです。
以下のようなコードが暗黙的に実行されているようなイメージだと思います。
...
String queueName    = "myQueue";
String exchangeName = "";
String routingKey   = queueName;

...
// 以下の Bind が暗黙的に行われていると思います。
channel.queueBind(queueName, exchangeName, routingKey);
...
rabbitmqctl list_bindings コマンドを使って Bind を確認できると思います。
rabbitmqctl list_bindings -p / source_name source_kind destination_name destination_kind routing_key
送信処理にデフォルトの Exchange を使うと以下のようになると思います。
...
String queueName    = "myQueue";
String exchangeName = "";
String routingKey   = queueName;
 
Channel channel = connection.createChannel();
AMQP.BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
channel.basicPublish(exchangeName, routingKey, props, message.getBytes());
...
受信処理にデフォルトの Exchange を使うと以下のようになると思います。
...
boolean durable = true;
boolean autoAck = false;
String queueName    = "myQueue";

Channel channel = connection.createChannel();
channel.queueDeclare(queueName, durable, false, false, null);
 
MyConsumer myConsumer = new MyConsumer(channel, "Consumer");
channel.basicConsume(queueName, autoAck, myConsumer);
...

実行する

本来、以下のようにして jar ファイルを作って実行したいところなのですが
cd /d C:\studying-rabbitmq
gradle :worker:clean :worker:distZip -PmainClass=studying.receiver.Worker -q
gradle :app:clean :app:distZip -PmainClass=studying.sender.App -q
gradle から実行してしまいます。
コマンドプロンプトを開いて、まず Worker側から起動します。
Worker

cd /d C:\studying-rabbitmq
gradle :worker:clean :worker:run -PmainClass=studying.receiver.Worker -q

[thread-id:01] start
Worker側が起動したのが確認できたら App側を実行します。
Worker側のコマンドプロンプトはそのままにしておいて
新たにコマンドプロンプトを開いて以下のコマンドで実行します。
App

cd /d C:\studying-rabbitmq
gradle :app:clean :app:run -PmainClass=studying.sender.App -q

send start
[thread-id:11] 'XXX'
...
しばらくすると Worker側のコマンドプロンプトに3秒間隔で以下のように表示されると思います。
Worker

[thread-id:01] start
 [thread-id:11] - ConsumerA Received 'XXX'
 [thread-id:12] - ConsumerB Received 'Larry'
 [thread-id:13] - ConsumerC Received 'Jessica'
3秒間後...
[thread-id:01] start
 [thread-id:11] - ConsumerA Received 'XXX'
 [thread-id:12] - ConsumerB Received 'Larry'
 [thread-id:13] - ConsumerC Received 'Jessica'
  [thread-id:11] - ConsumerA Done 'XXX'
 [thread-id:11] - ConsumerA Received 'Maurice'
  [thread-id:12] - ConsumerB Done 'Larry'
 [thread-id:12] - ConsumerB Received 'Verdine'
  [thread-id:13] - ConsumerC Done 'Jessica'
 [thread-id:13] - ConsumerC Received 'Andrew'
3秒間後...
[thread-id:01] start
 [thread-id:11] - ConsumerA Received 'XXX'
 [thread-id:12] - ConsumerB Received 'Larry'
 [thread-id:13] - ConsumerC Received 'Jessica'
  [thread-id:11] - ConsumerA Done 'XXX'
 [thread-id:11] - ConsumerA Received 'Maurice'
  [thread-id:12] - ConsumerB Done 'Larry'
 [thread-id:12] - ConsumerB Received 'Verdine'
  [thread-id:13] - ConsumerC Done 'Jessica'
 [thread-id:13] - ConsumerC Received 'Andrew'
  [thread-id:11] - ConsumerA Done 'Maurice'
 [thread-id:11] - ConsumerA Received 'Johnny'
  [thread-id:12] - ConsumerB Done 'Verdine'
 [thread-id:12] - ConsumerB Received 'Ralph'
  [thread-id:13] - ConsumerC Done 'Andrew'
 [thread-id:13] - ConsumerC Received 'Al'
3秒間後...
[thread-id:01] start
 [thread-id:11] - ConsumerA Received 'XXX'
 [thread-id:12] - ConsumerB Received 'Larry'
 [thread-id:13] - ConsumerC Received 'Jessica'
  [thread-id:11] - ConsumerA Done 'XXX'
 [thread-id:11] - ConsumerA Received 'Maurice'
  [thread-id:12] - ConsumerB Done 'Larry'
 [thread-id:12] - ConsumerB Received 'Verdine'
  [thread-id:13] - ConsumerC Done 'Jessica'
 [thread-id:13] - ConsumerC Received 'Andrew'
  [thread-id:11] - ConsumerA Done 'Maurice'
 [thread-id:11] - ConsumerA Received 'Johnny'
  [thread-id:12] - ConsumerB Done 'Verdine'
 [thread-id:12] - ConsumerB Received 'Ralph'
  [thread-id:13] - ConsumerC Done 'Andrew'
 [thread-id:13] - ConsumerC Received 'Al'
  [thread-id:11] - ConsumerA Done 'Johnny'
 [thread-id:11] - ConsumerA Received 'Philip'
  [thread-id:12] - ConsumerB Done 'Ralph'
  [thread-id:13] - ConsumerC Done 'Al'
3秒間後...
[thread-id:01] start
 [thread-id:11] - ConsumerA Received 'XXX'
 [thread-id:12] - ConsumerB Received 'Larry'
 [thread-id:13] - ConsumerC Received 'Jessica'
  [thread-id:11] - ConsumerA Done 'XXX'
 [thread-id:11] - ConsumerA Received 'Maurice'
  [thread-id:12] - ConsumerB Done 'Larry'
 [thread-id:12] - ConsumerB Received 'Verdine'
  [thread-id:13] - ConsumerC Done 'Jessica'
 [thread-id:13] - ConsumerC Received 'Andrew'
  [thread-id:11] - ConsumerA Done 'Maurice'
 [thread-id:11] - ConsumerA Received 'Johnny'
  [thread-id:12] - ConsumerB Done 'Verdine'
 [thread-id:12] - ConsumerB Received 'Ralph'
  [thread-id:13] - ConsumerC Done 'Andrew'
 [thread-id:13] - ConsumerC Received 'Al'
  [thread-id:11] - ConsumerA Done 'Johnny'
 [thread-id:11] - ConsumerA Received 'Philip'
  [thread-id:12] - ConsumerB Done 'Ralph'
  [thread-id:13] - ConsumerC Done 'Al'
  [thread-id:11] - ConsumerA Done 'Philip'
同時に3つずつメッセージが処理されているのが分かると思います。
また3つのスレッドが使い回されているのも確認できると思います。
さらに Consumer の処理が終わるまでスレッドが独占されている様子も分かると思います。 
この意味でスレッドセーフであるといえると思います。

おまけ

Connection が使用できるスレッドを1つに制限したらどうなるか?
ちょっと試してみます。
Worker側の処理で Connection を作成しているところを以下のように変更します。
worker/src/main/java/studying/receiver/Worker.java
...
// Worker の数を3つにする。
// ExecutorService es = Executors.newFixedThreadPool(3);
// Connection connection = factory.newConnection(es);

ExecutorService es = Executors.newFixedThreadPool(1);
Connection connection = factory.newConnection(es);
...
Worker側を再起動してから App側を実行すると
Worker側のコマンドプロンプトに3秒間隔で1つずつメッセージが以下のように表示されると思います。
Worker

[thread-id:01] start
 [thread-id:11] - ConsumerA Received 'Andrew'
  [thread-id:11] - ConsumerA Done 'Andrew'
 [thread-id:11] - ConsumerB Received 'Maurice'
  [thread-id:11] - ConsumerB Done 'Maurice'
 [thread-id:11] - ConsumerB Received 'Ralph'
  [thread-id:11] - ConsumerB Done 'Ralph'
 [thread-id:11] - ConsumerB Received 'Larry'
  [thread-id:11] - ConsumerB Done 'Larry'
 [thread-id:11] - ConsumerC Received 'Al'
  [thread-id:11] - ConsumerC Done 'Al'
 [thread-id:11] - ConsumerC Received 'Verdine'
  [thread-id:11] - ConsumerC Done 'Verdine'
 [thread-id:11] - ConsumerC Received 'Johnny'
  [thread-id:11] - ConsumerC Done 'Johnny'
 [thread-id:11] - ConsumerA Received 'Jessica'
  [thread-id:11] - ConsumerA Done 'Jessica'
 [thread-id:11] - ConsumerA Received 'XXX'
  [thread-id:11] - ConsumerA Done 'XXX'
 [thread-id:11] - ConsumerA Received 'Philip'
  [thread-id:11] - ConsumerA Done 'Philip'
3の Consumer が1つのスレッドを使い回して順番に処理してく様子が分かると思います。

今回のテーマから逸れるのですが
1つのメッセージを3つある Consumer のうちのどれか1つで処理するのでなく
1つのメッセージを3つある Consumer 3つで処理する方法を考えてみたいと思います。
"Publish/Subscribe" 方式でのメッセージ処理になると思います。
1つの Queue に3つの Consumer を割り当てていたのでラウンドロビンする形で処理されていました。
今度は、Consumer ごとにそれぞれ Queue を割り当てるようにします。
Worker側の処理で Queue の宣言しているところと
Consumer に割り当てているところを以下のように変更します。
また Connection が使用できるスレッド数を3つに戻しています。
worker/src/main/java/studying/receiver/Worker.java
...
// Worker の数を3つにする。
ExecutorService es = Executors.newFixedThreadPool(3);
Connection connection = factory.newConnection(es);
...

// Worker A
Channel channelA = connection.createChannel();
channelA.exchangeDeclare(exchangeName, exchangeType, durable);
channelA.queueDeclare(queueName + "A", durable, false, false, null);
channelA.queueBind(queueName + "A", exchangeName, routingKey);

MyConsumer myConsumerA = new MyConsumer(channelA, "ConsumerA");
channelA.basicConsume(queueName + "A", autoAck, myConsumerA);


// Worker B
Channel channelB = connection.createChannel();
channelB.exchangeDeclare(exchangeName, exchangeType, durable);
channelB.queueDeclare(queueName + "B", durable, false, false, null);
channelB.queueBind(queueName + "B", exchangeName, routingKey);

MyConsumer myConsumerB = new MyConsumer(channelB, "ConsumerB");
channelB.basicConsume(queueName + "B", autoAck, myConsumerB);


// Worker C
Channel channelC = connection.createChannel();
channelC.exchangeDeclare(exchangeName, exchangeType, durable);
channelC.queueDeclare(queueName + "C", durable, false, false, null);
channelC.queueBind(queueName + "C", exchangeName, routingKey);

MyConsumer myConsumerC = new MyConsumer(channelC, "ConsumerC");
channelC.basicConsume(queueName + "C", autoAck, myConsumerC);
...
Worker側を再起動してから App側を実行すると
Worker側のコマンドプロンプトに3秒間隔でメッセージが以下のように表示されると思います。
Worker

[thread-id:01] start
 [thread-id:11] - ConsumerA Received 'XXX'
 [thread-id:13] - ConsumerC Received 'XXX'
 [thread-id:12] - ConsumerB Received 'XXX'
  [thread-id:11] - ConsumerA Done 'XXX'
 [thread-id:11] - ConsumerA Received 'Al'
  [thread-id:13] - ConsumerC Done 'XXX'
  [thread-id:12] - ConsumerB Done 'XXX'
 [thread-id:13] - ConsumerC Received 'Al'
 [thread-id:12] - ConsumerB Received 'Al'
  [thread-id:11] - ConsumerA Done 'Al'
 [thread-id:11] - ConsumerA Received 'Johnny'
  [thread-id:13] - ConsumerC Done 'Al'
 [thread-id:13] - ConsumerC Received 'Johnny'
  [thread-id:12] - ConsumerB Done 'Al'
 [thread-id:12] - ConsumerB Received 'Johnny'
 ...
今度は3つの Consumer が送信されたメッセージをそれぞれ処理するのでログが3倍出力されると思います。

おわりに

MessageQueue サーバーを使って同時実行数を制御して処理できることが一応確認できたと思います。
しかし実際これでシステム開発しようとした場合、
冗長化であったり、トランザクションであったり、認証であったりと
考慮しないといけないことがまだまだあります。
今回は、3つの Worker を1つのプロセスで動かしていたので
プロセスが落ちると全部の Worker が止まっていまします。
簡単な冗長化であれば1プロセス1 Worker にして3つプロセスを起動すれば
同じように Worker 3つで処理できます。
この場合、プロセスが1つ落ちても Worker 全体が止まってしまうことはありません。
しかし Worker は動いていても MessageQueueサーバー自体が止まってしまうと
システム全体に影響が出ます。
ミドルウェアを導入することで開発は軽減されるものの管理が増える。
このあたりがミドルウェア導入の考えどころか?
参考URL
http://www.rabbitmq.com/getstarted.html
http://www.rabbitmq.com/api-guide.html