JMQTT是cicizz在2019年开始开源的项目,当时我们公司刚好在做IoT的东西,需要一个Java语言的MQTT Broker,但是当时物联网刚刚兴起,别说Java写的了,就是其他语言写的,也就只有emqtt(现改名EMQX)算是能用的,其他的如古早的mosquitto都是玩具项目。所以JMQTT1.x是很厉害的,也是我参考了很多的项目。
我参与了JMQTT初期的一些开发,不过现在JMQTT已经发展到3.x版本了,被改了很多东西,如为了支持MQTT5增加了hivemq-client,为了优化集群增加了akka,然后增加了一些UI界面等,还有一些老代码的兼容。个人认为新版的代码已经很乱了,比如MQTT3是用Netty直接写的,MQTT5又用hivemq间接地用Netty,这种就很难管理。
我们从JMQTT2.1.0版来分析源码,会比较清晰一点。
架构分析
这是作者当时画的架构图。结构就是:通信 -> 服务 -> 存储 -> 集群。
包的结构也非常清晰:
- broker:mqtt协议层,逻辑处理,BrokerStartup为启动类,BrokerController为初始化类,初始化所有的必备环境,其中acl,store的插件配置也必须在这里初始化
- common:公共层,存放工具类,bean类等
- remoting:通信层,连接管理,协议解析,心跳等
- distribution:配置模块,主要是配置文件,启停命令等存放
- example:客户端示例,目前只有java以及websocket
- manage:管理模块,主要涉及动态配置和系统监控等功能
- store:存储模块,提供了mqtt协议数据的几个接口,支持基于内存的和Rocksdb的本地存储
启动
BrokerStartup做的事情只是加载配置,最后new了一个BrokerController来做Broker服务器初始化。
BrokerController初始化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| public BrokerController(BrokerConfig brokerConfig, NettyConfig nettyConfig, StoreConfig storeConfig, ClusterConfig clusterConfig) { …… switch (storeConfig.getStoreType()) { case 1: this.abstractMqttStore = new RDBMqttStore(storeConfig); break; case 2: this.abstractMqttStore = new RedisMqttStore(clusterConfig); break; case 3: this.abstractMqttStore = new DefaultMqttStore(); break; }
this.connectPermission = new DefaultConnectPermission(); this.pubSubPermission = new DefaultPubSubPermission();
this.subscriptionMatcher = new DefaultSubscriptionTreeMatcher(); this.messageDispatcher = new DefaultDispatcherMessage(brokerConfig.getPollThreadNum(), subscriptionMatcher, flowMessageStore, offlineMessageStore);
this.channelEventListener = new ClientLifeCycleHookService(willMessageStore, messageDispatcher);
this.remotingServer = new NettyRemotingServer(brokerConfig, nettyConfig, channelEventListener); this.httpServer = new HttpServer(nettyConfig);
this.reSendMessageService = new ReSendMessageService(offlineMessageStore, flowMessageStore);
int coreThreadNum = Runtime.getRuntime().availableProcessors();
this.connectExecutor = new ThreadPoolExecutor(coreThreadNum * 2, coreThreadNum * 2, 60000, TimeUnit.MILLISECONDS, connectQueue, new ThreadFactoryImpl("ConnectThread"), new RejectHandler("connect", 100000)); this.pubExecutor = …… this.subExecutor = …… this.pingExecutor = ……
if (checkClusterMode()) { switch (clusterConfig.getClusterComponentName()) { case "local": this.clusterSessionManager = new DefaultClusterSessionManager(sessionStore, subscriptionStore); this.clusterMessageTransfer = new DefaultClusterMessageTransfer(messageDispatcher, clusterConfig); break; case "redis": this.clusterSessionManager = new RedisClusterSessionManager(sessionStore, subscriptionStore); this.clusterMessageTransfer = new RedisClusterMessageTransfer(messageDispatcher, (RedisMqttStore) abstractMqttStore); break; } }
}
|
可以看到这些信息:
- 作者没有用IoC容器,不得不自己大量new很多service,当时他说不想依赖其他的东西,原生java比较容易被定制化和集成,这的确也有道理。
- 存储服务实现了3种,内存、Redis、RocksDB,根据配置进行实例化
- 权限校验分2种,连接身份校验和发布订阅校验,这个“发布订阅校验”,是很多开源Broker都没有考虑到但实际是需要的东西。因为别人不应该订阅你家摄像头“视频URL”的消息,同样你也不应该往别人家门锁发送“开门”的消息。
- 订阅服务是DefaultSubscriptionTreeMatcher,很明显是个树结构,摆脱了“遍历通配符”的低效操作
- 连接、发布、订阅、心跳,都有各自的线程池去处理。这里涉及一个课题,“业务应该在handler用自己的线程池,还是应该共享Netty的线程?”。我个人认为最佳实践是,“如果业务没有I/O,就用Netty的,如果有I/O,就用自己的线程池”。所以这里的设计是非常合理的。
- 最后如果开启了集群,有2种实现,Hazelcast和Redis。Hazelcast是作者早期参考moquette的集群方式,因为不想用Redis这种集中存储。Hazelcast把数据分片存储在各个服务器上,并且自己实现了集群,对MQTT这里看似很友好,但实际上维护困难,不够可靠。这是后面用Redis做集群的原因。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public void start() { …… this.remotingServer.start(); …… } @Override public void start() { …… if (nettyConfig.getStartTcp()) { startTcpServer(false, nettyConfig.getTcpPort()); } if (nettyConfig.getStartSslTcp()) { startTcpServer(true, nettyConfig.getSslTcpPort()); }
if (nettyConfig.getStartWebsocket()) { startWebsocketServer(false, nettyConfig.getWebsocketPort()); } if (nettyConfig.getStartSslWebsocket()) { startWebsocketServer(true, nettyConfig.getSslWebsocketPort()); } }
|
这里就是比较熟悉的,用Netty服务器绑定端口监听消息的代码了。最后的一个handler是业务handler:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| class NettyMqttHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object obj) { MqttMessage mqttMessage = (MqttMessage) obj; if (mqttMessage != null && mqttMessage.decoderResult().isSuccess()) { MqttMessageType messageType = mqttMessage.fixedHeader().messageType(); Runnable runnable = () -> processorTable.get(messageType).getObject1().processRequest(ctx, mqttMessage); try { processorTable.get(messageType).getObject2().submit(runnable); } catch (RejectedExecutionException ex) { log.warn("Reject mqtt request,cause={}", ex.getMessage()); } } else { ctx.close(); } } }
|
消息处理器
ConnectProcessor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| @Override public void processRequest(ChannelHandlerContext ctx, MqttMessage mqttMessage) { …… try { if (!versionValid(mqttVersion)) { returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION; } else if (!clientIdVerify(clientId)) { returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED; } else if (onBlackList(RemotingHelper.getRemoteAddr(ctx.channel()), clientId)) { returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED; } else if (!authentication(clientId, userName, password)) { returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD; } else { int heartbeatSec = connectMessage.variableHeader().keepAliveTimeSeconds();
CommandReqOrResp response = null; if (this.clusterSessionManager != null) { response = clusterSessionManager.process(new CommandReqOrResp(CommandCode.CONNECT_QUERY_LAST_STATE)); } if (Objects.nonNull(response) && Objects.nonNull(response.getBody()) && BooleanUtils.toBoolean(response.getBody().toString())) { ClientSession previousClient = ConnectManager.getInstance().getClient(clientId); if (previousClient != null) { previousClient.getCtx().close(); ConnectManager.getInstance().removeClient(clientId); } }
if (cleanSession) { clientSession = createNewClientSession(clientId, ctx); sessionPresent = false; } else { if (Objects.nonNull(response) && Objects.nonNull(response.getBody())) { clientSession = reloadClientSession(ctx, clientId); sessionPresent = true; } else { clientSession = new ClientSession(clientId, false, ctx); sessionPresent = false; } }
sessionStore.setSession(clientId, true);
boolean willFlag = connectMessage.variableHeader().isWillFlag(); if (willFlag) { boolean willRetain = connectMessage.variableHeader().isWillRetain(); int willQos = connectMessage.variableHeader().willQos(); String willTopic = connectMessage.payload().willTopic(); byte[] willPayload = connectMessage.payload().willMessageInBytes(); storeWillMsg(clientId, willRetain, willQos, willTopic, willPayload); }
returnCode = MqttConnectReturnCode.CONNECTION_ACCEPTED; NettyUtil.setClientId(ctx.channel(), clientId); ConnectManager.getInstance().putClient(clientId, clientSession); } MqttConnAckMessage ackMessage = MessageUtil.getConnectAckMessage(returnCode, sessionPresent); ctx.writeAndFlush(ackMessage); if (returnCode != MqttConnectReturnCode.CONNECTION_ACCEPTED) { ctx.close(); return; } reConnect2SendMessage(clientId); newClientNotify(clientSession); } catch (Exception ex) { …… } }
|
SubscribeProcessor
订阅这里比较特别的是:增加了订阅权限的判断。其他都是通用逻辑,如topic格式校验、订阅处理、订阅持久化、保留消息消费。
保留消息处理:保留消息存在Map<String/*Topic*/, Message>里面,逐个拿出来匹配。如果消息很多(比如设备量级),这个效率还是很低的。
我们来分析一下订阅树:DefaultSubscriptionTreeMatcher,它本身是一棵树:
- 如何创建节点:当有订阅/取消订阅来搜索节点的时候,如果节点不存在,顺手创建
- 如何保证线程安全:Object lock = new Object() + syncronized(lock),全局一把锁
- 如何清理节点:不会清理节点!
清理节点是MQTT订阅树最大的难点,如果取消订阅的时候只清理掉订阅者,而不清理掉“没有订阅者的节点”,当很多设备都订阅自己的主题并且频繁随机断开重连,就会产生很多废弃且GC不掉的节点;如果清理,又会影响订阅的速度和成功率(因为设备通常是断开后立刻重连上同一个服务器)。这也是moquette后面弄出一些“墓碑节点”等特别复杂的设计的原因。
JMQTT这里直接没有清理,算是偏向于“简单稳定”而不是“复杂干净”的无奈平衡。
PublishProcessor
publish有点特别的是,消息不是直接发送的,而只是加入了MessageDispatcher的消息队列,让MessageDispatcher执行发送。这段代码在AbstractMessageProcessor:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| protected void processMessage(Message message) { if (null == clusterMessageTransfer) { this.messageDispatcher.appendMessage(message); }
boolean retain = (boolean) message.getHeader(MessageHeader.RETAIN); if (retain) { int qos = (int) message.getHeader(MessageHeader.QOS); byte[] payload = message.getPayload(); String topic = (String) message.getHeader(MessageHeader.TOPIC); if (qos == 0 || payload == null || payload.length == 0) { this.retainMessageStore.removeRetainMessage(topic); } else { this.retainMessageStore.storeRetainMessage(topic, message); } } dispatcherMessage2Cluster(message); }
private void dispatcherMessage2Cluster(Message message) { CommandReqOrResp commandReqOrResp = new CommandReqOrResp(CommandCode.MESSAGE_CLUSTER_TRANSFER, message); if (clusterMessageTransfer != null) { clusterMessageTransfer.sendMessage(commandReqOrResp); } }
|
MessageDispatcher是如何消费消息的呢:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| for (Message message : messages) { Set<Subscription> subscriptions = subscriptionMatcher.match((String) message.getHeader(MessageHeader.TOPIC)); for (Subscription subscription : subscriptions) { String clientId = subscription.getClientId(); ClientSession clientSession = ConnectManager.getInstance().getClient(subscription.getClientId()); if (ConnectManager.getInstance().containClient(clientId)) { int qos = MessageUtil.getMinQos((int) message.getHeader(MessageHeader.QOS), subscription.getQos()); int messageId = clientSession.generateMessageId(); message.putHeader(MessageHeader.QOS, qos); message.setMsgId(messageId); if (qos > 0) { flowMessageStore.cacheSendMsg(clientId, message); } MqttPublishMessage publishMessage = MessageUtil.getPubMessage(message, false, qos, messageId); clientSession.getCtx().writeAndFlush(publishMessage); } else { offlineMessageStore.addOfflineMessage(clientId, message); } } }
|
这样做有个非常大的好处,就是MessageDispatcher可以随时替换实现,比如用kafka来做。通常来说,publish的消息量是非常大的。
再来看看packetId的生成,当我们在下发消息的时候,需要主动创建packetId。这段代码在ClientSession的generateMessageId里:
1 2 3 4 5 6 7 8 9
| private transient AtomicInteger messageIdCounter = new AtomicInteger(1); public int generateMessageId(){ int messageId = messageIdCounter.getAndIncrement(); messageId = Math.abs( messageId % 0xFFFF); if(messageId == 0){ return generateMessageId(); } return messageId; }
|
用AtomicInteger保证线程安全,用% 0xFFFF来循环,如果是0则重新生成(因为MQTT要求[1, 65535])
集群
作者对集群的实现都在cluster里面,定义了4个消息类型:
1 2 3 4 5 6
| public final class CommandCode { public static final String CONNECT_QUERY_LAST_STATE = "CONNECT_QUERY_LAST_STATE"; public static final String CONNECT_GET_SUBSCRIPTIONS = "CONNECT_GET_SUBSCRIPTIONS"; public static final String MESSAGE_CLUSTER_TRANSFER = "MESSAGE_CLUSTER_TRANSFER"; public static final String ERROR_CODE = "ERROR_CODE"; }
|
- CONNECT_QUERY_LAST_STATE:在刚连上的时候,需要查询“之前的session”,来踢掉旧连接。这里可能会有疑问,session都放在redis里面统一存储的呀?因为有3种实现,如果是default本地内存的话,就必须从旧节点才能拿到session。
- CONNECT_GET_SUBSCRIPTIONS:同样,在需要复用session的时候,需要恢复订阅,订阅信息也得从旧节点拿或者从统一存储拿。
- MESSAGE_CLUSTER_TRANSFER:前面提到的广播publish消息用
Netty事件
作者把Netty有关的代码都放到remoting这个模块了。Netty事件相当于作者留的钩子。
注册了一个NettyConnectHandler extends ChannelDuplexHandler(收发双向都会经过这个handler):
- channelActive -> CONNECT事件
- channelInactive -> CLOSE事件
- userEventTriggered -> READER_IDLE事件
- exceptionCaught -> EXCEPTION
做的事情:
- onChannelConnect:啥都没做
- onChannelClose:遗嘱消息
- onChannelIdle:啥都没做(因为心跳超时关闭连接已经在NettyConnectHandler处理了)
- onChannelException:清理本地数据(关闭连接已经在NettyConnectHandler处理了)
总结
JMQTT 2.1.0 使用原生java+netty实现了MQTT Broker,对“存储”、“集群信息”、“消息”、“服务参数”都预留了大量接口,提供了大量实现,拆分得非常清楚,非常适合自定义组装或者二次开发。订阅树没有“清理废弃节点”,可能是个隐患。2.1.0的代码很清晰,看起来很舒服。
JMQTT 3.1.0 增加了MQTT5和akka集群的支持,共享订阅还在开发中,但代码很乱。我觉得可能类似EMQ一样,核心代码和其他UI之类的代码要分开多个项目,然后需要制定一些规则,比如一些命名规则,如订阅匹配SubscriptionMatcher有2个实现:
- DefaultDispatcherMessage:默认订阅树,很好理解。
- SubscriptionSupportGroupTreeMatcher:看名字完全看不出来是个啥,看注释才知道是“支持共享订阅的匹配器”,那是不是叫“SharedSubscriptionTreeMatcher”或者“MqttV5TreeMatcher”会更好…
好在今天和Cicizz聊天,他说已经去掉akka了,不过理由是“分发模式会导致内部消息流暴涨导致集群扩展有问题”。但是项目很久没更新了,因为没有动力。我们准备今年再做个规划评审,争取重新把JMQTT开发起来。