百度开源MqttBroker:BifroMQ分析

2023.07.17收到了百度开源IoT Broker的好消息,一直没有时间看。最近工作闲下来了,准备分析一下它的实现。

2019年开发Broker时,调研了很多实现方式,百度IoT是我特别想参考的实现,只可惜当时并没有开源代码可以参考,通过对实现方式的猜测,总结了这篇文章:《百度IoT:MQTT Broker架构设计》,不过现在开源版本改动了很多东西。

我们以BifroMQ v1.0.2版本来分析。

baidu.png

代码整体分析

  1. base-cluster:集群通信
    2019版百度用了Akka Cluster来做集群管理,但是开源版BifroMQ是自己重写了一套集群管理,用Gossip来做集群共识(Redis Cluster也用的Gossip),传输层支持TCP和UDP,底层是用Netty实现的。这么做的原因不太清楚,不过我也是开始也是受到EMQ的影响,认为Java的话Akka来做Broker集群通信会很方便。后面浅学了一下Akka,发现还是有很多弊端。或许百度也是如此放弃了Akka。

  2. base-crdt、base-hlc:不清楚
    看不懂这个包做的事情,也没有任何注释,crdt、hlc也都是缩写不能从英文去推测……

  3. base-env:管理线程池
    这里不清楚为什么线程池叫env

  4. base-hookloader:容器
    百度没有用Spring,这里做了容器

  5. base-kv:核心组件,复杂的kv引擎
    这块值得我后面仔细学习,这是我最想学的百度IoT Broker的部分

  6. base-rpc:自建RPC通信

  7. base-scheduler:基于caffeine做的调度器

  8. bifromq-dist:订阅相关代码

  9. bifromq-inbox:接受消息

  10. bifromq-mqtt:mqttBroker服务器
    这里你可以看到常见Broker的基本协议实现。目前只实现了v3版本的mqtt协议解析,支持mqtt over websocket,底层框架也是Netty4。看官网文档说后面会支持MQTT5,Netty的新版本已经支持MQTT5了,到时候可以直接用Netty的decoder/encoder。不过MQTT5的实现难点不在于协议解析,而是共享订阅这种功能。

  11. bifromq-plugin:插件
    提供入鉴权、事件、桥接等插件

  12. bifromq-retain:处理保留消息

  13. bifromq-session-dict:session管理
    session的管理也是个难点

  14. bifromq-sysprops:默认值配置
    类似自己实现了spring的application.properties

  15. build:启动相关

订阅浅析

之前的文章《百度IoT:MQTT Broker架构设计》提到过,对订阅过程很感兴趣。这里分析一下。

入口:订阅消息解析

订阅MQTT消息进入bifromq-mqtt/bifromq-mqtt-server的MQTT3SessionHandler类的doSubscribe方法处理,对每个订阅执行doSubscribe,有2个实现类:

  • MQTT3PersistentSessionHandler
  • MQTT3TransientSessionHandler

很明显,就是之前提到过的百度IoT对持久化session和非持久化session的处理。

我们只看持久化的,最终会调用bifromq-dist/bifromq-dist-client的DistClient类的sub方法执行订阅。里面的invoke方法会调用bifromq-dist/bifromq-dist-server的DistService的sub方法来接收处理。

DistService#sub方法

整体流程

  • 把SubRequest请求转换为SubCall.AddTopicFilter,然后用subCallScheduler.schedule执行,名字含义是增加订阅
  • 如果是普通主题,就把SubRequest请求转换为SubCall.InsertMatchRecord,然后用subCallScheduler.schedule执行,名字含义是插入匹配记录
  • 否则(如果是共享订阅$share),就把SubRequest请求转换为SubCall.JoinMatchGroup,然后用subCallScheduler.schedule执行,名字含义是加入匹配组。看起来这里的代码是想为MQTT5的共享订阅做准备。

最后用CompletableFuture.allOf同步结果:

  • 如果插入主题过滤器失败,则失败
  • 如果是插入匹配记录或者加入匹配组,返回订阅QoS等级

AddTopicFilter的过程

1
2
3
4
5
6
7
8
9
10
11
public CompletableFuture<Resp> schedule(Req request) {
return reqScheduler.submit(request)
.thenCompose(req -> {
Optional<BatchCallBuilder<Req, Resp>> builderOpt = find(req).map(callQueues::get);
if (builderOpt.isPresent()) {
return builderOpt.get().submit(req);
} else {
return CompletableFuture.failedFuture(DropException.BATCH_NOT_AVAILABLE);
}
});
}

以AddTopicFilter为例,subCallScheduler本质是一个BatchCallScheduler批量提交调度器,所以这个submit实际上用的是ICallScheduler接口的default实现,即创建一个完成的CompletableFuture:

1
2
3
default CompletableFuture<Req> submit(Req request) {
return CompletableFuture.completedFuture(request);
}

然后会thenCompose,表示完成后执行,会调用BatchCallScheduler的find拿到BatchCallBuilderKey,然后用key去callQueues里面拿到BatchCallBuilder<Req, Resp>订阅请求对象:

1
private final LoadingCache<BatchCallBuilderKey, BatchCallBuilder<Req, Resp>> callQueues;

如果对象存在,则执行提交;如果对象不存在,则失败。

find的过程

这里find的实现是在SubCallScheduler类中,这里涉及了一些key的操作我暂时看不懂,本质上就是找到订阅消息的唯一标记,然后通过这个标记拿到BatchCallBuilder

builderOpt.get().submit攒批

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
CompletableFuture<Resp> submit(Req request) {
long stamp = rwLock.readLock();
try {
return currentBatchRef.add(request);
} finally {
rwLock.unlockRead(stamp);
if (currentBatchRef.isEnough()) {
boolean offered = false;
stamp = rwLock.writeLock();
try {
if (currentBatchRef.isEnough()) {
offered = preflightBatches.offer(currentBatchRef);
assert offered;
MonitoredBatchCall newCall = reusableBatches.poll();
if (newCall == null) {
newCall = new MonitoredBatchCall();
}
currentBatchRef = newCall;
}
} finally {
rwLock.unlockWrite(stamp);
if (offered) {
meter.queueDepthSummary.record(preflightBatches.size());
}
}
}
trigger();
}
}

这里其实包了一层,调用的是BatchCallBuilder的submit,然后调用delegate.add,这里的delegate就是SubCallScheduler。add的操作也就是把消息插入BatchSubRequestBuilder这个对象容器中(方便后面进行批量执行)。值得一提的是,这里用的不是通常的ConcurrentHashMap,而是无锁的NonBlockingHashMap。

1
2
// key: subInfoKeyUtf8, subKey: topicFilter
private final Map<String, Map<String, QoS>> addTopicFilter = new NonBlockingHashMap<>();

add之后执行批量请求

在积累了足够多的请求后,会提交到preflightBatches队列中去

1
offered = preflightBatches.offer(currentBatchRef);

无论是否足够多,都会执行trigger方法。trigger方法用calling CAS来保证原子,经典手段了:

1
2
private final AtomicBoolean calling = new AtomicBoolean();
if (calling.compareAndSet(false, true)) {}

然后去preflightBatches拉MonitoredBatchCall请求容器对象来执行(当然如果此时还没offer进去,这里也就拉不到)。执行方式是调用execute,这里也就是调用SubCallScheduler的execute。

1
CompletableFuture<Void> task = batchCall.execute();

做的事情就是创建一个UpdateRequest,把前面的addTopicFilterinsertMatchRecord等等所有SubCall消息都放进去。

1
2
3
4
5
6
7
UpdateRequest.Builder reqBuilder = UpdateRequest.newBuilder().setReqId(System.nanoTime());
if (!addTopicFilter.isEmpty()) {
reqBuilder.setAddTopicFilter(com.baidu.bifromq.dist.rpc.proto.AddTopicFilter.newBuilder()
.putAllTopicFilter(Maps.transformValues(addTopicFilter,
e -> InboxSubInfo.newBuilder().putAllTopicFilters(e).build()))
.build());
}

最后会被kvStoreClient执行,:

1
2
3
4
5
6
7
8
9
10
11
kvStoreClient.execute(range.leader, KVRangeRWRequest.newBuilder()
.setReqId(request.getReqId())
.setVer(range.ver)
.setKvRangeId(range.id)
.setRwCoProc(DistServiceRWCoProcInput
.newBuilder()
.setUpdateRequest(request)
.build()
.toByteString())
.build())
.thenApply……

执行订阅

这后面嵌套KV组件很深,我推测是会走到这:DistWorkerCoProc#mutate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public Supplier<ByteString> mutate(ByteString input, IKVReader reader, IKVWriter writer) {
CodedInputStream cis = input.newCodedInput();
cis.enableAliasing(true);
DistServiceRWCoProcInput coProcInput = DistServiceRWCoProcInput.parseFrom(cis);
log.trace("Receive rw co-proc request\n{}", coProcInput);
UpdateRequest request = coProcInput.getUpdateRequest();
Set<String> touchedTenants = Sets.newHashSet();
Set<ScopedTopic> touchedTopics = Sets.newHashSet();
ByteString output = DistServiceRWCoProcOutput.newBuilder()
.setUpdateReply(batchUpdate(request, reader, writer, touchedTenants, touchedTopics))
.build().toByteString();
return () -> {
touchedTopics.forEach(routeCache::invalidate);
touchedTenants.forEach(routeCache::touch);
return output;
};
}

然后batchUpdate -> addTopicFilter 写入KV组件,然后routeCache::invalidate/touch触发缓存重建,让订阅更新。

订阅缓存和订阅树

订阅缓存就是SubscriptionCache,每次重新加载时会去执行一次match操作。match这块还没太看懂,我不确定是订阅时触发match,还是发布时触发match。

订阅树没有特别的地方,就是TreeNode+Matcher的经典设计。不过这里会涉及KV的start、end指针,需要时间看。订阅树的结构都在bifromq-dist/bifromq-dist-rpc-definition

总结

  • 代码没有用Spring,代价就是需要写容器、写配置、构造方法注入的参数非常之多,有点影响可读性。好处是不依赖框架容易集成,也不受制于框架bug或者框架更新之类。
  • Java用的是17版,这个好评。1.8马上就被淘汰了,11只是个过渡提升不大,不如一口气17,以后可以用很久。
  • RPC通信用的grpc-proto3,我可以学习一些用法,看看百度会怎么用。
  • 大量使用了CompletableFuture,这让我学到了很多。代码逻辑会很清晰,执行效率更高
  • KV设计是百度IoT的核心价值,代码很值得学习
  • 有个想吐槽的地方是,里面有大量拼写错误,比如TrieNodesortecChildList实际想写sortedChildList,看得出来开源得比较匆忙……

百度开源MqttBroker:BifroMQ分析

https://www.bananaoven.com/posts/47509/

作者

香蕉微波炉

发布于

2023-08-29

更新于

2023-08-29

许可协议