Skip to content

MqttWk源码分析(二):BrokerServer

前言

基于MqttWk v1.0.7。

BrokerServer是MqttWk从MainLaucher启动之后执行的第一个类。

start()

java
public void start() throws Exception {
    LOGGER.info("Initializing {} MQTT Broker ...", "[" + brokerProperties.getId() + "]");
    channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    channelIdMap = new HashMap<>();
    bossGroup = brokerProperties.getUseEpoll() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
    workerGroup = brokerProperties.getUseEpoll() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
    if (brokerProperties.getSslEnabled()) {
        KeyStore keyStore = KeyStore.getInstance("PKCS12");
        InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream("keystore/server.pfx");
        keyStore.load(inputStream, brokerProperties.getSslPassword().toCharArray());
        KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
        kmf.init(keyStore, brokerProperties.getSslPassword().toCharArray());
        sslContext = SslContextBuilder.forServer(kmf).build();
    }
    mqttServer();
    if (brokerProperties.getWebsocketEnabled()) {
        websocketServer();
        LOGGER.info("MQTT Broker {} is up and running. Open Port: {} WebSocketPort: {}", "[" + brokerProperties.getId() + "]", brokerProperties.getPort(), brokerProperties.getWebsocketPort());
    } else {
        LOGGER.info("MQTT Broker {} is up and running. Open Port: {} ", "[" + brokerProperties.getId() + "]", brokerProperties.getPort());
    }
}

1)ChannelGroup和ChannelIdMap是Bean,在全局任意位置可以注入,这里是第一次初始化。

2)bossGroup和workerGroup是netty的基本概念,Windows上使用Poll,Linux上可以开启Epoll。

3)然后根据具体的情况选择是否需要开启SSL,如果需要,则去构建SSL上下文结构。PKCS12是密钥文件类型,SunX509是最基础的JDK方式,兼容性高,速度较慢;你可以自己编写openSSL的方式。

4)mqttServer是TCP方式,websocketServer是websocket方式,两种都支持SSL,但不加密和加密是不能共存的。举个例子,开启了TCP+SSL,则不能直接通过TCP访问了,这一块不是太方便,可以自己修改成4种方式同时支持。

IoCBean

java
@IocBean(name = "channelGroup")
public ChannelGroup getChannels() {
    return this.channelGroup;
}

@IocBean(name = "channelIdMap")
public Map<String, ChannelId> getChannelIdMap() {
    return this.channelIdMap;
}

这是nutzboot的依赖注入方式,和Spring的Bean一样地去使用就好了。注入采用注解@Inject,和@Autowired/@Resources一样的效果。

mqttServer()

java
private void mqttServer() throws Exception {
    ServerBootstrap sb = new ServerBootstrap();
    sb.group(bossGroup, workerGroup)
            .channel(brokerProperties.getUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
            // handler在初始化时就会执行
            .handler(new LoggingHandler(LogLevel.INFO))
            // childHandler会在客户端成功connect后才执行
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline channelPipeline = socketChannel.pipeline();
                    // Netty提供的心跳检测
                    channelPipeline.addFirst("idle", new IdleStateHandler(0, 0, brokerProperties.getKeepAlive()));
                    // Netty提供的SSL处理
                    if (brokerProperties.getSslEnabled()) {
                        SSLEngine sslEngine = sslContext.newEngine(socketChannel.alloc());
                        sslEngine.setUseClientMode(false);        // 服务端模式
                        sslEngine.setNeedClientAuth(false);        // 不需要验证客户端
                        channelPipeline.addLast("ssl", new SslHandler(sslEngine));
                    }
                    channelPipeline.addLast("decoder", new MqttDecoder());
                    channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
                    channelPipeline.addLast("broker", ioc.get(BrokerHandler.class));
                }
            })
            .option(ChannelOption.SO_BACKLOG, brokerProperties.getSoBacklog())
            .childOption(ChannelOption.SO_KEEPALIVE, brokerProperties.getSoKeepAlive());
    if (Strings.isNotBlank(brokerProperties.getHost())) {
        channel = sb.bind(brokerProperties.getHost(), brokerProperties.getPort()).sync().channel();
    } else {
        channel = sb.bind(brokerProperties.getPort()).sync().channel();
    }
}

Netty的连接管理就是去配置handler,handler是一条链,所有的接入Netty发送的包都会经过整条链的处理。

加在最后的的是核心BrokerHandler,其他的都是一些TCP参数设置等等。

SSLEngine是开启SSL的,ClientAuth是表明需不需要双向认证,一般都不需要,然后设置成服务器模式即可。

最后通过bind方法绑定服务器域名和端口,就能够对外提供连接服务了。

websocketServer()

java
private void websocketServer() throws Exception {
    ServerBootstrap sb = new ServerBootstrap();
    sb.group(bossGroup, workerGroup)
            .channel(brokerProperties.getUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
            // handler在初始化时就会执行
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline channelPipeline = socketChannel.pipeline();
                    // Netty提供的心跳检测
                    channelPipeline.addFirst("idle", new IdleStateHandler(0, 0, brokerProperties.getKeepAlive()));
                    // Netty提供的SSL处理
                    if (brokerProperties.getSslEnabled()) {
                        SSLEngine sslEngine = sslContext.newEngine(socketChannel.alloc());
                        sslEngine.setUseClientMode(false);        // 服务端模式
                        sslEngine.setNeedClientAuth(false);        // 不需要验证客户端
                        channelPipeline.addLast("ssl", new SslHandler(sslEngine));
                    }
                    // 将请求和应答消息编码或解码为HTTP消息
                    channelPipeline.addLast("http-codec", new HttpServerCodec());
                    // 将HTTP消息的多个部分合成一条完整的HTTP消息
                    channelPipeline.addLast("aggregator", new HttpObjectAggregator(1048576));
                    // 将HTTP消息进行压缩编码
                    channelPipeline.addLast("compressor ", new HttpContentCompressor());
                    channelPipeline.addLast("protocol", new WebSocketServerProtocolHandler(brokerProperties.getWebsocketPath(), "mqtt,mqttv3.1,mqttv3.1.1", true, 65536));
                    channelPipeline.addLast("mqttWebSocket", new MqttWebSocketCodec());
                    channelPipeline.addLast("decoder", new MqttDecoder());
                    channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
                    channelPipeline.addLast("broker", ioc.get(BrokerHandler.class));
                }
            })
            .option(ChannelOption.SO_BACKLOG, brokerProperties.getSoBacklog())
            .childOption(ChannelOption.SO_KEEPALIVE, brokerProperties.getSoKeepAlive());
    if (Strings.isNotBlank(brokerProperties.getHost())) {
        websocketChannel = sb.bind(brokerProperties.getHost(), brokerProperties.getWebsocketPort()).sync().channel();
    } else {
        websocketChannel = sb.bind(brokerProperties.getWebsocketPort()).sync().channel();
    }
}

websocket多了一些配置,主要是需要从Websocket转到netty的MQTT数据结构,其余都是重复代码。

stop()

java
public void stop() {
    LOGGER.info("Shutdown {} MQTT Broker ...", "[" + brokerProperties.getId() + "]");
    channelGroup = null;
    channelIdMap = null;
    bossGroup.shutdownGracefully();
    bossGroup = null;
    workerGroup.shutdownGracefully();
    workerGroup = null;
    channel.closeFuture().syncUninterruptibly();
    channel = null;
    websocketChannel.closeFuture().syncUninterruptibly();
    websocketChannel = null;
    LOGGER.info("MQTT Broker {} shutdown finish.", "[" + brokerProperties.getId() + "]");
}

标准的关闭方法,会在服务器停止时被调用。通过这些BrokerServer的代码就能够直到代码写得有多清晰简洁了。

转载请注明出处https://bananaoven.com/articles/249.html | 香蕉微波炉
分享许可方式知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议
重大发现:转载注明原文网址的同学刚买了彩票就中奖,刚写完代码就跑通,刚转身就遇到了真爱。