Netty ChannelFuture 异步监听

1. 前言

本节主要讲解 ChannelFuture ,它的作用是用来保存 Channel 异步操作的结果,可以看作是一个异步操作结果的占位符。

2. 概念

在 Netty 中所有的 IO 操作都是异步的,不能立刻得到 IO 操作的执行结果,但是可以通过注册一个监听器来监听其执行结果。在 Java 的并发编程当中可以通过 Future 来进行异步结果的监听,但是在 Netty 当中是通过 ChannelFuture 来实现异步结果的监听。通过注册一个监听的方式进行监听,当操作执行成功或者失败时监听会自动触发注册的监听事件。

3. 应用场景

ChannelFture 在开发当中经常需要用到,可以用来监听客户端连接服务端的结果反馈,Netty 是异步操作,无法知道什么时候执行完成,因此可以通过 ChannelFuture 来进行执行结果的监听。在 Netty 当中 Bind 、Write 、Connect 等操作会简单的返回一个 ChannelFuture。

4. 核心方法

序号 方法 描述
1 addListener 注册监听器,当操作已完成 (isDone 方法返回完成),将会通知指定的监听器;如果 Future 对象已完成,则通知指定的监听器
2 removeListener 移除监听器
3 sync 等待异步操作执行完毕
4 await 等待异步操作执行完毕
5 isDone 判断当前操作是否完成
6 isSuccess 判断已完成的当前操作是否成功
7 isCancellable 判断已完成的当前操作是否被取消
8 cause 获取已完成的当前操作失败的原因

sync () 和 await () 都是等待异步操作执行完成,那么它们有什么区别呢?

  1. sync () 会抛出异常,建议使用 sync ();
  2. await () 不会抛出异常,主线程无法捕捉子线程执行抛出的异常。

5. 深入了解 ChannelFuture

5.1 生命周期说明

Future 可以通过四个核心方法来判断任务的执行情况。

状态 说明
isDone() 任务是否执行完成,无论成功还是失败
isSuccess() 任务是否执行采购
isCancelled() 任务是否被取消
cause() 获取执行异常信息

执行过程状态的改变说明

当一个异步任务操作开始的时候,一个新的 future 对象就会被创建。在开始的时候该 future 是处于未完成的状态,也就是说,isDone ()=false、isSuccess ()=false、isCancelled ()=false;只要该任务中任何一种状态结束了,无论是说成功、失败、或者被取消,那么整个 Future 就会被标记为已完成。注意的是,如果执行失败那么 cause () 方法会返回异常信息的内容。

图片描述

实例:

ChannelFuture channelFuture=bootstrap.connect("127.0.0.1",80);
channelFuture.addListener(new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) throws Exception {
        if(future.isDone()){
            if(future.isSuccess()){
                System.out.println("执行成功...");
            }else if(future.isCancelled()){
                System.out.println("任务被取消...");
            }else if(future.cause()!=null){
                System.out.println("执行出错:"+future.cause().getMessage());
            }
        }
    }
});

5.2 ChannelFuture 父接口说明

ChannelFuture 的类继承结构,具体如下所示:

public interface ChannelFuture extends Future<Void> {
    
}
public interface Future<V> extends java.util.concurrent.Future<V> {
    
}

通过上面的继承关系,我们可以清晰的知道 ChannelFuture 其实最顶层的接口是来自 java 并发包的 Future,java 并发包下的 Future 需要手工检查执行结果是否已经完成,非常的繁琐,因此 Netty 把它进行了封装和完善,变成了自动的监听,用起来变的非常的简单。

java 并发包下的 Future 主要存在以下几个缺陷:

  1. 只允许手动通过 get () 来检查对应的操作是否已经完成,它是堵塞直到子线程完成执行并且返回结果;
  2. 只有 isDone () 方法判断一个异步操作是否完成,但是对于完成的定义过于模糊,JDK 文档指出正常终止、抛出异常、用户取消都会使 isDone () 方法返回真。并不能很好的区分到底是哪种状态。

get () 方法是堵塞的,必须等待子线程执行完成才能往下执行。

实例:

//1.定义一个子线程,实现 Callable 接口
public class ThreadTest implements Callable<Integer>{
    @Override
    public Integer call(){
	    //打印
    	System.out.println(">>>>>>>>子线程休眠之前");
	    //休眠5秒
	    Thread.sleep(5000);
    	//打印
    	System.out.println(">>>>>>>>子线程休眠之后");
        return 1;
	}
}
//2.调用子线程处理
public static void main(String[] args){
    ThreadTest t=new ThreadTest();
    FutureTask<Integer> future=new FutureTask<Integer>(t);
    //2.1.开始执行子线程
    new Thread(future).start();
    
  	//2.2.手工返回结果
    int result=future.get();
    System.out.println(">>>>>>>>执行结果:"+result);
    //2.3.操作数据库
    userDao.updateStatus("1");
}

执行结果:

>>>>>>>>子线程休眠之前
>>>>>>>>子线程休眠之后
>>>>>>>>执行结果:1

结论总结:

  1. 说明了 Java 并发包的 Future 要想获取异步执行结果,必须手工调用 get () 方法,此时虽然能获取执行结果,但是无法知道执行结果是成功还是失败;
  2. 使用 get () 获取执行结果,但是 get () 后面的业务则被堵塞,直到后面执行完毕才会往下执行,失去了异步操作提高执行效率的意义了。

6. ChannelFuture 原理

6.1 线程堵塞

思考:sync () 和 await () 方法如何同步等待执行完成并获取执行结果的呢?

源码分析如下所示:

private short waiters;//计数器

@Override
public Promise<V> await() throws InterruptedException {
    //1.判断是否执行完成,如果执行完成则返回
    if (isDone()) {
        return this;
    }

    //2.线程是否已经中断,如果中断则抛异常
    if (Thread.interrupted()) {
        throw new InterruptedException(toString());
    }

    //3.检查死锁
    checkDeadLock();

    //4.同步代码块->while循环不断的监听执行结果
    synchronized (this) {
        while (!isDone()) {
            incWaiters();//waiters递增
            try {
                wait();//JDK 的 Object 方法,线程等待【核心】
            } finally {
                decWaiters();//waiters 递减
            }
        }
    }
    return this;
}

//递增函数
private void incWaiters() {
    if (waiters == Short.MAX_VALUE) {
        throw new IllegalStateException("too many waiters: " + this);
    }
    ++waiters;
}

//递减函数
private void decWaiters() {
    --waiters;
}

通过以上代码,我们发现 await () 的核心其实就是调用 Object 的 wait () 方法进行线程休眠,普通的 Java 多线程知识点。

6.2 线程唤醒

思考:当前线程休眠了,那么什么时候进行唤醒呢?

源码分析如下所示:

@Override
public Promise<V> setSuccess(V result) {
    //1.setSuccess0 赋值操作
    if (setSuccess0(result)) {
        //2.通知执行监听器
        notifyListeners();
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}

private boolean setSuccess0(V result) {
    //继续进入方法
    return setValue0(result == null ? SUCCESS : result);
}

private boolean setValue0(Object objResult) {
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        
        //继续进入方法
        checkNotifyWaiters();
        return true;
    }
    return false;
}

private synchronized void checkNotifyWaiters() {
    if (waiters > 0) {
        //核心:唤醒之前休眠的线程
        notifyAll();
    }
}

源码分析总结:

  1. 堵塞的核心是通过 Object.wait () 方法进行休眠当前线程,普通的 Java 多线程知识;
  2. 执行完成之后给不同状态(setSuccess、setFailure)赋值的时候唤醒休眠的线程;
  3. 唤醒线程之后调用监听器的方法 l.operationComplete(future);

7. 小结

通过本节的学习,我们需要掌握以下几个核心知识点:

  1. 掌握异步的概念,传统 I/O 是同步堵塞的,执行 I/O 操作后线程会被阻塞住,直到操作完成;异步处理的好处是不会造成线程阻塞,可以通过 Future 来监听异步执行的结果;
  2. ChannelFuture 的几种状态,以及它的值变化时机;
  3. ChannelFuture 的堵塞和唤醒源码分析。