Skip to content

Moquette源码分析(一)v0.10订阅树

一、M10 简介

Moquette 0.10(以下简称M10)的订阅树实现在moquette-0.10/broker/src/main/java/io/moquette/spi/impl/subscriptions,包括:

  • SubscriptionsDirectory:订阅树的增删改查
  • Token:片段,由topicFilter按“/”分割而成的最小字符串单位
  • Topic:主题/主题过滤器,包含一系列方法,比如比较主题和主题过滤器是否匹配
  • Subscription:订阅信息,<clientId, topicFilter,qos,active>
  • TreeNode:树节点

查看源码的时候应该从SubscriptionsDirectory开始阅读。

二、M10 订阅树表示

假设有如下订阅信息:

节点编号订阅客户端订阅主题订阅质量
N5Aabc/+/1230
N3Babc/#1
N3Aabc/#0
N4Babc/def0
N6Babc/def/1230
N6Cabc/def/1231
N7Dabc/def/4560

则可以构建一棵典型的M10订阅树: m10_sub_tree.png

量名数据类型数据含义
m_tokenToken该节点代表片段
m_childrenList<TreeNode>子节点
m_subscriptionsSet<ClientTopicCouple>[1]该节点订阅信息集合
subtreeSubscriptionsInteger子树订阅信息数量和[2]

[1] ClientTopicCouple是{clientId,topicFilter},相比Subscription少了一些字段,减少内存占用。
[2] 不是子节点数量之和,而是本节点订阅数量与每个子节点下面的订阅数量之和(递归遍历)

可以看到,M10订阅树的信息全在节点内(因为并没有所谓“边”的数据结构)。

三、M10 数据结构

3.1 节点 TreeNode

  1. 数据结构:TreeNode
  2. 属性值
    数据名数据类型数据含义
    m_tokenToken本节点片段,例如“abc”
    m_childrenList<TreeNode>子节点
    m_subscriptionSet<ClientTopicCouple>本节点订阅信息,例如{client123,"+"}
  3. 重要方法
  • copy:浅拷贝,新建一个TreeNode,拷贝TreeNode的所有属性,由于m_children和m_subscription是引用,这里直接拷贝了引用。
  • childWithToken:查询所有的子节点,找到其中具有token片段的节点,如果找不到,返回null
  • updateChild:移除旧子节点,增加新子节点
  • remove:移除本节点上特定的订阅信息
  • matches:查询树中所有匹配某个主题Token的订阅信息。该方法会递归比对每个Token是否和该节点上所有订阅信息的TopicToken匹配,如果匹配,加入结果。

3.2 主题(主题过滤器)Topic

  1. 数据结构:Topic
  2. 属性值
    数据名数据类型数据含义
    topicString主题或主题过滤器
    tokenstransient List<Token>划分的片段(不会被序列化)
    validtransient boolean是否是合法的主题(不会被序列化)
  3. 重要方法
  • getTokens:生成Token列表,只会生成1次,且会判断主题是否合法,比如“/abc/#/123”是非法的
  • isValid:主题是否合法
  • match:返回是否匹配某个订阅主题。要求本主题不含通配符,被匹配的主题可以含通配符。
  • asTopic:根据String生成Topic对象

3.3 片段 Token

  1. 数据结构:Token
  2. 属性值
    数据名数据类型数据含义
    nameString片段内容
  3. 重要方法
  • match:两个Token是否匹配。要求本Token可以含通配符,被比较的Token不能含通配符。

3.4 简要订阅信息 ClientTopicCouple

  1. 数据结构:ClientTopicCouple
  2. 属性值
    数据名数据类型数据含义
    topicFilterTopic主题过滤器
    clientIDStringMQTT协议的clientID,客户端ID

3.5 订阅信息 Subscription

  1. 数据结构:Subscription
  2. 属性值
    数据名数据类型数据含义
    topicFilterTopic主题过滤器
    clientIDStringMQTT协议的clientID,客户端
    requestQosMQTTQoS订阅质量
    activeboolean订阅是否有效,暂未使用这个字段

3.6 订阅树 SubscriptionsDirectory

  1. 数据结构:SubscriptionsDirectory
  2. 属性值
    数据名数据类型数据含义
    subscriptionsAtomicReference<TreeNode>根节点
  3. 重要方法
  • init:从持久化存储里初始化订阅信息
  • add:新增订阅
  • removeSubscription:删除一个订阅
  • removeForClient:移除一个客户端的所有订阅
  • matches:查询一个发布主题(不含通配符)匹配的订阅信息

四、M10 匹配订阅信息过程

M10的订阅匹配实现是DFS的:

java
// 算法4-1 递归匹配 
FUNCTION void matches(Queue<Token> tokens, List<ClientTopicCouple> matchingSubs)
    // 弹出队列头片段
    t = tokens.poll()
    // 如果已经匹配完了
    IF t == null
       // 说明完全匹配,加入所有的订阅信息
       matchingSubs.addAll(m_subscriptions)
      // 子节点如果含有通配符,也加入子节点订阅信息,因为通配符可以匹配空层,比如“/abc”是能够匹配上“/abc/#”的
       FOR childNode IN m_children
           IF childNode.m_token == "#" || childNode.m_token == "+"
               matchingSubs.addAll(childNode.m_subscriptions) 
           END
       END
       RETURN
    END


    // 如果本层token是通配符(对于订阅Topic来说不可能含有通配符),则加入所有订阅信息
    IF m_token == "#"
        matchingSubs.addAll(m_subscriptions) 
        RETURN
    END


    // 遍历子节点,是否有匹配的,如果匹配,递归调用
    FOR childNode in m_children
        IF childNode.m_token matches t
            matches(tokens, mathingSubs)
        END
    END
END

例如,我们仍然保持前面的订阅,订阅树也一样: m10_sub_tree.png

客户端X向主题abc/def/123发布消息,从根节点开始遍历:

java
1. 进入Root,弹出第一个token“abc”,查找子节点发现N1的m_token"abc"匹配
2. 进入N1,弹出第二个token“def”,发现N2的m_token“+”匹配,N3的m_token“#”也匹配,N4的m_token“def”也匹配
3.
3.1.1 进入子节点N2,弹出第三个token“123”,发现N5的m_token"123"匹配
3.1.2 进入子节点N5,队列空,匹配完成,将N5的m_subscriptions[{A, abc/+/123}]加入订阅信息集合
3.2.1 返回节点N1,进入子节点N3,队列空,匹配完成,将N3的订阅信息{A,“abc/#”}和订阅信息{B,“abc/#”}加入集合
3.3.1 返回节点N1,进入子节点N4,发现子节点N6的m_token"123"匹配
3.3.2 进入子节点N6,队列空,将N6的订阅信息[{B, abc/def/123},{C, abc/def/123}]加入集合
4. 返回N1,返回Root,查找结束

这样就成功匹配到了5条订阅信息:

节点编号订阅客户端订阅主题订阅质量
N5Aabc/+/1230
N3Babc/#1
N3Aabc/#0
N6Babc/def/1230
N6Cabc/def/1231

五、M10增加订阅

5.1 路径重建算法 RP

M10的增加订阅和取消订阅都会调用路径重建算法(recreatePath,RP)。在了解路径重建算法之前,需要知道M10订阅树一个性质:

【特性1】给定一个主题,一旦发现某个token之前没有建立过节点,其后的所有token都必将建立新节点

【解释】给定一个主题"abc/+/123/new1/new2/new3",前面的token"abc","+","123"由于A客户端的订阅已经构建了对应的节点TreeNode,发现其后的第一个没有构建过的token“new1”,则其后的“new2”、"new3"都肯定没有构建过。

【证明】假设后面的某个token Tn是以前建立过的,那么必然存在之前的某个订阅构建了从Root到Tn的完整路径,且当前主题S一定也遍历的这个路径(否则不会访问到Tn);但现在已经在这条路径上遇到了一个新节点T0,矛盾,所以Tn必然不可能被建立过。

RP算法如下:

java
// 算法5-2 路径重建recreatePath 
FUNCTION NodeCouple recreatePath(Topic topic, final TreeNode oldRoot)
    // 新建节点,浅拷贝子节点,增加两个指针
    newRoot = oldRoot.copy()
    parent = newRoot
    current = newRoot
    // 遍历片段
    For token IN Topic
       // 如果已经存在某个子节点具有这个token
       IF EXIST token IN children
           // 浅拷贝这个子节点,当前节点指针下移
           current = matchChild.copy()
           // 删掉旧子节点,增加新子节点
           parrent.update(matchChild, current)
           // 父节点指针下移
           parrent = current
       ELSE
           // 新建一个子节点
           matchChild = new TreeNode
           matchChild.m_token = token
           // 当前节点加入新子节点
           current.addChild(matchChild)
           // 当前节点指针下移
           current = matchChild
       END
    END
    
    RETURN NodeCouple(newRoot, current)
END

5.2 增加订阅过程

举一个详细的例子,假设现在客户端F产生了一个新订阅"abc/+/123/new1/new2/new3",我们看看是如何插入的:

  1. 建立初始数据结构 浅拷贝的newRoot,两个指针P,C: add_subscripton_1.png
  2. 发现“abc”有子节点,浅拷贝,指针下移 add_subscripton_2.png
  3. 发现“+”有子节点,浅拷贝,指针下移 add_subscripton_3.png
  4. 发现“123”有子节点,浅拷贝,指针下移 add_subscripton_4.png
  5. 发现“new1”没有节点了,增加节点 add_subscripton_5.png
  6. 同理new2、new3 add_subscripton_6.png
  7. 很明显,newRoot构建了一个新的树,包含了最新的订阅,current指向了创建的最后一个节点,我们更改颜色来更好地直观感受 add_subscripton_7.png

5.3 增加订阅剩余工作

目前只做了路径重建,没有完成订阅,剩下三个工作:

  1. 将订阅信息加入节点 有了current指针,就能够直接到达最后创建的节点,将订阅信息加入即可
  2. 重新计算订阅数量subtreeSubscriptions 重新执行一次递归计算(耗时)
  3. 替换旧树根节点:需要考虑并发,这里M10用的是经典的CAS,替换根节点,旧节点的内存回收由JAVA垃圾回收器来做。

我们来看一下CAS的这段源码:

java
do {
    oldRoot = subscriptions.get();
    couple = recreatePath(newSubscription.topicFilter, oldRoot);
    couple.createdNode.addSubscription(newSubscription); //createdNode could be null?
    couple.root.recalculateSubscriptionsSize();
    //spin lock repeating till we can, swap root, if can't swap just re-do the operation
} while (!subscriptions.compareAndSet(oldRoot, couple.root));

当CAS替换不成功的时候(比如同时有并发的插入操作且它们执行成功导致根节点发生变化),while条件判断为true,会丢弃掉之前生成的节点,重新执行生成操作。利用这个思路可以写很多CAS的代码,例如:

java
do {
	num = myAtomNum.intValue();
	if (num >=  maxNum) {
		do something 1 ……
                return;
	}
 } while (!myAtomNum.compareAndSet(num, num + 1));
 
// do something 2 ……

myAtomNum是一个原子类型,我们希望当myAtomNum的值在达到maxNum后执行“do something 1”,而没有达到时正常执行“do something 2”。如果仅仅写一个if,就会有并发问题。如果像上面这样写,则保证了并发安全。

  • 假设maxNum=50,num=40,此时有2个并发线程进入,一个成功执行do something 2,一个失败循环后成功执行do something 2。
  • 假设maxNum=50,num=49,此时有2个并发线程进入,一个成功执行do something 2,一个失败循环后成功执行do something 1。
  • 假设maxNum=50,num=50,此时有2个并发线程进入,则都将执行do something 1。

六、M10 取消订阅

M10取消订阅同样会使用RP算法,找到这棵树的最后一个订阅节点(current指针),然后移除其中的订阅信息。 例如A取消订阅“abc/+/123”,如下图所示: remove_subscription.png 再进行和订阅同样的后续操作。

七、总结

可以发现M10的订阅树缺点:

  1. 只要订阅过,哪怕取消订阅后,这个订阅的节点也不会删除
  2. 无论通配符订阅还是非通配符订阅,都需要遍历一遍树结构。
  3. 每次订阅、取消订阅,递归更新订阅数量的时间耗费是很大的,但是这个“统计订阅量”的需求并不需要实时,也不是频繁发生的
转载请注明出处https://bananaoven.com/articles/269.html | 香蕉微波炉
分享许可方式知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议
重大发现:转载注明原文网址的同学刚买了彩票就中奖,刚写完代码就跑通,刚转身就遇到了真爱。
具体问题具体杠
0/20000
发送
加载中...