Skip to content

MqttWk源码分析(三):BrokerHandler及协议处理分析

前言

基于MqttWk v1.0.7。

一、BrokerHandler

channelActive()

java
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    super.channelActive(ctx);
    this.channelGroup.add(ctx.channel());
    this.channelIdMap.put(ctx.channel().id().asShortText(), ctx.channel().id());
}

当channel刚生效(客户端刚连接),将其channel信息存起来。

Netty默认提供了channel管理,这里自己做channel管理是为了方便根据channelId快速找到对应channel好下发消息。

channelInactive()

java
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    super.channelInactive(ctx);
    this.channelGroup.remove(ctx.channel());
    this.channelIdMap.remove(ctx.channel().id().asShortText());
}

当channel失效时,将channel信息移除。

exceptionCaught()

java
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    if (cause instanceof IOException) {
        // 远程主机强迫关闭了一个现有的连接的异常
        ctx.close();
    } else {
        super.exceptionCaught(ctx, cause);
    }
}

连接发生IO异常时,关闭channel。

如果你想要做更多处理,比如发生自定义的异常处理逻辑,也可以在这里做。

在这个方法运行完后,会去因为关闭连接触发channelInactive,所以如果在channelInactive那里做了清理工作,这里就不要重复做了。

userEventTriggered()

java
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
        IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
        if (idleStateEvent.state() == IdleState.ALL_IDLE) {
            Channel channel = ctx.channel();
            String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
            // 发送遗嘱消息
            if (this.protocolProcess.getSessionStoreService().containsKey(clientId)) {
                SessionStore sessionStore = this.protocolProcess.getSessionStoreService().get(clientId);
                if (sessionStore.getWillMessage() != null) {
                    this.protocolProcess.publish().processPublish(ctx.channel(), sessionStore.getWillMessage());
                }
            }
            ctx.close();
        }
    } else {
        super.userEventTriggered(ctx, evt);
    }
}

userEventTrigger可以编写自定义的事件发生后处理逻辑。这里是判断当客户端一直没有发送消息也没有接收的时候,就触发IdleState,然后发送遗嘱消息并关闭连接。我觉得这里的逻辑有问题,因为遗嘱消息的标准分发条件是:

  • 服务端检测到了I/O错误或网络故障
  • 客户端在KeepAlive的时间内未能通讯
  • 客户端没有发送DISCONNECT报文直接关闭连接
  • 由于协议错误关闭了网络连接

所以不只是“未能通讯”才分发,抛出异常也会分发的。

channelRead0()

java
@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
    if (msg.decoderResult().isFailure()) {
        Throwable cause = msg.decoderResult().cause();
        if (cause instanceof MqttUnacceptableProtocolVersionException) {
            ctx.writeAndFlush(MqttMessageFactory.newMessage(
                    new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
                    new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false),
                    null));
        } else if (cause instanceof MqttIdentifierRejectedException) {
            ctx.writeAndFlush(MqttMessageFactory.newMessage(
                    new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
                    new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false),
                    null));
        }
        ctx.close();
        return;
    }

    switch (msg.fixedHeader().messageType()) {
        case CONNECT:
            protocolProcess.connect().processConnect(ctx.channel(), (MqttConnectMessage) msg);
            break;
        case CONNACK:
            break;
        case PUBLISH:
            protocolProcess.publish().processPublish(ctx.channel(), (MqttPublishMessage) msg);
            break;
        case PUBACK:
            protocolProcess.pubAck().processPubAck(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
            break;
        case PUBREC:
            protocolProcess.pubRec().processPubRec(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
            break;
        case PUBREL:
            protocolProcess.pubRel().processPubRel(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
            break;
        case PUBCOMP:
            protocolProcess.pubComp().processPubComp(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
            break;
        case SUBSCRIBE:
            protocolProcess.subscribe().processSubscribe(ctx.channel(), (MqttSubscribeMessage) msg);
            break;
        case SUBACK:
            break;
        case UNSUBSCRIBE:
            protocolProcess.unSubscribe().processUnSubscribe(ctx.channel(), (MqttUnsubscribeMessage) msg);
            break;
        case UNSUBACK:
            break;
        case PINGREQ:
            protocolProcess.pingReq().processPingReq(ctx.channel(), msg);
            break;
        case PINGRESP:
            break;
        case DISCONNECT:
            protocolProcess.disConnect().processDisConnect(ctx.channel(), msg);
            break;
        default:
            break;
    }
}

这里用channelRead0而不是channelRead是因为作者继承的SimpleChannelInboundHandler<MqttMessage>而不是ChannelInboundHandlerAdapter。

ChannelInboundHandlerAdapter是Netty默认的入站消息处理类;SimpleChannelInboundHandler继承了它并实现了一些基础的方法,更方便一些。

在里面,首先判断是否是合法的包、合法的协议、合法的ClientID,然后根据不同的报文类型去分发到protocol里不同的处理类处理。

二、protocol包

1、PingReq

java
public void processPingReq(Channel channel, MqttMessage msg) {
    MqttMessage pingRespMessage = MqttMessageFactory.newMessage(
        new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0), null, null);
    LOGGER.debug("PINGREQ - clientId: {}", (String) channel.attr(AttributeKey.valueOf("clientId")).get());
    channel.writeAndFlush(pingRespMessage);
}

简单的心跳回包,心跳包是单向的,总是由客户端发给服务端,服务端回包即可。

2、Connect

java
public void processConnect(Channel channel, MqttConnectMessage msg) {
    // 消息解码器出现异常
    if (msg.decoderResult().isFailure()) {
        Throwable cause = msg.decoderResult().cause();
        if (cause instanceof MqttUnacceptableProtocolVersionException) {
            // 不支持的协议版本
            MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
                    new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
                    new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false), null);
            channel.writeAndFlush(connAckMessage);
            channel.close();
            return;
        } else if (cause instanceof MqttIdentifierRejectedException) {
            // 不合格的clientId
            MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
                    new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
                    new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
            channel.writeAndFlush(connAckMessage);
            channel.close();
            return;
        }
        channel.close();
        return;
    }
    // clientId为空或null的情况, 这里要求客户端必须提供clientId, 不管cleanSession是否为1, 此处没有参考标准协议实现
    if (StrUtil.isBlank(msg.payload().clientIdentifier())) {
        MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
                new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
                new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
        channel.writeAndFlush(connAckMessage);
        channel.close();
        return;
    }
    if (brokerProperties.getMqttPasswordMust()) {
        // 用户名和密码验证, 这里要求客户端连接时必须提供用户名和密码, 不管是否设置用户名标志和密码标志为1, 此处没有参考标准协议实现
        String username = msg.payload().userName();
        String password = msg.payload().passwordInBytes() == null ? null : new String(msg.payload().passwordInBytes(), CharsetUtil.UTF_8);
        if (!authService.checkValid(username, password)) {
            MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
                    new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
                    new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null);
            channel.writeAndFlush(connAckMessage);
            channel.close();
            return;
        }
    }
    // 如果会话中已存储这个新连接的clientId, 就关闭之前该clientId的连接
    if (sessionStoreService.containsKey(msg.payload().clientIdentifier())) {
        SessionStore sessionStore = sessionStoreService.get(msg.payload().clientIdentifier());
        Boolean cleanSession = sessionStore.isCleanSession();
        if (cleanSession) {
            sessionStoreService.remove(msg.payload().clientIdentifier());
            subscribeStoreService.removeForClient(msg.payload().clientIdentifier());
            dupPublishMessageStoreService.removeByClient(msg.payload().clientIdentifier());
            dupPubRelMessageStoreService.removeByClient(msg.payload().clientIdentifier());
        }
        try {
            ChannelId channelId = channelIdMap.get(sessionStore.getChannelId());
            if(channelId!=null) {
                Channel previous = channelGroup.find(channelId);
                if (previous != null) previous.close();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    // 处理遗嘱信息
    SessionStore sessionStore = new SessionStore(msg.payload().clientIdentifier(), channel.id().asShortText(), msg.variableHeader().isCleanSession(), null);
    if (msg.variableHeader().isWillFlag()) {
        MqttPublishMessage willMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
                new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(msg.variableHeader().willQos()), msg.variableHeader().isWillRetain(), 0),
                new MqttPublishVariableHeader(msg.payload().willTopic(), 0), Unpooled.buffer().writeBytes(msg.payload().willMessageInBytes()));
        sessionStore.setWillMessage(willMessage);
    }
    // 处理连接心跳包
    if (msg.variableHeader().keepAliveTimeSeconds() > 0) {
        if (channel.pipeline().names().contains("idle")) {
            channel.pipeline().remove("idle");
        }
        channel.pipeline().addFirst("idle", new IdleStateHandler(0, 0, Math.round(msg.variableHeader().keepAliveTimeSeconds() * 1.5f)));
    }
    // 至此存储会话信息及返回接受客户端连接
    sessionStoreService.put(msg.payload().clientIdentifier(), sessionStore);
    // 将clientId存储到channel的map中
    channel.attr(AttributeKey.valueOf("clientId")).set(msg.payload().clientIdentifier());
    Boolean sessionPresent = sessionStoreService.containsKey(msg.payload().clientIdentifier()) && !msg.variableHeader().isCleanSession();
    MqttConnAckMessage okResp = (MqttConnAckMessage) MqttMessageFactory.newMessage(
            new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
            new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, sessionPresent), null);
    channel.writeAndFlush(okResp);
    LOGGER.debug("CONNECT - clientId: {}, cleanSession: {}", msg.payload().clientIdentifier(), msg.variableHeader().isCleanSession());
    // 如果cleanSession为0, 需要重发同一clientId存储的未完成的QoS1和QoS2的DUP消息
    if (!msg.variableHeader().isCleanSession()) {
        List<DupPublishMessageStore> dupPublishMessageStoreList = dupPublishMessageStoreService.get(msg.payload().clientIdentifier());
        List<DupPubRelMessageStore> dupPubRelMessageStoreList = dupPubRelMessageStoreService.get(msg.payload().clientIdentifier());
        dupPublishMessageStoreList.forEach(dupPublishMessageStore -> {
            MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
                    new MqttFixedHeader(MqttMessageType.PUBLISH, true, MqttQoS.valueOf(dupPublishMessageStore.getMqttQoS()), false, 0),
                    new MqttPublishVariableHeader(dupPublishMessageStore.getTopic(), dupPublishMessageStore.getMessageId()), Unpooled.buffer().writeBytes(dupPublishMessageStore.getMessageBytes()));
            channel.writeAndFlush(publishMessage);
        });
        dupPubRelMessageStoreList.forEach(dupPubRelMessageStore -> {
            MqttMessage pubRelMessage = MqttMessageFactory.newMessage(
                    new MqttFixedHeader(MqttMessageType.PUBREL, true, MqttQoS.AT_MOST_ONCE, false, 0),
                    MqttMessageIdVariableHeader.from(dupPubRelMessageStore.getMessageId()), null);
            channel.writeAndFlush(pubRelMessage);
        });
    }
}

1)这里又处理了一次非法值返回码,我想作者可能是忘记了前面处理过。

2)clientID要求必须不为空,协议规定为空是可以的,但只能出现在cleansession=1(非持久化)会话上,服务器帮助客户端生成一个随机clientID,一旦断开连接,客户端的ClientID就会发生变化,所以只能非持久化会话。这里没有实现这个逻辑,要求必须不为空。

3)用户名和密码认证,协议规定为空是可以的,这里要求必须提供。实际上真实的服务器可能不需要username,因为password很可能是JWT之类的数据,已经足够验证客户端身份了,不需要username。

4)获取旧会话,关闭相同clientID的连接。只能同时有一个clientID在线,如果有重复clientID则需要踢掉旧会话。这里的踢操作依赖于本地的channelgroup,所以只会踢掉本地的client,不会踢掉其他Broker上的client,可以说是一个Bug……作者集群做得不好。

5)处理遗嘱信息,如果有存起来就好。

6)连接心跳包,如果设置了keepAlive的值,则设置Netty的IdleHandler为那个值,如果为0,则保持默认的前面在BrokerServer设置的值。这里其实还应该判断keepAlive是否设置得合法,因为过短的心跳间隔会造成服务器压力很大,应该检查客户端要求的心跳间隔是否服务器能承受,否则就断开连接。作者没有检查。

7)如果cleansession为0(持久会话),则重新下发未ACK的QoS1、QoS2消息,第二阶段未完成的QoS2消息。作者保存这些消息都是没有顺序的,是直接Hash到Redis,但理论上应该保证重发顺序。

3、Disconnect

java
public void processDisConnect(Channel channel, MqttMessage msg) {
    String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
    SessionStore sessionStore = sessionStoreService.get(clientId);
    if (sessionStore != null && sessionStore.isCleanSession()) {
        subscribeStoreService.removeForClient(clientId);
        dupPublishMessageStoreService.removeByClient(clientId);
        dupPubRelMessageStoreService.removeByClient(clientId);
    }
    LOGGER.debug("DISCONNECT - clientId: {}, cleanSession: {}", clientId, sessionStore.isCleanSession());
    sessionStoreService.remove(clientId);
    channel.close();
}

根据是否是持久会话清理掉session即可。

4、Publish

java
public void processPublish(Channel channel, MqttPublishMessage msg) {
    String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
    // QoS=0
    if (msg.fixedHeader().qosLevel() == MqttQoS.AT_MOST_ONCE) {
        byte[] messageBytes = new byte[msg.payload().readableBytes()];
        msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
        InternalMessage internalMessage = new InternalMessage().setTopic(msg.variableHeader().topicName())
                .setMqttQoS(msg.fixedHeader().qosLevel().value()).setMessageBytes(messageBytes)
                .setDup(false).setRetain(false).setClientId(clientId);
        internalCommunication.internalSend(internalMessage);
        this.sendPublishMessage(msg.variableHeader().topicName(), msg.fixedHeader().qosLevel(), messageBytes, false, false);
    }
    // QoS=1
    if (msg.fixedHeader().qosLevel() == MqttQoS.AT_LEAST_ONCE) {
        byte[] messageBytes = new byte[msg.payload().readableBytes()];
        msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
        InternalMessage internalMessage = new InternalMessage().setTopic(msg.variableHeader().topicName())
                .setMqttQoS(msg.fixedHeader().qosLevel().value()).setMessageBytes(messageBytes)
                .setDup(false).setRetain(false).setClientId(clientId);
        internalCommunication.internalSend(internalMessage);
        this.sendPublishMessage(msg.variableHeader().topicName(), msg.fixedHeader().qosLevel(), messageBytes, false, false);
        this.sendPubAckMessage(channel, msg.variableHeader().packetId());
    }
    // QoS=2
    if (msg.fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) {
        byte[] messageBytes = new byte[msg.payload().readableBytes()];
        msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
        InternalMessage internalMessage = new InternalMessage().setTopic(msg.variableHeader().topicName())
                .setMqttQoS(msg.fixedHeader().qosLevel().value()).setMessageBytes(messageBytes)
                .setDup(false).setRetain(false).setClientId(clientId);
        internalCommunication.internalSend(internalMessage);
        this.sendPublishMessage(msg.variableHeader().topicName(), msg.fixedHeader().qosLevel(), messageBytes, false, false);
        this.sendPubRecMessage(channel, msg.variableHeader().packetId());
    }
    // retain=1, 保留消息
    if (msg.fixedHeader().isRetain()) {
        byte[] messageBytes = new byte[msg.payload().readableBytes()];
        msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
        if (messageBytes.length == 0) {
            retainMessageStoreService.remove(msg.variableHeader().topicName());
        } else {
            RetainMessageStore retainMessageStore = new RetainMessageStore().setTopic(msg.variableHeader().topicName()).setMqttQoS(msg.fixedHeader().qosLevel().value())
                    .setMessageBytes(messageBytes);
            retainMessageStoreService.put(msg.variableHeader().topicName(), retainMessageStore);
        }
    }
}

回顾一下Publish的流程:

e1aecacf471595e90bbe3720d391ec5c.png

f5aa9fccab7ff9437f956780b522e060.png

如果是QoS0,直接发送;

如果是QoS1,除了发送还需要回复PubAck给发送者;

如果是QoS2,除了发送还需要回复PubRec给发送者;

如果是保留消息,长度为0则清除该主题的保留消息,否则保存该主题的保留消息。

如果开启了集群功能,会通过Redis广播;如果开启了Kafka转发,会进行转发。

java
private void sendPublishMessage(String topic, MqttQoS mqttQoS, byte[] messageBytes, boolean retain, boolean dup) {
    List<SubscribeStore> subscribeStores = subscribeStoreService.search(topic);
    subscribeStores.forEach(subscribeStore -> {
        if (sessionStoreService.containsKey(subscribeStore.getClientId())) {
            // 订阅者收到MQTT消息的QoS级别, 最终取决于发布消息的QoS和主题订阅的QoS
            MqttQoS respQoS = mqttQoS.value() > subscribeStore.getMqttQoS() ? MqttQoS.valueOf(subscribeStore.getMqttQoS()) : mqttQoS;
            if (respQoS == MqttQoS.AT_MOST_ONCE) {
                MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
                        new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0),
                        new MqttPublishVariableHeader(topic, 0), Unpooled.buffer().writeBytes(messageBytes));
                LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}", subscribeStore.getClientId(), topic, respQoS.value());
                ChannelId channelId = channelIdMap.get(sessionStoreService.get(subscribeStore.getClientId()).getChannelId());
                if(channelId!=null) {
                    Channel channel = channelGroup.find(channelId);
                    if (channel != null) channel.writeAndFlush(publishMessage);
                }
            }
            if (respQoS == MqttQoS.AT_LEAST_ONCE) {
                int messageId = messageIdService.getNextMessageId();
                MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
                        new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0),
                        new MqttPublishVariableHeader(topic, messageId), Unpooled.buffer().writeBytes(messageBytes));
                LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", subscribeStore.getClientId(), topic, respQoS.value(), messageId);
                DupPublishMessageStore dupPublishMessageStore = new DupPublishMessageStore().setClientId(subscribeStore.getClientId())
                        .setTopic(topic).setMqttQoS(respQoS.value()).setMessageBytes(messageBytes).setMessageId(messageId);
                dupPublishMessageStoreService.put(subscribeStore.getClientId(), dupPublishMessageStore);
                ChannelId channelId = channelIdMap.get(sessionStoreService.get(subscribeStore.getClientId()).getChannelId());
                if(channelId!=null) {
                    Channel channel = channelGroup.find(channelId);
                    if (channel != null) channel.writeAndFlush(publishMessage);
                }
            }
            if (respQoS == MqttQoS.EXACTLY_ONCE) {
                int messageId = messageIdService.getNextMessageId();
                MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
                        new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0),
                        new MqttPublishVariableHeader(topic, messageId), Unpooled.buffer().writeBytes(messageBytes));
                LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", subscribeStore.getClientId(), topic, respQoS.value(), messageId);
                DupPublishMessageStore dupPublishMessageStore = new DupPublishMessageStore().setClientId(subscribeStore.getClientId())
                        .setTopic(topic).setMqttQoS(respQoS.value()).setMessageBytes(messageBytes).setMessageId(messageId);
                dupPublishMessageStoreService.put(subscribeStore.getClientId(), dupPublishMessageStore);
                ChannelId channelId = channelIdMap.get(sessionStoreService.get(subscribeStore.getClientId()).getChannelId());
                if(channelId!=null) {
                    Channel channel = channelGroup.find(channelId);
                    if (channel != null) channel.writeAndFlush(publishMessage);
                }
            }
        }
    });
}

和发送者通信的时候才会收到Publish。

发送的具体代码流程是:

搜寻订阅这个主题的订阅者;

如果是QoS0,直接下发(QoS0的messageId为0);

如果是QoS1,先获取一个未被占用的messageId,缓存一份DupPublish消息,再下发;

如果是QoS2,先获取一个未被占用的messageId,缓存一份DupPublish消息,再下发。

5、PubRel

java
public void processPubRel(Channel channel, MqttMessageIdVariableHeader variableHeader) {
    MqttMessage pubCompMessage = MqttMessageFactory.newMessage(
        new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0),
        MqttMessageIdVariableHeader.from(variableHeader.messageId()), null);
    LOGGER.debug("PUBREL - clientId: {}, messageId: {}", (String) channel.attr(AttributeKey.valueOf("clientId")).get(), variableHeader.messageId());
    channel.writeAndFlush(pubCompMessage);
}

和发送者通信的时候才会收到PubRel,处于QoS2第二阶段,服务器需要会送一个PubComp让发送者知道QoS2已经完成了。

6、PubAck

java
public void processPubAck(Channel channel, MqttMessageIdVariableHeader variableHeader) {
    int messageId = variableHeader.messageId();
    LOGGER.debug("PUBACK - clientId: {}, messageId: {}", (String) channel.attr(AttributeKey.valueOf("clientId")).get(), messageId);
    dupPublishMessageStoreService.remove((String) channel.attr(AttributeKey.valueOf("clientId")).get(), messageId);
}

和接收者通信的时候才会收到PubAck,表明QoS1已经完成,移除缓存的DupPublish消息。

7、PubRec

java
public void processPubRec(Channel channel, MqttMessageIdVariableHeader variableHeader) {
    MqttMessage pubRelMessage = MqttMessageFactory.newMessage(
        new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_MOST_ONCE, false, 0),
        MqttMessageIdVariableHeader.from(variableHeader.messageId()), null);
    LOGGER.debug("PUBREC - clientId: {}, messageId: {}", (String) channel.attr(AttributeKey.valueOf("clientId")).get(), variableHeader.messageId());
    dupPublishMessageStoreService.remove((String) channel.attr(AttributeKey.valueOf("clientId")).get(), variableHeader.messageId());
    DupPubRelMessageStore dupPubRelMessageStore = new DupPubRelMessageStore().setClientId((String) channel.attr(AttributeKey.valueOf("clientId")).get())
        .setMessageId(variableHeader.messageId());
    dupPubRelMessageStoreService.put((String) channel.attr(AttributeKey.valueOf("clientId")).get(), dupPubRelMessageStore);
    channel.writeAndFlush(pubRelMessage);
}

和接收者通信的时候才会收到PubRec,表明QoS2的第一阶段完成,移除缓存的Publish消息,重新缓存一份PubRel消息,并下发PubRel消息,开启第二阶段。

8、PubComp

java
public void processPubComp(Channel channel, MqttMessageIdVariableHeader variableHeader) {
    int messageId = variableHeader.messageId();
    LOGGER.debug("PUBCOMP - clientId: {}, messageId: {}", (String) channel.attr(AttributeKey.valueOf("clientId")).get(), messageId);
    dupPubRelMessageStoreService.remove((String) channel.attr(AttributeKey.valueOf("clientId")).get(), variableHeader.messageId());
}

和接收者通信的时候才会收到PubComp,表明QoS2已经完成,移除缓存的PubRel消息。

9、Subscribe

java
public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
    List<MqttTopicSubscription> topicSubscriptions = msg.payload().topicSubscriptions();
    if (this.validTopicFilter(topicSubscriptions)) {
        String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
        List<Integer> mqttQoSList = new ArrayList<Integer>();
        topicSubscriptions.forEach(topicSubscription -> {
            String topicFilter = topicSubscription.topicName();
            MqttQoS mqttQoS = topicSubscription.qualityOfService();
            SubscribeStore subscribeStore = new SubscribeStore(clientId, topicFilter, mqttQoS.value());
            subscribeStoreService.put(topicFilter, subscribeStore);
            mqttQoSList.add(mqttQoS.value());
            LOGGER.debug("SUBSCRIBE - clientId: {}, topFilter: {}, QoS: {}", clientId, topicFilter, mqttQoS.value());
        });
        MqttSubAckMessage subAckMessage = (MqttSubAckMessage) MqttMessageFactory.newMessage(
            new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
            MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()),
            new MqttSubAckPayload(mqttQoSList));
        channel.writeAndFlush(subAckMessage);
        // 发布保留消息
        topicSubscriptions.forEach(topicSubscription -> {
            String topicFilter = topicSubscription.topicName();
            MqttQoS mqttQoS = topicSubscription.qualityOfService();
            this.sendRetainMessage(channel, topicFilter, mqttQoS);
        });
    } else {
        channel.close();
    }
}

先校验所有的Topic是否正确,如果正确,则回复对应的SubAck报文,并把订阅信息存储起来。

对于有保留消息的主题,需要保存该主题的保留消息。

java
private boolean validTopicFilter(List<MqttTopicSubscription> topicSubscriptions) {
    for (MqttTopicSubscription topicSubscription : topicSubscriptions) {
        String topicFilter = topicSubscription.topicName();
        // 以#或+符号开头的、以/符号结尾的订阅按非法订阅处理, 这里没有参考标准协议
        if (StrUtil.startWith(topicFilter, '+') || StrUtil.endWith(topicFilter, '/'))
            return false;
        if (StrUtil.contains(topicFilter, '#')) {
            // 如果出现多个#符号的订阅按非法订阅处理
            if (StrUtil.count(topicFilter, '#') > 1) return false;
        }
        if (StrUtil.contains(topicFilter, '+')) {
            //如果+符号和/+字符串出现的次数不等的情况按非法订阅处理
            if (StrUtil.count(topicFilter, '+') != StrUtil.count(topicFilter, "/+")) return false;
        }
    }
    return true;
}

验证Topic的正确性只限制了一些格式,没有对“是否有权限订阅”进行权限验证。

10、Unsubscribe

java
public void processUnSubscribe(Channel channel, MqttUnsubscribeMessage msg) {
    List<String> topicFilters = msg.payload().topics();
    String clinetId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
    topicFilters.forEach(topicFilter -> {
        subscribeStoreService.remove(topicFilter, clinetId);
        LOGGER.debug("UNSUBSCRIBE - clientId: {}, topicFilter: {}", clinetId, topicFilter);
    });
    MqttUnsubAckMessage unsubAckMessage = (MqttUnsubAckMessage) MqttMessageFactory.newMessage(
        new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
        MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()), null);
    channel.writeAndFlush(unsubAckMessage);
}

移除订阅信息就好了。

总结

MqttWk的协议处理有一些地方不妥,但是整体结构非常清晰。

转载请注明出处https://bananaoven.com/articles/250.html | 香蕉微波炉
分享许可方式知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议
重大发现:转载注明原文网址的同学刚买了彩票就中奖,刚写完代码就跑通,刚转身就遇到了真爱。