RabbitMQでメッセージキューイングシステムを実現する

RabbitMQとはAMQP対応のメッセージキューイングシステムです。

何ができるかというと、遅い処理をキューに貯めてあとで処理することで高速なレスポンスを返すとができます。

f:id:katashiyo515:20140921133042p:plain

f:id:katashiyo515:20140921133045p:plain

 

もう少しわかりやすく表すとこんな感じです。

メッセージキューシステムを使わない場合

f:id:katashiyo515:20140921133047p:plain

メッセージキューシステムを使う場合

f:id:katashiyo515:20140921133049p:plain

 

では、さっそく使ってみます

RabbitMQのインストール

今回はrubybunnyでRabbitMQを使ってみました。

bunnyのインストール
$ curl -sSL https://get.rvm.io | bash -s stable
export PATH=$PATH:/usr/local/rvm/bin/
$ rvm install ruby-2.1.1
$ rvm use ruby-2.1.1
$ gem install bunny --version ">= 0.9.1"
erlangのインストール
sudo yum install erlang
RabbitMQのインストール
$ curl -O http://www.rabbitmq.com/releases/rabbitmq-server/v3.3.5/rabbitmq-server-3.3.5-1.noarch.rpm
sudo rpm -ivh --nosignature  rabbitmq-server-3.3.5-1.noarch.rpm
Management Plugin(RabbitMQ)のインストール
sudo /usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management

※必須ではないですが、管理が便利なので入れています

https://www.rabbitmq.com/management.html

RabbitMQの起動

sudo /sbin/service rabbitmq-server start
Starting rabbitmq-server: SUCCESS
rabbitmq-server.

ユーザの追加と削除

# hogeユーザをhogeパスワードで追加
sudo rabbitmqctl add_user hoge hoge
sudo rabbitmqctl set_permissions hoge ".*" ".*" ".*"
sudo rabbitmqctl set_user_tags hoge administrator
 
 
# guest削除
sudo rabbitmqctl delete_user guest

クラスタ

4台でクラスタ化しました。

https://www.rabbitmq.com/clustering.html

host名
ip
role
myserver
192.168.1.210 Consumer
myserver2
192.168.1.212 Consumer
myserver3
192.168.1.213 Publisher
myserver4
192.168.1.214 Publisher

myserver2、myserver3、myserver4で以下を実行しクラスタ

# RabbitMQサーバを停止
sudo /sbin/service rabbitmq-server stop
# cookieの中身を同じにする
$ vim /var/lib/rabbitmq/.erlang.cookie
# myserver
sudo /sbin/service rabbitmq-server start
sudo rabbitmqctl stop_app
Stopping node rabbit@myserver3 ...
...done.
sudo rabbitmqctl join_cluster --ram rabbit@myserver
Clustering node rabbit@myserver3 with rabbit@myserver ...
...done.
sudo rabbitmqctl start_app
Starting node rabbit@myserver3 ...
...done.

管理画面で見るとこんな感じになります

f:id:katashiyo515:20140921133052j:plain

 

 

実行

Publisher側のコード

send.rb
#!/usr/bin/env ruby
# encoding: utf-8
require "bunny"
conn = Bunny.new(:hostname => "localhost":username => "hoge":password => "hoge")
conn.start
ch = conn.create_channel
q = ch.queue("hello")
msg = ARGV.empty? ? "Hello World!" ARGV.join(" ")
ch.default_exchange.publish(msg, :routing_key => q.name)
puts " [x]Sent " + msg
conn.close

 Consumerのコード

send.rb
#!/usr/bin/env ruby
# encoding: utf-8
require "bunny"
conn = Bunny.new(:username => "hoge":password => "hoge")
conn.start
ch = conn.create_channel
q  = ch.queue("hello")
puts "[*] Waiting fo messages in #{q.name}. To exit press CTR+C"
q.subscribe(:block => truedo |delivery_info, properties, body|
    puts " [x]Received #{body}"
end

実行する

Consumer1

send.rb
$ ruby receive.rb

Consumer2

send.rb
$ ruby receive.rb

Publisher1側

1秒おきにメッセージを送る

send.rb
for in `seq 1 100`; do sleep 1;ruby -rubygems send.rb pub1_$i; done

Publisher2側

2秒おきにメッセージを送る

send.rb
for in `seq 1 100`; do sleep 2;ruby -rubygems send.rb pub2_$i; done
実行結果

Publisher1、Publisher2から送られたメッセージがConsumer1、Consumer2で処理されています。

f:id:katashiyo515:20140921133054j:plain

試したところこんな動きをしていました。

Consumer1
Consumer2
結果
停止 停止 処理されずキューがたまる。Consumer起動時にたまったキューを処理
停止 稼働 Consumer2で処理
稼働 停止 Consumer1で処理
稼働 稼働 Consumer1、Consumer2で交互に処理(Round-Robin?)

 

おしまい。

参考

http://www.rabbitmq.com/