1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
| public class Publish { private ISessionStoreService sessionStoreService; private ISubscribeStoreService subscribeStoreService; private IMessageIdService messageIdService; private IRetainMessageStoreService retainMessageStoreService; private IDupPublishMessageStoreService dupPublishMessageStoreService; private InternalCommunication internalCommunication;
private ChannelGroup channelGroup; private Map<String, ChannelId> channelIdMap; private BrokerProperties brokerProperties;
public void processPublish(Channel channel, MqttPublishMessage msg) { String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
if (sessionStoreService.containsKey(clientId)) { SessionStore sessionStore = sessionStoreService.get(clientId); ChannelId channelId = channelIdMap.get(sessionStore.getBrokerId() + "_" + sessionStore.getChannelId()); if (brokerProperties.getId().equals(sessionStore.getBrokerId()) && channelId != null) { sessionStoreService.expire(clientId, sessionStore.getExpire()); } }
if (msg.fixedHeader().qosLevel() == MqttQoS.AT_MOST_ONCE) { byte[] messageBytes = new byte[msg.payload().readableBytes()]; msg.payload().getBytes(msg.payload().readerIndex(), messageBytes); InternalMessage internalMessage = new InternalMessage().setTopic(msg.variableHeader().topicName()) .setMqttQoS(msg.fixedHeader().qosLevel().value()).setMessageBytes(messageBytes) .setDup(false).setRetain(false).setClientId(clientId); internalCommunication.internalSend(internalMessage); this.sendPublishMessage(msg.variableHeader().topicName(), msg.fixedHeader().qosLevel(), messageBytes, false, false); }
if (msg.fixedHeader().qosLevel() == MqttQoS.AT_LEAST_ONCE) { byte[] messageBytes = new byte[msg.payload().readableBytes()]; msg.payload().getBytes(msg.payload().readerIndex(), messageBytes); InternalMessage internalMessage = new InternalMessage().setTopic(msg.variableHeader().topicName()) .setMqttQoS(msg.fixedHeader().qosLevel().value()).setMessageBytes(messageBytes) .setDup(false).setRetain(false).setClientId(clientId); internalCommunication.internalSend(internalMessage); this.sendPublishMessage(msg.variableHeader().topicName(), msg.fixedHeader().qosLevel(), messageBytes, false, false); this.sendPubAckMessage(channel, msg.variableHeader().packetId()); }
if (msg.fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) { byte[] messageBytes = new byte[msg.payload().readableBytes()]; msg.payload().getBytes(msg.payload().readerIndex(), messageBytes); InternalMessage internalMessage = new InternalMessage().setTopic(msg.variableHeader().topicName()) .setMqttQoS(msg.fixedHeader().qosLevel().value()).setMessageBytes(messageBytes) .setDup(false).setRetain(false).setClientId(clientId); internalCommunication.internalSend(internalMessage); this.sendPublishMessage(msg.variableHeader().topicName(), msg.fixedHeader().qosLevel(), messageBytes, false, false); this.sendPubRecMessage(channel, msg.variableHeader().packetId()); }
if (msg.fixedHeader().isRetain()) { byte[] messageBytes = new byte[msg.payload().readableBytes()]; msg.payload().getBytes(msg.payload().readerIndex(), messageBytes); if (messageBytes.length == 0) { retainMessageStoreService.remove(msg.variableHeader().topicName()); } else { RetainMessageStore retainMessageStore = new RetainMessageStore().setTopic(msg.variableHeader().topicName()).setMqttQoS(msg.fixedHeader().qosLevel().value()) .setMessageBytes(messageBytes); retainMessageStoreService.put(msg.variableHeader().topicName(), retainMessageStore); } } }
private void sendPublishMessage(String topic, MqttQoS mqttQoS, byte[] messageBytes, boolean retain, boolean dup) { List<SubscribeStore> subscribeStores = subscribeStoreService.search(topic); subscribeStores.forEach(subscribeStore -> { if (sessionStoreService.containsKey(subscribeStore.getClientId())) { MqttQoS respQoS = mqttQoS.value() > subscribeStore.getMqttQoS() ? MqttQoS.valueOf(subscribeStore.getMqttQoS()) : mqttQoS;
if (respQoS == MqttQoS.AT_MOST_ONCE) { MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0), new MqttPublishVariableHeader(topic, 0), Unpooled.buffer().writeBytes(messageBytes)); LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}", subscribeStore.getClientId(), topic, respQoS.value()); SessionStore sessionStore = sessionStoreService.get(subscribeStore.getClientId()); ChannelId channelId = channelIdMap.get(sessionStore.getBrokerId() + "_" + sessionStore.getChannelId()); if (channelId != null) { Channel channel = channelGroup.find(channelId); if (channel != null) channel.writeAndFlush(publishMessage); } }
if (respQoS == MqttQoS.AT_LEAST_ONCE) { int messageId = messageIdService.getNextMessageId(); MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0), new MqttPublishVariableHeader(topic, messageId), Unpooled.buffer().writeBytes(messageBytes)); LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", subscribeStore.getClientId(), topic, respQoS.value(), messageId); DupPublishMessageStore dupPublishMessageStore = new DupPublishMessageStore().setClientId(subscribeStore.getClientId()) .setTopic(topic).setMqttQoS(respQoS.value()).setMessageBytes(messageBytes).setMessageId(messageId); dupPublishMessageStoreService.put(subscribeStore.getClientId(), dupPublishMessageStore); SessionStore sessionStore = sessionStoreService.get(subscribeStore.getClientId()); ChannelId channelId = channelIdMap.get(sessionStore.getBrokerId() + "_" + sessionStore.getChannelId()); if (channelId != null) { Channel channel = channelGroup.find(channelId); if (channel != null) channel.writeAndFlush(publishMessage); } }
if (respQoS == MqttQoS.EXACTLY_ONCE) { int messageId = messageIdService.getNextMessageId(); MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0), new MqttPublishVariableHeader(topic, messageId), Unpooled.buffer().writeBytes(messageBytes)); LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", subscribeStore.getClientId(), topic, respQoS.value(), messageId); DupPublishMessageStore dupPublishMessageStore = new DupPublishMessageStore().setClientId(subscribeStore.getClientId()) .setTopic(topic).setMqttQoS(respQoS.value()).setMessageBytes(messageBytes).setMessageId(messageId); dupPublishMessageStoreService.put(subscribeStore.getClientId(), dupPublishMessageStore); SessionStore sessionStore = sessionStoreService.get(subscribeStore.getClientId()); ChannelId channelId = channelIdMap.get(sessionStore.getBrokerId() + "_" + sessionStore.getChannelId()); if (channelId != null) { Channel channel = channelGroup.find(channelId); if (channel != null) channel.writeAndFlush(publishMessage); } } } }); }
private void sendPubAckMessage(Channel channel, int messageId) { MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(messageId), null); channel.writeAndFlush(pubAckMessage); }
private void sendPubRecMessage(Channel channel, int messageId) { MqttMessage pubRecMessage = MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(messageId), null); channel.writeAndFlush(pubRecMessage); }
}
|