【源码分析】RocketMQ消息追踪源码详解
消息追踪有生产者发送消息追踪和消费者消费消息追踪,因为他们原理差不多,并且容易出现问题的地方主要集中在消费者消息追踪,所以这篇文章主要去讲一下消费者的消息追踪,而消费者又分为PullConsumer和PushConsumer,在消息追踪这块基本是一样的,所以最终选择的是PushConsumer的消费消息的消息追踪
消息追踪服务的初始化
消息追踪的初始化是随着消费者初始化进行初始化的,见下面的代码分析
1 |
|
先分析行1:这里初始化了一个Dispatcher,我们来看一下这里是怎么进行初始化的:
1 |
|
上面就初始化好了一个AsyncTraceDispatcher,然后行2把这个dispatcher给保存到了当前消费者。
下面看一下行3中的这个Hook是个什么
1 |
|
有个consumeMessageBefore和consumeMessageAfter,这名字起的还是很形象的,大致猜出他们是消息消费前消费后会去调用这两个方法。
初始化这里先知道这些就可以了,总结来说记住如下三点就可以了:
- 初始化了一个AsyncTraceDispatcher和一个ConsumeMessageTraceHookImpl并保存到了当前的消费者上
- 其中AsyncTraceDispatcher里面有一个存Runnable的队列,一个线程池,一个生产者;
- ConsumeMessageTraceHookImpl有两个方法发消息前和发消息后会调用,并且也保存了上面的Dispatcher
消息追踪服务的启动
当消费者初始化完毕后会调用其自己的start()方法进行消费者的启动,代码如下:
1 |
|
来看下行2Dispatcher是怎么启动的
1 |
|
在行2.2我们看到有一个AsyncRunnable,看下这个类的run方法是啥
1 |
|
在这里看到了一个熟悉的traceContextQueue,这也是初始化的时候我说需要记下来的,就是从这里面每5毫秒取出context然后包装成AsyncAppenderRequest,再扔到初始化的时候初始化的线程池。
小结:当消费者启动的时候顺带开了一个守护进程,这个守护进程会一直去遍历一个队列从中取出context去跑。然后此时还剩下两个问题没有解决
- 这个队列是怎么添加的数据
- 这个AsyncAppenderRequest是怎么运行的
消息追踪服务的实现
在上一节我们发现了两个问题,让我们一个一个来解决
####traceContextQueue是如何添加的数据
1 |
|
AsyncTraceDispatcher有一个append方法会一直向队列中加入context,那这个append是谁调的呢
1 |
|
清晰了,原来是消费前消费后都会把context加入到队列中
第二个问题AsyncAppenderRequest是怎么运行的
1 |
|
如上代码所示,这个AsyncAppenderRequest就是来发送消息追踪的消息的。
小结:当消费者发送消息的时候会把要发送消息的context加入到队列中,然后让守护进程去发消息追踪的消息,发消息的代码在下面
1 |
|
这里的代码就不详细讲解了可以看一下发送消息的Message是什么样的
1 |
|
总结:本文从消息追踪功能的初始化,启动,和实现仨个方面进行了讲解了一下消息追踪是怎么跑起来的,为了方便对整体有个认知,这里有些细节没有去讲,去了解和认识RocketMQ还是很有好处的。