Netty EventLoop 事件循环机制

1. 前言

前面几节主要讲解了 Netty 的几种线程模型,基本上都是理论上的东西,那么 Netty 是如何去实现这些线程模型的呢?答案:核心是 EventLoop,今天我们主要介绍 EventLoop 是如何来实现线程模型的。

2. 什么是 EventLoop

源码:

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
    EventLoopGroup parent();
}

public interface EventLoopGroup extends EventExecutorGroup {
    
}

public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
    
}

通过上面的简单源码,我们发现 EventLoopGroup 就是一个线程池,它是继承 Java 并发包下的定时线程池,而 EventLoop 则是线程池里面的一个子线程。

通过源码查看它们之间的关系,具体如下所示:

public interface EventLoopGroup extends EventExecutorGroup {
    EventLoop next();//返回线程组里面的一个线程
}

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
    EventLoopGroup parent();//关联该线程所属的线程组
}

通过以上简单的分析,我们需要掌握的知识点是,Netty 是通过线程池去实现 Reactor 线程模型的,而线程池并不是使用 Java 内置的线程池,而是继承它们并且进行了一定的扩展。就是 EventLoopGroup 和 EventLoop。

3. EventLoop 的架构图

EventLoop 整体的类 UML 关系图还是比较复杂的,Netty 底层主要是以 NIO 为主,因此核心掌握 NioEventLoop 和 NioEventLoopGroup 两个类的结构即可。

NioEventLoopGroup 关系图:
图片描述

以上的架构图,我们主要关心 NioEventLoopGroup->MultithreadEventLoopGroup->EventLoopGroup,其实这种是比较典型的接口、抽象类、实现类的模式。

NioEventLoop 关系图
图片描述

以上的架构图,我们主要关心的是 NioEventLoop->SingleThreadEventLoop->EventLoop,它也是典型的接口、抽象类、实现类的模式。

4. EventLoop 的核心原理

下面,我们将通过源码的方式介绍 EventLoop 在 Netty 当中是如何运行的。
首先,我们需要了解 EventLoop 三个核心步骤,如下图所示:
图片描述

4.1 EventLoop 的核心作用

EventLoop 的核心作用是,一定客户端连接进来,则服务端给其分配一个 Channel(连接通道),并且会给 Channel 分配一个 EventLoop 和 ChannelPipeline。其中,EventLoop 主要负责该 Channel 相关的业务逻辑处理的,ChannelPipeline 则负责管理业务逻辑(双向链表)。ChannelPipleline 下一个章节会详细讲解。

EventLoop 的核心功能是处理 Channel 相关的业务逻辑,它里面其实是一个死循环,重复做着 3 个事件,分别是

  1. 监控端口;
  2. 处理端口事件,将其分发;
  3. 处理队列事件。

核心结论:每个 EventLoop 可以被绑定到多个 Channel 身上,但是一个 Channel 有且仅有一个 EventLoop 与之进行对应。

4.2 NioEventLoop 初始化流程

这里,我们将通过核心源码来梳理一下 NioEventLoopGroup 的初始化流程,也就是线程池的初始化。

实例:

NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
    .group(bossGroup, workerGroup);
//省略其它代码

第一步: 进入 NioEventLoopGroup 构造函数

public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    public NioEventLoopGroup() {
        //构造函数一直跟进
        this(0);
    }

    public NioEventLoopGroup(
        int nThreads, Executor executor, 
        SelectorProvider selectorProvider, 
        SelectStrategyFactory selectStrategyFactory) {

        //调用父类的构造函数,点进去,查看源码
        super(nThreads, executor, 
              new Object[]{selectorProvider, selectStrategyFactory, 
                           RejectedExecutionHandlers.reject()});

	}
}

第二步: MultithreadEventLoopGroup 构造函数

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
    
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        //调用父类的构造函数,点进去,查看源码
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
}

第三步: MultithreadEventExecutorGroup 构造函数

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
    
   	//构造函数
	protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        
        //点击跟进
        this(nThreads, 
             (Executor)(threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory)), 
             args);
    }
    
    //核心,在这里进行 “线程组” 初始化工作
    private final EventExecutor[] children;
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, 
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        //省略了其它代码,只保留核心部分
        
        //1.创建一个数组,长度是 nThreads
		this.children = new EventExecutor[nThreads];
        //2.数组初始化值
        for(int i = 0; i < nThreads; ++i) {
            //通过 this.newChild() 来创建具体“线程”
            this.children[i] = this.newChild((Executor)executor, args);
        }
    }
    
    //newChild 是一个抽象类,由子类去进行实现
    protected abstract EventExecutor newChild(Executor var1, Object... var2) throws Exception;
}

第四步: 由子类 NioEventLoopGroup 去实现 newChild () 抽象方法

public class NioEventLoopGroup extends MultithreadEventLoopGroup {
	protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    	//创建一个 NioEventLoop
        return new NioEventLoop(this, executor, 
                                (SelectorProvider)args[0], 	
                                ((SelectStrategyFactory)args[1]).newSelectStrategy(), 
                                (RejectedExecutionHandler)args[2]);
    } 
}
	

到这里,我们终于看到 NioEventLoop 的身影了,在 newChild () 去进行初始化工作。

4.3 NioEventLoop 执行流程

上面讲解了 NioEventLoop 的初始化流程,那么它到底在什么时候开始执行的呢?

源码入口:

serverBootstrap.bind(80);

第一步: 抽象类 AbstractBootstrap

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
    
    public ChannelFuture bind(int inetPort) {
        return this.bind(new InetSocketAddress(inetPort));
    }
    
    public ChannelFuture bind(SocketAddress localAddress) {
        this.validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        } else {
            //继续跟进
            return this.doBind(localAddress);
        }
    }
    
    private ChannelFuture doBind(final SocketAddress localAddress) {
        //继续跟进
        final ChannelFuture regFuture = this.initAndRegister();
    }
    
    final ChannelFuture initAndRegister() {
        //继续跟进
        this.init(channel);
    }
    
    //抽象方法
    abstract void init(Channel var1) throws Exception;
}

第二步: 实现类 ServerBootstrap

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
    void init(Channel channel) throws Exception {
        
        //1.把 ChannelHandler 添加到 ChannelPipeline 里,组成一条双向业务链表
        p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
            public void initChannel(Channel ch) throws Exception {
                //1.1.管道
                final ChannelPipeline pipeline = ch.pipeline();
                //1.2.添加到管道
                ChannelHandler handler = ServerBootstrap.this.config.handler();
                if (handler != null) {
                    pipeline.addLast(new ChannelHandler[]{handler});
                }
				//1.3.执行线程池的 “execute()”,核心入口
                ch.eventLoop().execute(new Runnable() {
                    public void run() {
                        pipeline.addLast(
                            new ChannelHandler[]{
                                new ServerBootstrap.ServerBootstrapAcceptor(
                                    currentChildGroup, 
                                    currentChildHandler, 
                                    currentChildOptions, 
                                    currentChildAttrs)
                            }
                        );
                    }
                });
            }
        }});
    }
}

这里是在 init () 方法里面进行一序列的初始化工作,并且执行上面初始化好的 NioEventLoop 的 execute () 方法。

第三步: 执行 SingleThreadEventExecutor 的 execute () 方法

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    
	public void execute(Runnable task) {
        //是否是当前线程
		boolean inEventLoop = this.inEventLoop();
		if (inEventLoop) {
            //如果是当前线程,则添加任务到队列
			this.addTask(task);
		} else {
            //如果不是当前线程,则先启动线程
			this.startThread();
            //把任务添加到任务队列
			this.addTask(task);
            //如果线程已经关闭并且该任务已经被移除了
			if (this.isShutdown() && this.removeTask(task)) {
                //执行拒绝策略
				reject();
			}
         }
    }
    
    private void startThread() {
        this.doStartThread();
    }
    
    private void doStartThread() {
        this.executor.execute(new Runnable() {
            public void run() {
                //执行 run() 方法
                SingleThreadEventExecutor.this.run();
            }
        });
    }
    
    //抽象方法
    protected abstract void run();
}

第四步: 子类 NioEventLoop 实现抽象方法 run (),这里是 run () 方法是一个死循环,并且执行三个核心事件,分别是 “监听端口”、“处理端口事件”、“处理队列事件”。

public final class NioEventLoop extends SingleThreadEventLoop {
    protected void run() {
        while(true) {
            //省略
        }
    }
}

run () 方法里面核心执行了 this.processSelectedKeys()this.runAllTasks()

4.4 核心总结

每次执行 execute () 时,都是向队列里面添加任务,当第一次添加时则先启动线程,并且执行子类 NioEventLoop 的 run () 方法。而该 run () 是整个 EventLoop 的核心,主要的核心功能如下:

  1. this.select(),该方法是一个堵塞方法,主要是监听客户端事件;
  2. this.processSelectedKeys(),该方法获取监听到的客户端事件,并将其进行分发;
  3. this.runAllTasks(),执行队列里面的任务。

5. 小结

本节主要是讲解 EventLoop,它是 Netty 的线程模型的核心,Netty 已经帮我们进行了高度的封装,不需要懂得其原理也不会影响 Netty 的使用,但是如果能了解其核心原理,可以让我们更加深刻的理解 Netty 是如何运转的。