入门级MqttBroker:MqttWk1.3.0源码分析

MqttWk可以算作开源MQTT Broker中的教科书,代码简洁易懂,虽然并不适合生产,但对照着MQTT白皮书的每个功能,都能在里面找到基本的实现,能够对Broker有个非常全面的认知以及有了兜底方案的底气(实在写不出,按这个思路先上生产也行,能用就行)。

作者wizzer其实是为了推广他写的Nutz,把MqttWk当作Nutz的一个应用场景来写的。Nutz/NutzBoot就是国产版Spring/Springboot,作者除了实现IoC、AOP等基本功能,还做了一些国内公司喜欢的小优化。在MqttWk中大量出现Nutz的代码,不过不必担心,只要会Spring就能够瞬间理解,只是换了个参数名字而已。

相关链接:

我们接下来从最新的1.3.0的版本的源码入手,分析MqttWk如何实现的基本功能。

MqttWk 1.3.0 实现了哪些功能

  • 协议:MQTT 3.1.1
  • QoS:0、1、2
  • 遗嘱消息、保留消息、消息分发重试、心跳机制
  • MQTT连接认证、SSL、Websocket
  • 单主题订阅、通配符订阅
  • 集群
  • kafka消息转发

MqttWk 包结构

1
2
3
4
5
6
7
8
9
MqttWk
├── mqtt-auth -- MQTT服务连接时用户名和密码认证
├── mqtt-broker -- MQTT服务器功能的核心实现
├── mqtt-common -- 公共类及其他模块使用的服务接口及对象
├── mqtt-store -- MQTT服务器会话信息(redis缓存及kafka加载)
├── mqtt-client -- MQTT客户端示例代码(配置文件修改数据库连接启动之)
├── mqtt-zoo -- 教程文档或文件
├── mqtt-test-kafka -- kafka消费者接收消息
├── mqtt-test-websocket -- websocket通信测试示例

MqttWk 功能实现分析

小提示:作者release的1.3.0版本代码,nutzboot.version用的是2.4.3-SNAPSHOT,在外网是找不到的,简单替换成2.4.2.v20201205就好,不影响看代码。

Broker的启动

mqtt-broker的MainLauncher就是入口,类似于springboot的Application,可以看到注入了一个RedisService和BrokerServer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class MainLauncher {

@Inject("refer:$ioc")
private Ioc ioc;
@Inject
private PropertiesProxy conf;
@Inject
private RedisService redisService;
@Inject
private BrokerServer brokerServer;

public static void main(String[] args) throws Exception {
NbApp nb = new NbApp().setArgs(args).setPrintProcDoc(true);
nb.setMainPackage("cn.wizzer.iot");
nb.run();
}
}

BrokerServer就应该是启动类,入口是start方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Broker配置
private BrokerProperties brokerProperties;
// Netty的经典实现,boss和worker组
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
// ssl
private SslContext sslContext;
// 普通tcp和websocket的channel。channel是netty的一个经典模型,可以用它来操作I/O
private Channel channel;
private Channel websocketChannel;
// channel的集合(或者说容器),用来管理一堆channel,可以通过ChannelId获取指定的channel
private ChannelGroup channelGroup;
// 作者自定义的容器,能够通过自定义key,找到对应的channelId,从而找到channel发消息
private Map<String, ChannelId> channelIdMap;

public void start() throws Exception {

channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
channelIdMap = new HashMap<>();

// 这里很有参考意义,linux下用epoll,性能会得到很大提升。本地调试用nio
bossGroup = brokerProperties.getUseEpoll() ? new EpollEventLoopGroup(brokerProperties.getBossGroup_nThreads()) : new NioEventLoopGroup(brokerProperties.getBossGroup_nThreads());
workerGroup = brokerProperties.getUseEpoll() ? new EpollEventLoopGroup(brokerProperties.getWorkerGroup_nThreads()) : new NioEventLoopGroup(brokerProperties.getWorkerGroup_nThreads());

// 初始化ssl上下文
if (brokerProperties.getSslEnabled()) {
KeyStore keyStore = KeyStore.getInstance("PKCS12");
InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream("keystore/server.pfx");
keyStore.load(inputStream, brokerProperties.getSslPassword().toCharArray());
KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
kmf.init(keyStore, brokerProperties.getSslPassword().toCharArray());
sslContext = SslContextBuilder.forServer(kmf).build();
}

// 启动mqtt服务器
mqttServer();
if (brokerProperties.getWebsocketEnabled()) {
// 启动websocket服务器
websocketServer();
}
}

mqttServer是这样实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 准备一个启动器
ServerBootstrap sb = new ServerBootstrap();
sb.group(bossGroup, workerGroup)
.channel(brokerProperties.getUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
// handler在初始化时就会执行
.handler(new LoggingHandler(LogLevel.INFO))
// childHandler会在客户端成功connect后才执行
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
// Netty提供的心跳检测,当作Broker的心跳检测器
channelPipeline.addFirst("idle", new IdleStateHandler(0, 0, brokerProperties.getKeepAlive()));
// Netty提供的SSL处理
if (brokerProperties.getSslEnabled()) {
SSLEngine sslEngine = sslContext.newEngine(socketChannel.alloc());
sslEngine.setUseClientMode(false); // 服务端模式
sslEngine.setNeedClientAuth(false); // 不需要验证客户端
channelPipeline.addLast("ssl", new SslHandler(sslEngine));
}
// Netty提供的MQTT反序列化器
channelPipeline.addLast("decoder", new MqttDecoder());
// Netty提供的MQTT序列化器
channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
// 最后才启动BrokerHandler
channelPipeline.addLast("broker", ioc.get(BrokerHandler.class));
}
})
// 一些tcp参数配置
.option(ChannelOption.SO_BACKLOG, brokerProperties.getSoBacklog())
.childOption(ChannelOption.SO_KEEPALIVE, brokerProperties.getSoKeepAlive());
if (Strings.isNotBlank(brokerProperties.getHost())) {
// 通过bind操作,启动netty服务器,开始监听连接
channel = sb.bind(brokerProperties.getHost(), brokerProperties.getPort()).sync().channel();
} else {
channel = sb.bind(brokerProperties.getPort()).sync().channel();
}

这里涉及了大量netty的知识,略过。重点关注:

  • 作者利用了Netty 4.1.77.Final实现了服务器,包括TCP调参、SSL解析、甚至是MQTT协议解析,都完全不用操心。这个版本的netty只有MQTT3.1.1协议解析器,Netty对MQTT5.0的支持还在开发中。有netty这么优秀好用的框架,实践中就请不要再使用原生socket从盘古开天地开始写起了。
  • netty最重要的模型就是pipeline,依次执行pipeline上的handler,最后执行到自定义的BrokerHandler,来做业务操作
  • 可能会有疑惑,decoder和encoder怎么写在一起?实际上encoder继承的是ChannelOutboundHandlerAdapter,而decoder是ChannelInboundHandlerAdapter,一个输出才会执行,一个输入才会执行,所以写在一起并不影响。

websocket类似,这里就略过了。

报文交互:BrokerHandler

报文交互继承了一个SimpleChannelInboundHandler的类,这个类会帮你做一些清理内存的操作,非常推荐使用。

我们之前在线上出过一个内存泄漏的问题,就是因为Netty在堆外内存创建MqttMessage报文对象后,并不会对它进行清理)(因为不知道你什么时候才不用了),而且又不能被GC回收,是需要业务手动清理的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class BrokerHandler extends SimpleChannelInboundHandler<MqttMessage> {

@Inject
private ProtocolProcess protocolProcess;
@Inject
private BrokerProperties brokerProperties;
@Inject
private ChannelGroup channelGroup;
@Inject
private Map<String, ChannelId> channelIdMap;

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 新连接接入
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 连接断开时
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
// 连接发送了消息
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 操作中发生了任何异常
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 用户自定义的事件触发了
}
}

先来看看一些简单的操作都做了什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
// 把连接加入到channelGroup中管理
this.channelGroup.add(ctx.channel());
// 自定义key-value:brokerId_channelId -> channelId,方便查找
this.channelIdMap.put(brokerProperties.getId() + "_" + ctx.channel().id().asLongText(), ctx.channel().id());
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
// 移除操作
this.channelGroup.remove(ctx.channel());
this.channelIdMap.remove(brokerProperties.getId() + "_" + ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof IOException) {
// 远程主机强迫关闭了一个现有的连接的异常,关闭channel
ctx.close();
} else {
super.exceptionCaught(ctx, cause);
}
}

然后用户事件触发,作者做的事情主要是检测连接心跳超时,然后发送遗嘱消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
// 心跳超时事件(由前面加的IdleStateHandler发起)
if (idleStateEvent.state() == IdleState.ALL_IDLE) {
Channel channel = ctx.channel();
// 从channel的上下文拿到clientId
String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
// 如果有遗嘱消息,则发送遗嘱消息
if (this.protocolProcess.getSessionStoreService().containsKey(clientId)) {
SessionStore sessionStore = this.protocolProcess.getSessionStoreService().get(clientId);
if (sessionStore.getWillMessage() != null) {
this.protocolProcess.publish().processPublish(ctx.channel(), sessionStore.getWillMessage());
}
}
ctx.close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}

值得学习的一点就是这里channel.attr的使用,我们可以把一些clientId之类的参数存入,方便全局获取,有点类似于Threadlocal的作用。

当收到MQTT报文时,作者并没有直接在这里做处理,而是将它交由自定义的各个协议处理器来处理对应协议,这样handler的代码就会很清晰,值得参考。大部分Broker也都是这样做的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
// 报文解析失败,按照MQTT协议做对应返回
if (msg.decoderResult().isFailure()) {
Throwable cause = msg.decoderResult().cause();
if (cause instanceof MqttUnacceptableProtocolVersionException) {
ctx.writeAndFlush(MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false),
null));
} else if (cause instanceof MqttIdentifierRejectedException) {
ctx.writeAndFlush(MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false),
null));
}
ctx.close();
return;
}

// 根据报文类型,执行不同的协议处理器protocolProcess
switch (msg.fixedHeader().messageType()) {
case CONNECT:
protocolProcess.connect().processConnect(ctx.channel(), (MqttConnectMessage) msg);
break;
case CONNACK:
break;
case PUBLISH:
protocolProcess.publish().processPublish(ctx.channel(), (MqttPublishMessage) msg);
break;
case PUBACK:
protocolProcess.pubAck().processPubAck(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case PUBREC:
protocolProcess.pubRec().processPubRec(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case PUBREL:
protocolProcess.pubRel().processPubRel(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case PUBCOMP:
protocolProcess.pubComp().processPubComp(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case SUBSCRIBE:
protocolProcess.subscribe().processSubscribe(ctx.channel(), (MqttSubscribeMessage) msg);
break;
case SUBACK:
break;
case UNSUBSCRIBE:
protocolProcess.unSubscribe().processUnSubscribe(ctx.channel(), (MqttUnsubscribeMessage) msg);
break;
case UNSUBACK:
break;
case PINGREQ:
protocolProcess.pingReq().processPingReq(ctx.channel(), msg);
break;
case PINGRESP:
break;
case DISCONNECT:
protocolProcess.disConnect().processDisConnect(ctx.channel(), msg);
break;
default:
break;
}
}

MQTT Connect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
public class Connect {
// 会话服务
private ISessionStoreService sessionStoreService;
// 主题订阅服务
private ISubscribeStoreService subscribeStoreService;
// QoS1、QoS2第一阶段的消息重发服务
private IDupPublishMessageStoreService dupPublishMessageStoreService;
// Qos2第二阶段的消息重发服务
private IDupPubRelMessageStoreService dupPubRelMessageStoreService;
// 认证服务
private IAuthService authService;
// Broker的参数
private BrokerProperties brokerProperties;
// 连接channel管理
private ChannelGroup channelGroup;
private Map<String, ChannelId> channelIdMap;

public void processConnect(Channel channel, MqttConnectMessage msg) {
// 消息解码器出现异常
if (msg.decoderResult().isFailure()) {
Throwable cause = msg.decoderResult().cause();
if (cause instanceof MqttUnacceptableProtocolVersionException) {
// 不支持的协议版本
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false), null);
channel.writeAndFlush(connAckMessage);
channel.close();
return;
} else if (cause instanceof MqttIdentifierRejectedException) {
// 不合格的clientId
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
channel.writeAndFlush(connAckMessage);
channel.close();
return;
}
channel.close();
return;
}

// clientId为空或null的情况, 这里要求客户端必须提供clientId, 不管cleanSession是否为1, 此处没有参考标准协议实现
if (StrUtil.isBlank(msg.payload().clientIdentifier())) {
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
channel.writeAndFlush(connAckMessage);
channel.close();
return;
}

// 用户名和密码验证, 这里要求客户端连接时必须提供用户名和密码, 不管是否设置用户名标志和密码标志为1, 此处没有参考标准协议实现
if (brokerProperties.getMqttPasswordMust()) {
String username = msg.payload().userName();
String password = msg.payload().passwordInBytes() == null ? null : new String(msg.payload().passwordInBytes(), CharsetUtil.UTF_8);
if (!authService.checkValid(username, password)) {
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null);
channel.writeAndFlush(connAckMessage);
channel.close();
return;
}
}

// 如果会话中已存储这个新连接的clientId, 就关闭之前该clientId的连接
if (sessionStoreService.containsKey(msg.payload().clientIdentifier())) {
SessionStore sessionStore = sessionStoreService.get(msg.payload().clientIdentifier());
boolean cleanSession = sessionStore.isCleanSession();
// 如果每次重连都要清除会话,就执行清除
if (cleanSession) {
sessionStoreService.remove(msg.payload().clientIdentifier());
subscribeStoreService.removeForClient(msg.payload().clientIdentifier());
dupPublishMessageStoreService.removeByClient(msg.payload().clientIdentifier());
dupPubRelMessageStoreService.removeByClient(msg.payload().clientIdentifier());
}
try {
ChannelId channelId = channelIdMap.get(sessionStore.getBrokerId() + "_" + sessionStore.getChannelId());
if (channelId != null) {
Channel previous = channelGroup.find(channelId);
if (previous != null) previous.close();
}
} catch (Exception e) {
//e.printStackTrace();
}
} else {
//如果不存在session,则清除之前的其他缓存
subscribeStoreService.removeForClient(msg.payload().clientIdentifier());
dupPublishMessageStoreService.removeByClient(msg.payload().clientIdentifier());
dupPubRelMessageStoreService.removeByClient(msg.payload().clientIdentifier());
}

// 处理连接心跳包
int expire = 0;
if (msg.variableHeader().keepAliveTimeSeconds() > 0) {
if (channel.pipeline().names().contains("idle")) {
channel.pipeline().remove("idle");
}
expire = Math.round(msg.variableHeader().keepAliveTimeSeconds() * 1.5f);
channel.pipeline().addFirst("idle", new IdleStateHandler(0, 0, expire));
}

// 存储遗嘱信息
SessionStore sessionStore = new SessionStore(brokerProperties.getId(), msg.payload().clientIdentifier(), channel.id().asLongText(), msg.variableHeader().isCleanSession(), null, expire);
if (msg.variableHeader().isWillFlag()) {
MqttPublishMessage willMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(msg.variableHeader().willQos()), msg.variableHeader().isWillRetain(), 0),
new MqttPublishVariableHeader(msg.payload().willTopic(), 0), Unpooled.buffer().writeBytes(msg.payload().willMessageInBytes()));
sessionStore.setWillMessage(willMessage);
}

// 存储会话
sessionStoreService.put(msg.payload().clientIdentifier(), sessionStore, expire);

// 将clientId存储到channel的map中
channel.attr(AttributeKey.valueOf("clientId")).set(msg.payload().clientIdentifier());
boolean sessionPresent = sessionStoreService.containsKey(msg.payload().clientIdentifier()) && !msg.variableHeader().isCleanSession();

// 回复成功
MqttConnAckMessage okResp = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, sessionPresent), null);
channel.writeAndFlush(okResp);

// 如果cleanSession为0, 需要重发同一clientId存储的未完成的QoS1和QoS2的DUP消息
if (!msg.variableHeader().isCleanSession()) {
List<DupPublishMessageStore> dupPublishMessageStoreList = dupPublishMessageStoreService.get(msg.payload().clientIdentifier());
List<DupPubRelMessageStore> dupPubRelMessageStoreList = dupPubRelMessageStoreService.get(msg.payload().clientIdentifier());
dupPublishMessageStoreList.forEach(dupPublishMessageStore -> {
MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, true, MqttQoS.valueOf(dupPublishMessageStore.getMqttQoS()), false, 0),
new MqttPublishVariableHeader(dupPublishMessageStore.getTopic(), dupPublishMessageStore.getMessageId()), Unpooled.buffer().writeBytes(dupPublishMessageStore.getMessageBytes()));
channel.writeAndFlush(publishMessage);
});
dupPubRelMessageStoreList.forEach(dupPubRelMessageStore -> {
MqttMessage pubRelMessage = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBREL, true, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(dupPubRelMessageStore.getMessageId()), null);
channel.writeAndFlush(pubRelMessage);
});
}
}

}

CONNECT主要处理的是:

  • cleanSession和Qos的处理
  • 心跳1.5倍
  • 遗嘱的处理
  • username+password身份校验
  • clientId、session等基本信息的存储

MQTT Subscribe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public class Subscribe {
// 存储服务
private ISubscribeStoreService subscribeStoreService;
// 报文标识符生成服务
private IMessageIdService messageIdService;
// 保留消息服务
private IRetainMessageStoreService retainMessageStoreService;

public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
// 获取要订阅的主题
List<MqttTopicSubscription> topicSubscriptions = msg.payload().topicSubscriptions();

if (this.validTopicFilter(topicSubscriptions)) {
String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
List<Integer> mqttQoSList = new ArrayList<Integer>();

// 存储每个主题
topicSubscriptions.forEach(topicSubscription -> {
String topicFilter = topicSubscription.topicName();
MqttQoS mqttQoS = topicSubscription.qualityOfService();
SubscribeStore subscribeStore = new SubscribeStore(clientId, topicFilter, mqttQoS.value());
subscribeStoreService.put(topicFilter, subscribeStore);
mqttQoSList.add(mqttQoS.value());
});

// 订阅成功,返回报文
MqttSubAckMessage subAckMessage = (MqttSubAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()),
new MqttSubAckPayload(mqttQoSList));
channel.writeAndFlush(subAckMessage);

// 接收订阅主题上的保留消息
topicSubscriptions.forEach(topicSubscription -> {
String topicFilter = topicSubscription.topicName();
MqttQoS mqttQoS = topicSubscription.qualityOfService();
this.sendRetainMessage(channel, topicFilter, mqttQoS);
});
} else {
channel.close();
}
}

private boolean validTopicFilter(List<MqttTopicSubscription> topicSubscriptions) {
for (MqttTopicSubscription topicSubscription : topicSubscriptions) {
String topicFilter = topicSubscription.topicName();
// 以#或+符号开头的、以/符号结尾的订阅按非法订阅处理, 这里没有参考标准协议
if (StrUtil.startWith(topicFilter, '+') || StrUtil.endWith(topicFilter, '/'))
return false;
if (StrUtil.contains(topicFilter, '#')) {
// 如果出现多个#符号的订阅按非法订阅处理
if (StrUtil.count(topicFilter, '#') > 1) return false;
}
if (StrUtil.contains(topicFilter, '+')) {
// 如果+符号和/+字符串出现的次数不等的情况按非法订阅处理
if (StrUtil.count(topicFilter, '+') != StrUtil.count(topicFilter, "/+")) return false;
}
}
return true;
}

private void sendRetainMessage(Channel channel, String topicFilter, MqttQoS mqttQoS) {
// 搜索主题过滤器的全部保留消息
List<RetainMessageStore> retainMessageStores = retainMessageStoreService.search(topicFilter);

retainMessageStores.forEach(retainMessageStore -> {
// 新的保留消息QoS取最小值
MqttQoS respQoS = retainMessageStore.getMqttQoS() > mqttQoS.value() ? mqttQoS : MqttQoS.valueOf(retainMessageStore.getMqttQoS());

// Qos0
if (respQoS == MqttQoS.AT_MOST_ONCE) {
MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(……);
channel.writeAndFlush(publishMessage);
}

// Qos1
if (respQoS == MqttQoS.AT_LEAST_ONCE) {
int messageId = messageIdService.getNextMessageId();
MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(……);
channel.writeAndFlush(publishMessage);
}

// Qos2
if (respQoS == MqttQoS.EXACTLY_ONCE) {
int messageId = messageIdService.getNextMessageId();
MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(……);
channel.writeAndFlush(publishMessage);
}
});
}

}

回顾一下保留消息:发布者发布消息时,如果 Retained 标记被设置为 true,则该消息即是 MQTT 中的保留消息(Retained Message)。MQTT 服务器会为每个主题存储最新一条保留消息,以方便消息发布后才上线的客户端在订阅主题时仍可以接收到该消息。

订阅做了2件事:

  • 保存订阅
  • 接收订阅主题上的保留消息

这里可能会有个疑问:QoS1、Qos2不是要做重发什么的操作吗?实际上,QoS1、QoS2的回包,比如PUBACK,会在其他的processor处理,而不是在订阅的processor这里处理,所以只需要下发就行了。

这里重点关注一下订阅树是如何存储的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class SubscribeStoreService implements ISubscribeStoreService {
// 普通订阅
private SubscribeNotWildcardCache subscribeNotWildcardCache;
// 通配符订阅
private SubscribeWildcardCache subscribeWildcardCache;

@Override
public void put(String topicFilter, SubscribeStore subscribeStore) {
if (StrUtil.contains(topicFilter, '#') || StrUtil.contains(topicFilter, '+')) {
subscribeWildcardCache.put(topicFilter, subscribeStore.getClientId(), subscribeStore);
} else {
subscribeNotWildcardCache.put(topicFilter, subscribeStore.getClientId(), subscribeStore);
}
}

@Override
public List<SubscribeStore> search(String topic) {
// 订阅结果
List<SubscribeStore> subscribeStores = new ArrayList<SubscribeStore>();
// 获取所有的普通主题,直接加入订阅结果
List<SubscribeStore> list = subscribeNotWildcardCache.all(topic);
if (list.size() > 0) {
subscribeStores.addAll(list);
}
// 获取所有的通配符主题,对每个主题进行匹配
subscribeWildcardCache.all().forEach((topicFilter, map) -> {
……
});
return subscribeStores;
}

}

订阅树竟然不是棵树,而是Redis存储的扁平化的k-v条目,key是topic,value是SubscribeStore。对于通配符主题,直接获取所有的通配符主题订阅,然后逐个匹配,这个效率极其低下,这也是mqttwk最大的问题。甚至我刚看的版本直接是*匹配,这个1.3.0版本虽然改成了scan,效率也是不行的。因为生产的通配符主题,通常也是会和设备相关的,也就是说通配符主题是设备量级。

当然也有好处,就是结构及其简单,很容易理解,作为教科书版本是没问题的。

MQTT Publish

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
public class Publish {
// session服务
private ISessionStoreService sessionStoreService;
// 订阅服务
private ISubscribeStoreService subscribeStoreService;
// 报文ID生成服务
private IMessageIdService messageIdService;
// 保留消息服务
private IRetainMessageStoreService retainMessageStoreService;
// QoS1/2 重发消息暂存服务
private IDupPublishMessageStoreService dupPublishMessageStoreService;
// 内部消息转发(kafka)
private InternalCommunication internalCommunication;

private ChannelGroup channelGroup;
private Map<String, ChannelId> channelIdMap;
private BrokerProperties brokerProperties;

public void processPublish(Channel channel, MqttPublishMessage msg) {
String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();

// publish 延长session失效时间
if (sessionStoreService.containsKey(clientId)) {
SessionStore sessionStore = sessionStoreService.get(clientId);
ChannelId channelId = channelIdMap.get(sessionStore.getBrokerId() + "_" + sessionStore.getChannelId());
if (brokerProperties.getId().equals(sessionStore.getBrokerId()) && channelId != null) {
sessionStoreService.expire(clientId, sessionStore.getExpire());
}
}

// QoS=0
if (msg.fixedHeader().qosLevel() == MqttQoS.AT_MOST_ONCE) {
byte[] messageBytes = new byte[msg.payload().readableBytes()];
msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
InternalMessage internalMessage = new InternalMessage().setTopic(msg.variableHeader().topicName())
.setMqttQoS(msg.fixedHeader().qosLevel().value()).setMessageBytes(messageBytes)
.setDup(false).setRetain(false).setClientId(clientId);
internalCommunication.internalSend(internalMessage);
this.sendPublishMessage(msg.variableHeader().topicName(), msg.fixedHeader().qosLevel(), messageBytes, false, false);
}

// QoS=1
if (msg.fixedHeader().qosLevel() == MqttQoS.AT_LEAST_ONCE) {
byte[] messageBytes = new byte[msg.payload().readableBytes()];
msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
InternalMessage internalMessage = new InternalMessage().setTopic(msg.variableHeader().topicName())
.setMqttQoS(msg.fixedHeader().qosLevel().value()).setMessageBytes(messageBytes)
.setDup(false).setRetain(false).setClientId(clientId);
internalCommunication.internalSend(internalMessage);
this.sendPublishMessage(msg.variableHeader().topicName(), msg.fixedHeader().qosLevel(), messageBytes, false, false);
this.sendPubAckMessage(channel, msg.variableHeader().packetId());
}

// QoS=2
if (msg.fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) {
byte[] messageBytes = new byte[msg.payload().readableBytes()];
msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
InternalMessage internalMessage = new InternalMessage().setTopic(msg.variableHeader().topicName())
.setMqttQoS(msg.fixedHeader().qosLevel().value()).setMessageBytes(messageBytes)
.setDup(false).setRetain(false).setClientId(clientId);
internalCommunication.internalSend(internalMessage);
this.sendPublishMessage(msg.variableHeader().topicName(), msg.fixedHeader().qosLevel(), messageBytes, false, false);
this.sendPubRecMessage(channel, msg.variableHeader().packetId());
}

// retain=1, 保留消息
if (msg.fixedHeader().isRetain()) {
byte[] messageBytes = new byte[msg.payload().readableBytes()];
msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
if (messageBytes.length == 0) {
retainMessageStoreService.remove(msg.variableHeader().topicName());
} else {
RetainMessageStore retainMessageStore = new RetainMessageStore().setTopic(msg.variableHeader().topicName()).setMqttQoS(msg.fixedHeader().qosLevel().value())
.setMessageBytes(messageBytes);
retainMessageStoreService.put(msg.variableHeader().topicName(), retainMessageStore);
}
}
}

private void sendPublishMessage(String topic, MqttQoS mqttQoS, byte[] messageBytes, boolean retain, boolean dup) {
List<SubscribeStore> subscribeStores = subscribeStoreService.search(topic);
subscribeStores.forEach(subscribeStore -> {
if (sessionStoreService.containsKey(subscribeStore.getClientId())) {
// 订阅者收到MQTT消息的QoS级别, 最终取决于发布消息的QoS和主题订阅的QoS
MqttQoS respQoS = mqttQoS.value() > subscribeStore.getMqttQoS() ? MqttQoS.valueOf(subscribeStore.getMqttQoS()) : mqttQoS;

if (respQoS == MqttQoS.AT_MOST_ONCE) {
MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0),
new MqttPublishVariableHeader(topic, 0), Unpooled.buffer().writeBytes(messageBytes));
LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}", subscribeStore.getClientId(), topic, respQoS.value());
SessionStore sessionStore = sessionStoreService.get(subscribeStore.getClientId());
ChannelId channelId = channelIdMap.get(sessionStore.getBrokerId() + "_" + sessionStore.getChannelId());
if (channelId != null) {
Channel channel = channelGroup.find(channelId);
if (channel != null) channel.writeAndFlush(publishMessage);
}
}

if (respQoS == MqttQoS.AT_LEAST_ONCE) {
int messageId = messageIdService.getNextMessageId();
MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0),
new MqttPublishVariableHeader(topic, messageId), Unpooled.buffer().writeBytes(messageBytes));
LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", subscribeStore.getClientId(), topic, respQoS.value(), messageId);
DupPublishMessageStore dupPublishMessageStore = new DupPublishMessageStore().setClientId(subscribeStore.getClientId())
.setTopic(topic).setMqttQoS(respQoS.value()).setMessageBytes(messageBytes).setMessageId(messageId);
// 重发消息的存储
dupPublishMessageStoreService.put(subscribeStore.getClientId(), dupPublishMessageStore);
SessionStore sessionStore = sessionStoreService.get(subscribeStore.getClientId());
ChannelId channelId = channelIdMap.get(sessionStore.getBrokerId() + "_" + sessionStore.getChannelId());
if (channelId != null) {
Channel channel = channelGroup.find(channelId);
if (channel != null) channel.writeAndFlush(publishMessage);
}
}

if (respQoS == MqttQoS.EXACTLY_ONCE) {
int messageId = messageIdService.getNextMessageId();
MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0),
new MqttPublishVariableHeader(topic, messageId), Unpooled.buffer().writeBytes(messageBytes));
LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", subscribeStore.getClientId(), topic, respQoS.value(), messageId);
DupPublishMessageStore dupPublishMessageStore = new DupPublishMessageStore().setClientId(subscribeStore.getClientId())
.setTopic(topic).setMqttQoS(respQoS.value()).setMessageBytes(messageBytes).setMessageId(messageId);
// 重发消息的存储
dupPublishMessageStoreService.put(subscribeStore.getClientId(), dupPublishMessageStore);
SessionStore sessionStore = sessionStoreService.get(subscribeStore.getClientId());
ChannelId channelId = channelIdMap.get(sessionStore.getBrokerId() + "_" + sessionStore.getChannelId());
if (channelId != null) {
Channel channel = channelGroup.find(channelId);
if (channel != null) channel.writeAndFlush(publishMessage);
}
}
}
});
}

private void sendPubAckMessage(Channel channel, int messageId) {
MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(messageId), null);
channel.writeAndFlush(pubAckMessage);
}

private void sendPubRecMessage(Channel channel, int messageId) {
MqttMessage pubRecMessage = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(messageId), null);
channel.writeAndFlush(pubRecMessage);
}

}

结构很清晰:

  • Qos0、1、2的处理
  • 保留消息的处理
  • 消息转发给第三方中间件

也有很多问题:
1、每发一次消息,就给session续一次命,session的过期时间就是心跳时间。这是不可靠的,一旦session过期,相关的重试的消息都会在下次connect的时候因没有会话而丢弃,导致QoS1、2失效。
2、QoS1、2的消息,也是存在Redis的,简单但不可靠。

值得学习的是,很容易纠结publish的QoS和subscribe的QoS如果不同,该以谁为准。这里就给出了结论:以最小的QoS为准。

集群功能

直接用redis的pub/sub广播消息,收到消息的broker,查找自己身上是否有连接对应的client,如果有且有订阅,则下发消息。这个消息量在生产中其实很大,不应该使用redis来做集群广播。

MessageIdService

这里值得参考的是报文Id的生成,报文Id是从0~65535循环的,所以作者用redis存了一个全局变量,然后取余:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MessageIdService implements IMessageIdService {

@Override
public int getNextMessageId() {
try {
while (true) {
int nextMsgId = (int) (redisService.incr("mqttwk:messageid:num") % 65536);
if (nextMsgId > 0) {
return nextMsgId;
}
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
return 0;
}

/**
* 每次重启的时候初始化
*/
public void init() {
redisService.del("mqttwk:messageid:num");
}
}

总结

MqttWk是一款教学入门级的Broker,作者用最简短的代码实现了MQTT协议规定的完整的功能,功能点清晰,用Redis搞定一切存储,很容易上手。但一些设计上过于简单,并不适应复杂的生产环境,尤其是当物联网设备达到十万、百万量级的时候,或者有一些业务需要用到通配符的时候。

不过真的非常感谢MqttWk,我刚做Broker的时候,看了很多开源的Broker,直到找到这个Broker,才入了门,比如一些QoS1、2的报文到底如何交互等,订阅到底如何处理等,其他Broker为了性能考虑数据结构、设计模式都写得十分复杂,加上很多都没用IoC框架而是原生代码,看个几天都理不清楚;MqttWk的精简反而十分易读,注释详尽、设计简单,甚至可以几分钟就理解完主要逻辑,非常推荐。

入门级MqttBroker:MqttWk1.3.0源码分析

https://www.bananaoven.com/posts/12000/

作者

香蕉微波炉

发布于

2023-01-14

更新于

2023-01-14

许可协议