2023.07.17收到了百度开源IoT Broker的好消息,一直没有时间看。最近工作闲下来了,准备分析一下它的实现。
2019年开发Broker时,调研了很多实现方式,百度IoT是我特别想参考的实现,只可惜当时并没有开源代码可以参考,通过对实现方式的猜测,总结了这篇文章:《百度IoT:MQTT Broker架构设计》,不过现在开源版本改动了很多东西。
我们以BifroMQ v1.0.2版本来分析。

代码整体分析
base-cluster:集群通信 2019版百度用了Akka Cluster来做集群管理,但是开源版BifroMQ是自己重写了一套集群管理,用Gossip来做集群共识(Redis Cluster也用的Gossip),传输层支持TCP和UDP,底层是用Netty实现的。这么做的原因不太清楚,不过我也是开始也是受到EMQ的影响,认为Java的话Akka来做Broker集群通信会很方便。后面浅学了一下Akka,发现还是有很多弊端。或许百度也是如此放弃了Akka。
base-crdt、base-hlc:不清楚 看不懂这个包做的事情,也没有任何注释,crdt、hlc也都是缩写不能从英文去推测……
base-env:管理线程池 这里不清楚为什么线程池叫env
base-hookloader:容器 百度没有用Spring,这里做了容器
base-kv:核心组件,复杂的kv引擎 这块值得我后面仔细学习,这是我最想学的百度IoT Broker的部分
base-rpc:自建RPC通信
base-scheduler:基于caffeine做的调度器
bifromq-dist:订阅相关代码
bifromq-inbox:接受消息
bifromq-mqtt:mqttBroker服务器 这里你可以看到常见Broker的基本协议实现。目前只实现了v3版本的mqtt协议解析,支持mqtt over websocket,底层框架也是Netty4。看官网文档说后面会支持MQTT5,Netty的新版本已经支持MQTT5了,到时候可以直接用Netty的decoder/encoder。不过MQTT5的实现难点不在于协议解析,而是共享订阅这种功能。
bifromq-plugin:插件 提供入鉴权、事件、桥接等插件
bifromq-retain:处理保留消息
bifromq-session-dict:session管理 session的管理也是个难点
bifromq-sysprops:默认值配置 类似自己实现了spring的application.properties
build:启动相关
订阅浅析
之前的文章《百度IoT:MQTT Broker架构设计》提到过,对订阅过程很感兴趣。这里分析一下。
入口:订阅消息解析
订阅MQTT消息进入bifromq-mqtt/bifromq-mqtt-server的MQTT3SessionHandler类的doSubscribe方法处理,对每个订阅执行doSubscribe,有2个实现类:
- MQTT3PersistentSessionHandler
- MQTT3TransientSessionHandler
很明显,就是之前提到过的百度IoT对持久化session和非持久化session的处理。
我们只看持久化的,最终会调用bifromq-dist/bifromq-dist-client的DistClient类的sub方法执行订阅。里面的invoke方法会调用bifromq-dist/bifromq-dist-server的DistService的sub方法来接收处理。
DistService#sub方法
整体流程
- 把SubRequest请求转换为SubCall.AddTopicFilter,然后用subCallScheduler.schedule执行,名字含义是增加订阅
- 如果是普通主题,就把SubRequest请求转换为SubCall.InsertMatchRecord,然后用subCallScheduler.schedule执行,名字含义是插入匹配记录
- 否则(如果是共享订阅$share),就把SubRequest请求转换为SubCall.JoinMatchGroup,然后用subCallScheduler.schedule执行,名字含义是加入匹配组。看起来这里的代码是想为MQTT5的共享订阅做准备。
最后用CompletableFuture.allOf同步结果:
- 如果插入主题过滤器失败,则失败
- 如果是插入匹配记录或者加入匹配组,返回订阅QoS等级
AddTopicFilter的过程
java
public CompletableFuture<Resp> schedule(Req request) {
return reqScheduler.submit(request)
.thenCompose(req -> {
Optional<BatchCallBuilder<Req, Resp>> builderOpt = find(req).map(callQueues::get);
if (builderOpt.isPresent()) {
return builderOpt.get().submit(req);
} else {
return CompletableFuture.failedFuture(DropException.BATCH_NOT_AVAILABLE);
}
});
}以AddTopicFilter为例,subCallScheduler本质是一个BatchCallScheduler批量提交调度器,所以这个submit实际上用的是ICallScheduler接口的default实现,即创建一个完成的CompletableFuture:
java
default CompletableFuture<Req> submit(Req request) {
return CompletableFuture.completedFuture(request);
}然后会thenCompose,表示完成后执行,会调用BatchCallScheduler的find拿到BatchCallBuilderKey,然后用key去callQueues里面拿到BatchCallBuilder<Req, Resp>订阅请求对象:
java
private final LoadingCache<BatchCallBuilderKey, BatchCallBuilder<Req, Resp>> callQueues;如果对象存在,则执行提交;如果对象不存在,则失败。
find的过程
这里find的实现是在SubCallScheduler类中,这里涉及了一些key的操作我暂时看不懂,本质上就是找到订阅消息的唯一标记,然后通过这个标记拿到BatchCallBuilder
builderOpt.get().submit攒批
java
CompletableFuture<Resp> submit(Req request) {
long stamp = rwLock.readLock();
try {
return currentBatchRef.add(request);
} finally {
rwLock.unlockRead(stamp);
if (currentBatchRef.isEnough()) {
boolean offered = false;
stamp = rwLock.writeLock();
try {
if (currentBatchRef.isEnough()) {
offered = preflightBatches.offer(currentBatchRef);
assert offered;
MonitoredBatchCall newCall = reusableBatches.poll();
if (newCall == null) {
newCall = new MonitoredBatchCall();
}
currentBatchRef = newCall;
}
} finally {
rwLock.unlockWrite(stamp);
if (offered) {
meter.queueDepthSummary.record(preflightBatches.size());
}
}
}
trigger();
}
}这里其实包了一层,调用的是BatchCallBuilder的submit,然后调用delegate.add,这里的delegate就是SubCallScheduler。add的操作也就是把消息插入BatchSubRequestBuilder这个对象容器中(方便后面进行批量执行)。值得一提的是,这里用的不是通常的ConcurrentHashMap,而是无锁的NonBlockingHashMap。
java
// key: subInfoKeyUtf8, subKey: topicFilter
private final Map<String, Map<String, QoS>> addTopicFilter = new NonBlockingHashMap<>();add之后执行批量请求
在积累了足够多的请求后,会提交到preflightBatches队列中去
offered = preflightBatches.offer(currentBatchRef);无论是否足够多,都会执行trigger方法。trigger方法用calling CAS来保证原子,经典手段了:
private final AtomicBoolean calling = new AtomicBoolean();
if (calling.compareAndSet(false, true)) {}然后去preflightBatches拉MonitoredBatchCall请求容器对象来执行(当然如果此时还没offer进去,这里也就拉不到)。执行方式是调用execute,这里也就是调用SubCallScheduler的execute。
java
CompletableFuture<Void> task = batchCall.execute();做的事情就是创建一个UpdateRequest,把前面的addTopicFilter、insertMatchRecord等等所有SubCall消息都放进去。
java
UpdateRequest.Builder reqBuilder = UpdateRequest.newBuilder().setReqId(System.nanoTime());
if (!addTopicFilter.isEmpty()) {
reqBuilder.setAddTopicFilter(com.baidu.bifromq.dist.rpc.proto.AddTopicFilter.newBuilder()
.putAllTopicFilter(Maps.transformValues(addTopicFilter,
e -> InboxSubInfo.newBuilder().putAllTopicFilters(e).build()))
.build());
}最后会被kvStoreClient执行,:
java
kvStoreClient.execute(range.leader, KVRangeRWRequest.newBuilder()
.setReqId(request.getReqId())
.setVer(range.ver)
.setKvRangeId(range.id)
.setRwCoProc(DistServiceRWCoProcInput
.newBuilder()
.setUpdateRequest(request)
.build()
.toByteString())
.build())
.thenApply……执行订阅
这后面嵌套KV组件很深,我推测是会走到这:DistWorkerCoProc#mutate
java
public Supplier<ByteString> mutate(ByteString input, IKVReader reader, IKVWriter writer) {
CodedInputStream cis = input.newCodedInput();
cis.enableAliasing(true);
DistServiceRWCoProcInput coProcInput = DistServiceRWCoProcInput.parseFrom(cis);
log.trace("Receive rw co-proc request\n{}", coProcInput);
UpdateRequest request = coProcInput.getUpdateRequest();
Set<String> touchedTenants = Sets.newHashSet();
Set<ScopedTopic> touchedTopics = Sets.newHashSet();
ByteString output = DistServiceRWCoProcOutput.newBuilder()
.setUpdateReply(batchUpdate(request, reader, writer, touchedTenants, touchedTopics))
.build().toByteString();
return () -> {
touchedTopics.forEach(routeCache::invalidate);
touchedTenants.forEach(routeCache::touch);
return output;
};
}然后batchUpdate -> addTopicFilter 写入KV组件,然后routeCache::invalidate/touch触发缓存重建,让订阅更新。
订阅缓存和订阅树
订阅缓存就是SubscriptionCache,每次重新加载时会去执行一次match操作。match这块还没太看懂,我不确定是订阅时触发match,还是发布时触发match。
订阅树没有特别的地方,就是TreeNode+Matcher的经典设计。不过这里会涉及KV的start、end指针,需要时间看。订阅树的结构都在bifromq-dist/bifromq-dist-rpc-definition
总结
- 代码没有用Spring,代价就是需要写容器、写配置、构造方法注入的参数非常之多,有点影响可读性。好处是不依赖框架容易集成,也不受制于框架bug或者框架更新之类。
- Java用的是17版,这个好评。1.8马上就被淘汰了,11只是个过渡提升不大,不如一口气17,以后可以用很久。
- RPC通信用的grpc-proto3,我可以学习一些用法,看看百度会怎么用。
- 大量使用了CompletableFuture,这让我学到了很多。代码逻辑会很清晰,执行效率更高
- KV设计是百度IoT的核心价值,代码很值得学习
- 有个想吐槽的地方是,里面有大量拼写错误,比如
TrieNode的sortecChildList实际想写sortedChildList,看得出来开源得比较匆忙……



粤公网安备44030602007943号