netty 4.x
rocketmq NettyServer#doOpen 添加 ChannelHandler 时,使用了 ChannelPipeline#addLast(EventExecutorGroup, ChannelHandler...) 方法,这个方法在每个 ChannelHandler 执行时都会使用 EventExecutorGroup
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// 就是这个 addLast 方法
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()
);
}
});
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 拿到当前 ChannelHandler 指定的线程池
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
// 因为这个线程池不是 eventLoopGroupSelector 的,所以会走这边
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
然后执行完这个方法就返回了,所以又会去执行 NioByteUnsafe#read 里面的 do while,重新执行上面的 1.2.3.4。然后又去执行 ByteToMessageDecoder#channelRead。 假设第一次 ByteToMessageDecoder#channelRead 还没执行,第二次 ByteToMessageDecoder#channelRead 也有可能会先执行了吧?
第二行复制错了是 rocketmq NettyRemotingServer#start 最后一段 重新执行上面的 1.2.3.4 写错了,是 重新执行上面的 2.3.4 第一次写有点紧张,sorry
打扰大家了,已经看到为什么了
DefaultChannelPipeline#childExecutor
private EventExecutor childExecutor(EventExecutorGroup group) {
if (group == null) {
return null;
}
Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
if (pinEventExecutor != null && !pinEventExecutor) {
return group.next();
}
Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
if (childExecutors == null) {
// Use size of 4 as most people only use one extra EventExecutor.
childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
}
// Pin one of the child executors once and remember it so that the same child executor
// is used to fire events for the same channel.
EventExecutor childExecutor = childExecutors.get(group);
if (childExecutor == null) {
childExecutor = group.next();
childExecutors.put(group, childExecutor);
}
return childExecutor;
}
应该一个 channel 会指定一个 SingleThreadEventExecutor
1
mazai 2019-12-02 17:21:17 +08:00
netty 是不存在多线程的问题的,因为一个 Channel 注册的时候只会只会注册到一个 EventLoop 上( SingleThreadEventExecutor 本质上就是 EventLoop ),在注册的时候会判断当前执行注册的线程是不是 EventLoop 所在的线程,如果是就注册,否则就使用 EventLoop 所在的线程发起一个任务执行注册。一个 EventLoop 上会注册多个 Channel,所以在 EventLoop 维护的 Channel 生命周期中只会使用一个线程执行,因此也就不存在多线程的问题!
注册的部分可以看 AbstractUnsafe.register(); |