Skip to content

好用的MqttBroker:JMQTT2.1.0源码分析

JMQTT是cicizz在2019年开始开源的项目,当时我们公司刚好在做IoT的东西,需要一个Java语言的MQTT Broker,但是当时物联网刚刚兴起,别说Java写的了,就是其他语言写的,也就只有emqtt(现改名EMQX)算是能用的,其他的如古早的mosquitto都是玩具项目。所以JMQTT1.x是很厉害的,也是我参考了很多的项目。

我参与了JMQTT初期的一些开发,不过现在JMQTT已经发展到3.x版本了,被改了很多东西,如为了支持MQTT5增加了hivemq-client,为了优化集群增加了akka,然后增加了一些UI界面等,还有一些老代码的兼容。个人认为新版的代码已经很乱了,比如MQTT3是用Netty直接写的,MQTT5又用hivemq间接地用Netty,这种就很难管理。

我们从JMQTT2.1.0版来分析源码,会比较清晰一点。

架构分析

jmqtt_design.jpg

这是作者当时画的架构图。结构就是:通信 -> 服务 -> 存储 -> 集群。

包的结构也非常清晰:

  • broker:mqtt协议层,逻辑处理,BrokerStartup为启动类,BrokerController为初始化类,初始化所有的必备环境,其中acl,store的插件配置也必须在这里初始化
  • common:公共层,存放工具类,bean类等
  • remoting:通信层,连接管理,协议解析,心跳等
  • distribution:配置模块,主要是配置文件,启停命令等存放
  • example:客户端示例,目前只有java以及websocket
  • manage:管理模块,主要涉及动态配置和系统监控等功能
  • store:存储模块,提供了mqtt协议数据的几个接口,支持基于内存的和Rocksdb的本地存储

启动

BrokerStartup做的事情只是加载配置,最后new了一个BrokerController来做Broker服务器初始化。

BrokerController初始化:

java
public BrokerController(BrokerConfig brokerConfig, NettyConfig nettyConfig, StoreConfig storeConfig, ClusterConfig clusterConfig) {
    ……
    // 3种存储方式选择:RocksDB、Redis、内存
    switch (storeConfig.getStoreType()) {
        case 1:
            this.abstractMqttStore = new RDBMqttStore(storeConfig);
            break;
        case 2:
            this.abstractMqttStore = new RedisMqttStore(clusterConfig);
            break;
        case 3:
            this.abstractMqttStore = new DefaultMqttStore();
            break;
    }

    // 权限校验组件初始化
    this.connectPermission = new DefaultConnectPermission();
    this.pubSubPermission = new DefaultPubSubPermission();

    // 订阅和转发器初始化
    this.subscriptionMatcher = new DefaultSubscriptionTreeMatcher();
    this.messageDispatcher = new DefaultDispatcherMessage(brokerConfig.getPollThreadNum(), subscriptionMatcher, flowMessageStore, offlineMessageStore);

    // 集群事件监听器
    this.channelEventListener = new ClientLifeCycleHookService(willMessageStore, messageDispatcher);

    // RPC服务器
    this.remotingServer = new NettyRemotingServer(brokerConfig, nettyConfig, channelEventListener);
    this.httpServer = new HttpServer(nettyConfig);

    // 消息重发服务
    this.reSendMessageService = new ReSendMessageService(offlineMessageStore, flowMessageStore);

    int coreThreadNum = Runtime.getRuntime().availableProcessors();

    this.connectExecutor = new ThreadPoolExecutor(coreThreadNum * 2,
            coreThreadNum * 2,
            60000,
            TimeUnit.MILLISECONDS,
            connectQueue,
            new ThreadFactoryImpl("ConnectThread"),
            new RejectHandler("connect", 100000));
    this.pubExecutor = ……
    this.subExecutor = ……
    this.pingExecutor = ……

    // 如果开启了集群,需要广播消息和同步session
    if (checkClusterMode()) {
        switch (clusterConfig.getClusterComponentName()) {
            // 本地集群,Hazelcast实现
            case "local":
                this.clusterSessionManager = new DefaultClusterSessionManager(sessionStore, subscriptionStore);
                this.clusterMessageTransfer = new DefaultClusterMessageTransfer(messageDispatcher, clusterConfig);
                break;
            // 利用redis实现集群
            case "redis":
                this.clusterSessionManager = new RedisClusterSessionManager(sessionStore, subscriptionStore);
                this.clusterMessageTransfer = new RedisClusterMessageTransfer(messageDispatcher, (RedisMqttStore) abstractMqttStore);
                break;
        }
    }

}

可以看到这些信息:

  • 作者没有用IoC容器,不得不自己大量new很多service,当时他说不想依赖其他的东西,原生java比较容易被定制化和集成,这的确也有道理。
  • 存储服务实现了3种,内存、Redis、RocksDB,根据配置进行实例化
  • 权限校验分2种,连接身份校验和发布订阅校验,这个“发布订阅校验”,是很多开源Broker都没有考虑到但实际是需要的东西。因为别人不应该订阅你家摄像头“视频URL”的消息,同样你也不应该往别人家门锁发送“开门”的消息。
  • 订阅服务是DefaultSubscriptionTreeMatcher,很明显是个树结构,摆脱了“遍历通配符”的低效操作
  • 连接、发布、订阅、心跳,都有各自的线程池去处理。这里涉及一个课题,“业务应该在handler用自己的线程池,还是应该共享Netty的线程?”。我个人认为最佳实践是,“如果业务没有I/O,就用Netty的,如果有I/O,就用自己的线程池”。所以这里的设计是非常合理的。
  • 最后如果开启了集群,有2种实现,Hazelcast和Redis。Hazelcast是作者早期参考moquette的集群方式,因为不想用Redis这种集中存储。Hazelcast把数据分片存储在各个服务器上,并且自己实现了集群,对MQTT这里看似很友好,但实际上维护困难,不够可靠。这是后面用Redis做集群的原因。
java
public void start() {
    ……
    this.remotingServer.start();
    ……
}
@Override
public void start() {
    ……
    // TCP + MQTT
    if (nettyConfig.getStartTcp()) {
        startTcpServer(false, nettyConfig.getTcpPort());
    }
    // TCP + MQTT + SSL
    if (nettyConfig.getStartSslTcp()) {
        startTcpServer(true, nettyConfig.getSslTcpPort());
    }

    // WEBSOCKET + MQTT
    if (nettyConfig.getStartWebsocket()) {
        startWebsocketServer(false, nettyConfig.getWebsocketPort());
    }
    // WEBSOCKET + MQTT + SSL
    if (nettyConfig.getStartSslWebsocket()) {
        startWebsocketServer(true, nettyConfig.getSslWebsocketPort());
    }
}

这里就是比较熟悉的,用Netty服务器绑定端口监听消息的代码了。最后的一个handler是业务handler:

java
class NettyMqttHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object obj) {
        MqttMessage mqttMessage = (MqttMessage) obj;
        if (mqttMessage != null && mqttMessage.decoderResult().isSuccess()) {
            // 根据消息类型,把消息交给各个Processor处理
            MqttMessageType messageType = mqttMessage.fixedHeader().messageType();
            Runnable runnable = () -> processorTable.get(messageType).getObject1().processRequest(ctx, mqttMessage);
            try {
                processorTable.get(messageType).getObject2().submit(runnable);
            } catch (RejectedExecutionException ex) {
                log.warn("Reject mqtt request,cause={}", ex.getMessage());
            }
        } else {
            ctx.close();
        }
    }
}

消息处理器

ConnectProcessor

java
@Override
public void processRequest(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
   ……
   try {
       // 版本不对、clientId不对、黑名单、连接未授权等异常
       if (!versionValid(mqttVersion)) {
           returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION;
       } else if (!clientIdVerify(clientId)) {
           returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
       } else if (onBlackList(RemotingHelper.getRemoteAddr(ctx.channel()), clientId)) {
           returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
       } else if (!authentication(clientId, userName, password)) {
           returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
       } else {
           int heartbeatSec = connectMessage.variableHeader().keepAliveTimeSeconds();

           // 如果是集群,而且其他节点有连接,踢下线
           CommandReqOrResp response = null;
           if (this.clusterSessionManager != null) {
               response = clusterSessionManager.process(new CommandReqOrResp(CommandCode.CONNECT_QUERY_LAST_STATE));
           }
           if (Objects.nonNull(response) && Objects.nonNull(response.getBody()) && BooleanUtils.toBoolean(response.getBody().toString())) {
               ClientSession previousClient = ConnectManager.getInstance().getClient(clientId);
               if (previousClient != null) {
                   previousClient.getCtx().close();
                   ConnectManager.getInstance().removeClient(clientId);
               }
           }

           // 清理session标记处理,清理或者复用
           if (cleanSession) {
               clientSession = createNewClientSession(clientId, ctx);
               sessionPresent = false;
           } else {
               if (Objects.nonNull(response) && Objects.nonNull(response.getBody())) {
                   clientSession = reloadClientSession(ctx, clientId);
                   sessionPresent = true;
               } else {
                   clientSession = new ClientSession(clientId, false, ctx);
                   sessionPresent = false;
               }
           }

           // 标记上线
           sessionStore.setSession(clientId, true);

           // 遗嘱处理
           boolean willFlag = connectMessage.variableHeader().isWillFlag();
           if (willFlag) {
               boolean willRetain = connectMessage.variableHeader().isWillRetain();
               int willQos = connectMessage.variableHeader().willQos();
               String willTopic = connectMessage.payload().willTopic();
               byte[] willPayload = connectMessage.payload().willMessageInBytes();
               storeWillMsg(clientId, willRetain, willQos, willTopic, willPayload);
           }

           // 连接成功
           returnCode = MqttConnectReturnCode.CONNECTION_ACCEPTED;
           NettyUtil.setClientId(ctx.channel(), clientId);
           ConnectManager.getInstance().putClient(clientId, clientSession);
       }
       MqttConnAckMessage ackMessage = MessageUtil.getConnectAckMessage(returnCode, sessionPresent);
       ctx.writeAndFlush(ackMessage);
       if (returnCode != MqttConnectReturnCode.CONNECTION_ACCEPTED) {
           ctx.close();
           return;
       }
       // 消费重连期间错过的消息
       reConnect2SendMessage(clientId);
       // 新客户端上线通知
       newClientNotify(clientSession);
   } catch (Exception ex) {
       ……
   }
}

SubscribeProcessor

订阅这里比较特别的是:增加了订阅权限的判断。其他都是通用逻辑,如topic格式校验、订阅处理、订阅持久化、保留消息消费。

保留消息处理:保留消息存在Map<String/Topic/, Message>里面,逐个拿出来匹配。如果消息很多(比如设备量级),这个效率还是很低的。

我们来分析一下订阅树:DefaultSubscriptionTreeMatcher,它本身是一棵树:

  • 如何创建节点:当有订阅/取消订阅来搜索节点的时候,如果节点不存在,顺手创建
  • 如何保证线程安全:Object lock = new Object() + syncronized(lock),全局一把锁
  • 如何清理节点:不会清理节点!

清理节点是MQTT订阅树最大的难点,如果取消订阅的时候只清理掉订阅者,而不清理掉“没有订阅者的节点”,当很多设备都订阅自己的主题并且频繁随机断开重连,就会产生很多废弃且GC不掉的节点;如果清理,又会影响订阅的速度和成功率(因为设备通常是断开后立刻重连上同一个服务器)。这也是moquette后面弄出一些“墓碑节点”等特别复杂的设计的原因。

JMQTT这里直接没有清理,算是偏向于“简单稳定”而不是“复杂干净”的无奈平衡。

PublishProcessor

publish有点特别的是,消息不是直接发送的,而只是加入了MessageDispatcher的消息队列,让MessageDispatcher执行发送。这段代码在AbstractMessageProcessor:

java
protected void processMessage(Message message) {
    // 单机模式,向本地messageDispatcher增加消息
    if (null == clusterMessageTransfer) {
        this.messageDispatcher.appendMessage(message);
    }

    // 保留消息处理
    boolean retain = (boolean) message.getHeader(MessageHeader.RETAIN);
    if (retain) {
        int qos = (int) message.getHeader(MessageHeader.QOS);
        byte[] payload = message.getPayload();
        String topic = (String) message.getHeader(MessageHeader.TOPIC);
        //qos == 0 or payload is none,then clear previous retain message
        if (qos == 0 || payload == null || payload.length == 0) {
            this.retainMessageStore.removeRetainMessage(topic);
        } else {
            this.retainMessageStore.storeRetainMessage(topic, message);
        }
    }
    dispatcherMessage2Cluster(message);
}

private void dispatcherMessage2Cluster(Message message) {
    // 集群模式,广播消息(当各个节点收到消息后,仍然会执行各自的messageDispatcher.appendMessage)
    CommandReqOrResp commandReqOrResp = new CommandReqOrResp(CommandCode.MESSAGE_CLUSTER_TRANSFER, message);
    if (clusterMessageTransfer != null) {
        clusterMessageTransfer.sendMessage(commandReqOrResp);
    }
}

MessageDispatcher是如何消费消息的呢:

java
for (Message message : messages) {
   // 根据主题,获取订阅者
   Set<Subscription> subscriptions = subscriptionMatcher.match((String) message.getHeader(MessageHeader.TOPIC));
   // 每个订阅者下发消息
   for (Subscription subscription : subscriptions) {
       String clientId = subscription.getClientId();
       ClientSession clientSession = ConnectManager.getInstance().getClient(subscription.getClientId());
       if (ConnectManager.getInstance().containClient(clientId)) {
           int qos = MessageUtil.getMinQos((int) message.getHeader(MessageHeader.QOS), subscription.getQos());
           int messageId = clientSession.generateMessageId();
           message.putHeader(MessageHeader.QOS, qos);
           message.setMsgId(messageId);
           if (qos > 0) {
               flowMessageStore.cacheSendMsg(clientId, message);
           }
           MqttPublishMessage publishMessage = MessageUtil.getPubMessage(message, false, qos, messageId);
           clientSession.getCtx().writeAndFlush(publishMessage);
       } else {
           offlineMessageStore.addOfflineMessage(clientId, message);
       }
   }
}

这样做有个非常大的好处,就是MessageDispatcher可以随时替换实现,比如用kafka来做。通常来说,publish的消息量是非常大的。

再来看看packetId的生成,当我们在下发消息的时候,需要主动创建packetId。这段代码在ClientSession的generateMessageId里:

java
private transient AtomicInteger messageIdCounter = new AtomicInteger(1);
public int generateMessageId(){
    int messageId = messageIdCounter.getAndIncrement();
    messageId = Math.abs( messageId % 0xFFFF);
    if(messageId == 0){
        return generateMessageId();
    }
    return messageId;
}

用AtomicInteger保证线程安全,用% 0xFFFF来循环,如果是0则重新生成(因为MQTT要求[1, 65535])

集群

作者对集群的实现都在cluster里面,定义了4个消息类型:

java
public final class CommandCode {
    public static final String CONNECT_QUERY_LAST_STATE = "CONNECT_QUERY_LAST_STATE";
    public static final String CONNECT_GET_SUBSCRIPTIONS = "CONNECT_GET_SUBSCRIPTIONS";
    public static final String MESSAGE_CLUSTER_TRANSFER = "MESSAGE_CLUSTER_TRANSFER";
    public static final String ERROR_CODE = "ERROR_CODE";
}
  • CONNECT_QUERY_LAST_STATE:在刚连上的时候,需要查询“之前的session”,来踢掉旧连接。这里可能会有疑问,session都放在redis里面统一存储的呀?因为有3种实现,如果是default本地内存的话,就必须从旧节点才能拿到session。
  • CONNECT_GET_SUBSCRIPTIONS:同样,在需要复用session的时候,需要恢复订阅,订阅信息也得从旧节点拿或者从统一存储拿。
  • MESSAGE_CLUSTER_TRANSFER:前面提到的广播publish消息用

Netty事件

作者把Netty有关的代码都放到remoting这个模块了。Netty事件相当于作者留的钩子。

注册了一个NettyConnectHandler extends ChannelDuplexHandler(收发双向都会经过这个handler):

  • channelActive -> CONNECT事件
  • channelInactive -> CLOSE事件
  • userEventTriggered -> READER_IDLE事件
  • exceptionCaught -> EXCEPTION

做的事情:

  • onChannelConnect:啥都没做
  • onChannelClose:遗嘱消息
  • onChannelIdle:啥都没做(因为心跳超时关闭连接已经在NettyConnectHandler处理了)
  • onChannelException:清理本地数据(关闭连接已经在NettyConnectHandler处理了)

总结

JMQTT 2.1.0 使用原生java+netty实现了MQTT Broker,对“存储”、“集群信息”、“消息”、“服务参数”都预留了大量接口,提供了大量实现,拆分得非常清楚,非常适合自定义组装或者二次开发。订阅树没有“清理废弃节点”,可能是个隐患。2.1.0的代码很清晰,看起来很舒服。

JMQTT 3.1.0 增加了MQTT5和akka集群的支持,共享订阅还在开发中,但代码很乱。我觉得可能类似EMQ一样,核心代码和其他UI之类的代码要分开多个项目,然后需要制定一些规则,比如一些命名规则,如订阅匹配SubscriptionMatcher有2个实现:

  • DefaultDispatcherMessage:默认订阅树,很好理解。
  • SubscriptionSupportGroupTreeMatcher:看名字完全看不出来是个啥,看注释才知道是“支持共享订阅的匹配器”,那是不是叫“SharedSubscriptionTreeMatcher”或者“MqttV5TreeMatcher”会更好…

好在今天和Cicizz聊天,他说已经去掉akka了,不过理由是“分发模式会导致内部消息流暴涨导致集群扩展有问题”。但是项目很久没更新了,因为没有动力。我们准备今年再做个规划评审,争取重新把JMQTT开发起来。

转载请注明出处https://bananaoven.com/articles/7268.html | 香蕉微波炉
分享许可方式知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议
重大发现:转载注明原文网址的同学刚买了彩票就中奖,刚写完代码就跑通,刚转身就遇到了真爱。
具体问题具体杠
0/20000
发送
加载中...