百度开源MqttBroker:BifroMQ分析
2023.07.17收到了百度开源IoT Broker的好消息,一直没有时间看。最近工作闲下来了,准备分析一下它的实现。
2019年开发Broker时,调研了很多实现方式,百度IoT是我特别想参考的实现,只可惜当时并没有开源代码可以参考,通过对实现方式的猜测,总结了这篇文章:《百度IoT:MQTT Broker架构设计》,不过现在开源版本改动了很多东西。
我们以BifroMQ v1.0.2版本来分析。
代码整体分析
base-cluster:集群通信
2019版百度用了Akka Cluster来做集群管理,但是开源版BifroMQ是自己重写了一套集群管理,用Gossip来做集群共识(Redis Cluster也用的Gossip),传输层支持TCP和UDP,底层是用Netty实现的。这么做的原因不太清楚,不过我也是开始也是受到EMQ的影响,认为Java的话Akka来做Broker集群通信会很方便。后面浅学了一下Akka,发现还是有很多弊端。或许百度也是如此放弃了Akka。base-crdt、base-hlc:不清楚
看不懂这个包做的事情,也没有任何注释,crdt、hlc也都是缩写不能从英文去推测……base-env:管理线程池
这里不清楚为什么线程池叫envbase-hookloader:容器
百度没有用Spring,这里做了容器base-kv:核心组件,复杂的kv引擎
这块值得我后面仔细学习,这是我最想学的百度IoT Broker的部分base-rpc:自建RPC通信
base-scheduler:基于caffeine做的调度器
bifromq-dist:订阅相关代码
bifromq-inbox:接受消息
bifromq-mqtt:mqttBroker服务器
这里你可以看到常见Broker的基本协议实现。目前只实现了v3版本的mqtt协议解析,支持mqtt over websocket,底层框架也是Netty4。看官网文档说后面会支持MQTT5,Netty的新版本已经支持MQTT5了,到时候可以直接用Netty的decoder/encoder。不过MQTT5的实现难点不在于协议解析,而是共享订阅这种功能。bifromq-plugin:插件
提供入鉴权、事件、桥接等插件bifromq-retain:处理保留消息
bifromq-session-dict:session管理
session的管理也是个难点bifromq-sysprops:默认值配置
类似自己实现了spring的application.propertiesbuild:启动相关
订阅浅析
之前的文章《百度IoT:MQTT Broker架构设计》提到过,对订阅过程很感兴趣。这里分析一下。
入口:订阅消息解析
订阅MQTT消息进入bifromq-mqtt/bifromq-mqtt-server的MQTT3SessionHandler
类的doSubscribe方法处理,对每个订阅执行doSubscribe
,有2个实现类:
- MQTT3PersistentSessionHandler
- MQTT3TransientSessionHandler
很明显,就是之前提到过的百度IoT对持久化session和非持久化session的处理。
我们只看持久化的,最终会调用bifromq-dist/bifromq-dist-client的DistClient
类的sub方法执行订阅。里面的invoke方法会调用bifromq-dist/bifromq-dist-server的DistService
的sub方法来接收处理。
DistService#sub方法
整体流程
- 把SubRequest请求转换为SubCall.AddTopicFilter,然后用subCallScheduler.schedule执行,名字含义是增加订阅
- 如果是普通主题,就把SubRequest请求转换为SubCall.InsertMatchRecord,然后用subCallScheduler.schedule执行,名字含义是插入匹配记录
- 否则(如果是共享订阅$share),就把SubRequest请求转换为SubCall.JoinMatchGroup,然后用subCallScheduler.schedule执行,名字含义是加入匹配组。看起来这里的代码是想为MQTT5的共享订阅做准备。
最后用CompletableFuture.allOf同步结果:
- 如果插入主题过滤器失败,则失败
- 如果是插入匹配记录或者加入匹配组,返回订阅QoS等级
AddTopicFilter的过程
1 | public CompletableFuture<Resp> schedule(Req request) { |
以AddTopicFilter为例,subCallScheduler本质是一个BatchCallScheduler批量提交调度器,所以这个submit实际上用的是ICallScheduler接口的default实现,即创建一个完成的CompletableFuture:
1 | default CompletableFuture<Req> submit(Req request) { |
然后会thenCompose,表示完成后执行,会调用BatchCallScheduler的find拿到BatchCallBuilderKey,然后用key去callQueues里面拿到BatchCallBuilder<Req, Resp>订阅请求对象:
1 | private final LoadingCache<BatchCallBuilderKey, BatchCallBuilder<Req, Resp>> callQueues; |
如果对象存在,则执行提交;如果对象不存在,则失败。
find的过程
这里find的实现是在SubCallScheduler类中,这里涉及了一些key的操作我暂时看不懂,本质上就是找到订阅消息的唯一标记,然后通过这个标记拿到BatchCallBuilder
builderOpt.get().submit攒批
1 | CompletableFuture<Resp> submit(Req request) { |
这里其实包了一层,调用的是BatchCallBuilder的submit,然后调用delegate.add,这里的delegate就是SubCallScheduler。add的操作也就是把消息插入BatchSubRequestBuilder这个对象容器中(方便后面进行批量执行)。值得一提的是,这里用的不是通常的ConcurrentHashMap,而是无锁的NonBlockingHashMap。
1 | // key: subInfoKeyUtf8, subKey: topicFilter |
add之后执行批量请求
在积累了足够多的请求后,会提交到preflightBatches队列中去
1 | offered = preflightBatches.offer(currentBatchRef); |
无论是否足够多,都会执行trigger方法。trigger方法用calling CAS来保证原子,经典手段了:
1 | private final AtomicBoolean calling = new AtomicBoolean(); |
然后去preflightBatches拉MonitoredBatchCall请求容器对象来执行(当然如果此时还没offer进去,这里也就拉不到)。执行方式是调用execute,这里也就是调用SubCallScheduler的execute。
1 | CompletableFuture<Void> task = batchCall.execute(); |
做的事情就是创建一个UpdateRequest,把前面的addTopicFilter
、insertMatchRecord
等等所有SubCall消息都放进去。
1 | UpdateRequest.Builder reqBuilder = UpdateRequest.newBuilder().setReqId(System.nanoTime()); |
最后会被kvStoreClient执行,:
1 | kvStoreClient.execute(range.leader, KVRangeRWRequest.newBuilder() |
执行订阅
这后面嵌套KV组件很深,我推测是会走到这:DistWorkerCoProc#mutate
1 | public Supplier<ByteString> mutate(ByteString input, IKVReader reader, IKVWriter writer) { |
然后batchUpdate -> addTopicFilter 写入KV组件,然后routeCache::invalidate/touch触发缓存重建,让订阅更新。
订阅缓存和订阅树
订阅缓存就是SubscriptionCache,每次重新加载时会去执行一次match操作。match这块还没太看懂,我不确定是订阅时触发match,还是发布时触发match。
订阅树没有特别的地方,就是TreeNode+Matcher的经典设计。不过这里会涉及KV的start、end指针,需要时间看。订阅树的结构都在bifromq-dist/bifromq-dist-rpc-definition
总结
- 代码没有用Spring,代价就是需要写容器、写配置、构造方法注入的参数非常之多,有点影响可读性。好处是不依赖框架容易集成,也不受制于框架bug或者框架更新之类。
- Java用的是17版,这个好评。1.8马上就被淘汰了,11只是个过渡提升不大,不如一口气17,以后可以用很久。
- RPC通信用的grpc-proto3,我可以学习一些用法,看看百度会怎么用。
- 大量使用了CompletableFuture,这让我学到了很多。代码逻辑会很清晰,执行效率更高
- KV设计是百度IoT的核心价值,代码很值得学习
- 有个想吐槽的地方是,里面有大量拼写错误,比如
TrieNode
的sortecChildList
实际想写sortedChildList
,看得出来开源得比较匆忙……
百度开源MqttBroker:BifroMQ分析