【源码分析】RocketMQ消息追踪源码详解

消息追踪有生产者发送消息追踪和消费者消费消息追踪,因为他们原理差不多,并且容易出现问题的地方主要集中在消费者消息追踪,所以这篇文章主要去讲一下消费者的消息追踪,而消费者又分为PullConsumer和PushConsumer,在消息追踪这块基本是一样的,所以最终选择的是PushConsumer的消费消息的消息追踪

消息追踪服务的初始化

消息追踪的初始化是随着消费者初始化进行初始化的,见下面的代码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
DefaultMQPushConsumer.java
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) {// 是否开启消息追踪的标志
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook);// 行1.初始化异步Dispatcher
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;// 行2.把这个dispatcher记录到当前消费者下面的traceDispatcher这里需要记住
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));// 行3.并且给当前的consumer注册了一个钩子
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}

先分析行1:这里初始化了一个Dispatcher,我们来看一下这里是怎么进行初始化的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
AsyncTraceDispatcher.java
public AsyncTraceDispatcher(String group, Type type,String traceTopicName, RPCHook rpcHook) {
// queueSize is greater than or equal to the n power of 2 of value
this.queueSize = 2048;
this.batchSize = 100;
this.maxMsgSize = 128000;
this.discardCount = new AtomicLong(0L);
this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
this.group = group;
this.type = type;

this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);// 这个队列很重要要记住这个Dispatcher是有一个队列的这个队列里存的都是Runnable
if (!UtilAll.isBlank(traceTopicName)) {
this.traceTopicName = traceTopicName;
} else {
this.traceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC;
}
this.traceExecutor = new ThreadPoolExecutor(//
10, //
20, //
1000 * 60, //
TimeUnit.MILLISECONDS, //
this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_"));// 初始化一个线程池
traceProducer = getAndCreateTraceProducer(rpcHook);// 创建了一个生产者用于发送消息追踪相关消息
}

上面就初始化好了一个AsyncTraceDispatcher,然后行2把这个dispatcher给保存到了当前消费者。

下面看一下行3中的这个Hook是个什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ConsumeMessageTraceHookImpl.java
public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
public ConsumeMessageTraceHookImpl(TraceDispatcher localDispatcher) {
this.localDispatcher = localDispatcher;
}
}

//先看下ConsumeMessageHook这个接口
public interface ConsumeMessageHook {
String hookName();

void consumeMessageBefore(final ConsumeMessageContext context);

void consumeMessageAfter(final ConsumeMessageContext context);
}

有个consumeMessageBefore和consumeMessageAfter,这名字起的还是很形象的,大致猜出他们是消息消费前消费后会去调用这两个方法。

初始化这里先知道这些就可以了,总结来说记住如下三点就可以了:

  1. 初始化了一个AsyncTraceDispatcher和一个ConsumeMessageTraceHookImpl并保存到了当前的消费者上
  2. 其中AsyncTraceDispatcher里面有一个存Runnable的队列,一个线程池,一个生产者;
  3. ConsumeMessageTraceHookImpl有两个方法发消息前和发消息后会调用,并且也保存了上面的Dispatcher

消息追踪服务的启动

当消费者初始化完毕后会调用其自己的start()方法进行消费者的启动,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
DefaultMQPushConsumer.java
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultMQPushConsumerImpl.start();//行1.消费者启动,本文咱不细看,就知道启动了就可以
if (null != traceDispatcher) {//通过判断是否有dispatcher来决定是否启动消息追踪,初始化那里我们知道这里是被保存了的
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());//行2.这里开始启动这个Dispatcher(调度器)
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}

来看下行2Dispatcher是怎么启动的

1
2
3
4
5
6
7
8
9
10
11
12
13
AsyncTraceDispatcher.java
public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start();
}//行2.1.启动一个消息追踪的生产者
this.accessChannel = accessChannel;
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);//行2.2 开启了一个守护进程去运行AsyncRunnable
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
}

在行2.2我们看到有一个AsyncRunnable,看下这个类的run方法是啥

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
AsyncTraceDispatcher.java

class AsyncRunnable implements Runnable {
private boolean stopped;

@Override
public void run() {
while (!stopped) {
List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
for (int i = 0; i < batchSize; i++) {
TraceContext context = null;
try {
//get trace data element from blocking Queue — traceContextQueue
context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
if (context != null) {
contexts.add(context);
} else {
break;
}
}
if (contexts.size() > 0) {
AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
traceExecutor.submit(request);
} else if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true;
}
}

}
}

在这里看到了一个熟悉的traceContextQueue,这也是初始化的时候我说需要记下来的,就是从这里面每5毫秒取出context然后包装成AsyncAppenderRequest,再扔到初始化的时候初始化的线程池。

小结:当消费者启动的时候顺带开了一个守护进程,这个守护进程会一直去遍历一个队列从中取出context去跑。然后此时还剩下两个问题没有解决

  1. 这个队列是怎么添加的数据
  2. 这个AsyncAppenderRequest是怎么运行的

消息追踪服务的实现

在上一节我们发现了两个问题,让我们一个一个来解决

####traceContextQueue是如何添加的数据

picture 3

1
2
3
4
5
6
7
8
9
AsyncTraceDispatcher.java
@Override
public boolean append(final Object ctx) {
boolean result = traceContextQueue.offer((TraceContext) ctx);
if (!result) {
log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);
}
return result;
}

AsyncTraceDispatcher有一个append方法会一直向队列中加入context,那这个append是谁调的呢

picture 4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
...//省略代码
if (beans.size() > 0) {
traceContext.setTraceBeans(beans);
traceContext.setTimeStamp(System.currentTimeMillis());
localDispatcher.append(traceContext);//这里调的
}
}
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
...//省略代码
localDispatcher.append(subAfterContext);//这里调的
}

清晰了,原来是消费前消费后都会把context加入到队列中

第二个问题AsyncAppenderRequest是怎么运行的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class AsyncAppenderRequest implements Runnable {
List<TraceContext> contextList;

public AsyncAppenderRequest(final List<TraceContext> contextList) {
if (contextList != null) {
this.contextList = contextList;
} else {
this.contextList = new ArrayList<TraceContext>(1);
}
}

@Override
public void run() {
sendTraceData(contextList);// 终于看到了发消息的地方
}
}

如上代码所示,这个AsyncAppenderRequest就是来发送消息追踪的消息的。

小结:当消费者发送消息的时候会把要发送消息的context加入到队列中,然后让守护进程去发消息追踪的消息,发消息的代码在下面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void sendTraceData(List<TraceContext> contextList) {
Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
for (TraceContext context : contextList) {
if (context.getTraceBeans().isEmpty()) {
continue;
}
// Topic value corresponding to original message entity content
String topic = context.getTraceBeans().get(0).getTopic();
String regionId = context.getRegionId();
// Use original message entity's topic as key
String key = topic;
if (!StringUtils.isBlank(regionId)) {
key = key + TraceConstants.CONTENT_SPLITOR + regionId;
}
List<TraceTransferBean> transBeanList = transBeanMap.get(key);
if (transBeanList == null) {
transBeanList = new ArrayList<TraceTransferBean>();
transBeanMap.put(key, transBeanList);
}
TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);
transBeanList.add(traceData);
}
for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {
String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
String dataTopic = entry.getKey();
String regionId = null;
if (key.length > 1) {
dataTopic = key[0];
regionId = key[1];
}
flushData(entry.getValue(), dataTopic, regionId);
}
}

这里的代码就不详细讲解了可以看一下发送消息的Message是什么样的

1
2
3
Message{topic='rmq_sys_TRACE_DATA_cn-qingdao-publictest', flag=0, properties={KEYS=7F00000100077CD62F4389D8FA370048, WAIT=true}, body=[83, 117, 98, 66, 101, 102, 111, 114, 101, 1, 49, 54, 49, 57, 53, 49, 57, 48, 57, 57, 48, 57, 54, 1, 99, 110, 45, 113, 105, 110, 103, 100, 97, 111, 45, 112, 117, 98, 108, 105, 99, 116, 101, 115, 116, 1, 71, 73, 68, 95, 115, 116, 105, 99, 107, 101, 114, 1, 55, 70, 48, 48, 48, 48, 48, 49, 50, 52, 55, 65, 51, 66, 49, 57, 50, 68, 51, 50, 56, 57, 68, 56, 70, 67, 68, 56, 48, 48, 50, 48, 1, 55, 70, 48, 48, 48, 48, 48, 49, 48, 48, 48, 55, 55, 67, 68, 54, 50, 70, 52, 51, 56, 57, 68, 56, 70, 65, 51, 55, 48, 48, 52, 56, 1, 48, 1, 110, 117, 108, 108, 1, 49, 57, 50, 46, 49, 54, 56, 46, 51, 49, 46, 53, 52, 64, 57, 51, 51, 56, 2], transactionId='null'}
body解码后是这样:
SubBefore1619519099096cn-qingdao-publictest消费者组7F000001247A3B192D3289D8FCD800207F00000100077CD62F4389D8FA3700480nullip@9338

总结:本文从消息追踪功能的初始化,启动,和实现仨个方面进行了讲解了一下消息追踪是怎么跑起来的,为了方便对整体有个认知,这里有些细节没有去讲,去了解和认识RocketMQ还是很有好处的。