RabbitMQ是基於erlang開發的消息服務,官網為:https://www.rabbitmq.com,RabbitMQ要依賴erlang運行,所以要先安裝erlang環境,rabbitmq可以用rpm或通用二進制包安裝,這里使用二進制包的方式安裝,版本為3.8.1,對應的erlang版本為22.1,注意版本的對應關系,如果相差太大可能會無法啟動服務,rabbit和erlang的版本對應關系參考:https://www.rabbitmq.com/which-erlang.html
首先安裝erlang,在centos下使用yum安裝的erlang版本太低,rabbitmq無法啟動,這里使用編譯源碼的方式安裝,參考文檔為:https://github.com/erlang/otp/blob/maint/HOWTO/INSTALL.md,文檔寫的很詳細,erlang包下載地址:https://www.erlang.org/downloads,這里下載的包為:otp_src_22.1.tar.gz,安裝之前注意下面的一些必要環境:
1. GNU make,這個一般機器都有
2. gcc編譯環境,這里用的4.8版本
3. Perl 5,默認是5.16
4. ncurses-devel開發包,安裝命令: yum install ncurses-devel
5. sed命令,幾乎所有linux系統都有
6. openssl開發包,版本要大於0.9.8,安裝命令: yum install openssl-devel
基本上上面這些就夠了,其他的都是一些附加的擴展或者編譯文檔用到的,具體組件參考上面的安裝鏈接頁面
執行下面的命令解壓erlang源碼:
tar -xvzf otp_src_22.1.tar.gz cd otp_src_22.1/ export ERL_TOP=`pwd`
開始編譯源碼,安裝目錄設置為:/usr/local/erlang
./configure --prefix=/usr/local/erlang make make install
現在erlang就安裝到/usr/local/erlang了,這時候要配置環境變量方便直接調用erl命令,導入變量命令: export PATH=$PATH:/usr/local/erlang/bin ,可以將這條指令添加到profile配置文件然后source永久生效,添加變量后執行erl如果正常進入提示符說明環境可以了:
然后下一步開始安裝rabbitmq,這里安裝包的名稱為:rabbitmq-server-generic-unix-3.8.1.tar.xz,需要先用xz命令進行解壓:
xz -d rabbitmq-server-generic-unix-3.8.1.tar.xz tar -xvf rabbitmq-server-generic-unix-3.8.1.tar
執行之后當前目錄下會解壓出來rabbitmq_server-3.8.1這個目錄,直接移動到安裝的位置即可,比如: mv rabbitmq_server-3.8.1 /usr/local 將rabbitmq安裝到/usr/local/下,在rabbitmq服務啟動前可以手動去調整系統最大文件數等參數來優化mq的性能,具體參考:https://www.rabbitmq.com/install-generic-unix.html,同樣為了方便可以將/usr/local/rabbitmq_server-3.8.1/sbin目錄添加到環境變量
其實現在已經可以啟動服務了,目前參數都是默認的,要配置自定義參數還缺少一個配置文件,這里配置文件默認位置為:/usr/local/rabbitmq_server-3.8.1/etc/rabbitmq/rabbitmq.conf,這個文件不存在需要手動創建,具體的配置示例參考:https://github.com/rabbitmq/rabbitmq-server/blob/master/docs/rabbitmq.conf.example,現在可以將配示例全部復制到新建的配置文件中,也就是安裝目錄下的etc/rabbitmq/rabbitmq.conf,比如端口號默認是5672,我們可以修改為5682,配置項為:listeners.tcp.default,如下:
更多配置參考:https://www.rabbitmq.com/configure.html,然后可以啟動rabbitmq服務了,命令如下:
前台啟動: ./sbin/rabbitmq-server
后台啟動: ./sbin/rabbitmq-server -detached
停止服務: sbin/rabbitmqctl shutdown
啟動服務之后可以測試一下消息隊列方式是不是好用,這是使用python pika模塊,生產消息代碼如下:new_task.py
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost', port=5682)) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) for i in range(100): message = '%d. Hello!' % i channel.basic_publish( exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print("Queue sent %r" % message) connection.close()
消費消息代碼如下:worker.py
#!/usr/bin/env python import pika import time connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost',port=5682)) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print("Received %r" % body) #time.sleep(body.count(b'.')) #print("Done") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='task_queue', on_message_callback=callback) channel.start_consuming()
這兩段代碼就是官網上給出的work queues示例代碼,首先啟動消費的腳本,這里開兩個窗口分別執行python3 worker.py,啟動后應該是阻塞的,然后再啟動生產消息的腳本python3 new_task.py,我這里用的是python3,具體根據自己的環境執行,執行后如果生產消息正常退出並且兩個worker分別輸出不一樣序號的消息,也就是說生產的消息被分割到兩個進程處理,每個進程消費到數據都不重復,這樣的話說明測試通過,服務是正常的
實際上rabbitmq支持5種不同的消息模型,可以根據不同的場景來應用,這幾種消息模型如下:
這里只是簡單敘述了rabbitmq非常初級的應用,更深入的后續再補充.