java原生整合rocketmq


一,引入依赖

1
2
3
4
5
6
7
8
9
10
11
<dependency>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-client</artifactId>
     <version> 4.6 . 0 </version>
     <exclusions>
         <exclusion>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </exclusion>
     </exclusions>
</dependency>

 

 

二, 使用demo:

1,simple(简单消息生产)

/**
  * 以单向模式发送消息
  * 这种方式主要用在不特别关心发送结果的场景,例如日志发送
  */
public static void OnewayProducer() throws Exception {
     // 实例化消息生产者Producer
     DefaultMQProducer producer = new DefaultMQProducer( "simple_producerGroup_test2" );
     // 设置NameServer的地址
     producer.setNamesrvAddr( "10.131.236.179:19300;10.131.236.180:19301" );
     // 启动Producer实例
     producer.start();
     for ( int i = 0 ; i < 10 ; i++) {
         // 创建消息,并指定Topic,Tag和消息体
         Message msg = new Message( "simple_topic_test2" /* Topic */ ,
                 "TagA" /* Tag */ ,
                 ( "Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
         );
         // 发送单向消息,没有任何返回结果
         producer.sendOneway(msg);
 
     }
     // 如果不再发送消息,关闭Producer实例。
     producer.shutdown();
}
 
/**
  * 同步发送消息
  * 在重要的通知消息,SMS通知,SMS营销系统等广泛的场景中使用可靠的同步传输。
  */
public static void SyncProducer() throws Exception {
     // 实例化消息生产者Producer
     DefaultMQProducer producer = new DefaultMQProducer( "simple_producerGroup_test2" );
     // 设置NameServer的地址
     producer.setNamesrvAddr( "10.131.236.179:19300;10.131.236.180:19301" );
     // 启动Producer实例
     producer.start();
     for ( int i = 0 ; i < 100 ; i++) {
         // 创建消息,并指定Topic,Tag和消息体
         Message msg = new Message( "simple_topic_test2" /* Topic */ ,
                 "TagA" /* Tag */ ,
                 ( "Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
         );
         // 发送消息到一个Broker
         SendResult sendResult = producer.send(msg);
         // 通过sendResult返回消息是否成功送达
         System.out.printf( "%s%n" , sendResult);
     }
     // 如果不再发送消息,关闭Producer实例。
     producer.shutdown();
}
 
/**
  * 异步发送消息
  * 异步传输通常用于对时间敏感的业务场景中。
  */
public static void AsyncProducer () throws Exception {
     // 实例化消息生产者Producer
     DefaultMQProducer producer = new DefaultMQProducer( "simple_producerGroup_test3" );
     // 设置NameServer的地址
     producer.setNamesrvAddr( "10.131.236.179:19300;10.131.236.180:19301" );
     // 启动Producer实例
     producer.start();
     producer.setRetryTimesWhenSendAsyncFailed( 0 );
 
     int messageCount = 100 ;
     // 根据消息数量实例化倒计时计算器
     final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
     for ( int i = 0 ; i < messageCount; i++) {
         final int index = i;
         // 创建消息,并指定Topic,Tag和消息体
         Message msg = new Message( "simple_topic_test3" ,
                 "TagA" ,
                 "OrderID188" ,
                 "Hello world" .getBytes(RemotingHelper.DEFAULT_CHARSET));
         // SendCallback接收异步返回结果的回调
         producer.send(msg, new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {
                 System.out.printf( "%-10d OK %s %n" , index,
                         sendResult.getMsgId());
             }
             @Override
             public void onException(Throwable e) {
                 System.out.printf( "%-10d Exception %s %n" , index, e);
                 e.printStackTrace();
             }
         });
     }
     // 等待5s
     countDownLatch.await( 5 , TimeUnit.SECONDS);
     // 如果不再发送消息,关闭Producer实例。
     producer.shutdown();
}
 
 

2,filter(过滤消息)

 

过滤消息有两种方式:Tag 和 SQL 表达式

SQL 基本语法:

RocketMQ 只定义了一些基本语法来支持这个特性,支持扩展。

  • 数值比较,比如 >、>=、<、<=、BETWEEN、=;
  • 字符比较,比如 =、<>、IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND、OR、NOT。

常量支持的额类型为:

  • 数值,比如 123、3.1415;
  • 字符,比如 ‘abc’,必须用单引号包裹起来;
  • NULL,特殊的常量;
  • 布尔值,TRUE 或 FALSE。

只有使用 push 模式的消费者才能使用 SQL92 标准的 SQL 语句

注:消费者sql的使用需要rocketmq开启broker配置项的enablePropertyFilter=true,

默认没开启,如需开启使用前提前找对应开通人处理

生产者代码:

         // 实例化消息生产者Producer
         DefaultMQProducer producer = new DefaultMQProducer( "filter_producerGroup_test1" );
         // 设置NameServer的地址
         producer.setNamesrvAddr( "10.131.236.179:19300;10.131.236.180:19301" );
         // 启动Producer实例
         producer.start();
         for ( int i= 0 ;i< 10 ;i++) {
             Message msg = new Message( "filter_topic_test1" ,
                     "tagA" ,
                     ( "Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
             );
             // 设置一些属性
             msg.putUserProperty( "a" , String.valueOf(i));
             SendResult sendResult = producer.send(msg);
//            System.out.println(sendResult.getMessageQueue());
             System.out.println(String.format( "SendResult status:%s,brokerName:%s, queueId:%d, queueOffset:%s" ,
                     sendResult.getSendStatus(),
                     sendResult.getMessageQueue().getBrokerName(),sendResult.getMessageQueue().getQueueId(),
                     sendResult.getQueueOffset()));
         }
 
         // 如果不再发送消息,关闭Producer实例。
         producer.shutdown();

消费者代码:

// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "filter_consumerGroup_test1" );
// 设置NameServer的地址
consumer.setNamesrvAddr( "10.131.236.179:19300;10.131.236.180:19301" );
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
// 需要配置broker的enablePropertyFilter=true
consumer.subscribe( "filter_topic_test1" , MessageSelector.bySql( "a between 0 and 3" ));
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener( new MessageListenerConcurrently() {
     @Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
         System.out.printf( "%s Receive New Messages: %s %n" , Thread.currentThread().getName(), msgs);
         // 标记该消息已经被成功消费
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }
});
// 启动消费者实例
consumer.start();
System.out.printf( "Consumer Started.%n" );

 

3,batch(批量发送消息)

批量发送消息能显著提高传递小消息的性能,限制是这些批量消息应该有相同的 Topic,相同的 waitStoreMsgOK,而且不能是延时消息,此外,这一批消息的总大小不应超过 4MB。

 

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer( "batch_producerGroup_test2" );
// 设置NameServer的地址
producer.setNamesrvAddr( "10.131.236.179:19300;10.131.236.180:19301" );
// 启动Producer实例
producer.start();
String topic = "batch_topic_test2" ;
//批量处理1--每次批处理不超过4M
List<Message> messages = new ArrayList<>();
messages.add( new Message(topic, "TagA" , "OrderID001" , "Hello world 0" .getBytes()));
messages.add( new Message(topic, "TagA" , "OrderID002" , "Hello world 1" .getBytes()));
messages.add( new Message(topic, "TagA" , "OrderID003" , "Hello world 2" .getBytes()));
//批量处理2
//把大的消息分裂成若干个小的消息
List<Message> messages = new ArrayList<>();
for ( int i= 0 ;i< 1000 ;i++){
     //如果每条消息是1kb字节数据 ,限制4M相当于400条数据处理一次,1000条需要处理三次
     String msg= "--10kb字节的数据--" ;
     messages.add( new Message(topic, "TagA" , msg, msg.getBytes()));
}
ListSplitter splitter = new ListSplitter(messages);
int j= 0 ;
while (splitter.hasNext()) {
     try {
         List<Message>  listItem = splitter.next();
         producer.send(listItem);
         System.out.println( "---------------------------" + "批处理第" +j+ "次" + "---------------------------" );
         j++;
     } catch (Exception e) {
         e.printStackTrace();
         //处理error
     }
}
try {
     producer.send(messages);
} catch (Exception e) {
     e.printStackTrace();
     //处理error
}
producer.shutdown();

消息分割工具类:

public class ListSplitter implements Iterator<List<Message>> {
     private final int SIZE_LIMIT = 4 * 1024 * 1024 //4M
     private final List<Message> messages;
     private int currIndex;
 
     public ListSplitter(List<Message> messages) {
         this .messages = messages;
     }
 
     @Override
     public boolean hasNext() {
         return currIndex < messages.size();
     }
 
     @Override
     public List<Message> next() {
         int startIndex = getStartIndex();
         int nextIndex = startIndex;
         int totalSize = 0 ;
         for (; nextIndex < messages.size(); nextIndex++) {
             Message message = messages.get(nextIndex);
             int tmpSize = calcMessageSize(message);
             if (tmpSize + totalSize > SIZE_LIMIT) {
                 break ;
             } else {
                 totalSize += tmpSize;
             }
         }
         List<Message> subList = messages.subList(startIndex, nextIndex);
         currIndex = nextIndex;
         return subList;
     }
 
     @Override
     public void remove() {
 
     }
 
     private int getStartIndex() {
         Message currMessage = messages.get(currIndex);
         int tmpSize = calcMessageSize(currMessage);
         if (tmpSize > SIZE_LIMIT){
             //发送的数据过大,SIZE_LIMIT需要设置合理,不能小于发送的消息里的单条数据
             return currIndex;
         }
         while (tmpSize > SIZE_LIMIT) {
             currIndex += 1 ;
             Message message = messages.get(currIndex);
             tmpSize = calcMessageSize(message);
         }
         return currIndex;
     }
 
     private int calcMessageSize(Message message) {
         int tmpSize = message.getTopic().length() + message.getBody().length;
         Map<String, String> properties = message.getProperties();
         for (Map.Entry<String, String> entry : properties.entrySet()) {
             tmpSize += entry.getKey().length() + entry.getValue().length();
         }
         tmpSize = tmpSize + 20 ; // 增加⽇日志的开销20字节
         return tmpSize;
     }
}

 

4,order(有序消息)

 

订单号相同的消息会被先后发送到同一个broker的同一个队列中,保证顺序性

生产代码:

// 订单流程
String[] msgs = new String[] { "创建消息" , "付款消息" , "推送消息" , "完成消息" };
for ( int i = 0 ; i < msgs.length; i++) {
     // 创建消息, 指定Topic、Tag、key和消息体
     Message msg = new Message( "OrderTopic" , "Order" , "i" + i, msgs[i].getBytes( "UTF-8" ));
     // 发送消息到一个Broker, 参数二: 消息队列的选择器, 参数三: 选择的业务标识
     SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
         @Override
         public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
             long orderId = ( long ) arg;
             long index = orderId % mqs.size(); // 取模
             return mqs.get(( int ) index);
         }
     }, orderId); // 动态传入订单id
}

消费代码:

// 订阅Topic、Tag
  consumer.subscribe( "OrderTopic" , "*" );
// 设置回调函数, 处理消息
  consumer.registerMessageListener( new MessageListenerOrderly() {
      @Override
      public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
          for (MessageExt msg : msgs) {
              // 消息
              String message = new String(msg.getBody());
          }
          return ConsumeOrderlyStatus.SUCCESS;
      }
  });

 

5,scheduled(延时消息)

应用场景:

比如电商场景,提交了一个订单就可以发送一个延时消息,1h 后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

默认时延配置:

 

messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";对应1~18,18个等级,1是1s,2是5s...,可通过配置修改时延的时长,可在申请开通时找对应开通人开通

生产者代码:

// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer( "scheduled_producerGroup_test1" );
producer.setNamesrvAddr( "10.131.236.179:19300;10.131.236.180:19301" );
// 启动生产者
producer.start();
int totalMessagesToSend = 100 ;
for ( int i = 0 ; i < totalMessagesToSend; i++) {
     Message message = new Message( "scheduled_topic_test1" , ( "Hello scheduled message " + i).getBytes());
     // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
     message.setDelayTimeLevel( 3 );
     // 发送消息
     producer.send(message);
}
// 关闭生产者
producer.shutdown();

 

6,transaction(事务消息)

  1. 事务消息不支持延时消息和批量消息。
  2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。
  3. 事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
  4. 事务性消息可能不止一次被检查或消费。
  5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

生产者代码:

// 使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,设置自定义线程池来处理这些检查请求。
// 执行本地事务后、需要根据执行结果对消息队列进行回复。
// 回传的事务状态:
// TransactionStatus.CommitTransaction:   提交事务,它允许消费者消费此消息。
// TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
// TransactionStatus.Unknown:             中间状态,它代表需要检查消息队列来确定状态。
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer( "transaction_producerGroup_test1" );
ExecutorService executorService = new ThreadPoolExecutor( 2 , 5 , 100 , TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>( 2000 ), new ThreadFactory() {
     @Override
     public Thread newThread(Runnable r) {
         Thread thread = new Thread(r);
         thread.setName( "client-transaction-msg-check-thread" );
         return thread;
     }
});
producer.setExecutorService(executorService);
//
producer.setTransactionListener(transactionListener);
 
producer.setNamesrvAddr( "10.131.236.179:19300;10.131.236.180:19301" );
 
producer.start();
String[] tags = new String[] { "TagA" , "TagB" , "TagC" , "TagD" , "TagE" };
for ( int i = 0 ; i < 10 ; i++) {
     try {
         Message msg =
                 new Message( "transaction_topic_test1" , tags[i % tags.length], "KEY" + i,
                         ( "Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
         SendResult sendResult = producer.sendMessageInTransaction(msg, null );
         System.out.printf( "%s%n" , sendResult);
         Thread.sleep( 10 );
     } catch (MQClientException | UnsupportedEncodingException e) {
         e.printStackTrace();
     }
}
for ( int i = 0 ; i < 1000 ; i++) {
     Thread.sleep( 1000 );
}
producer.shutdown();

事务消息监听处理逻辑 :

public class TransactionListenerImpl implements TransactionListener {
     // 当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。
     // 返回三个事务状态之一。
     // checkLocalTransaction 方法用于检查本地事务状态,并回应消息队列的检查请求。
     // 也是返回三个事务状态之一。
     private AtomicInteger transactionIndex = new AtomicInteger( 0 );
     private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
     @Override
     public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
         System.out.printf( "#### executeLocalTransaction is executed, msgTransactionId=%s %n" ,
                 msg.getTransactionId());
 
         int value = transactionIndex.getAndIncrement();
         int status = value % 3 ;
         localTrans.put(msg.getTransactionId(), status);
         return LocalTransactionState.UNKNOW;
     }
     @Override
     public LocalTransactionState checkLocalTransaction(MessageExt msg) {
         Integer status = localTrans.get(msg.getTransactionId());
         if ( null != status) {
             switch (status) {
                 case 0 :
                     System.out.printf( "    # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n" , msg.getMsgId());
                     return LocalTransactionState.UNKNOW;
                 case 1 :
                     System.out.printf( "    # ROLLBACK # Simulating %s related local transaction exec failed! %n" , msg.getMsgId());
                     return LocalTransactionState.COMMIT_MESSAGE;
                 case 2 :
                     System.out.printf( "    # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n" );
                     return LocalTransactionState.ROLLBACK_MESSAGE;
             }
         }
         return LocalTransactionState.COMMIT_MESSAGE;
     }
}

 

7,开启消费端的广播模式(默认集群模式)

使用广播模式有两个条件:

1, 创建消费者组的时候开启consumerBroadcastEnable=true

2, 代码里加上

consumer.setMessageModel(MessageModel.BROADCASTING);


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM