RocketMq入門 配置的acl沒有生效


修改RocketMq源碼  distribution/conf/broker.conf 和 distribution/conf/plain_acl.yml之后,配置文件未生效

在啟動broker.startup時,配置運行環境

 

 Program arguments: 配置文件 輸入 -c C:\Users\Administrator\Desktop\rocketmq-all-4.7.1-source-release\distribution\conf\broker.conf

Environment variables:設置環境變量 輸入 ROCKETMQ_HOME=C:\Users\Administrator\Desktop\rocketmq-all-4.7.1-source-release\distribution

rocketmq中只有broker需要設置環境變量

producer使用acl:

public static void main(String[] args) throws Exception {

    //Instantiate with a producer group name.
    DefaultMQProducer producer = new
            DefaultMQProducer("please_rename_unique_group_name",getAclRPCHook());

    // Specify name server addresses.
    producer.setNamesrvAddr("localhost:9876");

    //Launch the instance.
    producer.start();

    for (int i = 0; i < 10; i++) {
        //Create a message instance, specifying topic, tag and message body.
        Message msg = new Message("TopicTest",
                "TagA",
                ("Hello RocketMQ wangshuai").getBytes(RemotingHelper.DEFAULT_CHARSET));

        //Call send message to deliver message to one of brokers.
        SendResult sendResult = producer.send(msg);

        System.out.printf("%s%n", sendResult);
    }
    //Shut down once the producer instance is not longer in use.
    producer.shutdown();
}
static RPCHook getAclRPCHook(){
    return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));
}

 

consumer使用acl:

public static void main(String[] args) throws InterruptedException, MQClientException {

   // Instantiate with specified consumer group name.
   DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name",getAclRPCHook(),new AllocateMessageQueueAveragely());

   // Specify name server addresses.
   consumer.setNamesrvAddr("localhost:9876");

   // Subscribe one more more topics to consume.
   consumer.subscribe("TopicTest", "*");

   // Register callback to execute on arrival of messages fetched from brokers.
//      注冊回調 以便獲取到達broker的消息
   consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
           System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
           return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

     //Launch the consumer instance.
     consumer.start();

     System.out.printf("Consumer Started.%n");
}
static RPCHook getAclRPCHook(){
    return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));
}

 

  

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM