
| public class Publish { private ISessionStoreService sessionStoreService; private ISubscribeStoreService subscribeStoreService; private IMessageIdService messageIdService; private IRetainMessageStoreService retainMessageStoreService; private IDupPublishMessageStoreService dupPublishMessageStoreService; private InternalCommunication internalCommunication;
private ChannelGroup channelGroup; private Map<String, ChannelId> channelIdMap; private BrokerProperties brokerProperties;
public void processPublish(Channel channel, MqttPublishMessage msg) { String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
if (sessionStoreService.containsKey(clientId)) { SessionStore sessionStore = sessionStoreService.get(clientId); ChannelId channelId = channelIdMap.get(sessionStore.getBrokerId() + "_" + sessionStore.getChannelId()); if (brokerProperties.getId().equals(sessionStore.getBrokerId()) && channelId != null) { sessionStoreService.expire(clientId, sessionStore.getExpire()); } }
if (msg.fixedHeader().qosLevel() == MqttQoS.AT_MOST_ONCE) { byte[] messageBytes = new byte[msg.payload().readableBytes()]; msg.payload().getBytes(msg.payload().readerIndex(), messageBytes); InternalMessage internalMessage = new InternalMessage().setTopic(msg.variableHeader().topicName()) .setMqttQoS(msg.fixedHeader().qosLevel().value()).setMessageBytes(messageBytes) .setDup(false).setRetain(false).setClientId(clientId); internalCommunication.internalSend(internalMessage); this.sendPublishMessage(msg.variableHeader().topicName(), msg.fixedHeader().qosLevel(), messageBytes, false, false); }
if (msg.fixedHeader().qosLevel() == MqttQoS.AT_LEAST_ONCE) { byte[] messageBytes = new byte[msg.payload().readableBytes()]; msg.payload().getBytes(msg.payload().readerIndex(), messageBytes); InternalMessage internalMessage = new InternalMessage().setTopic(msg.variableHeader().topicName()) .setMqttQoS(msg.fixedHeader().qosLevel().value()).setMessageBytes(messageBytes) .setDup(false).setRetain(false).setClientId(clientId); internalCommunication.internalSend(internalMessage); this.sendPublishMessage(msg.variableHeader().topicName(), msg.fixedHeader().qosLevel(), messageBytes, false, false); this.sendPubAckMessage(channel, msg.variableHeader().packetId()); }
if (msg.fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) { byte[] messageBytes = new byte[msg.payload().readableBytes()]; msg.payload().getBytes(msg.payload().readerIndex(), messageBytes); InternalMessage internalMessage = new InternalMessage().setTopic(msg.variableHeader().topicName()) .setMqttQoS(msg.fixedHeader().qosLevel().value()).setMessageBytes(messageBytes) .setDup(false).setRetain(false).setClientId(clientId); internalCommunication.internalSend(internalMessage); this.sendPublishMessage(msg.variableHeader().topicName(), msg.fixedHeader().qosLevel(), messageBytes, false, false); this.sendPubRecMessage(channel, msg.variableHeader().packetId()); }
if (msg.fixedHeader().isRetain()) { byte[] messageBytes = new byte[msg.payload().readableBytes()]; msg.payload().getBytes(msg.payload().readerIndex(), messageBytes); if (messageBytes.length == 0) { retainMessageStoreService.remove(msg.variableHeader().topicName()); } else { RetainMessageStore retainMessageStore = new RetainMessageStore().setTopic(msg.variableHeader().topicName()).setMqttQoS(msg.fixedHeader().qosLevel().value()) .setMessageBytes(messageBytes); retainMessageStoreService.put(msg.variableHeader().topicName(), retainMessageStore); } } }
private void sendPublishMessage(String topic, MqttQoS mqttQoS, byte[] messageBytes, boolean retain, boolean dup) { List<SubscribeStore> subscribeStores = subscribeStoreService.search(topic); subscribeStores.forEach(subscribeStore -> { if (sessionStoreService.containsKey(subscribeStore.getClientId())) { MqttQoS respQoS = mqttQoS.value() > subscribeStore.getMqttQoS() ? MqttQoS.valueOf(subscribeStore.getMqttQoS()) : mqttQoS;
if (respQoS == MqttQoS.AT_MOST_ONCE) { MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0), new MqttPublishVariableHeader(topic, 0), Unpooled.buffer().writeBytes(messageBytes)); LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}", subscribeStore.getClientId(), topic, respQoS.value()); SessionStore sessionStore = sessionStoreService.get(subscribeStore.getClientId()); ChannelId channelId = channelIdMap.get(sessionStore.getBrokerId() + "_" + sessionStore.getChannelId()); if (channelId != null) { Channel channel = channelGroup.find(channelId); if (channel != null) channel.writeAndFlush(publishMessage); } }
if (respQoS == MqttQoS.AT_LEAST_ONCE) { int messageId = messageIdService.getNextMessageId(); MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0), new MqttPublishVariableHeader(topic, messageId), Unpooled.buffer().writeBytes(messageBytes)); LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", subscribeStore.getClientId(), topic, respQoS.value(), messageId); DupPublishMessageStore dupPublishMessageStore = new DupPublishMessageStore().setClientId(subscribeStore.getClientId()) .setTopic(topic).setMqttQoS(respQoS.value()).setMessageBytes(messageBytes).setMessageId(messageId); dupPublishMessageStoreService.put(subscribeStore.getClientId(), dupPublishMessageStore); SessionStore sessionStore = sessionStoreService.get(subscribeStore.getClientId()); ChannelId channelId = channelIdMap.get(sessionStore.getBrokerId() + "_" + sessionStore.getChannelId()); if (channelId != null) { Channel channel = channelGroup.find(channelId); if (channel != null) channel.writeAndFlush(publishMessage); } }
if (respQoS == MqttQoS.EXACTLY_ONCE) { int messageId = messageIdService.getNextMessageId(); MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0), new MqttPublishVariableHeader(topic, messageId), Unpooled.buffer().writeBytes(messageBytes)); LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", subscribeStore.getClientId(), topic, respQoS.value(), messageId); DupPublishMessageStore dupPublishMessageStore = new DupPublishMessageStore().setClientId(subscribeStore.getClientId()) .setTopic(topic).setMqttQoS(respQoS.value()).setMessageBytes(messageBytes).setMessageId(messageId); dupPublishMessageStoreService.put(subscribeStore.getClientId(), dupPublishMessageStore); SessionStore sessionStore = sessionStoreService.get(subscribeStore.getClientId()); ChannelId channelId = channelIdMap.get(sessionStore.getBrokerId() + "_" + sessionStore.getChannelId()); if (channelId != null) { Channel channel = channelGroup.find(channelId); if (channel != null) channel.writeAndFlush(publishMessage); } } } }); }
private void sendPubAckMessage(Channel channel, int messageId) { MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(messageId), null); channel.writeAndFlush(pubAckMessage); }
private void sendPubRecMessage(Channel channel, int messageId) { MqttMessage pubRecMessage = MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(messageId), null); channel.writeAndFlush(pubRecMessage); }
}
|