はじめに
前回、『RabbitMQ で同時処理実行数を制限する』で送信処理と受信処理を見てきましたが 今回は、それに加えてレスポンスを取得できるようにしたいと思います。 やり方として RabbitMQ Tutorials の『6 Remote procedure call (RPC)』が 参考になるのではないかと思います。
tutorial では、送信用の Queue とは別に 返信用の Queue を用意して レスポンスを処理する方法になっていたかと思います。 非同期的にレスポンス処理するのであれば、このやり方になるのだろうと思います。 でも、なにか、こう、もうちょっと、そこまででなくていいのでお手軽にできる方法はないかと JavaDoc を眺めていると JsonRpcServer と JsonRpcClient なるものを見つけました! 名前的にみてメッセージを JSON で RPC風に送受信処理と返信処理ができそうな雰囲気です! これは!と思い試してみることにしました。
始める前に... RabbitMQ がインストール済であること RabbitMQ サーバーにログイン可能でリソースにアクセスできるユーザが作成済であること (ユーザとパスワードを "user01" と "passwd" としています。 以上が前提になります。 このあたりはわたしも『yum の Local Repository に RabbitMQ をインストール』にまとめていますので よろしければ、こちらも合わせて、ご参考に。
ではでは!Getting Started!
環境
開発機開発言語 | Java 1.8.0 |
---|---|
フレームワーク | rabbitmq-java-client 3.6.1 |
ビルドツール | Gradle 2.12 ※gradle コマンドが実行できるように設定しておきます。 |
OS | Windows 8.1 |
RabbitMQ Server
IP | 192.168.10.110 |
---|---|
ミドルウェア |
RabbitMQ server 3.6.1
ユーザ : user01 パスワード : passwd |
OS | CentOS-7-x86_64-Minimal-1511.iso VirtualBox 5.0.16 |
プロジェクト構成
今回は、RPC ということでプロジェクトを Client と Service そして Gateway の3つに分けました。 というのも Client、Service、Gateway の3つそれぞれ別の jar ファイルになっていた方が RPC では、何が何に依存していて、どこで何が必要なのかなど その雰囲気がより伝わるだろうと思ったからです。 もちろん、1つのプロジェクトにするこもできます。
Client と Service のインターフェイス部分と両者で使用するデータクラスを Gateway に切り出しました。 Service は、Gateway のインターフェイスを実装してサービスとして公開します。 Client は、Gateway のインターフェイスからスタブを作成してサービスにアクセスします。 Service と Client は、Gateway のデータクラスを使って通信します。 おおよそ、こんな感じになると思います。
C ドライブ直下に「studying-rabbitmq-rpc」フォルダを作成して各ファイルを以下のように配置します。 ファイルは、UTF-8 で保存します。
準備
ビルドスクリプトを作成します。 プロジェクトは、Client、Service、Gateway と3つあるのですがビルドスクリプトは1つにしています。build.gradle
subprojects { apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'application' sourceCompatibility = '1.8' targetCompatibility = '1.8' [compileJava, compileTestJava]*.options*.encoding = 'UTF-8' mainClassName = (project.hasProperty('mainClass')) ? "$mainClass" : '' repositories { jcenter() } dependencies { compile 'com.rabbitmq:amqp-client:3.6.1' compile 'org.slf4j:slf4j-api:1.7.18' testCompile 'junit:junit:4.12' } } project(':client') { dependencies { compile project(':gateway') } } project(':service') { dependencies { compile project(':gateway') } }
client と service の compile 時の依存関係に gateway の project/jar ファイルを追加しています。settings.gradle
rootProject.name = 'studying-rabbitmq-rpc' include 'client', 'gateway', 'service'
RabbitMQ のユーザが未作成の場合は、以下のようにして作成します。 作成したユーザがリソースにアクセスできるように権限も設定します。
rabbitmqctl add_user user01 passwd rabbitmqctl set_permissions -p / user01 ".*" ".*" ".*"
ユーザとパスワードは、"user01" と "passwd" にしています。
Gateway の作成
まず、Client側と Service側の両方で使う Gateway から作成します。
送信と返信に使うメッセージデータクラスを作成します。gateway/src/main/java/studying/gateway/MyRequest.java
package studying.gateway; public class MyRequest { private String message; public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }gateway/src/main/java/studying/gateway/MyResponse.java
package studying.gateway; public class MyResponse { private String result; public String getResult() { return result; } public void setResult(String result) { this.result = result; } }
次にインターフェイスを作成します。 メソッドは2つ定義します。gateway/src/main/java/studying/gateway/MyService.java
package studying.gateway; public interface MyService { // (1) public String echo(String message); // (2) public Object print(Object param); }
メソッドは、以下のようになります。
- (1) echo()メソッド
-
文字列を引数に受けて、また文字列を返します。
- (2) print()メソッド
-
Object型の引数で Object型の戻り値になっていますが、 実装的には、MyRequestクラスを Object型の引数で受けて、 MyResponseクラスを Object型で返すようになります。 (MyRequestクラスと MyResponseクラスを直接、引数と戻り値にできない事情が...
Service の作成
Gateway のインターフェイスの実装クラスを作成します。service/src/main/java/studying/service/MyServiceImpl.java
package studying.service; import java.util.Map; import com.rabbitmq.tools.json.JSONUtil; import studying.gateway.MyRequest; import studying.gateway.MyResponse; import studying.gateway.MyService; public class MyServiceImpl implements MyService { // (1) @Override public String echo(String message) { System.out.printf("Received '%s' %n", message); return "Hello? " + message; } // (2) @SuppressWarnings("unchecked") @Override public Object print(Object param) { System.out.printf("Received '%s' %n", param); MyRequest request = new MyRequest(); JSONUtil.tryFill(request, (Map<String, Object>) param); MyResponse response = new MyResponse(); response.setResult("Hello! " + request.getMessage()); return response; } }
実装内容は、以下のようになります。
- (1) echo()メソッド
-
受信したメッセージをコンソールに出力します。
メッセージに文字列 "Hello?" を付けて返します。 - (2) print()メソッド
-
受信したメッセージをコンソールに出力します。 メッセージは、一旦、Object型で受けて JSONUtil を使って MyRequestクラスに出力します。 MyRequestクラスからメッセージを取り出して文字列 "Hello!" を付けて返信データにします。 返信データを MyResponseクラスに設定して返します。
引数と戻り値
JsonRpcServer/JsonRpcClient というだけあってメッセージデータは、JSON になっているようです。 String型や int型など primitive型は、直接 JSON に変換できるようなので そのまま引数や戻り値にしても問題ないようです。 シンプルな JavaBean の形式をとっているクラスであえば、JSON に変換可能なようで 同じように引数や戻り値に使えるみたいです。 ただし、この場合、注意が必要で Proxy 越しに呼び出されるのが原因だと思うのですが 一旦 Object型にしないとうまく動かないようです。
RPC といっても今回は、Java クラスをシリアライズしたメッセージデータを使っている訳ではないので Service側と Client側で同じ Javaクラスを使っていないと通信できないことはないと思います。 飽くまでも今回のメッセージデータは、JSON なので Service側と Client側がメッセージデータに それぞれの Javaクラスを使っていたとしても JSON をそれぞれのクラスに変換(出力)できれば問題ないと思います。 今回は、Gateway という形でメッセージデータに同じクラスを使っていますが Service側と Client側がそれぞれでメッセージデータ用の Javaクラスを用意してもよいと思います。 (おおよそ、フィールド名を合わせておけばクラス名が違っていてもいけそうです。
次にサービスを公開してメッセージの受付を開始するクラスを作成します。service/src/main/java/studying/service/MyRpcServer.java
package studying.service; import java.io.IOException; import java.util.Map; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.tools.jsonrpc.JsonRpcServer; import studying.gateway.MyService; public class MyRpcServer { public static void main(String[] args) { System.out.printf("listening... %n"); // RabbitMQ Server 接続情報 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.110"); factory.setUsername("user01"); factory.setPassword("passwd"); Connection connection = null; try { // RabbitMQ Server に接続して Channel を生成する。 connection = factory.newConnection(); Channel channel = connection.createChannel(); // Queue と Exchange を宣言して Bind をする。 String queueName = "myRpcQueue"; String exchangeType = "direct"; String exchangeName = "myRpcExchange"; String routingKey = "myRpcKey"; boolean durable = true; boolean exclusive = false; boolean autoDelete = false; Map<String, Object> arguments = null; channel.exchangeDeclare(exchangeName, exchangeType, durable); channel.queueDeclare(queueName, durable, exclusive, autoDelete, arguments); channel.queueBind(queueName, exchangeName, routingKey); // メッセージの受付を開始する。 JsonRpcServer server = new JsonRpcServer(channel, queueName, MyService.class, new MyServiceImpl()); server.mainloop(); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
処理内容としては、 送受信処理に使う Queue と Exchange (direct タイプ) を宣言しています。 Queue名と Exchange名は、"myRpcQueue" と "myRpcExchange" になります。 RoutingKey を "myRpcKey" にして "myRpcQueue" と "myRpcExchange" を Bind しています。 MyServiceインターフェイスとその実装クラス MyServiceImpl を使って メッセージの受信処理を開始しています。
Client の作成
サービスの呼び出しクラスを作成します。 echo()メソッドと print()メソッドを呼び出して結果をコンソールに出力します。
以下の内容で作成します。client/src/main/java/studying/client/MyRpcClient.java
package studying.client; import java.io.IOException; import java.util.Map; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.tools.json.JSONUtil; import com.rabbitmq.tools.jsonrpc.JsonRpcClient; import studying.gateway.MyRequest; import studying.gateway.MyResponse; import studying.gateway.MyService; public class MyRpcClient { @SuppressWarnings("unchecked") public void doAction() { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.110"); factory.setUsername("user01"); factory.setPassword("passwd"); Connection connection = null; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "myRpcExchange"; String routingKey = "myRpcKey"; int timeout = 1000 * 5; // Stub を作成する。 JsonRpcClient client = new JsonRpcClient(channel, exchangeName, routingKey, timeout); MyService serivce = (MyService)client.createProxy(MyService.class); // (1) echo()メソッド呼び出し String message = serivce.echo("dolphin"); System.out.printf("echo: %s %n", message); // (2) print()メソッド呼び出し MyRequest request = new MyRequest(); request.setMessage("revolution"); Object source = serivce.print(request); MyResponse response = new MyResponse(); JSONUtil.fill(response, (Map<String, Object>)source); System.out.printf("print: %s %n", response.getResult()); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } public static void main(String[] args) { new MyRpcClient().doAction(); } }
Exchange と Queue の宣言と Bind は、Service側で行うので Client側では、送信時に Exchange と RoutingKey を指定するだけにします。 送信する Exchange と RoutingKey は、"myRpcExchange" と "myRpcKey" にしています。 送信するまでに最低限 Exchange が存在していないとうまくいかないので 先に Service側を起動することになります。
処理内容は、以下のようになります。
- (1) echo()メソッド呼び出し
-
引数と戻り値とも文字 型なのでそのまま送信して、その結果をコンソールに出力します。
- (2) print()メソッド呼び出し
-
メッセージを MyRequest クラスで送信します。 (インターフェイス的には、Object 型ですが実装は、MyRequest クラスになっています。 結果メッセージを一旦、Object 型で受けて JSONUtil を使って MyResponse クラスに出力します。 MyResponse クラスから結果メッセージを取り出してコンソールに出力します。
返信用の Queue
RabbitMQ の Tutorial 的には、返信メッセージの受付に Queue が宣言されているのですが JsonRpcServer/JsonRpcClient では、Direct reply-to (amq.rabbitmq.reply-to.XXX) という 疑似的な Queue が暗黙的に返信用の Queue の代わりに使われるため 明示的に返信用の Queue を宣言する必要はないようです。
実行する
Queue と Exchange の設定をしている Service側を先に起動します。 コマンドプロンプトを開いて、gradle から Service側を起動します。Service
cd /d C:\studying-rabbitmq-rpc gradle :service:clean :service:run -PmainClass=studying.service.MyRpcServer -q listening...
Service側が起動したのが確認できたら Client側を実行します。 Service側のコマンドプロンプトはそのままにしておいて 新たにコマンドプロンプトを開いて以下のコマンドを実行します。
以下のように表示されると思います。Client
cd /d C:\studying-rabbitmq-rpc gradle :client:clean :client:run -PmainClass=studying.client.MyRpcClient -q echo: Hello? dolphin print: Hello! revolution
Service側のコマンドプロンプトには、以下のように表示されると思います。Service
... listening... Received 'dolphin' Received '{message=revolution}'
また、以下のように jar ファイルを作って実行してみるのもいいと思います。
cd /d C:\studying-rabbitmq-rpc gradle :service:clean :service:distZip -PmainClass=studying.service.MyRpcServer -q gradle :client:clean :client:distZip -PmainClass=studying.client.MyRpcClient -q
おわりに
同期処理ではありますがレスポンスを取得できるようになったと思います。 (レスポンスを非同期処理で取得できるようにするのは、なかなか手間だと思います...
今回は、お手軽に、というのも裏テーマにあったので JsonRpcServer と JsonRpcClient を使いました。 使ってみたところ、Client側の実装が Java に限定されてしまいそうな懸念を持ちました。 しかも JsonRpcClientクラスを使わないとうまく通信できないかもしれません。 (RPC は、こんな感じ? JsonRpcServerクラスの doCall() メソッド、JsonRpcClientクラスの call()メソッドあたりを 読み解いてみると Java 以外の Client を使って今回のサービスにアクセスできるかもしれません。 それもどうかな? Service側と Client側がどちらも Java なら JMS でいいような気がします。 (RabbitMQ の JMS は有償だったような気が...
わたしは、Service側の実装が Java に依存するのには、なんら抵抗ないのですが Client側が特定のプログラム言語(Java であっても)に依存してしまうのはイマイチだったりします。 ここは、なんとか考えないと。