Skip to content

MqttWk源码分析(一):代码结构和初步运行

前言

基于MqttWk v1.0.7。

MqttWk是我见过最清晰、代码量最少的Broker了,分析其源码有利于初步了解。

一、代码结构分析

1、整体架构

2956a85900b0df5126c3f955b94b9510.png

  • mqtt-auth:验证权限方面的代码
  • mqtt-broker:Broker核心代码
  • mqtt-common:抽象出来的持久化接口
  • mqtt-store:持久化接口的Redis、Kafka实现
  • mqtt-zoo:简单测试代码

2、mqtt-auth代码结构

7ca8cda1b7be40057b9605d9a3fa17c9.png

验证服务的实现,还有一些验证工具类

3、mqtt-broker核心代码结构

8308ec9055b074449ad4b9e1172fdf72.png

  • cluster:集群通信的实现,用的是Redis的发布订阅作消息总线
  • codec:基于Websocket的MQTT通信需要特殊的编解码器,这里是为websocket写的编解码器
  • config:Broker参数配置
  • handler:Broker是基于netty做网络通信的,所以这里是netty连接的核心handler
  • internal:内部通信代码(Redis集群通信和kafka转发)
  • protocol:MQTT协议逻辑核心代码
  • server:启动MQTT服务器的代码,主要是MQTT over websocket、MQTT TCP、链路加密SSL等分别启动在不同端口提供服务
  • service:Kafka转发的实现,就是一个Producer
  • MainLauncher:启动器,基于nutzboot,非常简洁,和Spring的Application启动器一样
  • resources:密钥,参数配置。这里的参数配置是直接properties文件配置,config里的参数配置是基于类的配置,都和Spring的概念一样。
  • protocol:在不同MQTT包发过来的时候,Broker应该做的逻辑操作

b37da059386dd791a429a29bf3ad3a34.png

4、mqtt-common

646a8121cc2c006b159f73dd49b86f61.png

  • auth:验证接口
  • message | DupPublish:QoS1/QoS2发布的缓存消息
  • message | DupPubRel:QoS2第二阶段的消息
  • message | MessageId:分布式的MessageId生成。这里做得不太好,所有的messageId都用这个,甚至没有区分不同客户端,更不用说入口和出口分开了。
  • message | RetainMessage:保留消息的存储接口
  • session:会话信息存储,包括clientID、遗嘱消息等等
  • subscribe:订阅信息,实际上是一棵构建于Redis之上的订阅树

5、mqtt-store

01988d72d9531b4d1660077c2f8f0ce9.png

  • cache:Redis的增删改查实现,值得注意的是将精确订阅主题和通配符订阅主题分开存储了
  • kafka:分区的方式,这里采用按Topic分区,可以分得更均匀,避免热点问题。
  • message:消息存储的实现
  • session:会话存储的实现
  • starter:初始化Kafka存储
  • subscribe:订阅信息存储
  • storeutil:序列化、反序列化工具。因为netty默认的mqtt数据结构没有继承Serializable,序列化会出问题,所以作者自己写了一个。

6、mqtt-zoo

1cf6c0aa5dfe22ff927e1cff9b72ccc5.png

  • keystore:密钥存储,用于验证
  • mqtt-test-kafka:kafka测试
  • mqtt-test-websocket:websocket连接测试

二、初步运行

1、安装好本地的单机版Redis,推荐使用Redis Desktop Manager来可视化Redis。

2、配置broker-application.properties中的redis

ini
redis.host=127.0.0.1
redis.port=6379
redis.mode=normal

3、运行MainLauncher.java

nutzboot会打印参数,这是我最喜欢的一点。

8f279368609d47bc1b39f8c0cc5670a5.png

4、用mqtt-spy测试

注意端口默认8885而不是1883,可以在application.properties里面配置。

8456732996eb39cb415d76b1605c9d94.png

5、观察Redis存储的内容

都是JSON格式的,因为作者使用的FastJson存储。

4e78db866ee254c9005d5051c13df2cd.png

转载请注明出处https://bananaoven.com/articles/247.html | 香蕉微波炉
分享许可方式知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议
重大发现:转载注明原文网址的同学刚买了彩票就中奖,刚写完代码就跑通,刚转身就遇到了真爱。