Moquette源码分析(一)v0.10订阅树
一、M10 简介
Moquette 0.10(以下简称M10)的订阅树实现在moquette-0.10/broker/src/main/java/io/moquette/spi/impl/subscriptions,包括:
- SubscriptionsDirectory:订阅树的增删改查
- Token:片段,由topicFilter按“/”分割而成的最小字符串单位
- Topic:主题/主题过滤器,包含一系列方法,比如比较主题和主题过滤器是否匹配
- Subscription:订阅信息,{clientId, topicFilter,qos,active}
- TreeNode:树节点
查看源码的时候应该从SubscriptionsDirectory开始阅读。
二、M10 订阅树表示
假设有如下订阅信息:
节点编号 | 订阅客户端 | 订阅主题 | 订阅质量 |
---|---|---|---|
N5 | A | abc/+/123 | 0 |
N3 | B | abc/# | 1 |
N3 | A | abc/# | 0 |
N4 | B | abc/def | 0 |
N6 | B | abc/def/123 | 0 |
N6 | C | abc/def/123 | 1 |
N7 | D | abc/def/456 | 0 |
则可以构建一棵典型的M10订阅树:
量名 | 数据类型 | 数据含义 |
---|---|---|
m_token | Token | 该节点代表片段 |
m_children | List<TreeNode> | 子节点 |
m_subscriptions | Set<ClientTopicCouple>[1] | 该节点订阅信息集合 |
subtreeSubscriptions | Integer | 子树订阅信息数量和[2] |
[1] ClientTopicCouple是{clientId,topicFilter},相比Subscription少了一些字段,减少内存占用。
[2] 不是子节点数量之和,而是本节点订阅数量与每个子节点下面的订阅数量之和(递归遍历)
可以看到,M10订阅树的信息全在节点内(因为并没有所谓“边”的数据结构)。
三、M10 数据结构
3.1 节点 TreeNode
- 数据结构:TreeNode
- 属性值
数据名 数据类型 数据含义 m_token Token 本节点片段,例如“abc” m_children List<TreeNode> 子节点 m_subscription Set<ClientTopicCouple> 本节点订阅信息,例如{client123,”+”} - 重要方法
- copy:浅拷贝,新建一个TreeNode,拷贝TreeNode的所有属性,由于m_children和m_subscription是引用,这里直接拷贝了引用。
- childWithToken:查询所有的子节点,找到其中具有token片段的节点,如果找不到,返回null
- updateChild:移除旧子节点,增加新子节点
- remove:移除本节点上特定的订阅信息
- matches:查询树中所有匹配某个主题Token的订阅信息。该方法会递归比对每个Token是否和该节点上所有订阅信息的TopicToken匹配,如果匹配,加入结果。
3.2 主题(主题过滤器)Topic
- 数据结构:Topic
- 属性值
数据名 数据类型 数据含义 topic String 主题或主题过滤器 tokens transient List<Token> valid transient boolean - 重要方法
- getTokens:生成Token列表,只会生成1次,且会判断主题是否合法,比如“/abc/#/123”是非法的
- isValid:主题是否合法
- match:返回是否匹配某个订阅主题。要求本主题不含通配符,被匹配的主题可以含通配符。
- asTopic:根据String生成Topic对象
3.3 片段 Token
- 数据结构:Token
- 属性值
数据名 数据类型 数据含义 name String 片段内容 - 重要方法
- match:两个Token是否匹配。要求本Token可以含通配符,被比较的Token不能含通配符。
3.4 简要订阅信息 ClientTopicCouple
- 数据结构:ClientTopicCouple
- 属性值
数据名 数据类型 数据含义 topicFilter Topic 主题过滤器 clientID String MQTT协议的clientID,客户端ID
3.5 订阅信息 Subscription
- 数据结构:Subscription
- 属性值
数据名 数据类型 数据含义 topicFilter Topic 主题过滤器 clientID String MQTT协议的clientID,客户端 requestQos MQTTQoS 订阅质量 active boolean 订阅是否有效,暂未使用这个字段
3.6 订阅树 SubscriptionsDirectory
- 数据结构:SubscriptionsDirectory
- 属性值
数据名 数据类型 数据含义 subscriptions AtomicReference<TreeNode> 根节点 - 重要方法
- init:从持久化存储里初始化订阅信息
- add:新增订阅
- removeSubscription:删除一个订阅
- removeForClient:移除一个客户端的所有订阅
- matches:查询一个发布主题(不含通配符)匹配的订阅信息
四、M10 匹配订阅信息过程
M10的订阅匹配实现是DFS的:
1 | // 算法4-1 递归匹配 |
例如,我们仍然保持前面的订阅,订阅树也一样:
客户端X向主题abc/def/123发布消息,从根节点开始遍历:
- 进入Root,弹出第一个token“abc”,查找子节点发现N1的m_token”abc”匹配
- 进入N1,弹出第二个token“def”,发现N2的m_token“+”匹配,N3的m_token“#”也匹配,N4的m_token“def”也匹配
- 3.1.1 进入子节点N2,弹出第三个token“123”,发现N5的m_token”123”匹配
3.1.2 进入子节点N5,队列空,匹配完成,将N5的m_subscriptions[{A, abc/+/123}]加入订阅信息集合
3.2.1 返回节点N1,进入子节点N3,队列空,匹配完成,将N3的订阅信息{A,“abc/#”}和订阅信息{B,“abc/#”}加入集合
3.3.1 返回节点N1,进入子节点N4,发现子节点N6的m_token”123”匹配
3.3.2 进入子节点N6,队列空,将N6的订阅信息[{B, abc/def/123},{C, abc/def/123}]加入集合 - 返回N1,返回Root,查找结束
这样就成功匹配到了5条订阅信息:
节点编号 | 订阅客户端 | 订阅主题 | 订阅质量 |
---|---|---|---|
N5 | A | abc/+/123 | 0 |
N3 | B | abc/# | 1 |
N3 | A | abc/# | 0 |
N6 | B | abc/def/123 | 0 |
N6 | C | abc/def/123 | 1 |
五、M10增加订阅
5.1 路径重建算法 RP
M10的增加订阅和取消订阅都会调用路径重建算法(recreatePath,RP)。在了解路径重建算法之前,需要知道M10订阅树一个性质:
【特性1】给定一个主题,一旦发现某个token之前没有建立过节点,其后的所有token都必将建立新节点
【解释】给定一个主题”abc/+/123/new1/new2/new3”,前面的token”abc”,”+”,”123”由于A客户端的订阅已经构建了对应的节点TreeNode,发现其后的第一个没有构建过的token“new1”,则其后的“new2”、”new3”都肯定没有构建过。
【证明】假设后面的某个token Tn是以前建立过的,那么必然存在之前的某个订阅构建了从Root到Tn的完整路径,且当前主题S一定也遍历的这个路径(否则不会访问到Tn);但现在已经在这条路径上遇到了一个新节点T0,矛盾,所以Tn必然不可能被建立过。
RP算法如下:
1 | // 算法5-2 路径重建recreatePath |
5.2 增加订阅过程
举一个详细的例子,假设现在客户端F产生了一个新订阅”abc/+/123/new1/new2/new3”,我们看看是如何插入的:
(1)建立初始数据结构
浅拷贝的newRoot,两个指针P,C:
(2)发现“abc”有子节点,浅拷贝,指针下移
(3)发现“+”有子节点,浅拷贝,指针下移
(4)发现“123”有子节点,浅拷贝,指针下移
(5)发现“new1”没有节点了,增加节点
(6)同理new2、new3
(7)很明显,newRoot构建了一个新的树,包含了最新的订阅,current指向了创建的最后一个节点,我们更改颜色来更好地直观感受
5.3 增加订阅剩余工作
目前只做了路径重建,没有完成订阅,剩下三个工作:
- 将订阅信息加入节点 有了current指针,就能够直接到达最后创建的节点,将订阅信息加入即可
- 重新计算订阅数量subtreeSubscriptions 重新执行一次递归计算(耗时)
- 替换旧树根节点:需要考虑并发,这里M10用的是经典的CAS,替换根节点,旧节点的内存回收由JAVA垃圾回收器来做。
我们来看一下CAS的这段源码:
1 | do { |
当CAS替换不成功的时候(比如同时有并发的插入操作且它们执行成功导致根节点发生变化),while条件判断为true,会丢弃掉之前生成的节点,重新执行生成操作。利用这个思路可以写很多CAS的代码,例如:
1 | do { |
myAtomNum是一个原子类型,我们希望当myAtomNum的值在达到maxNum后执行“do something 1”,而没有达到时正常执行“do something 2”。如果仅仅写一个if,就会有并发问题。如果像上面这样写,则保证了并发安全。
- 假设maxNum=50,num=40,此时有2个并发线程进入,一个成功执行do something 2,一个失败循环后成功执行do something 2。
- 假设maxNum=50,num=49,此时有2个并发线程进入,一个成功执行do something 2,一个失败循环后成功执行do something 1。
- 假设maxNum=50,num=50,此时有2个并发线程进入,则都将执行do something 1。
六、M10 取消订阅
M10取消订阅同样会使用RP算法,找到这棵树的最后一个订阅节点(current指针),然后移除其中的订阅信息。
例如A取消订阅“abc/+/123”,如下图所示:
再进行和订阅同样的后续操作。
七、总结
可以发现M10的订阅树缺点:
- 只要订阅过,哪怕取消订阅后,这个订阅的节点也不会删除
- 无论通配符订阅还是非通配符订阅,都需要遍历一遍树结构。
- 每次订阅、取消订阅,递归更新订阅数量的时间耗费是很大的,但是这个“统计订阅量”的需求并不需要实时,也不是频繁发生的
Moquette源码分析(一)v0.10订阅树