NewLife.RocketMQ使用


百度連接rocketmq,然后報錯了

找到了內網的ip地址,報

 

 然后百度,修改brokerIP1的地址

 :https://www.cnblogs.com/smail-bao/p/6905460.html;

https://blog.csdn.net/chuanhejiu9868/article/details/100956381 

#進入rocketmq根目錄
cd incubator-rocketmq/distribution/target/apache-rocketmq
#編寫配置文件,並寫好配置
echo "brokerIP1=10.19.73.64的外網IP" > broker.properties
#啟動 mqnamesrv 
nohup sh bin/mqnamesrv &

#重點:mrbroker 啟動時通過 -c 加載配置文件
nohup sh bin/mqbroker -n ${namesrvIp}:9876 -c /opt/rocketmq/incubator-rocketmq/distribution/target/apache-rocketmq/broker.properties &

這里我們停止RocketMQ使用
./bin/mqshutdown broker
./bin/mqshutdown namesrv

  然后發布消息

 

 

 再測試下消費

代碼

 static void Main(string[] args)
        {
            //XTrace.UseConsole();

            //ConsumerData();

            for (int i = 0; i <= 10; i++)
            {
                Console.WriteLine("請發布消息");
                string wxMessage = Console.ReadLine();
                ProducerData(wxMessage, i.ToString());
            }
            Console.Read();
        }
        /// <summary>
        /// 生產者
        /// </summary>
        private static void ProducerData(string wxMessage, string key)
        {
            try
            {
                var producer = new Producer
                {
                    Topic = "nx_test1",
                    NameServerAddress = "ip:9876",
                    Group = "測試",
                    //Log = XTrace.Log,
                };
                producer.Start();

                //發送消息方式一,可以設置key
                var msg = new Message()
                {
                    BodyString = wxMessage,
                    Keys = key,
                    Tags = "TagC",
                    Flag = 0,
                    WaitStoreMsgOK = true
                };
                var data = producer.Publish(msg);
                //var data = producer.PublishAsync(msg);
                Console.WriteLine(JsonConvert.SerializeObject(data));
                //producer.Publish(JsonConvert.SerializeObject(wxMessage), "測試", "111", 6000);
                Console.WriteLine("生產者" + JsonConvert.SerializeObject(msg));
                //釋放連接
                producer.Dispose();
            }
            catch (Exception ex)
            {
                Console.WriteLine("寫入消息隊列出錯:" + ex.ToString());
            }
        }
        /// <summary>
        /// 消費者
        /// </summary>
        private static void ConsumerData()
        {
            var consumer = new Consumer
            {
                //Server = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet",
                //AccessKey = "LTAINsp1qKfO61c5",
                //SecretKey = "BvX6DpQffUz8xKIQ0u13EMxBW6YJmp",
                Topic = "nx_test1",
                Group = "測試",
                NameServerAddress = "ip:9876",
                BatchSize = 1,
                //Log = XTrace.Log,
            };

            consumer.OnConsume = (q, ms) =>
            {
                //string mInfo = $"BrokerName={q.BrokerName},QueueId={q.QueueId},Length={ms.Length}";
                //Console.WriteLine(mInfo);
                foreach (var item in ms.ToList())
                {
                    string msg = $"消息:msgId={item.MsgId},key={item.Keys},產生時間【{item.BornTimestamp.ToDateTime()}】,內容>{System.Text.Encoding.Default.GetString(item.Body)}";
                    Console.WriteLine(msg);
                }
                //   return false;//通知消息隊:不消費消息
                return true;        //通知消息隊:消費了消息
            };

            consumer.Start();
        }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM