RabbitMQとはAMQP対応のメッセージキューイングシステムです。
何ができるかというと、遅い処理をキューに貯めてあとで処理することで高速なレスポンスを返すとができます。
もう少しわかりやすく表すとこんな感じです。
メッセージキューシステムを使わない場合
メッセージキューシステムを使う場合
では、さっそく使ってみます
RabbitMQのインストール
今回はrubyのbunnyでRabbitMQを使ってみました。
bunnyのインストール
$ curl -sSL https: //get .rvm.io | bash -s stable
$ export PATH=$PATH: /usr/local/rvm/bin/
$ rvm install ruby-2.1.1
$ rvm use ruby-2.1.1
$ gem install bunny --version ">= 0.9.1"
|
$ sudo yum install erlang
|
RabbitMQのインストール
$ curl -O http: //www .rabbitmq.com /releases/rabbitmq-server/v3 .3.5 /rabbitmq-server-3 .3.5-1.noarch.rpm
$ sudo rpm -ivh --nosignature rabbitmq-server-3.3.5-1.noarch.rpm
|
Management Plugin(RabbitMQ)のインストール
$ sudo /usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management
|
※必須ではないですが、管理が便利なので入れています
https://www.rabbitmq.com/management.html
RabbitMQの起動
$ sudo /sbin/service rabbitmq-server start
Starting rabbitmq-server: SUCCESS
rabbitmq-server.
|
ユーザの追加と削除
$ sudo rabbitmqctl add_user hoge hoge
$ sudo rabbitmqctl set_permissions hoge ".*" ".*" ".*"
$ sudo rabbitmqctl set_user_tags hoge administrator
$ sudo rabbitmqctl delete_user guest
|
4台でクラスタ化しました。
https://www.rabbitmq.com/clustering.html
myserver
|
192.168.1.210 |
Consumer |
myserver2
|
192.168.1.212 |
Consumer |
myserver3
|
192.168.1.213 |
Publisher |
myserver4
|
192.168.1.214 |
Publisher |
myserver2、myserver3、myserver4で以下を実行しクラスタ化
$ sudo /sbin/service rabbitmq-server stop
$ vim /var/lib/rabbitmq/ .erlang.cookie
$ sudo /sbin/service rabbitmq-server start
$ sudo rabbitmqctl stop_app
Stopping node rabbit@myserver3 ...
... done .
$ sudo rabbitmqctl join_cluster -- ram rabbit@myserver
Clustering node rabbit@myserver3 with rabbit@myserver ...
... done .
$ sudo rabbitmqctl start_app
Starting node rabbit@myserver3 ...
... done .
|
管理画面で見るとこんな感じになります
実行
Publisher側のコード
require "bunny"
conn = Bunny. new ( :hostname => "localhost" , :username => "hoge" , :password => "hoge" )
conn.start
ch = conn.create_channel
q = ch.queue( "hello" )
msg = ARGV .empty? ? "Hello World!" : ARGV .join( " " )
ch.default_exchange.publish(msg, :routing_key => q.name)
puts " [x]Sent " + msg
conn.close
|
Consumerのコード
require "bunny"
conn = Bunny. new ( :username => "hoge" , :password => "hoge" )
conn.start
ch = conn.create_channel
q = ch.queue( "hello" )
puts "[*] Waiting fo messages in #{q.name}. To exit press CTR+C"
q.subscribe( :block => true ) do |delivery_info, properties, body|
puts " [x]Received #{body}"
end
|
実行する
Consumer1
Consumer2
Publisher1側
1秒おきにメッセージを送る
$ for i in ` seq 1 100`; do sleep 1;ruby -rubygems send.rb pub1_$i; done
|
Publisher2側
2秒おきにメッセージを送る
$ for i in ` seq 1 100`; do sleep 2;ruby -rubygems send.rb pub2_$i; done
|
実行結果
Publisher1、Publisher2から送られたメッセージがConsumer1、Consumer2で処理されています。
試したところこんな動きをしていました。
停止 |
停止 |
処理されずキューがたまる。Consumer起動時にたまったキューを処理 |
停止 |
稼働 |
Consumer2で処理 |
稼働 |
停止 |
Consumer1で処理 |
稼働 |
稼働 |
Consumer1、Consumer2で交互に処理(Round-Robin?) |
おしまい。
参考
http://www.rabbitmq.com/