前言
基于MqttWk v1.0.7。
一、BrokerHandler
channelActive()
java
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
this.channelGroup.add(ctx.channel());
this.channelIdMap.put(ctx.channel().id().asShortText(), ctx.channel().id());
}当channel刚生效(客户端刚连接),将其channel信息存起来。
Netty默认提供了channel管理,这里自己做channel管理是为了方便根据channelId快速找到对应channel好下发消息。
channelInactive()
java
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
this.channelGroup.remove(ctx.channel());
this.channelIdMap.remove(ctx.channel().id().asShortText());
}当channel失效时,将channel信息移除。
exceptionCaught()
java
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof IOException) {
// 远程主机强迫关闭了一个现有的连接的异常
ctx.close();
} else {
super.exceptionCaught(ctx, cause);
}
}连接发生IO异常时,关闭channel。
如果你想要做更多处理,比如发生自定义的异常处理逻辑,也可以在这里做。
在这个方法运行完后,会去因为关闭连接触发channelInactive,所以如果在channelInactive那里做了清理工作,这里就不要重复做了。
userEventTriggered()
java
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.ALL_IDLE) {
Channel channel = ctx.channel();
String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
// 发送遗嘱消息
if (this.protocolProcess.getSessionStoreService().containsKey(clientId)) {
SessionStore sessionStore = this.protocolProcess.getSessionStoreService().get(clientId);
if (sessionStore.getWillMessage() != null) {
this.protocolProcess.publish().processPublish(ctx.channel(), sessionStore.getWillMessage());
}
}
ctx.close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}userEventTrigger可以编写自定义的事件发生后处理逻辑。这里是判断当客户端一直没有发送消息也没有接收的时候,就触发IdleState,然后发送遗嘱消息并关闭连接。我觉得这里的逻辑有问题,因为遗嘱消息的标准分发条件是:
- 服务端检测到了I/O错误或网络故障
- 客户端在KeepAlive的时间内未能通讯
- 客户端没有发送DISCONNECT报文直接关闭连接
- 由于协议错误关闭了网络连接
所以不只是“未能通讯”才分发,抛出异常也会分发的。
channelRead0()
java
@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
if (msg.decoderResult().isFailure()) {
Throwable cause = msg.decoderResult().cause();
if (cause instanceof MqttUnacceptableProtocolVersionException) {
ctx.writeAndFlush(MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false),
null));
} else if (cause instanceof MqttIdentifierRejectedException) {
ctx.writeAndFlush(MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false),
null));
}
ctx.close();
return;
}
switch (msg.fixedHeader().messageType()) {
case CONNECT:
protocolProcess.connect().processConnect(ctx.channel(), (MqttConnectMessage) msg);
break;
case CONNACK:
break;
case PUBLISH:
protocolProcess.publish().processPublish(ctx.channel(), (MqttPublishMessage) msg);
break;
case PUBACK:
protocolProcess.pubAck().processPubAck(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case PUBREC:
protocolProcess.pubRec().processPubRec(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case PUBREL:
protocolProcess.pubRel().processPubRel(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case PUBCOMP:
protocolProcess.pubComp().processPubComp(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case SUBSCRIBE:
protocolProcess.subscribe().processSubscribe(ctx.channel(), (MqttSubscribeMessage) msg);
break;
case SUBACK:
break;
case UNSUBSCRIBE:
protocolProcess.unSubscribe().processUnSubscribe(ctx.channel(), (MqttUnsubscribeMessage) msg);
break;
case UNSUBACK:
break;
case PINGREQ:
protocolProcess.pingReq().processPingReq(ctx.channel(), msg);
break;
case PINGRESP:
break;
case DISCONNECT:
protocolProcess.disConnect().processDisConnect(ctx.channel(), msg);
break;
default:
break;
}
}这里用channelRead0而不是channelRead是因为作者继承的SimpleChannelInboundHandler<MqttMessage>而不是ChannelInboundHandlerAdapter。
ChannelInboundHandlerAdapter是Netty默认的入站消息处理类;SimpleChannelInboundHandler继承了它并实现了一些基础的方法,更方便一些。
在里面,首先判断是否是合法的包、合法的协议、合法的ClientID,然后根据不同的报文类型去分发到protocol里不同的处理类处理。
二、protocol包
1、PingReq
java
public void processPingReq(Channel channel, MqttMessage msg) {
MqttMessage pingRespMessage = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0), null, null);
LOGGER.debug("PINGREQ - clientId: {}", (String) channel.attr(AttributeKey.valueOf("clientId")).get());
channel.writeAndFlush(pingRespMessage);
}简单的心跳回包,心跳包是单向的,总是由客户端发给服务端,服务端回包即可。
2、Connect
java
public void processConnect(Channel channel, MqttConnectMessage msg) {
// 消息解码器出现异常
if (msg.decoderResult().isFailure()) {
Throwable cause = msg.decoderResult().cause();
if (cause instanceof MqttUnacceptableProtocolVersionException) {
// 不支持的协议版本
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false), null);
channel.writeAndFlush(connAckMessage);
channel.close();
return;
} else if (cause instanceof MqttIdentifierRejectedException) {
// 不合格的clientId
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
channel.writeAndFlush(connAckMessage);
channel.close();
return;
}
channel.close();
return;
}
// clientId为空或null的情况, 这里要求客户端必须提供clientId, 不管cleanSession是否为1, 此处没有参考标准协议实现
if (StrUtil.isBlank(msg.payload().clientIdentifier())) {
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
channel.writeAndFlush(connAckMessage);
channel.close();
return;
}
if (brokerProperties.getMqttPasswordMust()) {
// 用户名和密码验证, 这里要求客户端连接时必须提供用户名和密码, 不管是否设置用户名标志和密码标志为1, 此处没有参考标准协议实现
String username = msg.payload().userName();
String password = msg.payload().passwordInBytes() == null ? null : new String(msg.payload().passwordInBytes(), CharsetUtil.UTF_8);
if (!authService.checkValid(username, password)) {
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null);
channel.writeAndFlush(connAckMessage);
channel.close();
return;
}
}
// 如果会话中已存储这个新连接的clientId, 就关闭之前该clientId的连接
if (sessionStoreService.containsKey(msg.payload().clientIdentifier())) {
SessionStore sessionStore = sessionStoreService.get(msg.payload().clientIdentifier());
Boolean cleanSession = sessionStore.isCleanSession();
if (cleanSession) {
sessionStoreService.remove(msg.payload().clientIdentifier());
subscribeStoreService.removeForClient(msg.payload().clientIdentifier());
dupPublishMessageStoreService.removeByClient(msg.payload().clientIdentifier());
dupPubRelMessageStoreService.removeByClient(msg.payload().clientIdentifier());
}
try {
ChannelId channelId = channelIdMap.get(sessionStore.getChannelId());
if(channelId!=null) {
Channel previous = channelGroup.find(channelId);
if (previous != null) previous.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
// 处理遗嘱信息
SessionStore sessionStore = new SessionStore(msg.payload().clientIdentifier(), channel.id().asShortText(), msg.variableHeader().isCleanSession(), null);
if (msg.variableHeader().isWillFlag()) {
MqttPublishMessage willMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(msg.variableHeader().willQos()), msg.variableHeader().isWillRetain(), 0),
new MqttPublishVariableHeader(msg.payload().willTopic(), 0), Unpooled.buffer().writeBytes(msg.payload().willMessageInBytes()));
sessionStore.setWillMessage(willMessage);
}
// 处理连接心跳包
if (msg.variableHeader().keepAliveTimeSeconds() > 0) {
if (channel.pipeline().names().contains("idle")) {
channel.pipeline().remove("idle");
}
channel.pipeline().addFirst("idle", new IdleStateHandler(0, 0, Math.round(msg.variableHeader().keepAliveTimeSeconds() * 1.5f)));
}
// 至此存储会话信息及返回接受客户端连接
sessionStoreService.put(msg.payload().clientIdentifier(), sessionStore);
// 将clientId存储到channel的map中
channel.attr(AttributeKey.valueOf("clientId")).set(msg.payload().clientIdentifier());
Boolean sessionPresent = sessionStoreService.containsKey(msg.payload().clientIdentifier()) && !msg.variableHeader().isCleanSession();
MqttConnAckMessage okResp = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, sessionPresent), null);
channel.writeAndFlush(okResp);
LOGGER.debug("CONNECT - clientId: {}, cleanSession: {}", msg.payload().clientIdentifier(), msg.variableHeader().isCleanSession());
// 如果cleanSession为0, 需要重发同一clientId存储的未完成的QoS1和QoS2的DUP消息
if (!msg.variableHeader().isCleanSession()) {
List<DupPublishMessageStore> dupPublishMessageStoreList = dupPublishMessageStoreService.get(msg.payload().clientIdentifier());
List<DupPubRelMessageStore> dupPubRelMessageStoreList = dupPubRelMessageStoreService.get(msg.payload().clientIdentifier());
dupPublishMessageStoreList.forEach(dupPublishMessageStore -> {
MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, true, MqttQoS.valueOf(dupPublishMessageStore.getMqttQoS()), false, 0),
new MqttPublishVariableHeader(dupPublishMessageStore.getTopic(), dupPublishMessageStore.getMessageId()), Unpooled.buffer().writeBytes(dupPublishMessageStore.getMessageBytes()));
channel.writeAndFlush(publishMessage);
});
dupPubRelMessageStoreList.forEach(dupPubRelMessageStore -> {
MqttMessage pubRelMessage = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBREL, true, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(dupPubRelMessageStore.getMessageId()), null);
channel.writeAndFlush(pubRelMessage);
});
}
}1)这里又处理了一次非法值返回码,我想作者可能是忘记了前面处理过。
2)clientID要求必须不为空,协议规定为空是可以的,但只能出现在cleansession=1(非持久化)会话上,服务器帮助客户端生成一个随机clientID,一旦断开连接,客户端的ClientID就会发生变化,所以只能非持久化会话。这里没有实现这个逻辑,要求必须不为空。
3)用户名和密码认证,协议规定为空是可以的,这里要求必须提供。实际上真实的服务器可能不需要username,因为password很可能是JWT之类的数据,已经足够验证客户端身份了,不需要username。
4)获取旧会话,关闭相同clientID的连接。只能同时有一个clientID在线,如果有重复clientID则需要踢掉旧会话。这里的踢操作依赖于本地的channelgroup,所以只会踢掉本地的client,不会踢掉其他Broker上的client,可以说是一个Bug……作者集群做得不好。
5)处理遗嘱信息,如果有存起来就好。
6)连接心跳包,如果设置了keepAlive的值,则设置Netty的IdleHandler为那个值,如果为0,则保持默认的前面在BrokerServer设置的值。这里其实还应该判断keepAlive是否设置得合法,因为过短的心跳间隔会造成服务器压力很大,应该检查客户端要求的心跳间隔是否服务器能承受,否则就断开连接。作者没有检查。
7)如果cleansession为0(持久会话),则重新下发未ACK的QoS1、QoS2消息,第二阶段未完成的QoS2消息。作者保存这些消息都是没有顺序的,是直接Hash到Redis,但理论上应该保证重发顺序。
3、Disconnect
java
public void processDisConnect(Channel channel, MqttMessage msg) {
String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
SessionStore sessionStore = sessionStoreService.get(clientId);
if (sessionStore != null && sessionStore.isCleanSession()) {
subscribeStoreService.removeForClient(clientId);
dupPublishMessageStoreService.removeByClient(clientId);
dupPubRelMessageStoreService.removeByClient(clientId);
}
LOGGER.debug("DISCONNECT - clientId: {}, cleanSession: {}", clientId, sessionStore.isCleanSession());
sessionStoreService.remove(clientId);
channel.close();
}根据是否是持久会话清理掉session即可。
4、Publish
java
public void processPublish(Channel channel, MqttPublishMessage msg) {
String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
// QoS=0
if (msg.fixedHeader().qosLevel() == MqttQoS.AT_MOST_ONCE) {
byte[] messageBytes = new byte[msg.payload().readableBytes()];
msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
InternalMessage internalMessage = new InternalMessage().setTopic(msg.variableHeader().topicName())
.setMqttQoS(msg.fixedHeader().qosLevel().value()).setMessageBytes(messageBytes)
.setDup(false).setRetain(false).setClientId(clientId);
internalCommunication.internalSend(internalMessage);
this.sendPublishMessage(msg.variableHeader().topicName(), msg.fixedHeader().qosLevel(), messageBytes, false, false);
}
// QoS=1
if (msg.fixedHeader().qosLevel() == MqttQoS.AT_LEAST_ONCE) {
byte[] messageBytes = new byte[msg.payload().readableBytes()];
msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
InternalMessage internalMessage = new InternalMessage().setTopic(msg.variableHeader().topicName())
.setMqttQoS(msg.fixedHeader().qosLevel().value()).setMessageBytes(messageBytes)
.setDup(false).setRetain(false).setClientId(clientId);
internalCommunication.internalSend(internalMessage);
this.sendPublishMessage(msg.variableHeader().topicName(), msg.fixedHeader().qosLevel(), messageBytes, false, false);
this.sendPubAckMessage(channel, msg.variableHeader().packetId());
}
// QoS=2
if (msg.fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) {
byte[] messageBytes = new byte[msg.payload().readableBytes()];
msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
InternalMessage internalMessage = new InternalMessage().setTopic(msg.variableHeader().topicName())
.setMqttQoS(msg.fixedHeader().qosLevel().value()).setMessageBytes(messageBytes)
.setDup(false).setRetain(false).setClientId(clientId);
internalCommunication.internalSend(internalMessage);
this.sendPublishMessage(msg.variableHeader().topicName(), msg.fixedHeader().qosLevel(), messageBytes, false, false);
this.sendPubRecMessage(channel, msg.variableHeader().packetId());
}
// retain=1, 保留消息
if (msg.fixedHeader().isRetain()) {
byte[] messageBytes = new byte[msg.payload().readableBytes()];
msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
if (messageBytes.length == 0) {
retainMessageStoreService.remove(msg.variableHeader().topicName());
} else {
RetainMessageStore retainMessageStore = new RetainMessageStore().setTopic(msg.variableHeader().topicName()).setMqttQoS(msg.fixedHeader().qosLevel().value())
.setMessageBytes(messageBytes);
retainMessageStoreService.put(msg.variableHeader().topicName(), retainMessageStore);
}
}
}回顾一下Publish的流程:


如果是QoS0,直接发送;
如果是QoS1,除了发送还需要回复PubAck给发送者;
如果是QoS2,除了发送还需要回复PubRec给发送者;
如果是保留消息,长度为0则清除该主题的保留消息,否则保存该主题的保留消息。
如果开启了集群功能,会通过Redis广播;如果开启了Kafka转发,会进行转发。
java
private void sendPublishMessage(String topic, MqttQoS mqttQoS, byte[] messageBytes, boolean retain, boolean dup) {
List<SubscribeStore> subscribeStores = subscribeStoreService.search(topic);
subscribeStores.forEach(subscribeStore -> {
if (sessionStoreService.containsKey(subscribeStore.getClientId())) {
// 订阅者收到MQTT消息的QoS级别, 最终取决于发布消息的QoS和主题订阅的QoS
MqttQoS respQoS = mqttQoS.value() > subscribeStore.getMqttQoS() ? MqttQoS.valueOf(subscribeStore.getMqttQoS()) : mqttQoS;
if (respQoS == MqttQoS.AT_MOST_ONCE) {
MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0),
new MqttPublishVariableHeader(topic, 0), Unpooled.buffer().writeBytes(messageBytes));
LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}", subscribeStore.getClientId(), topic, respQoS.value());
ChannelId channelId = channelIdMap.get(sessionStoreService.get(subscribeStore.getClientId()).getChannelId());
if(channelId!=null) {
Channel channel = channelGroup.find(channelId);
if (channel != null) channel.writeAndFlush(publishMessage);
}
}
if (respQoS == MqttQoS.AT_LEAST_ONCE) {
int messageId = messageIdService.getNextMessageId();
MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0),
new MqttPublishVariableHeader(topic, messageId), Unpooled.buffer().writeBytes(messageBytes));
LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", subscribeStore.getClientId(), topic, respQoS.value(), messageId);
DupPublishMessageStore dupPublishMessageStore = new DupPublishMessageStore().setClientId(subscribeStore.getClientId())
.setTopic(topic).setMqttQoS(respQoS.value()).setMessageBytes(messageBytes).setMessageId(messageId);
dupPublishMessageStoreService.put(subscribeStore.getClientId(), dupPublishMessageStore);
ChannelId channelId = channelIdMap.get(sessionStoreService.get(subscribeStore.getClientId()).getChannelId());
if(channelId!=null) {
Channel channel = channelGroup.find(channelId);
if (channel != null) channel.writeAndFlush(publishMessage);
}
}
if (respQoS == MqttQoS.EXACTLY_ONCE) {
int messageId = messageIdService.getNextMessageId();
MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0),
new MqttPublishVariableHeader(topic, messageId), Unpooled.buffer().writeBytes(messageBytes));
LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", subscribeStore.getClientId(), topic, respQoS.value(), messageId);
DupPublishMessageStore dupPublishMessageStore = new DupPublishMessageStore().setClientId(subscribeStore.getClientId())
.setTopic(topic).setMqttQoS(respQoS.value()).setMessageBytes(messageBytes).setMessageId(messageId);
dupPublishMessageStoreService.put(subscribeStore.getClientId(), dupPublishMessageStore);
ChannelId channelId = channelIdMap.get(sessionStoreService.get(subscribeStore.getClientId()).getChannelId());
if(channelId!=null) {
Channel channel = channelGroup.find(channelId);
if (channel != null) channel.writeAndFlush(publishMessage);
}
}
}
});
}和发送者通信的时候才会收到Publish。
发送的具体代码流程是:
搜寻订阅这个主题的订阅者;
如果是QoS0,直接下发(QoS0的messageId为0);
如果是QoS1,先获取一个未被占用的messageId,缓存一份DupPublish消息,再下发;
如果是QoS2,先获取一个未被占用的messageId,缓存一份DupPublish消息,再下发。
5、PubRel
java
public void processPubRel(Channel channel, MqttMessageIdVariableHeader variableHeader) {
MqttMessage pubCompMessage = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(variableHeader.messageId()), null);
LOGGER.debug("PUBREL - clientId: {}, messageId: {}", (String) channel.attr(AttributeKey.valueOf("clientId")).get(), variableHeader.messageId());
channel.writeAndFlush(pubCompMessage);
}和发送者通信的时候才会收到PubRel,处于QoS2第二阶段,服务器需要会送一个PubComp让发送者知道QoS2已经完成了。
6、PubAck
java
public void processPubAck(Channel channel, MqttMessageIdVariableHeader variableHeader) {
int messageId = variableHeader.messageId();
LOGGER.debug("PUBACK - clientId: {}, messageId: {}", (String) channel.attr(AttributeKey.valueOf("clientId")).get(), messageId);
dupPublishMessageStoreService.remove((String) channel.attr(AttributeKey.valueOf("clientId")).get(), messageId);
}和接收者通信的时候才会收到PubAck,表明QoS1已经完成,移除缓存的DupPublish消息。
7、PubRec
java
public void processPubRec(Channel channel, MqttMessageIdVariableHeader variableHeader) {
MqttMessage pubRelMessage = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(variableHeader.messageId()), null);
LOGGER.debug("PUBREC - clientId: {}, messageId: {}", (String) channel.attr(AttributeKey.valueOf("clientId")).get(), variableHeader.messageId());
dupPublishMessageStoreService.remove((String) channel.attr(AttributeKey.valueOf("clientId")).get(), variableHeader.messageId());
DupPubRelMessageStore dupPubRelMessageStore = new DupPubRelMessageStore().setClientId((String) channel.attr(AttributeKey.valueOf("clientId")).get())
.setMessageId(variableHeader.messageId());
dupPubRelMessageStoreService.put((String) channel.attr(AttributeKey.valueOf("clientId")).get(), dupPubRelMessageStore);
channel.writeAndFlush(pubRelMessage);
}和接收者通信的时候才会收到PubRec,表明QoS2的第一阶段完成,移除缓存的Publish消息,重新缓存一份PubRel消息,并下发PubRel消息,开启第二阶段。
8、PubComp
java
public void processPubComp(Channel channel, MqttMessageIdVariableHeader variableHeader) {
int messageId = variableHeader.messageId();
LOGGER.debug("PUBCOMP - clientId: {}, messageId: {}", (String) channel.attr(AttributeKey.valueOf("clientId")).get(), messageId);
dupPubRelMessageStoreService.remove((String) channel.attr(AttributeKey.valueOf("clientId")).get(), variableHeader.messageId());
}和接收者通信的时候才会收到PubComp,表明QoS2已经完成,移除缓存的PubRel消息。
9、Subscribe
java
public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
List<MqttTopicSubscription> topicSubscriptions = msg.payload().topicSubscriptions();
if (this.validTopicFilter(topicSubscriptions)) {
String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
List<Integer> mqttQoSList = new ArrayList<Integer>();
topicSubscriptions.forEach(topicSubscription -> {
String topicFilter = topicSubscription.topicName();
MqttQoS mqttQoS = topicSubscription.qualityOfService();
SubscribeStore subscribeStore = new SubscribeStore(clientId, topicFilter, mqttQoS.value());
subscribeStoreService.put(topicFilter, subscribeStore);
mqttQoSList.add(mqttQoS.value());
LOGGER.debug("SUBSCRIBE - clientId: {}, topFilter: {}, QoS: {}", clientId, topicFilter, mqttQoS.value());
});
MqttSubAckMessage subAckMessage = (MqttSubAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()),
new MqttSubAckPayload(mqttQoSList));
channel.writeAndFlush(subAckMessage);
// 发布保留消息
topicSubscriptions.forEach(topicSubscription -> {
String topicFilter = topicSubscription.topicName();
MqttQoS mqttQoS = topicSubscription.qualityOfService();
this.sendRetainMessage(channel, topicFilter, mqttQoS);
});
} else {
channel.close();
}
}先校验所有的Topic是否正确,如果正确,则回复对应的SubAck报文,并把订阅信息存储起来。
对于有保留消息的主题,需要保存该主题的保留消息。
java
private boolean validTopicFilter(List<MqttTopicSubscription> topicSubscriptions) {
for (MqttTopicSubscription topicSubscription : topicSubscriptions) {
String topicFilter = topicSubscription.topicName();
// 以#或+符号开头的、以/符号结尾的订阅按非法订阅处理, 这里没有参考标准协议
if (StrUtil.startWith(topicFilter, '+') || StrUtil.endWith(topicFilter, '/'))
return false;
if (StrUtil.contains(topicFilter, '#')) {
// 如果出现多个#符号的订阅按非法订阅处理
if (StrUtil.count(topicFilter, '#') > 1) return false;
}
if (StrUtil.contains(topicFilter, '+')) {
//如果+符号和/+字符串出现的次数不等的情况按非法订阅处理
if (StrUtil.count(topicFilter, '+') != StrUtil.count(topicFilter, "/+")) return false;
}
}
return true;
}验证Topic的正确性只限制了一些格式,没有对“是否有权限订阅”进行权限验证。
10、Unsubscribe
java
public void processUnSubscribe(Channel channel, MqttUnsubscribeMessage msg) {
List<String> topicFilters = msg.payload().topics();
String clinetId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
topicFilters.forEach(topicFilter -> {
subscribeStoreService.remove(topicFilter, clinetId);
LOGGER.debug("UNSUBSCRIBE - clientId: {}, topicFilter: {}", clinetId, topicFilter);
});
MqttUnsubAckMessage unsubAckMessage = (MqttUnsubAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()), null);
channel.writeAndFlush(unsubAckMessage);
}移除订阅信息就好了。
总结
MqttWk的协议处理有一些地方不妥,但是整体结构非常清晰。



粤公网安备44030602007943号