前言
基于MqttWk v1.0.7。
BrokerServer是MqttWk从MainLaucher启动之后执行的第一个类。
start()
java
public void start() throws Exception {
LOGGER.info("Initializing {} MQTT Broker ...", "[" + brokerProperties.getId() + "]");
channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
channelIdMap = new HashMap<>();
bossGroup = brokerProperties.getUseEpoll() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
workerGroup = brokerProperties.getUseEpoll() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
if (brokerProperties.getSslEnabled()) {
KeyStore keyStore = KeyStore.getInstance("PKCS12");
InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream("keystore/server.pfx");
keyStore.load(inputStream, brokerProperties.getSslPassword().toCharArray());
KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
kmf.init(keyStore, brokerProperties.getSslPassword().toCharArray());
sslContext = SslContextBuilder.forServer(kmf).build();
}
mqttServer();
if (brokerProperties.getWebsocketEnabled()) {
websocketServer();
LOGGER.info("MQTT Broker {} is up and running. Open Port: {} WebSocketPort: {}", "[" + brokerProperties.getId() + "]", brokerProperties.getPort(), brokerProperties.getWebsocketPort());
} else {
LOGGER.info("MQTT Broker {} is up and running. Open Port: {} ", "[" + brokerProperties.getId() + "]", brokerProperties.getPort());
}
}1)ChannelGroup和ChannelIdMap是Bean,在全局任意位置可以注入,这里是第一次初始化。
2)bossGroup和workerGroup是netty的基本概念,Windows上使用Poll,Linux上可以开启Epoll。
3)然后根据具体的情况选择是否需要开启SSL,如果需要,则去构建SSL上下文结构。PKCS12是密钥文件类型,SunX509是最基础的JDK方式,兼容性高,速度较慢;你可以自己编写openSSL的方式。
4)mqttServer是TCP方式,websocketServer是websocket方式,两种都支持SSL,但不加密和加密是不能共存的。举个例子,开启了TCP+SSL,则不能直接通过TCP访问了,这一块不是太方便,可以自己修改成4种方式同时支持。
IoCBean
java
@IocBean(name = "channelGroup")
public ChannelGroup getChannels() {
return this.channelGroup;
}
@IocBean(name = "channelIdMap")
public Map<String, ChannelId> getChannelIdMap() {
return this.channelIdMap;
}这是nutzboot的依赖注入方式,和Spring的Bean一样地去使用就好了。注入采用注解@Inject,和@Autowired/@Resources一样的效果。
mqttServer()
java
private void mqttServer() throws Exception {
ServerBootstrap sb = new ServerBootstrap();
sb.group(bossGroup, workerGroup)
.channel(brokerProperties.getUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
// handler在初始化时就会执行
.handler(new LoggingHandler(LogLevel.INFO))
// childHandler会在客户端成功connect后才执行
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
// Netty提供的心跳检测
channelPipeline.addFirst("idle", new IdleStateHandler(0, 0, brokerProperties.getKeepAlive()));
// Netty提供的SSL处理
if (brokerProperties.getSslEnabled()) {
SSLEngine sslEngine = sslContext.newEngine(socketChannel.alloc());
sslEngine.setUseClientMode(false); // 服务端模式
sslEngine.setNeedClientAuth(false); // 不需要验证客户端
channelPipeline.addLast("ssl", new SslHandler(sslEngine));
}
channelPipeline.addLast("decoder", new MqttDecoder());
channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
channelPipeline.addLast("broker", ioc.get(BrokerHandler.class));
}
})
.option(ChannelOption.SO_BACKLOG, brokerProperties.getSoBacklog())
.childOption(ChannelOption.SO_KEEPALIVE, brokerProperties.getSoKeepAlive());
if (Strings.isNotBlank(brokerProperties.getHost())) {
channel = sb.bind(brokerProperties.getHost(), brokerProperties.getPort()).sync().channel();
} else {
channel = sb.bind(brokerProperties.getPort()).sync().channel();
}
}Netty的连接管理就是去配置handler,handler是一条链,所有的接入Netty发送的包都会经过整条链的处理。
加在最后的的是核心BrokerHandler,其他的都是一些TCP参数设置等等。
SSLEngine是开启SSL的,ClientAuth是表明需不需要双向认证,一般都不需要,然后设置成服务器模式即可。
最后通过bind方法绑定服务器域名和端口,就能够对外提供连接服务了。
websocketServer()
java
private void websocketServer() throws Exception {
ServerBootstrap sb = new ServerBootstrap();
sb.group(bossGroup, workerGroup)
.channel(brokerProperties.getUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
// handler在初始化时就会执行
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
// Netty提供的心跳检测
channelPipeline.addFirst("idle", new IdleStateHandler(0, 0, brokerProperties.getKeepAlive()));
// Netty提供的SSL处理
if (brokerProperties.getSslEnabled()) {
SSLEngine sslEngine = sslContext.newEngine(socketChannel.alloc());
sslEngine.setUseClientMode(false); // 服务端模式
sslEngine.setNeedClientAuth(false); // 不需要验证客户端
channelPipeline.addLast("ssl", new SslHandler(sslEngine));
}
// 将请求和应答消息编码或解码为HTTP消息
channelPipeline.addLast("http-codec", new HttpServerCodec());
// 将HTTP消息的多个部分合成一条完整的HTTP消息
channelPipeline.addLast("aggregator", new HttpObjectAggregator(1048576));
// 将HTTP消息进行压缩编码
channelPipeline.addLast("compressor ", new HttpContentCompressor());
channelPipeline.addLast("protocol", new WebSocketServerProtocolHandler(brokerProperties.getWebsocketPath(), "mqtt,mqttv3.1,mqttv3.1.1", true, 65536));
channelPipeline.addLast("mqttWebSocket", new MqttWebSocketCodec());
channelPipeline.addLast("decoder", new MqttDecoder());
channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
channelPipeline.addLast("broker", ioc.get(BrokerHandler.class));
}
})
.option(ChannelOption.SO_BACKLOG, brokerProperties.getSoBacklog())
.childOption(ChannelOption.SO_KEEPALIVE, brokerProperties.getSoKeepAlive());
if (Strings.isNotBlank(brokerProperties.getHost())) {
websocketChannel = sb.bind(brokerProperties.getHost(), brokerProperties.getWebsocketPort()).sync().channel();
} else {
websocketChannel = sb.bind(brokerProperties.getWebsocketPort()).sync().channel();
}
}websocket多了一些配置,主要是需要从Websocket转到netty的MQTT数据结构,其余都是重复代码。
stop()
java
public void stop() {
LOGGER.info("Shutdown {} MQTT Broker ...", "[" + brokerProperties.getId() + "]");
channelGroup = null;
channelIdMap = null;
bossGroup.shutdownGracefully();
bossGroup = null;
workerGroup.shutdownGracefully();
workerGroup = null;
channel.closeFuture().syncUninterruptibly();
channel = null;
websocketChannel.closeFuture().syncUninterruptibly();
websocketChannel = null;
LOGGER.info("MQTT Broker {} shutdown finish.", "[" + brokerProperties.getId() + "]");
}标准的关闭方法,会在服务器停止时被调用。通过这些BrokerServer的代码就能够直到代码写得有多清晰简洁了。



粤公网安备44030602007943号