前言
基于MqttWkv1.0.7。
MqttWk没有构建内存中的订阅树,直接利用存储在Redis中的订阅信息来获取订阅者。
订阅树算法分析
主要的代码在cn.wizzer.iot.mqtt.server.store.subscribe.SubscribeStoreService:
java
@Override
public List<SubscribeStore> search(String topic) {
List<SubscribeStore> subscribeStores = new ArrayList<SubscribeStore>();
List<SubscribeStore> list = subscribeNotWildcardCache.all(topic);
if (list.size() > 0) {
subscribeStores.addAll(list);
}
subscribeWildcardCache.all().forEach((topicFilter, map) -> {
if (StrUtil.split(topic, '/').size() >= StrUtil.split(topicFilter, '/').size()) {
List<String> splitTopics = StrUtil.split(topic, '/');//a
List<String> spliteTopicFilters = StrUtil.split(topicFilter, '/');//#
String newTopicFilter = "";
for (int i = 0; i < spliteTopicFilters.size(); i++) {
String value = spliteTopicFilters.get(i);
if (value.equals("+")) {
newTopicFilter = newTopicFilter + "+/";
} else if (value.equals("#")) {
newTopicFilter = newTopicFilter + "#/";
break;
} else {
newTopicFilter = newTopicFilter + splitTopics.get(i) + "/";
}
}
newTopicFilter = StrUtil.removeSuffix(newTopicFilter, "/");
if (topicFilter.equals(newTopicFilter)) {
Collection<SubscribeStore> collection = map.values();
List<SubscribeStore> list2 = new ArrayList<SubscribeStore>(collection);
subscribeStores.addAll(list2);
}
}
});
return subscribeStores;
}对于普通的精确订阅,直接根据主题名得到对应的订阅者。
对于通配符订阅,直接获取所有通配符订阅Topic和订阅者,一个一个比对是否匹配,匹配的话就把订阅者加进去。
普通精确订阅正是之前有人提出mosquitto的订阅树优化成Hash表的方法。我惊讶的是通配符订阅直接拉取了所有通配符Topic和订阅者,这个量不会巨大吗。后来仔细想想,设备端的订阅全都是以设备deviceId或者什么唯一标识符的精确订阅,根本不会订阅通配符主题,订阅通配符主题的只有后端服务,因为后端服务才会想要/+/connected这种全局的信息。后端服务总共就那么多,所以这样拉取也不会太多,一个一个遍历时长也能够接受。
同时采用Redis做集中存储的好处是,集群不需要同步内存中的订阅树,Redis的单线程保证了并发修改订阅信息不会出现问题。



粤公网安备44030602007943号