好用的MqttBroker:JMQTT2.1.0源码分析

JMQTT是cicizz在2019年开始开源的项目,当时我们公司刚好在做IoT的东西,需要一个Java语言的MQTT Broker,但是当时物联网刚刚兴起,别说Java写的了,就是其他语言写的,也就只有emqtt(现改名EMQX)算是能用的,其他的如古早的mosquitto都是玩具项目。所以JMQTT1.x是很厉害的,也是我参考了很多的项目。

我参与了JMQTT初期的一些开发,不过现在JMQTT已经发展到3.x版本了,被改了很多东西,如为了支持MQTT5增加了hivemq-client,为了优化集群增加了akka,然后增加了一些UI界面等,还有一些老代码的兼容。个人认为新版的代码已经很乱了,比如MQTT3是用Netty直接写的,MQTT5又用hivemq间接地用Netty,这种就很难管理。

我们从JMQTT2.1.0版来分析源码,会比较清晰一点。

架构分析

jmqtt_design.jpg

这是作者当时画的架构图。结构就是:通信 -> 服务 -> 存储 -> 集群。

包的结构也非常清晰:

  • broker:mqtt协议层,逻辑处理,BrokerStartup为启动类,BrokerController为初始化类,初始化所有的必备环境,其中acl,store的插件配置也必须在这里初始化
  • common:公共层,存放工具类,bean类等
  • remoting:通信层,连接管理,协议解析,心跳等
  • distribution:配置模块,主要是配置文件,启停命令等存放
  • example:客户端示例,目前只有java以及websocket
  • manage:管理模块,主要涉及动态配置和系统监控等功能
  • store:存储模块,提供了mqtt协议数据的几个接口,支持基于内存的和Rocksdb的本地存储

启动

BrokerStartup做的事情只是加载配置,最后new了一个BrokerController来做Broker服务器初始化。

BrokerController初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public BrokerController(BrokerConfig brokerConfig, NettyConfig nettyConfig, StoreConfig storeConfig, ClusterConfig clusterConfig) {
……
// 3种存储方式选择:RocksDB、Redis、内存
switch (storeConfig.getStoreType()) {
case 1:
this.abstractMqttStore = new RDBMqttStore(storeConfig);
break;
case 2:
this.abstractMqttStore = new RedisMqttStore(clusterConfig);
break;
case 3:
this.abstractMqttStore = new DefaultMqttStore();
break;
}

// 权限校验组件初始化
this.connectPermission = new DefaultConnectPermission();
this.pubSubPermission = new DefaultPubSubPermission();

// 订阅和转发器初始化
this.subscriptionMatcher = new DefaultSubscriptionTreeMatcher();
this.messageDispatcher = new DefaultDispatcherMessage(brokerConfig.getPollThreadNum(), subscriptionMatcher, flowMessageStore, offlineMessageStore);

// 集群事件监听器
this.channelEventListener = new ClientLifeCycleHookService(willMessageStore, messageDispatcher);

// RPC服务器
this.remotingServer = new NettyRemotingServer(brokerConfig, nettyConfig, channelEventListener);
this.httpServer = new HttpServer(nettyConfig);

// 消息重发服务
this.reSendMessageService = new ReSendMessageService(offlineMessageStore, flowMessageStore);

int coreThreadNum = Runtime.getRuntime().availableProcessors();

this.connectExecutor = new ThreadPoolExecutor(coreThreadNum * 2,
coreThreadNum * 2,
60000,
TimeUnit.MILLISECONDS,
connectQueue,
new ThreadFactoryImpl("ConnectThread"),
new RejectHandler("connect", 100000));
this.pubExecutor = ……
this.subExecutor = ……
this.pingExecutor = ……

// 如果开启了集群,需要广播消息和同步session
if (checkClusterMode()) {
switch (clusterConfig.getClusterComponentName()) {
// 本地集群,Hazelcast实现
case "local":
this.clusterSessionManager = new DefaultClusterSessionManager(sessionStore, subscriptionStore);
this.clusterMessageTransfer = new DefaultClusterMessageTransfer(messageDispatcher, clusterConfig);
break;
// 利用redis实现集群
case "redis":
this.clusterSessionManager = new RedisClusterSessionManager(sessionStore, subscriptionStore);
this.clusterMessageTransfer = new RedisClusterMessageTransfer(messageDispatcher, (RedisMqttStore) abstractMqttStore);
break;
}
}

}

可以看到这些信息:

  • 作者没有用IoC容器,不得不自己大量new很多service,当时他说不想依赖其他的东西,原生java比较容易被定制化和集成,这的确也有道理。
  • 存储服务实现了3种,内存、Redis、RocksDB,根据配置进行实例化
  • 权限校验分2种,连接身份校验和发布订阅校验,这个“发布订阅校验”,是很多开源Broker都没有考虑到但实际是需要的东西。因为别人不应该订阅你家摄像头“视频URL”的消息,同样你也不应该往别人家门锁发送“开门”的消息。
  • 订阅服务是DefaultSubscriptionTreeMatcher,很明显是个树结构,摆脱了“遍历通配符”的低效操作
  • 连接、发布、订阅、心跳,都有各自的线程池去处理。这里涉及一个课题,“业务应该在handler用自己的线程池,还是应该共享Netty的线程?”。我个人认为最佳实践是,“如果业务没有I/O,就用Netty的,如果有I/O,就用自己的线程池”。所以这里的设计是非常合理的。
  • 最后如果开启了集群,有2种实现,Hazelcast和Redis。Hazelcast是作者早期参考moquette的集群方式,因为不想用Redis这种集中存储。Hazelcast把数据分片存储在各个服务器上,并且自己实现了集群,对MQTT这里看似很友好,但实际上维护困难,不够可靠。这是后面用Redis做集群的原因。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public void start() {
    ……
    this.remotingServer.start();
    ……
    }
    @Override
    public void start() {
    ……
    // TCP + MQTT
    if (nettyConfig.getStartTcp()) {
    startTcpServer(false, nettyConfig.getTcpPort());
    }
    // TCP + MQTT + SSL
    if (nettyConfig.getStartSslTcp()) {
    startTcpServer(true, nettyConfig.getSslTcpPort());
    }

    // WEBSOCKET + MQTT
    if (nettyConfig.getStartWebsocket()) {
    startWebsocketServer(false, nettyConfig.getWebsocketPort());
    }
    // WEBSOCKET + MQTT + SSL
    if (nettyConfig.getStartSslWebsocket()) {
    startWebsocketServer(true, nettyConfig.getSslWebsocketPort());
    }
    }
    这里就是比较熟悉的,用Netty服务器绑定端口监听消息的代码了。最后的一个handler是业务handler:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    class NettyMqttHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object obj) {
    MqttMessage mqttMessage = (MqttMessage) obj;
    if (mqttMessage != null && mqttMessage.decoderResult().isSuccess()) {
    // 根据消息类型,把消息交给各个Processor处理
    MqttMessageType messageType = mqttMessage.fixedHeader().messageType();
    Runnable runnable = () -> processorTable.get(messageType).getObject1().processRequest(ctx, mqttMessage);
    try {
    processorTable.get(messageType).getObject2().submit(runnable);
    } catch (RejectedExecutionException ex) {
    log.warn("Reject mqtt request,cause={}", ex.getMessage());
    }
    } else {
    ctx.close();
    }
    }
    }

消息处理器

ConnectProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@Override
public void processRequest(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
……
try {
// 版本不对、clientId不对、黑名单、连接未授权等异常
if (!versionValid(mqttVersion)) {
returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION;
} else if (!clientIdVerify(clientId)) {
returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
} else if (onBlackList(RemotingHelper.getRemoteAddr(ctx.channel()), clientId)) {
returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
} else if (!authentication(clientId, userName, password)) {
returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
} else {
int heartbeatSec = connectMessage.variableHeader().keepAliveTimeSeconds();

// 如果是集群,而且其他节点有连接,踢下线
CommandReqOrResp response = null;
if (this.clusterSessionManager != null) {
response = clusterSessionManager.process(new CommandReqOrResp(CommandCode.CONNECT_QUERY_LAST_STATE));
}
if (Objects.nonNull(response) && Objects.nonNull(response.getBody()) && BooleanUtils.toBoolean(response.getBody().toString())) {
ClientSession previousClient = ConnectManager.getInstance().getClient(clientId);
if (previousClient != null) {
previousClient.getCtx().close();
ConnectManager.getInstance().removeClient(clientId);
}
}

// 清理session标记处理,清理或者复用
if (cleanSession) {
clientSession = createNewClientSession(clientId, ctx);
sessionPresent = false;
} else {
if (Objects.nonNull(response) && Objects.nonNull(response.getBody())) {
clientSession = reloadClientSession(ctx, clientId);
sessionPresent = true;
} else {
clientSession = new ClientSession(clientId, false, ctx);
sessionPresent = false;
}
}

// 标记上线
sessionStore.setSession(clientId, true);

// 遗嘱处理
boolean willFlag = connectMessage.variableHeader().isWillFlag();
if (willFlag) {
boolean willRetain = connectMessage.variableHeader().isWillRetain();
int willQos = connectMessage.variableHeader().willQos();
String willTopic = connectMessage.payload().willTopic();
byte[] willPayload = connectMessage.payload().willMessageInBytes();
storeWillMsg(clientId, willRetain, willQos, willTopic, willPayload);
}

// 连接成功
returnCode = MqttConnectReturnCode.CONNECTION_ACCEPTED;
NettyUtil.setClientId(ctx.channel(), clientId);
ConnectManager.getInstance().putClient(clientId, clientSession);
}
MqttConnAckMessage ackMessage = MessageUtil.getConnectAckMessage(returnCode, sessionPresent);
ctx.writeAndFlush(ackMessage);
if (returnCode != MqttConnectReturnCode.CONNECTION_ACCEPTED) {
ctx.close();
return;
}
// 消费重连期间错过的消息
reConnect2SendMessage(clientId);
// 新客户端上线通知
newClientNotify(clientSession);
} catch (Exception ex) {
……
}
}

SubscribeProcessor

订阅这里比较特别的是:增加了订阅权限的判断。其他都是通用逻辑,如topic格式校验、订阅处理、订阅持久化、保留消息消费。

保留消息处理:保留消息存在Map<String/*Topic*/, Message>里面,逐个拿出来匹配。如果消息很多(比如设备量级),这个效率还是很低的。

我们来分析一下订阅树:DefaultSubscriptionTreeMatcher,它本身是一棵树:

  • 如何创建节点:当有订阅/取消订阅来搜索节点的时候,如果节点不存在,顺手创建
  • 如何保证线程安全:Object lock = new Object() + syncronized(lock),全局一把锁
  • 如何清理节点:不会清理节点!

清理节点是MQTT订阅树最大的难点,如果取消订阅的时候只清理掉订阅者,而不清理掉“没有订阅者的节点”,当很多设备都订阅自己的主题并且频繁随机断开重连,就会产生很多废弃且GC不掉的节点;如果清理,又会影响订阅的速度和成功率(因为设备通常是断开后立刻重连上同一个服务器)。这也是moquette后面弄出一些“墓碑节点”等特别复杂的设计的原因。

JMQTT这里直接没有清理,算是偏向于“简单稳定”而不是“复杂干净”的无奈平衡。

PublishProcessor

publish有点特别的是,消息不是直接发送的,而只是加入了MessageDispatcher的消息队列,让MessageDispatcher执行发送。这段代码在AbstractMessageProcessor:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
protected void processMessage(Message message) {
// 单机模式,向本地messageDispatcher增加消息
if (null == clusterMessageTransfer) {
this.messageDispatcher.appendMessage(message);
}

// 保留消息处理
boolean retain = (boolean) message.getHeader(MessageHeader.RETAIN);
if (retain) {
int qos = (int) message.getHeader(MessageHeader.QOS);
byte[] payload = message.getPayload();
String topic = (String) message.getHeader(MessageHeader.TOPIC);
//qos == 0 or payload is none,then clear previous retain message
if (qos == 0 || payload == null || payload.length == 0) {
this.retainMessageStore.removeRetainMessage(topic);
} else {
this.retainMessageStore.storeRetainMessage(topic, message);
}
}
dispatcherMessage2Cluster(message);
}

private void dispatcherMessage2Cluster(Message message) {
// 集群模式,广播消息(当各个节点收到消息后,仍然会执行各自的messageDispatcher.appendMessage)
CommandReqOrResp commandReqOrResp = new CommandReqOrResp(CommandCode.MESSAGE_CLUSTER_TRANSFER, message);
if (clusterMessageTransfer != null) {
clusterMessageTransfer.sendMessage(commandReqOrResp);
}
}

MessageDispatcher是如何消费消息的呢:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
for (Message message : messages) {
// 根据主题,获取订阅者
Set<Subscription> subscriptions = subscriptionMatcher.match((String) message.getHeader(MessageHeader.TOPIC));
// 每个订阅者下发消息
for (Subscription subscription : subscriptions) {
String clientId = subscription.getClientId();
ClientSession clientSession = ConnectManager.getInstance().getClient(subscription.getClientId());
if (ConnectManager.getInstance().containClient(clientId)) {
int qos = MessageUtil.getMinQos((int) message.getHeader(MessageHeader.QOS), subscription.getQos());
int messageId = clientSession.generateMessageId();
message.putHeader(MessageHeader.QOS, qos);
message.setMsgId(messageId);
if (qos > 0) {
flowMessageStore.cacheSendMsg(clientId, message);
}
MqttPublishMessage publishMessage = MessageUtil.getPubMessage(message, false, qos, messageId);
clientSession.getCtx().writeAndFlush(publishMessage);
} else {
offlineMessageStore.addOfflineMessage(clientId, message);
}
}
}

这样做有个非常大的好处,就是MessageDispatcher可以随时替换实现,比如用kafka来做。通常来说,publish的消息量是非常大的。

再来看看packetId的生成,当我们在下发消息的时候,需要主动创建packetId。这段代码在ClientSession的generateMessageId里:

1
2
3
4
5
6
7
8
9
private transient AtomicInteger messageIdCounter = new AtomicInteger(1);
public int generateMessageId(){
int messageId = messageIdCounter.getAndIncrement();
messageId = Math.abs( messageId % 0xFFFF);
if(messageId == 0){
return generateMessageId();
}
return messageId;
}

用AtomicInteger保证线程安全,用% 0xFFFF来循环,如果是0则重新生成(因为MQTT要求[1, 65535])

集群

作者对集群的实现都在cluster里面,定义了4个消息类型:

1
2
3
4
5
6
public final class CommandCode {
public static final String CONNECT_QUERY_LAST_STATE = "CONNECT_QUERY_LAST_STATE";
public static final String CONNECT_GET_SUBSCRIPTIONS = "CONNECT_GET_SUBSCRIPTIONS";
public static final String MESSAGE_CLUSTER_TRANSFER = "MESSAGE_CLUSTER_TRANSFER";
public static final String ERROR_CODE = "ERROR_CODE";
}
  • CONNECT_QUERY_LAST_STATE:在刚连上的时候,需要查询“之前的session”,来踢掉旧连接。这里可能会有疑问,session都放在redis里面统一存储的呀?因为有3种实现,如果是default本地内存的话,就必须从旧节点才能拿到session。
  • CONNECT_GET_SUBSCRIPTIONS:同样,在需要复用session的时候,需要恢复订阅,订阅信息也得从旧节点拿或者从统一存储拿。
  • MESSAGE_CLUSTER_TRANSFER:前面提到的广播publish消息用

Netty事件

作者把Netty有关的代码都放到remoting这个模块了。Netty事件相当于作者留的钩子。

注册了一个NettyConnectHandler extends ChannelDuplexHandler(收发双向都会经过这个handler):

  • channelActive -> CONNECT事件
  • channelInactive -> CLOSE事件
  • userEventTriggered -> READER_IDLE事件
  • exceptionCaught -> EXCEPTION

做的事情:

  • onChannelConnect:啥都没做
  • onChannelClose:遗嘱消息
  • onChannelIdle:啥都没做(因为心跳超时关闭连接已经在NettyConnectHandler处理了)
  • onChannelException:清理本地数据(关闭连接已经在NettyConnectHandler处理了)

总结

JMQTT 2.1.0 使用原生java+netty实现了MQTT Broker,对“存储”、“集群信息”、“消息”、“服务参数”都预留了大量接口,提供了大量实现,拆分得非常清楚,非常适合自定义组装或者二次开发。订阅树没有“清理废弃节点”,可能是个隐患。2.1.0的代码很清晰,看起来很舒服。

JMQTT 3.1.0 增加了MQTT5和akka集群的支持,共享订阅还在开发中,但代码很乱。我觉得可能类似EMQ一样,核心代码和其他UI之类的代码要分开多个项目,然后需要制定一些规则,比如一些命名规则,如订阅匹配SubscriptionMatcher有2个实现:

  • DefaultDispatcherMessage:默认订阅树,很好理解。
  • SubscriptionSupportGroupTreeMatcher:看名字完全看不出来是个啥,看注释才知道是“支持共享订阅的匹配器”,那是不是叫“SharedSubscriptionTreeMatcher”或者“MqttV5TreeMatcher”会更好…

好在今天和Cicizz聊天,他说已经去掉akka了,不过理由是“分发模式会导致内部消息流暴涨导致集群扩展有问题”。但是项目很久没更新了,因为没有动力。我们准备今年再做个规划评审,争取重新把JMQTT开发起来。

好用的MqttBroker:JMQTT2.1.0源码分析

https://www.bananaoven.com/posts/7268/

作者

香蕉微波炉

发布于

2023-01-17

更新于

2023-01-17

许可协议