RocketMQ消息追踪相关源码分析

在消息追踪这块,我一共遇到了两个问题:

第一个问题:在开启消息追踪功能后,消息追踪没有生效,看不到消费者组

第二个问题:消息追踪生效了,但是只显示一个消费者组的消息追踪信息

消息追踪未生效

第一次配置RocketMQ的消息追踪功能是参考官方文档中消息追踪说明配置的,其中有这么一段话

大致流程如下:

  1. 配置文件中traceTopicEnable设置为true

  2. 启动开启消息轨迹的Broker (看到这里我默认阿里云已经开启了)

1
nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties &
  1. 在默认情况下,消息轨迹数据是存储于系统级的TraceTopic中(其名称为:RMQ_SYS_TRACE_TOPIC)。该Topic在Broker节点启动时,会自动创建出来(如上所叙,需要在Broker端的配置文件中将traceTopicEnable的开关变量设置为true。用户也可以自定义配置消息轨迹存储的topic(这里我也默认阿里云也如此配置)

  2. 然后就可以正确使用消息轨迹追踪了

然而。。并没有卵用,在消息发送成功且处理成功后,阿里云后台并没有看到消息,然后查询了巨多资料后,在SpringCloudAlibaba的issue中发现了这个,里面提到

picture 4

然后我恍然大悟,我认为的上述流程中的3是有问题,确实是默认开启了消息追踪但是并没有设置为默认的RMQ_SYS_TRACE_TOPIC这个topic,而是如上图一样,不同区域不同的topic。虽然感觉挫了点但是能用呀,然后就配置了公网的topic发现确实好用了,但是我有三个消费者组,只显示了一个,奇怪了,然后就去翻阿里云的文档,因为一开始为了用开源的RocketMQ也没怎么看文档,一直以为文档只介绍了ONS的SDK的用法,然后看到了picture 5

原来也给开源的SDK写了文档,然后一翻,看到了下面

picture 6

好吧,原来只需要配置一下AccessChannel就可以,然后他就会根据当前mq的地区配置不同的Topic,不愧是阿里贡献给Apache的项目,开源的竟然也可以无缝对接自家产品。这样配置就优雅了许多。

但是好像那个只显示一个消费者组的问题并没有解决。

仅显示一个消费者组问题

在我们的开源项目sisyphus中最开始对rocketMQ的配置是如下的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if (consumerProperty.enableTrace) {
DefaultMQPushConsumer(MixAll.DEFAULT_CONSUMER_GROUP, hook,AllocateMessageQueueAveragely(),true, consumerProperty.traceTopic)
} else {
DefaultMQPushConsumer(MixAll.DEFAULT_CONSUMER_GROUP, hook,
AllocateMessageQueueAveragely())
}.apply {
this.namesrvAddr = chooseNameServerAddr(consumerProperty)
if (consumerProperty.accessChannel != null) {
this.accessChannel = consumerProperty.accessChannel
}
if (metadata.groupId.isNotEmpty()) {
this.consumerGroup
}
}

因为这里用了kotlin的语法,我大致说一下意思,就是先看一下有没有配置enableTrace,如果定义了则先构建一个带追踪服务的默认的消费者组然后再看看消费者有没有自定义配置消费组,如果有配置则更改为当前消费组。

当我点开DefaultMQPushConsumer的源码的时候就大致发现了问题(本文RocketMQ相关的源码版本是4.7.1):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public DefaultMQPushConsumer(String namespace, String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, String customizedTraceTopic) {
....

if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, Type.CONSUME, customizedTraceTopic, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
this.traceDispatcher = dispatcher;
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageTraceHookImpl(this.traceDispatcher));
} catch (Throwable var8) {
this.log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}

我们看到了,在消费者的构建函数中,如果开启了消息追踪,则会给当前消费者分配一个dispatch(调度器),这个调度器的消费者组按照上面的配置的话,应该就是默认的消费者组,按理说应该是自定义的消费者组才对。虽然和自己想的不一样,但是还是不清楚为什么会显示一个。

所以我们就来看一下这个dispatch到底是什么还有是怎么用的:

AsyncTraceDispatcher的构建函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public AsyncTraceDispatcher(String group, Type type,String traceTopicName, RPCHook rpcHook) {
// queueSize is greater than or equal to the n power of 2 of value
this.queueSize = 2048;
this.batchSize = 100;
this.maxMsgSize = 128000;
this.discardCount = new AtomicLong(0L);
this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
this.group = group;
this.type = type;

this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
if (!UtilAll.isBlank(traceTopicName)) {
this.traceTopicName = traceTopicName;
} else {
this.traceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC;
}
this.traceExecutor = new ThreadPoolExecutor(//
10, //
20, //
1000 * 60, //
TimeUnit.MILLISECONDS, //
this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = getAndCreateTraceProducer(rpcHook);
}

在这个构建函数中看到了熟悉的RMQ_SYS_TRACE_TOPIC,然后此时的group就是默认的group即DEFAULT_CONSUMER,然后还有一个traceProducer,猜测这个应该就是我们消费完消息之后,给消息追踪的topic发一个消息,以至于可以追踪消息。

继续看一下dispatch:

1
2
3
4
5
6
7
8
9
10
11
12
13
AsyncTraceDispatcher.java
public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start();
}
this.accessChannel = accessChannel;
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
}

看了下此处start方法调用的地方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
DefaultMQPushConsumer.java

@Override
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}

DefaultMQProducer.java

@Override
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}

和刚才的猜测是一样的,那就重点看一下这个start方法应该就可以找到问题。

1
2
3
4
5
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start();
}

这里开启了一个producer,那这个producer是怎么创建出来的,发现是在AsyncTraceDispatcher构造函数中创建出来的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
traceProducer = getAndCreateTraceProducer(rpcHook);

private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) {
DefaultMQProducer traceProducerInstance = this.traceProducer;
if (traceProducerInstance == null) {
traceProducerInstance = new DefaultMQProducer(rpcHook);
traceProducerInstance.setProducerGroup(genGroupNameForTrace());
traceProducerInstance.setSendMsgTimeout(5000);
traceProducerInstance.setVipChannelEnabled(false);
// The max size of message is 128K
traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000);
}
return traceProducerInstance;
}

private String genGroupNameForTrace() {
return TraceConstants.GROUP_NAME_PREFIX + "-" + this.group + "-" + this.type ;
}

这里的group就是前面的DEFAULT_CONSUMER。到目前没有发现什么问题,然后只能回过头再继续看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
traceProducer.start();

DefaultMQProducer.java
@Override
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}

this.defaultMQProducerImpl.start();

public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;

this.checkConfig();

if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}

this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}

this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

if (startFactory) {
mQClientFactory.start();
}

log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}

这块代码有些长,但是问题就出在了这里,因为每一个dispatch是每个消费者组有一个,然后按照我们之前的配置的话,每个消费者组都会启动一个属于自己的生产者组,但是他们的组都是默认的DEFAULT_CONSUMER,而每次都会走switch的CREATE_JUST,当第一个消费者组创建自己的内部的消息追踪的生产者组的时候是正常的,但当第二个消费者组创建自己的生产者的时候,会抛出这个异常

1
2
3
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);

所以后面两个的消费者组的消息追踪的生产者是创建失败的,所以导致只有一个消费者组有消息追踪。一切都明了了。至于修改的话就比较容易了:

1
2
3
4
5
6
7
8
9
10
if (consumerProperty.enableTrace) {
DefaultMQPushConsumer(metadata.groupId.takeIf { metadata.groupId.isNotEmpty() }
?: MixAll.DEFAULT_CONSUMER_GROUP, hook,
AllocateMessageQueueAveragely(),
true, consumerProperty.traceTopic)
} else {
DefaultMQPushConsumer(metadata.groupId.takeIf { metadata.groupId.isNotEmpty() }
?: MixAll.DEFAULT_CONSUMER_GROUP, hook,
AllocateMessageQueueAveragely())
}

解决完问题下面就是总结反思了,在想怎么才能最大程度上避免这种问题,看了源码,如果说到创建消费者的消息追踪的生产者的时候再去获取组,此时的组肯定是正确的,但是,这样做合理么,我总结了一下这样做确实可以避免这种问题,但是如果说这个参数放到后面再设置,那其他参数也需要这么操作,这样的话整个代码就不是很合理了,所以最终解决问题我觉着应该从自身出发,并且最大程度避免这个问题的一个方法是:

在使用kotlin的apply去懒设置配置的时候,当构建函数为必填的时候,就不要去使用apply去懒设置,这样应该就能最大程度上避免出现类似的错误。