JMQTT是cicizz在2019年开始开源的项目,当时我们公司刚好在做IoT的东西,需要一个Java语言的MQTT Broker,但是当时物联网刚刚兴起,别说Java写的了,就是其他语言写的,也就只有emqtt(现改名EMQX)算是能用的,其他的如古早的mosquitto都是玩具项目。所以JMQTT1.x是很厉害的,也是我参考了很多的项目。
我参与了JMQTT初期的一些开发,不过现在JMQTT已经发展到3.x版本了,被改了很多东西,如为了支持MQTT5增加了hivemq-client,为了优化集群增加了akka,然后增加了一些UI界面等,还有一些老代码的兼容。个人认为新版的代码已经很乱了,比如MQTT3是用Netty直接写的,MQTT5又用hivemq间接地用Netty,这种就很难管理。
我们从JMQTT2.1.0版来分析源码,会比较清晰一点。
- github:https://github.com/Cicizz/jmqtt
- 2.1.0: https://github.com/Cicizz/jmqtt/releases/tag/2.1.0
- 作者的设计介绍:https://www.yuque.com/tristan-ku8np/zze/xghq80
- QQ群:578185385
架构分析

这是作者当时画的架构图。结构就是:通信 -> 服务 -> 存储 -> 集群。
包的结构也非常清晰:
- broker:mqtt协议层,逻辑处理,BrokerStartup为启动类,BrokerController为初始化类,初始化所有的必备环境,其中acl,store的插件配置也必须在这里初始化
- common:公共层,存放工具类,bean类等
- remoting:通信层,连接管理,协议解析,心跳等
- distribution:配置模块,主要是配置文件,启停命令等存放
- example:客户端示例,目前只有java以及websocket
- manage:管理模块,主要涉及动态配置和系统监控等功能
- store:存储模块,提供了mqtt协议数据的几个接口,支持基于内存的和Rocksdb的本地存储
启动
BrokerStartup做的事情只是加载配置,最后new了一个BrokerController来做Broker服务器初始化。
BrokerController初始化:
java
public BrokerController(BrokerConfig brokerConfig, NettyConfig nettyConfig, StoreConfig storeConfig, ClusterConfig clusterConfig) {
……
// 3种存储方式选择:RocksDB、Redis、内存
switch (storeConfig.getStoreType()) {
case 1:
this.abstractMqttStore = new RDBMqttStore(storeConfig);
break;
case 2:
this.abstractMqttStore = new RedisMqttStore(clusterConfig);
break;
case 3:
this.abstractMqttStore = new DefaultMqttStore();
break;
}
// 权限校验组件初始化
this.connectPermission = new DefaultConnectPermission();
this.pubSubPermission = new DefaultPubSubPermission();
// 订阅和转发器初始化
this.subscriptionMatcher = new DefaultSubscriptionTreeMatcher();
this.messageDispatcher = new DefaultDispatcherMessage(brokerConfig.getPollThreadNum(), subscriptionMatcher, flowMessageStore, offlineMessageStore);
// 集群事件监听器
this.channelEventListener = new ClientLifeCycleHookService(willMessageStore, messageDispatcher);
// RPC服务器
this.remotingServer = new NettyRemotingServer(brokerConfig, nettyConfig, channelEventListener);
this.httpServer = new HttpServer(nettyConfig);
// 消息重发服务
this.reSendMessageService = new ReSendMessageService(offlineMessageStore, flowMessageStore);
int coreThreadNum = Runtime.getRuntime().availableProcessors();
this.connectExecutor = new ThreadPoolExecutor(coreThreadNum * 2,
coreThreadNum * 2,
60000,
TimeUnit.MILLISECONDS,
connectQueue,
new ThreadFactoryImpl("ConnectThread"),
new RejectHandler("connect", 100000));
this.pubExecutor = ……
this.subExecutor = ……
this.pingExecutor = ……
// 如果开启了集群,需要广播消息和同步session
if (checkClusterMode()) {
switch (clusterConfig.getClusterComponentName()) {
// 本地集群,Hazelcast实现
case "local":
this.clusterSessionManager = new DefaultClusterSessionManager(sessionStore, subscriptionStore);
this.clusterMessageTransfer = new DefaultClusterMessageTransfer(messageDispatcher, clusterConfig);
break;
// 利用redis实现集群
case "redis":
this.clusterSessionManager = new RedisClusterSessionManager(sessionStore, subscriptionStore);
this.clusterMessageTransfer = new RedisClusterMessageTransfer(messageDispatcher, (RedisMqttStore) abstractMqttStore);
break;
}
}
}可以看到这些信息:
- 作者没有用IoC容器,不得不自己大量new很多service,当时他说不想依赖其他的东西,原生java比较容易被定制化和集成,这的确也有道理。
- 存储服务实现了3种,内存、Redis、RocksDB,根据配置进行实例化
- 权限校验分2种,连接身份校验和发布订阅校验,这个“发布订阅校验”,是很多开源Broker都没有考虑到但实际是需要的东西。因为别人不应该订阅你家摄像头“视频URL”的消息,同样你也不应该往别人家门锁发送“开门”的消息。
- 订阅服务是DefaultSubscriptionTreeMatcher,很明显是个树结构,摆脱了“遍历通配符”的低效操作
- 连接、发布、订阅、心跳,都有各自的线程池去处理。这里涉及一个课题,“业务应该在handler用自己的线程池,还是应该共享Netty的线程?”。我个人认为最佳实践是,“如果业务没有I/O,就用Netty的,如果有I/O,就用自己的线程池”。所以这里的设计是非常合理的。
- 最后如果开启了集群,有2种实现,Hazelcast和Redis。Hazelcast是作者早期参考moquette的集群方式,因为不想用Redis这种集中存储。Hazelcast把数据分片存储在各个服务器上,并且自己实现了集群,对MQTT这里看似很友好,但实际上维护困难,不够可靠。这是后面用Redis做集群的原因。
java
public void start() {
……
this.remotingServer.start();
……
}
@Override
public void start() {
……
// TCP + MQTT
if (nettyConfig.getStartTcp()) {
startTcpServer(false, nettyConfig.getTcpPort());
}
// TCP + MQTT + SSL
if (nettyConfig.getStartSslTcp()) {
startTcpServer(true, nettyConfig.getSslTcpPort());
}
// WEBSOCKET + MQTT
if (nettyConfig.getStartWebsocket()) {
startWebsocketServer(false, nettyConfig.getWebsocketPort());
}
// WEBSOCKET + MQTT + SSL
if (nettyConfig.getStartSslWebsocket()) {
startWebsocketServer(true, nettyConfig.getSslWebsocketPort());
}
}这里就是比较熟悉的,用Netty服务器绑定端口监听消息的代码了。最后的一个handler是业务handler:
java
class NettyMqttHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object obj) {
MqttMessage mqttMessage = (MqttMessage) obj;
if (mqttMessage != null && mqttMessage.decoderResult().isSuccess()) {
// 根据消息类型,把消息交给各个Processor处理
MqttMessageType messageType = mqttMessage.fixedHeader().messageType();
Runnable runnable = () -> processorTable.get(messageType).getObject1().processRequest(ctx, mqttMessage);
try {
processorTable.get(messageType).getObject2().submit(runnable);
} catch (RejectedExecutionException ex) {
log.warn("Reject mqtt request,cause={}", ex.getMessage());
}
} else {
ctx.close();
}
}
}消息处理器
ConnectProcessor
java
@Override
public void processRequest(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
……
try {
// 版本不对、clientId不对、黑名单、连接未授权等异常
if (!versionValid(mqttVersion)) {
returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION;
} else if (!clientIdVerify(clientId)) {
returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
} else if (onBlackList(RemotingHelper.getRemoteAddr(ctx.channel()), clientId)) {
returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
} else if (!authentication(clientId, userName, password)) {
returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
} else {
int heartbeatSec = connectMessage.variableHeader().keepAliveTimeSeconds();
// 如果是集群,而且其他节点有连接,踢下线
CommandReqOrResp response = null;
if (this.clusterSessionManager != null) {
response = clusterSessionManager.process(new CommandReqOrResp(CommandCode.CONNECT_QUERY_LAST_STATE));
}
if (Objects.nonNull(response) && Objects.nonNull(response.getBody()) && BooleanUtils.toBoolean(response.getBody().toString())) {
ClientSession previousClient = ConnectManager.getInstance().getClient(clientId);
if (previousClient != null) {
previousClient.getCtx().close();
ConnectManager.getInstance().removeClient(clientId);
}
}
// 清理session标记处理,清理或者复用
if (cleanSession) {
clientSession = createNewClientSession(clientId, ctx);
sessionPresent = false;
} else {
if (Objects.nonNull(response) && Objects.nonNull(response.getBody())) {
clientSession = reloadClientSession(ctx, clientId);
sessionPresent = true;
} else {
clientSession = new ClientSession(clientId, false, ctx);
sessionPresent = false;
}
}
// 标记上线
sessionStore.setSession(clientId, true);
// 遗嘱处理
boolean willFlag = connectMessage.variableHeader().isWillFlag();
if (willFlag) {
boolean willRetain = connectMessage.variableHeader().isWillRetain();
int willQos = connectMessage.variableHeader().willQos();
String willTopic = connectMessage.payload().willTopic();
byte[] willPayload = connectMessage.payload().willMessageInBytes();
storeWillMsg(clientId, willRetain, willQos, willTopic, willPayload);
}
// 连接成功
returnCode = MqttConnectReturnCode.CONNECTION_ACCEPTED;
NettyUtil.setClientId(ctx.channel(), clientId);
ConnectManager.getInstance().putClient(clientId, clientSession);
}
MqttConnAckMessage ackMessage = MessageUtil.getConnectAckMessage(returnCode, sessionPresent);
ctx.writeAndFlush(ackMessage);
if (returnCode != MqttConnectReturnCode.CONNECTION_ACCEPTED) {
ctx.close();
return;
}
// 消费重连期间错过的消息
reConnect2SendMessage(clientId);
// 新客户端上线通知
newClientNotify(clientSession);
} catch (Exception ex) {
……
}
}SubscribeProcessor
订阅这里比较特别的是:增加了订阅权限的判断。其他都是通用逻辑,如topic格式校验、订阅处理、订阅持久化、保留消息消费。
保留消息处理:保留消息存在Map<String/Topic/, Message>里面,逐个拿出来匹配。如果消息很多(比如设备量级),这个效率还是很低的。
我们来分析一下订阅树:DefaultSubscriptionTreeMatcher,它本身是一棵树:
- 如何创建节点:当有订阅/取消订阅来搜索节点的时候,如果节点不存在,顺手创建
- 如何保证线程安全:Object lock = new Object() + syncronized(lock),全局一把锁
- 如何清理节点:不会清理节点!
清理节点是MQTT订阅树最大的难点,如果取消订阅的时候只清理掉订阅者,而不清理掉“没有订阅者的节点”,当很多设备都订阅自己的主题并且频繁随机断开重连,就会产生很多废弃且GC不掉的节点;如果清理,又会影响订阅的速度和成功率(因为设备通常是断开后立刻重连上同一个服务器)。这也是moquette后面弄出一些“墓碑节点”等特别复杂的设计的原因。
JMQTT这里直接没有清理,算是偏向于“简单稳定”而不是“复杂干净”的无奈平衡。
PublishProcessor
publish有点特别的是,消息不是直接发送的,而只是加入了MessageDispatcher的消息队列,让MessageDispatcher执行发送。这段代码在AbstractMessageProcessor:
java
protected void processMessage(Message message) {
// 单机模式,向本地messageDispatcher增加消息
if (null == clusterMessageTransfer) {
this.messageDispatcher.appendMessage(message);
}
// 保留消息处理
boolean retain = (boolean) message.getHeader(MessageHeader.RETAIN);
if (retain) {
int qos = (int) message.getHeader(MessageHeader.QOS);
byte[] payload = message.getPayload();
String topic = (String) message.getHeader(MessageHeader.TOPIC);
//qos == 0 or payload is none,then clear previous retain message
if (qos == 0 || payload == null || payload.length == 0) {
this.retainMessageStore.removeRetainMessage(topic);
} else {
this.retainMessageStore.storeRetainMessage(topic, message);
}
}
dispatcherMessage2Cluster(message);
}
private void dispatcherMessage2Cluster(Message message) {
// 集群模式,广播消息(当各个节点收到消息后,仍然会执行各自的messageDispatcher.appendMessage)
CommandReqOrResp commandReqOrResp = new CommandReqOrResp(CommandCode.MESSAGE_CLUSTER_TRANSFER, message);
if (clusterMessageTransfer != null) {
clusterMessageTransfer.sendMessage(commandReqOrResp);
}
}MessageDispatcher是如何消费消息的呢:
java
for (Message message : messages) {
// 根据主题,获取订阅者
Set<Subscription> subscriptions = subscriptionMatcher.match((String) message.getHeader(MessageHeader.TOPIC));
// 每个订阅者下发消息
for (Subscription subscription : subscriptions) {
String clientId = subscription.getClientId();
ClientSession clientSession = ConnectManager.getInstance().getClient(subscription.getClientId());
if (ConnectManager.getInstance().containClient(clientId)) {
int qos = MessageUtil.getMinQos((int) message.getHeader(MessageHeader.QOS), subscription.getQos());
int messageId = clientSession.generateMessageId();
message.putHeader(MessageHeader.QOS, qos);
message.setMsgId(messageId);
if (qos > 0) {
flowMessageStore.cacheSendMsg(clientId, message);
}
MqttPublishMessage publishMessage = MessageUtil.getPubMessage(message, false, qos, messageId);
clientSession.getCtx().writeAndFlush(publishMessage);
} else {
offlineMessageStore.addOfflineMessage(clientId, message);
}
}
}这样做有个非常大的好处,就是MessageDispatcher可以随时替换实现,比如用kafka来做。通常来说,publish的消息量是非常大的。
再来看看packetId的生成,当我们在下发消息的时候,需要主动创建packetId。这段代码在ClientSession的generateMessageId里:
java
private transient AtomicInteger messageIdCounter = new AtomicInteger(1);
public int generateMessageId(){
int messageId = messageIdCounter.getAndIncrement();
messageId = Math.abs( messageId % 0xFFFF);
if(messageId == 0){
return generateMessageId();
}
return messageId;
}用AtomicInteger保证线程安全,用% 0xFFFF来循环,如果是0则重新生成(因为MQTT要求[1, 65535])
集群
作者对集群的实现都在cluster里面,定义了4个消息类型:
java
public final class CommandCode {
public static final String CONNECT_QUERY_LAST_STATE = "CONNECT_QUERY_LAST_STATE";
public static final String CONNECT_GET_SUBSCRIPTIONS = "CONNECT_GET_SUBSCRIPTIONS";
public static final String MESSAGE_CLUSTER_TRANSFER = "MESSAGE_CLUSTER_TRANSFER";
public static final String ERROR_CODE = "ERROR_CODE";
}- CONNECT_QUERY_LAST_STATE:在刚连上的时候,需要查询“之前的session”,来踢掉旧连接。这里可能会有疑问,session都放在redis里面统一存储的呀?因为有3种实现,如果是default本地内存的话,就必须从旧节点才能拿到session。
- CONNECT_GET_SUBSCRIPTIONS:同样,在需要复用session的时候,需要恢复订阅,订阅信息也得从旧节点拿或者从统一存储拿。
- MESSAGE_CLUSTER_TRANSFER:前面提到的广播publish消息用
Netty事件
作者把Netty有关的代码都放到remoting这个模块了。Netty事件相当于作者留的钩子。
注册了一个NettyConnectHandler extends ChannelDuplexHandler(收发双向都会经过这个handler):
- channelActive -> CONNECT事件
- channelInactive -> CLOSE事件
- userEventTriggered -> READER_IDLE事件
- exceptionCaught -> EXCEPTION
做的事情:
- onChannelConnect:啥都没做
- onChannelClose:遗嘱消息
- onChannelIdle:啥都没做(因为心跳超时关闭连接已经在NettyConnectHandler处理了)
- onChannelException:清理本地数据(关闭连接已经在NettyConnectHandler处理了)
总结
JMQTT 2.1.0 使用原生java+netty实现了MQTT Broker,对“存储”、“集群信息”、“消息”、“服务参数”都预留了大量接口,提供了大量实现,拆分得非常清楚,非常适合自定义组装或者二次开发。订阅树没有“清理废弃节点”,可能是个隐患。2.1.0的代码很清晰,看起来很舒服。
JMQTT 3.1.0 增加了MQTT5和akka集群的支持,共享订阅还在开发中,但代码很乱。我觉得可能类似EMQ一样,核心代码和其他UI之类的代码要分开多个项目,然后需要制定一些规则,比如一些命名规则,如订阅匹配SubscriptionMatcher有2个实现:
- DefaultDispatcherMessage:默认订阅树,很好理解。
- SubscriptionSupportGroupTreeMatcher:看名字完全看不出来是个啥,看注释才知道是“支持共享订阅的匹配器”,那是不是叫“SharedSubscriptionTreeMatcher”或者“MqttV5TreeMatcher”会更好…
好在今天和Cicizz聊天,他说已经去掉akka了,不过理由是“分发模式会导致内部消息流暴涨导致集群扩展有问题”。但是项目很久没更新了,因为没有动力。我们准备今年再做个规划评审,争取重新把JMQTT开发起来。



粤公网安备44030602007943号