Moquette源码分析(一)v0.10订阅树

一、M10 简介

Moquette 0.10(以下简称M10)的订阅树实现在moquette-0.10/broker/src/main/java/io/moquette/spi/impl/subscriptions,包括:

  • SubscriptionsDirectory:订阅树的增删改查
  • Token:片段,由topicFilter按“/”分割而成的最小字符串单位
  • Topic:主题/主题过滤器,包含一系列方法,比如比较主题和主题过滤器是否匹配
  • Subscription:订阅信息,{clientId, topicFilter,qos,active}
  • TreeNode:树节点

查看源码的时候应该从SubscriptionsDirectory开始阅读。

二、M10 订阅树表示

假设有如下订阅信息:

节点编号 订阅客户端 订阅主题 订阅质量
N5 A abc/+/123 0
N3 B abc/# 1
N3 A abc/# 0
N4 B abc/def 0
N6 B abc/def/123 0
N6 C abc/def/123 1
N7 D abc/def/456 0

则可以构建一棵典型的M10订阅树:
m10_sub_tree.png

量名 数据类型 数据含义
m_token Token 该节点代表片段
m_children List<TreeNode> 子节点
m_subscriptions Set<ClientTopicCouple>[1] 该节点订阅信息集合
subtreeSubscriptions Integer 子树订阅信息数量和[2]

[1] ClientTopicCouple是{clientId,topicFilter},相比Subscription少了一些字段,减少内存占用。
[2] 不是子节点数量之和,而是本节点订阅数量与每个子节点下面的订阅数量之和(递归遍历)

可以看到,M10订阅树的信息全在节点内(因为并没有所谓“边”的数据结构)。

三、M10 数据结构

3.1 节点 TreeNode

  1. 数据结构:TreeNode
  2. 属性值
    数据名 数据类型 数据含义
    m_token Token 本节点片段,例如“abc”
    m_children List<TreeNode> 子节点
    m_subscription Set<ClientTopicCouple> 本节点订阅信息,例如{client123,”+”}
  3. 重要方法
  • copy:浅拷贝,新建一个TreeNode,拷贝TreeNode的所有属性,由于m_children和m_subscription是引用,这里直接拷贝了引用。
  • childWithToken:查询所有的子节点,找到其中具有token片段的节点,如果找不到,返回null
  • updateChild:移除旧子节点,增加新子节点
  • remove:移除本节点上特定的订阅信息
  • matches:查询树中所有匹配某个主题Token的订阅信息。该方法会递归比对每个Token是否和该节点上所有订阅信息的TopicToken匹配,如果匹配,加入结果。

3.2 主题(主题过滤器)Topic

  1. 数据结构:Topic
  2. 属性值
    数据名 数据类型 数据含义
    topic String 主题或主题过滤器
    tokens transient List<Token>
    valid transient boolean
  3. 重要方法
  • getTokens:生成Token列表,只会生成1次,且会判断主题是否合法,比如“/abc/#/123”是非法的
  • isValid:主题是否合法
  • match:返回是否匹配某个订阅主题。要求本主题不含通配符,被匹配的主题可以含通配符。
  • asTopic:根据String生成Topic对象

3.3 片段 Token

  1. 数据结构:Token
  2. 属性值
    数据名 数据类型 数据含义
    name String 片段内容
  3. 重要方法
  • match:两个Token是否匹配。要求本Token可以含通配符,被比较的Token不能含通配符。

3.4 简要订阅信息 ClientTopicCouple

  1. 数据结构:ClientTopicCouple
  2. 属性值
    数据名 数据类型 数据含义
    topicFilter Topic 主题过滤器
    clientID String MQTT协议的clientID,客户端ID

3.5 订阅信息 Subscription

  1. 数据结构:Subscription
  2. 属性值
    数据名 数据类型 数据含义
    topicFilter Topic 主题过滤器
    clientID String MQTT协议的clientID,客户端
    requestQos MQTTQoS 订阅质量
    active boolean 订阅是否有效,暂未使用这个字段

3.6 订阅树 SubscriptionsDirectory

  1. 数据结构:SubscriptionsDirectory
  2. 属性值
    数据名 数据类型 数据含义
    subscriptions AtomicReference<TreeNode> 根节点
  3. 重要方法
  • init:从持久化存储里初始化订阅信息
  • add:新增订阅
  • removeSubscription:删除一个订阅
  • removeForClient:移除一个客户端的所有订阅
  • matches:查询一个发布主题(不含通配符)匹配的订阅信息

四、M10 匹配订阅信息过程

M10的订阅匹配实现是DFS的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 算法4-1 递归匹配 
FUNCTION void matches(Queue<Token> tokens, List<ClientTopicCouple> matchingSubs)
// 弹出队列头片段
t = tokens.poll()
// 如果已经匹配完了
IF t == null
// 说明完全匹配,加入所有的订阅信息
matchingSubs.addAll(m_subscriptions)
// 子节点如果含有通配符,也加入子节点订阅信息,因为通配符可以匹配空层,比如“/abc”是能够匹配上“/abc/#”的
FOR childNode IN m_children
IF childNode.m_token == "#" || childNode.m_token == "+"
matchingSubs.addAll(childNode.m_subscriptions)
END
END
RETURN
END


// 如果本层token是通配符(对于订阅Topic来说不可能含有通配符),则加入所有订阅信息
IF m_token == "#"
matchingSubs.addAll(m_subscriptions)
RETURN
END


// 遍历子节点,是否有匹配的,如果匹配,递归调用
FOR childNode in m_children
IF childNode.m_token matches t
matches(tokens, mathingSubs)
END
END
END

例如,我们仍然保持前面的订阅,订阅树也一样:
m10_sub_tree.png

客户端X向主题abc/def/123发布消息,从根节点开始遍历:

  1. 进入Root,弹出第一个token“abc”,查找子节点发现N1的m_token”abc”匹配
  2. 进入N1,弹出第二个token“def”,发现N2的m_token“+”匹配,N3的m_token“#”也匹配,N4的m_token“def”也匹配
  3. 3.1.1 进入子节点N2,弹出第三个token“123”,发现N5的m_token”123”匹配
    3.1.2 进入子节点N5,队列空,匹配完成,将N5的m_subscriptions[{A, abc/+/123}]加入订阅信息集合
    3.2.1 返回节点N1,进入子节点N3,队列空,匹配完成,将N3的订阅信息{A,“abc/#”}和订阅信息{B,“abc/#”}加入集合
    3.3.1 返回节点N1,进入子节点N4,发现子节点N6的m_token”123”匹配
    3.3.2 进入子节点N6,队列空,将N6的订阅信息[{B, abc/def/123},{C, abc/def/123}]加入集合
  4. 返回N1,返回Root,查找结束

这样就成功匹配到了5条订阅信息:

节点编号 订阅客户端 订阅主题 订阅质量
N5 A abc/+/123 0
N3 B abc/# 1
N3 A abc/# 0
N6 B abc/def/123 0
N6 C abc/def/123 1

五、M10增加订阅

5.1 路径重建算法 RP

M10的增加订阅和取消订阅都会调用路径重建算法(recreatePath,RP)。在了解路径重建算法之前,需要知道M10订阅树一个性质:
【特性1】给定一个主题,一旦发现某个token之前没有建立过节点,其后的所有token都必将建立新节点
【解释】给定一个主题”abc/+/123/new1/new2/new3”,前面的token”abc”,”+”,”123”由于A客户端的订阅已经构建了对应的节点TreeNode,发现其后的第一个没有构建过的token“new1”,则其后的“new2”、”new3”都肯定没有构建过。
【证明】假设后面的某个token Tn是以前建立过的,那么必然存在之前的某个订阅构建了从Root到Tn的完整路径,且当前主题S一定也遍历的这个路径(否则不会访问到Tn);但现在已经在这条路径上遇到了一个新节点T0,矛盾,所以Tn必然不可能被建立过。

RP算法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 算法5-2 路径重建recreatePath 
FUNCTION NodeCouple recreatePath(Topic topic, final TreeNode oldRoot)
// 新建节点,浅拷贝子节点,增加两个指针
newRoot = oldRoot.copy()
parent = newRoot
current = newRoot
// 遍历片段
For token IN Topic
// 如果已经存在某个子节点具有这个token
IF EXIST token IN children
// 浅拷贝这个子节点,当前节点指针下移
current = matchChild.copy()
// 删掉旧子节点,增加新子节点
parrent.update(matchChild, current)
// 父节点指针下移
parrent = current
ELSE
// 新建一个子节点
matchChild = new TreeNode
matchChild.m_token = token
// 当前节点加入新子节点
current.addChild(matchChild)
// 当前节点指针下移
current = matchChild
END
END

RETURN NodeCouple(newRoot, current)
END

5.2 增加订阅过程

举一个详细的例子,假设现在客户端F产生了一个新订阅”abc/+/123/new1/new2/new3”,我们看看是如何插入的:
(1)建立初始数据结构
浅拷贝的newRoot,两个指针P,C:
add_subscripton_1.png
(2)发现“abc”有子节点,浅拷贝,指针下移
add_subscripton_2.png
(3)发现“+”有子节点,浅拷贝,指针下移
add_subscripton_3.png
(4)发现“123”有子节点,浅拷贝,指针下移
add_subscripton_4.png
(5)发现“new1”没有节点了,增加节点
add_subscripton_5.png
(6)同理new2、new3
add_subscripton_6.png
(7)很明显,newRoot构建了一个新的树,包含了最新的订阅,current指向了创建的最后一个节点,我们更改颜色来更好地直观感受
add_subscripton_7.png

5.3 增加订阅剩余工作

目前只做了路径重建,没有完成订阅,剩下三个工作:

  1. 将订阅信息加入节点 有了current指针,就能够直接到达最后创建的节点,将订阅信息加入即可
  2. 重新计算订阅数量subtreeSubscriptions 重新执行一次递归计算(耗时)
  3. 替换旧树根节点:需要考虑并发,这里M10用的是经典的CAS,替换根节点,旧节点的内存回收由JAVA垃圾回收器来做。

我们来看一下CAS的这段源码:

1
2
3
4
5
6
7
do {
oldRoot = subscriptions.get();
couple = recreatePath(newSubscription.topicFilter, oldRoot);
couple.createdNode.addSubscription(newSubscription); //createdNode could be null?
couple.root.recalculateSubscriptionsSize();
//spin lock repeating till we can, swap root, if can't swap just re-do the operation
} while (!subscriptions.compareAndSet(oldRoot, couple.root));

当CAS替换不成功的时候(比如同时有并发的插入操作且它们执行成功导致根节点发生变化),while条件判断为true,会丢弃掉之前生成的节点,重新执行生成操作。利用这个思路可以写很多CAS的代码,例如:

1
2
3
4
5
6
7
8
9
do {
num = myAtomNum.intValue();
if (num >= maxNum) {
do something 1 ……
return;
}
} while (!myAtomNum.compareAndSet(num, num + 1));

// do something 2 ……

myAtomNum是一个原子类型,我们希望当myAtomNum的值在达到maxNum后执行“do something 1”,而没有达到时正常执行“do something 2”。如果仅仅写一个if,就会有并发问题。如果像上面这样写,则保证了并发安全。

  • 假设maxNum=50,num=40,此时有2个并发线程进入,一个成功执行do something 2,一个失败循环后成功执行do something 2。
  • 假设maxNum=50,num=49,此时有2个并发线程进入,一个成功执行do something 2,一个失败循环后成功执行do something 1。
  • 假设maxNum=50,num=50,此时有2个并发线程进入,则都将执行do something 1。

六、M10 取消订阅

M10取消订阅同样会使用RP算法,找到这棵树的最后一个订阅节点(current指针),然后移除其中的订阅信息。
例如A取消订阅“abc/+/123”,如下图所示:
remove_subscription.png
再进行和订阅同样的后续操作。

七、总结

可以发现M10的订阅树缺点:

  1. 只要订阅过,哪怕取消订阅后,这个订阅的节点也不会删除
  2. 无论通配符订阅还是非通配符订阅,都需要遍历一遍树结构。
  3. 每次订阅、取消订阅,递归更新订阅数量的时间耗费是很大的,但是这个“统计订阅量”的需求并不需要实时,也不是频繁发生的

Moquette源码分析(一)v0.10订阅树

https://www.bananaoven.com/posts/46181/

作者

香蕉微波炉

发布于

2019-09-13

更新于

2019-09-13

许可协议