轉自:http://www.changeself.net/archives/rocketmq入門(3)拉取消息.html
RocketMQ不止可以直接推送消息,在消費端注冊監聽器進行監聽,還可以由消費端決定自己去拉取數據
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
/**
* PullConsumer,訂閱消息
*/
public
class
PullConsumer
{
//Java緩存
private
static
final
Map
<
MessageQueue
,
Long
>
offseTable
=
new
HashMap
<
MessageQueue
,
Long
>
(
)
;
public
static
void
main
(
String
[
]
args
)
throws
MQClientException
{
DefaultMQPullConsumer
consumer
=
new
DefaultMQPullConsumer
(
"PullConsumerGroup"
)
;
consumer
.
setNamesrvAddr
(
"127.0.0.1:9876"
)
;
consumer
.
start
(
)
;
//拉取訂閱主題的隊列,默認隊列大小是4
Set
<MessageQueue>
mqs
=
consumer
.
fetchSubscribeMessageQueues
(
"TopicTestMapBody"
)
;
for
(
MessageQueue
mq
:
mqs
)
{
System
.
out
.
println
(
"Consume from the queue: "
+
mq
)
;
SINGLE_MQ
:
while
(
true
)
{
try
{
PullResult
pullResult
=
consumer
.
pullBlockIfNotFound
(
mq
,
null
,
getMessageQueueOffset
(
mq
)
,
32
)
;
List
<MessageExt>
list
=
pullResult
.
getMsgFoundList
(
)
;
if
(
list
!=
null
&&
list
.
size
(
)
<
100
)
{
for
(
MessageExt
msg
:
list
)
{
System
.
out
.
println
(
SerializableInterface
.
deserialize
(
msg
.
getBody
(
)
)
)
;
}
}
System
.
out
.
println
(
pullResult
.
getNextBeginOffset
(
)
)
;
putMessageQueueOffset
(
mq
,
pullResult
.
getNextBeginOffset
(
)
)
;
switch
(
pullResult
.
getPullStatus
(
)
)
{
case
FOUND
:
// TODO
break
;
case
NO_MATCHED_MSG
:
break
;
case
NO_NEW_MSG
:
break
SINGLE_MQ
;
case
OFFSET_ILLEGAL
:
break
;
default
:
break
;
}
}
catch
(
Exception
e
)
{
e
.
printStackTrace
(
)
;
}
}
}
consumer
.
shutdown
(
)
;
}
private
static
void
putMessageQueueOffset
(
MessageQueue
mq
,
long
offset
)
{
offseTable
.
put
(
mq
,
offset
)
;
}
private
static
long
getMessageQueueOffset
(
MessageQueue
mq
)
{
Long
offset
=
offseTable
.
get
(
mq
)
;
if
(
offset
!=
null
)
{
System
.
out
.
println
(
offset
)
;
return
offset
;
}
return
0
;
}
|
剛開始的沒有細看PullResult對象,以為拉取到的結果沒有MessageExt對象還跑到群里面問別人,犯2了
特別要注意 靜態變量offsetTable的作用,拉取的是按照從offset(理解為下標)位置開始拉取,拉取N條,offsetTable記錄下次拉取的offset位置