前言
基于MqttWk v1.0.7。
MqttWk是我见过最清晰、代码量最少的Broker了,分析其源码有利于初步了解。
一、代码结构分析
1、整体架构

- mqtt-auth:验证权限方面的代码
- mqtt-broker:Broker核心代码
- mqtt-common:抽象出来的持久化接口
- mqtt-store:持久化接口的Redis、Kafka实现
- mqtt-zoo:简单测试代码
2、mqtt-auth代码结构

验证服务的实现,还有一些验证工具类
3、mqtt-broker核心代码结构

- cluster:集群通信的实现,用的是Redis的发布订阅作消息总线
- codec:基于Websocket的MQTT通信需要特殊的编解码器,这里是为websocket写的编解码器
- config:Broker参数配置
- handler:Broker是基于netty做网络通信的,所以这里是netty连接的核心handler
- internal:内部通信代码(Redis集群通信和kafka转发)
- protocol:MQTT协议逻辑核心代码
- server:启动MQTT服务器的代码,主要是MQTT over websocket、MQTT TCP、链路加密SSL等分别启动在不同端口提供服务
- service:Kafka转发的实现,就是一个Producer
- MainLauncher:启动器,基于nutzboot,非常简洁,和Spring的Application启动器一样
- resources:密钥,参数配置。这里的参数配置是直接properties文件配置,config里的参数配置是基于类的配置,都和Spring的概念一样。
- protocol:在不同MQTT包发过来的时候,Broker应该做的逻辑操作

4、mqtt-common

- auth:验证接口
- message | DupPublish:QoS1/QoS2发布的缓存消息
- message | DupPubRel:QoS2第二阶段的消息
- message | MessageId:分布式的MessageId生成。这里做得不太好,所有的messageId都用这个,甚至没有区分不同客户端,更不用说入口和出口分开了。
- message | RetainMessage:保留消息的存储接口
- session:会话信息存储,包括clientID、遗嘱消息等等
- subscribe:订阅信息,实际上是一棵构建于Redis之上的订阅树
5、mqtt-store

- cache:Redis的增删改查实现,值得注意的是将精确订阅主题和通配符订阅主题分开存储了
- kafka:分区的方式,这里采用按Topic分区,可以分得更均匀,避免热点问题。
- message:消息存储的实现
- session:会话存储的实现
- starter:初始化Kafka存储
- subscribe:订阅信息存储
- storeutil:序列化、反序列化工具。因为netty默认的mqtt数据结构没有继承Serializable,序列化会出问题,所以作者自己写了一个。
6、mqtt-zoo

- keystore:密钥存储,用于验证
- mqtt-test-kafka:kafka测试
- mqtt-test-websocket:websocket连接测试
二、初步运行
1、安装好本地的单机版Redis,推荐使用Redis Desktop Manager来可视化Redis。
2、配置broker-application.properties中的redis
ini
redis.host=127.0.0.1
redis.port=6379
redis.mode=normal3、运行MainLauncher.java
nutzboot会打印参数,这是我最喜欢的一点。

4、用mqtt-spy测试
注意端口默认8885而不是1883,可以在application.properties里面配置。

5、观察Redis存储的内容
都是JSON格式的,因为作者使用的FastJson存储。




粤公网安备44030602007943号