一,引入依赖
1
2
3
4
5
6
7
8
9
10
11
|
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>
4.6
.
0
</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
|
二, 使用demo:
1,simple(简单消息生产)
/**
* 以单向模式发送消息
* 这种方式主要用在不特别关心发送结果的场景,例如日志发送
*/
public
static
void
OnewayProducer()
throws
Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer =
new
DefaultMQProducer(
"simple_producerGroup_test2"
);
// 设置NameServer的地址
producer.setNamesrvAddr(
"10.131.236.179:19300;10.131.236.180:19301"
);
// 启动Producer实例
producer.start();
for
(
int
i =
0
; i <
10
; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg =
new
Message(
"simple_topic_test2"
/* Topic */
,
"TagA"
/* Tag */
,
(
"Hello RocketMQ "
+ i).getBytes(RemotingHelper.DEFAULT_CHARSET)
/* Message body */
);
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
/**
* 同步发送消息
* 在重要的通知消息,SMS通知,SMS营销系统等广泛的场景中使用可靠的同步传输。
*/
public
static
void
SyncProducer()
throws
Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer =
new
DefaultMQProducer(
"simple_producerGroup_test2"
);
// 设置NameServer的地址
producer.setNamesrvAddr(
"10.131.236.179:19300;10.131.236.180:19301"
);
// 启动Producer实例
producer.start();
for
(
int
i =
0
; i <
100
; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg =
new
Message(
"simple_topic_test2"
/* Topic */
,
"TagA"
/* Tag */
,
(
"Hello RocketMQ "
+ i).getBytes(RemotingHelper.DEFAULT_CHARSET)
/* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf(
"%s%n"
, sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
/**
* 异步发送消息
* 异步传输通常用于对时间敏感的业务场景中。
*/
public
static
void
AsyncProducer ()
throws
Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer =
new
DefaultMQProducer(
"simple_producerGroup_test3"
);
// 设置NameServer的地址
producer.setNamesrvAddr(
"10.131.236.179:19300;10.131.236.180:19301"
);
// 启动Producer实例
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(
0
);
int
messageCount =
100
;
// 根据消息数量实例化倒计时计算器
final
CountDownLatch2 countDownLatch =
new
CountDownLatch2(messageCount);
for
(
int
i =
0
; i < messageCount; i++) {
final
int
index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg =
new
Message(
"simple_topic_test3"
,
"TagA"
,
"OrderID188"
,
"Hello world"
.getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg,
new
SendCallback() {
@Override
public
void
onSuccess(SendResult sendResult) {
System.out.printf(
"%-10d OK %s %n"
, index,
sendResult.getMsgId());
}
@Override
public
void
onException(Throwable e) {
System.out.printf(
"%-10d Exception %s %n"
, index, e);
e.printStackTrace();
}
});
}
// 等待5s
countDownLatch.await(
5
, TimeUnit.SECONDS);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
|
2,filter(过滤消息)
过滤消息有两种方式:Tag 和 SQL 表达式
SQL 基本语法:
RocketMQ 只定义了一些基本语法来支持这个特性,支持扩展。
- 数值比较,比如 >、>=、<、<=、BETWEEN、=;
- 字符比较,比如 =、<>、IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND、OR、NOT。
常量支持的额类型为:
- 数值,比如 123、3.1415;
- 字符,比如 ‘abc’,必须用单引号包裹起来;
- NULL,特殊的常量;
- 布尔值,TRUE 或 FALSE。
只有使用 push 模式的消费者才能使用 SQL92 标准的 SQL 语句
注:消费者sql的使用需要rocketmq开启broker配置项的enablePropertyFilter=true,
默认没开启,如需开启使用前提前找对应开通人处理
生产者代码:
// 实例化消息生产者Producer
DefaultMQProducer producer =
new
DefaultMQProducer(
"filter_producerGroup_test1"
);
// 设置NameServer的地址
producer.setNamesrvAddr(
"10.131.236.179:19300;10.131.236.180:19301"
);
// 启动Producer实例
producer.start();
for
(
int
i=
0
;i<
10
;i++) {
Message msg =
new
Message(
"filter_topic_test1"
,
"tagA"
,
(
"Hello RocketMQ "
+ i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
msg.putUserProperty(
"a"
, String.valueOf(i));
SendResult sendResult = producer.send(msg);
// System.out.println(sendResult.getMessageQueue());
System.out.println(String.format(
"SendResult status:%s,brokerName:%s, queueId:%d, queueOffset:%s"
,
sendResult.getSendStatus(),
sendResult.getMessageQueue().getBrokerName(),sendResult.getMessageQueue().getQueueId(),
sendResult.getQueueOffset()));
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
|
消费者代码:
// 实例化消费者
DefaultMQPushConsumer consumer =
new
DefaultMQPushConsumer(
"filter_consumerGroup_test1"
);
// 设置NameServer的地址
consumer.setNamesrvAddr(
"10.131.236.179:19300;10.131.236.180:19301"
);
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
// 需要配置broker的enablePropertyFilter=true
consumer.subscribe(
"filter_topic_test1"
, MessageSelector.bySql(
"a between 0 and 3"
));
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(
new
MessageListenerConcurrently() {
@Override
public
ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf(
"%s Receive New Messages: %s %n"
, Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费
return
ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.printf(
"Consumer Started.%n"
);
|
3,batch(批量发送消息)
批量发送消息能显著提高传递小消息的性能,限制是这些批量消息应该有相同的 Topic,相同的 waitStoreMsgOK,而且不能是延时消息,此外,这一批消息的总大小不应超过 4MB。
// 实例化消息生产者Producer
DefaultMQProducer producer =
new
DefaultMQProducer(
"batch_producerGroup_test2"
);
// 设置NameServer的地址
producer.setNamesrvAddr(
"10.131.236.179:19300;10.131.236.180:19301"
);
// 启动Producer实例
producer.start();
String topic =
"batch_topic_test2"
;
//批量处理1--每次批处理不超过4M
List<Message> messages =
new
ArrayList<>();
messages.add(
new
Message(topic,
"TagA"
,
"OrderID001"
,
"Hello world 0"
.getBytes()));
messages.add(
new
Message(topic,
"TagA"
,
"OrderID002"
,
"Hello world 1"
.getBytes()));
messages.add(
new
Message(topic,
"TagA"
,
"OrderID003"
,
"Hello world 2"
.getBytes()));
//批量处理2
//把大的消息分裂成若干个小的消息
List<Message> messages =
new
ArrayList<>();
for
(
int
i=
0
;i<
1000
;i++){
//如果每条消息是1kb字节数据 ,限制4M相当于400条数据处理一次,1000条需要处理三次
String msg=
"--10kb字节的数据--"
;
messages.add(
new
Message(topic,
"TagA"
, msg, msg.getBytes()));
}
ListSplitter splitter =
new
ListSplitter(messages);
int
j=
0
;
while
(splitter.hasNext()) {
try
{
List<Message> listItem = splitter.next();
producer.send(listItem);
System.out.println(
"---------------------------"
+
"批处理第"
+j+
"次"
+
"---------------------------"
);
j++;
}
catch
(Exception e) {
e.printStackTrace();
//处理error
}
}
try
{
producer.send(messages);
}
catch
(Exception e) {
e.printStackTrace();
//处理error
}
producer.shutdown();
|
消息分割工具类:
public
class
ListSplitter
implements
Iterator<List<Message>> {
private
final
int
SIZE_LIMIT =
4
*
1024
*
1024
;
//4M
private
final
List<Message> messages;
private
int
currIndex;
public
ListSplitter(List<Message> messages) {
this
.messages = messages;
}
@Override
public
boolean
hasNext() {
return
currIndex < messages.size();
}
@Override
public
List<Message> next() {
int
startIndex = getStartIndex();
int
nextIndex = startIndex;
int
totalSize =
0
;
for
(; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int
tmpSize = calcMessageSize(message);
if
(tmpSize + totalSize > SIZE_LIMIT) {
break
;
}
else
{
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(startIndex, nextIndex);
currIndex = nextIndex;
return
subList;
}
@Override
public
void
remove() {
}
private
int
getStartIndex() {
Message currMessage = messages.get(currIndex);
int
tmpSize = calcMessageSize(currMessage);
if
(tmpSize > SIZE_LIMIT){
//发送的数据过大,SIZE_LIMIT需要设置合理,不能小于发送的消息里的单条数据
return
currIndex;
}
while
(tmpSize > SIZE_LIMIT) {
currIndex +=
1
;
Message message = messages.get(currIndex);
tmpSize = calcMessageSize(message);
}
return
currIndex;
}
private
int
calcMessageSize(Message message) {
int
tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for
(Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize +
20
;
// 增加⽇日志的开销20字节
return
tmpSize;
}
}
|
4,order(有序消息)
订单号相同的消息会被先后发送到同一个broker的同一个队列中,保证顺序性
生产代码:
// 订单流程
String[] msgs =
new
String[] {
"创建消息"
,
"付款消息"
,
"推送消息"
,
"完成消息"
};
for
(
int
i =
0
; i < msgs.length; i++) {
// 创建消息, 指定Topic、Tag、key和消息体
Message msg =
new
Message(
"OrderTopic"
,
"Order"
,
"i"
+ i, msgs[i].getBytes(
"UTF-8"
));
// 发送消息到一个Broker, 参数二: 消息队列的选择器, 参数三: 选择的业务标识
SendResult sendResult = producer.send(msg,
new
MessageQueueSelector() {
@Override
public
MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
long
orderId = (
long
) arg;
long
index = orderId % mqs.size();
// 取模
return
mqs.get((
int
) index);
}
}, orderId);
// 动态传入订单id
}
|
消费代码:
// 订阅Topic、Tag
consumer.subscribe(
"OrderTopic"
,
"*"
);
// 设置回调函数, 处理消息
consumer.registerMessageListener(
new
MessageListenerOrderly() {
@Override
public
ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for
(MessageExt msg : msgs) {
// 消息
String message =
new
String(msg.getBody());
}
return
ConsumeOrderlyStatus.SUCCESS;
}
});
|
5,scheduled(延时消息)
应用场景:
比如电商场景,提交了一个订单就可以发送一个延时消息,1h 后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
默认时延配置:
messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";对应1~18,18个等级,1是1s,2是5s...,可通过配置修改时延的时长,可在申请开通时找对应开通人开通
生产者代码:
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer =
new
DefaultMQProducer(
"scheduled_producerGroup_test1"
);
producer.setNamesrvAddr(
"10.131.236.179:19300;10.131.236.180:19301"
);
// 启动生产者
producer.start();
int
totalMessagesToSend =
100
;
for
(
int
i =
0
; i < totalMessagesToSend; i++) {
Message message =
new
Message(
"scheduled_topic_test1"
, (
"Hello scheduled message "
+ i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(
3
);
// 发送消息
producer.send(message);
}
// 关闭生产者
producer.shutdown();
|
6,transaction(事务消息)
- 事务消息不支持延时消息和批量消息。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的
transactionCheckMax
参数来修改此限制。如果已经检查某条消息超过 N 次的话( N =transactionCheckMax
) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionalMessageCheckListener
类来修改这个行为。 - 事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于
transactionTimeout
参数。 - 事务性消息可能不止一次被检查或消费。
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
生产者代码:
// 使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,设置自定义线程池来处理这些检查请求。
// 执行本地事务后、需要根据执行结果对消息队列进行回复。
// 回传的事务状态:
// TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
// TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
// TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
TransactionListener transactionListener =
new
TransactionListenerImpl();
TransactionMQProducer producer =
new
TransactionMQProducer(
"transaction_producerGroup_test1"
);
ExecutorService executorService =
new
ThreadPoolExecutor(
2
,
5
,
100
, TimeUnit.SECONDS,
new
ArrayBlockingQueue<Runnable>(
2000
),
new
ThreadFactory() {
@Override
public
Thread newThread(Runnable r) {
Thread thread =
new
Thread(r);
thread.setName(
"client-transaction-msg-check-thread"
);
return
thread;
}
});
producer.setExecutorService(executorService);
//
producer.setTransactionListener(transactionListener);
producer.setNamesrvAddr(
"10.131.236.179:19300;10.131.236.180:19301"
);
producer.start();
String[] tags =
new
String[] {
"TagA"
,
"TagB"
,
"TagC"
,
"TagD"
,
"TagE"
};
for
(
int
i =
0
; i <
10
; i++) {
try
{
Message msg =
new
Message(
"transaction_topic_test1"
, tags[i % tags.length],
"KEY"
+ i,
(
"Hello RocketMQ "
+ i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg,
null
);
System.out.printf(
"%s%n"
, sendResult);
Thread.sleep(
10
);
}
catch
(MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for
(
int
i =
0
; i <
1000
; i++) {
Thread.sleep(
1000
);
}
producer.shutdown();
|
事务消息监听处理逻辑 :
public
class
TransactionListenerImpl
implements
TransactionListener {
// 当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。
// 返回三个事务状态之一。
// checkLocalTransaction 方法用于检查本地事务状态,并回应消息队列的检查请求。
// 也是返回三个事务状态之一。
private
AtomicInteger transactionIndex =
new
AtomicInteger(
0
);
private
ConcurrentHashMap<String, Integer> localTrans =
new
ConcurrentHashMap<>();
@Override
public
LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.printf(
"#### executeLocalTransaction is executed, msgTransactionId=%s %n"
,
msg.getTransactionId());
int
value = transactionIndex.getAndIncrement();
int
status = value %
3
;
localTrans.put(msg.getTransactionId(), status);
return
LocalTransactionState.UNKNOW;
}
@Override
public
LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if
(
null
!= status) {
switch
(status) {
case
0
:
System.out.printf(
" # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n"
, msg.getMsgId());
return
LocalTransactionState.UNKNOW;
case
1
:
System.out.printf(
" # ROLLBACK # Simulating %s related local transaction exec failed! %n"
, msg.getMsgId());
return
LocalTransactionState.COMMIT_MESSAGE;
case
2
:
System.out.printf(
" # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n"
);
return
LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return
LocalTransactionState.COMMIT_MESSAGE;
}
}
|
7,开启消费端的广播模式(默认集群模式)
使用广播模式有两个条件:
1, 创建消费者组的时候开启consumerBroadcastEnable=true
2, 代码里加上
consumer.setMessageModel(MessageModel.BROADCASTING);