一、背景
分布式压测1台主控机,15台从机,每台测试机1W长连接客户端,进行混合场景MQTT稳定性PubSub测试,发现服务器在跑了一天以后被重启了。
(1)排除GC问题
之前的PET测试中,发现偶尔会有长时间的Full GC,导致Service Mesh探针几次没有响应而强行重启了服务器(那个是由于OpenTracing批量kafka消费的时候没有通过try-resource来释放资源造成堆内存泄漏)。因此看了下GC,发现Full GC耗时很短,次数很少,而且堆内存并没有爆炸。
(2)发现内存增长有问题
通过观察由Prometheus提供数据、Grafana绘制的图像,发现内存以1MB/分钟的速度几乎是线性增长到8G,然后由于达到了k8s的pod限制,被强行重启了。这种不正常的现象明显就是有内存泄漏,那么只能是堆外内存泄漏了。
通过ELK查询ERROR日志,发现被Netty抽样出了一个错:
java
i.n.u.ResourceLeakDetector: LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:331)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)
io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:113)
io.netty.handler.codec.ByteToMessageDecoder.expandCumulation(ByteToMessageDecoder.java:529)
io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:89)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:241)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
********
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:648)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:583)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:500)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:748)ByteBuf在被垃圾回收器回收的时候,没有调用Release释放掉所有的引用。
二、深入分析原因
Netty的版本是4.1.31.Final。
这个时候我怀疑之前一直纠结的MqttMessage要不要回收的问题:
java
public class MqttHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//todo can check "send other packet before CONNECT packet" at here to replace "already offline" implement.
MqttMessage mqttMessage = (MqttMessage) msg;
if (mqttMessage != null && mqttMessage.decoderResult().isSuccess()) {
}
// todo check whether it will cause memory leak
// cannot release count refer because of async mqtt processor
/*
finally {
ReferenceCountUtil.release(mqttMessage);
}
*/
}
}我看到很多其他开源Broker都做了这条回收语句,但我总是认为,为什么这个不是Java GC自动回收的呢?所以就注释掉了。
另外一个原因是,很多开源Broker都是默认在Netty线程里面去同步调用业务执行,我是用了单独的业务线程池,每个MQTT报文都不在Netty线程里,如果在这里进行了释放,业务线程获得的MqttMesage的payload就会有异常。
Java NIO的Buffer
Java常用的网络I/O模型就三个:
最早的阻塞网络I/O:BIO(Blocking I/O),jdk1.4以前用的就是这个,服务端和客户端线程数1:1,同步读写,简单可靠,但很明显效率低,服务端有瓶颈。
然后出现了非阻塞网络I/O:NIO(Non-blocking I/O),由数据缓冲区Buffer、通道Channel、多路复用器Selector组成。通过Selector轮询Channel的状态是否是可读写,如果可以就进行读写。
最新的就是异步网络I/O:AIO(Asynchronous I/O),jdk7以上支持,通过注册回调去实现读写,也就是每个Channel注册一个回调方法,当有读写的时候就会自动调用回调方法去读写。
NIO有两类著名Buffer,DirectByteBuffer,HeapByteBuffer。HeapByteBuffer在堆上分配,会被GC回收。而DirectByteBuffer直接利用系统接口进行内存申请,一旦不回收,就会引起OOM。
Netty的Buffer和释放引用计数场景
同样的,Netty使用NIO模型,在io.netty.buffer包内封装了PooledByteBuf抽象类,以及主要的两个实现类PooledDirectByteBuf、PooledHeapByteBuf。和名字一样,多了池化功能,降低分配开销。
Netty通过引用计数的方式来管理ByteBuf,当数据传递到ChannelHandler了之后,则由对应消费处理链(chain)的最后一个Handler在channelRead()方法里负责释放Bytebuf的引用计数。
java
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
...
ctx.fireChannelRead(buf);
}事实上,不仅仅是Bytebuf需要这样手动释放,你如果用了编解码器,也是需要释放的,比如我们这里的:
java
MqttMessage mqttMessage = (MqttMessage) msg就是进行了编码,需要我们自己去释放。当然,你可以考虑继承io.netty.channel.SimpleChannelInboundHandler,实现一个channelRead0的方法,由这个抽象类去帮你做释放:
java
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (this.acceptInboundMessage(msg)) {
this.channelRead0(ctx, msg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (this.autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}注意我们这里并不适合这样去做!因为我们的业务线程是分离的,不能去用SimpleChannelInboundHandler在业务线程还没处理的时候把它释放掉了,所以只能用channelRead并且自己释放。
Netty没有释放引用计数会怎么样
DirectBuffer在堆上分配对象,并用long指向一块用户态的内存区域(也就是所谓的native memory),这样I/O时只会进行一次拷贝(堆外内存-内核),避免了HeapBuffer在I/O时发生2次内存拷贝(堆-堆外-内核)的开销。
由于Netty的引用计数是自己的设计,而不是JVM的设计,JVM进行GC的时候只会判断是否可达,而不会判断Netty的引用计数是否为0,如果这样的Buffer对象被回收掉,不能再回到池中去,那么这个对象指向的直接内存就无法再次使用了,也就是造成了内存泄漏。
Netty提供了一个抽样方法,默认会抽样1%的buffer去检测GC回收时是否引用计数也为0,从而判断是否有内存泄漏。一共有4个策略:
- Disabled:关闭抽样检测
- Simple:抽样1%判断是否有内存泄漏,默认会采取这个策略
- Advanced:打印泄漏buffer的详细信息
- Paranoid:检测全部Buffer,会影响性能。
所以,我们的问题就是由于没有释放引用计数造成内存泄漏了。
三、问题解决和测试检验
1、问题解决
【方法一】
首先需要解决问题,既然业务线程没有释放,那么一个方法是,发给业务线程的是一个堆上分配的Java对象,也就是把MqttMessage的数据赋值给一个new出来的对象,然后再发给java线程。然后再在channelRead里面去释放掉引用计数:
java
try {
CustomMqttMessage msg = CustomMqttMessage.builder.xxx.build();
xxxExecutor.submit(()-> xxx.process(msg));
} finally {
ReferenceCountUtil.release(mqttMessage);
}【方法二】
这样的方法不喜欢,因为多了一次复制拷贝。我们还可以利用AOP去在业务线程结束后释放掉它:
java
@Aspect
@Component
@Slf4j
public class NettyAspect {
@Pointcut("execution(public * xxxx.mqtt.processor.MqttProcessor.processRequest(..))")
public void process() {
// pointcut definition
}
@Around("process()")
public Object aroundProcess(ProceedingJoinPoint pjp) throws Throwable {
Object res = null;
MqttMessage mqttMessage = null;
Object[] args = pjp.getArgs();
try {
if (args != null && args.length > 0) {
mqttMessage = (MqttMessage) args[1];
}
res = pjp.proceed();
} finally {
if (mqttMessage != null) {
ReferenceCountUtil.release(mqttMessage);
}
}
return res;
}
}这样没有数据拷贝,同时释放掉了Netty的引用计数。
2、测试检验
我们需要一个指标去测试。Netty的“io.netty.util.internal.PlatformDependent”的
java
private static final AtomicLong DIRECT_MEMORY_COUNTER是一个不错的指标,但是它是private的,我们拿不到。
于是可以写一个反射去拿到:
java
@Slf4j
@Service
public class NettyDirectMemoryReporterImpl implements NettyDirectMemoryReporter {
private AtomicLong directMemory;
@PostConstruct
public void init() {
Field field = ReflectionUtils.findField(PlatformDependent.class, "DIRECT_MEMORY_COUNTER");
if (field != null) {
try {
field.setAccessible(true);
directMemory = (AtomicLong) field.get(PlatformDependent.class);
log.info("netty direct memory will be metric.");
} catch (Exception e){
log.error("cannot get direct memory!");
} finally {
field.setAccessible(false);
}
} else {
log.error("cannot reflect PlatformDependent class!");
}
}
@Override
public long getNettyDirectMemory() {
return directMemory.longValue();
}
}在这里先设置了Accessible为true,但非常不安全,于是拿到地址后赋给本地private对象,再把Accessible设置回false。
然后提供一个public方法给外部调用,外部只能获取,不能修改,避免外部直接修改这个值造成Netty逻辑异常。
最后用prometheus暴露出去
java
@Bean
public Gauge nettyDirectMemoryCounterGauge(PrometheusMeterRegistry registry, NettyDirectMemoryReporter nettyDirectMemoryReporter) {
Gauge gauge = Gauge.builder(METRIC_NETTY_DIRECT_MEMORY, nettyDirectMemoryReporter, NettyDirectMemoryReporter::getNettyDirectMemory)
.tags(TAG_METRIC_TYPE, REALTIME)
.description("netty direct memory count")
.register(registry);
return gauge;
}可以通过
shell
curl http://localhost:8080/actuator/metrics/{监测变量名}观察metric数据输出是否正常。
把它部署到PET性能测试环境继续跑,再次对比观察。
(1)DIRECT_MEMORY_COUNTER
旧的一直往上涨到100+MB(图上没有),新的从启动开始一直保持70MB(图中的2台机器),没有增长:

(2)内存变化
新的内存不再线性增长,而是平稳增长(因为堆分配和热点代码),甚至还有下降(因为GC):

跑了一天之后也没有掉线和报错了。



粤公网安备44030602007943号