Kotlin 中的序列(Sequences)

今天一起来探索 Kotlin 中的序列。序列(Sequences)实际上是对应 Java8 中的 Stream 的翻版。从之前文章可以了解到 Kotlin 定义了很多操作集合的 API,没错这些函数照样适用于序列(Sequences),而且序列操作在性能方面优于集合操作。而且通过之前函数式 API 的源码中可以看出它们会创建很多中间集合,每个操作符都会开辟内存来存储中间数据集,然而这些在序列中就不需要。

1. 为什么需要 Sequences

我们一般在 Kotlin 中处理数据集都是集合,以及使用集合中一些函数式操作符 API,我们很少去将数据集转换成序列再去做集合操作。这是因为我们一般接触的数据量级比较小,使用集合和序列没什么差别,让我们一起来看个例子,你就会明白使用序列的意义了。

//不使用Sequences序列,使用普通的集合操作
fun computeRunTime(action: (() -> Unit)?) {
  val startTime = System.currentTimeMillis()
  action?.invoke()
  println("the code run time is ${System.currentTimeMillis() - startTime}")
}

fun main(args: Array<String>) = computeRunTime {
  (0..10000000)
        .map { it + 1 }
        .filter { it % 2 == 0 }
        .count { it < 10 }
        .run {
            println("by using list way, result is : $this")
        }
}

运行结果:

图片描述

//转化成Sequences序列,使用序列操作
fun computeRunTime(action: (() -> Unit)?) {
    val startTime = System.currentTimeMillis()
    action?.invoke()
    println("the code run time is ${System.currentTimeMillis() - startTime}")
}

fun main(args: Array<String>) = computeRunTime {
    (0..10000000)
        .asSequence()
        .map { it + 1 }
        .filter { it % 2 == 0 }
        .count { it < 10 }
        .run {
            println("by using sequences way, result is : $this")
        }
}

运行结果:

图片描述
通过以上同一个功能实现,使用普通集合操作和转化成序列后再做操作的运行时间差距不仅一点点,也就对应着两种实现方式在数据集量级比较大的情况下,性能差异也是很大的。这样应该知道为什么我们需要使用 Sequences 序列了吧。

2. 什么是 Sequences

序列操作又被称之为惰性集合操作,Sequences 序列接口强大在于其操作的实现方式。序列中的元素求值都是惰性的,所以可以更加高效使用序列来对数据集中的元素进行链式操作 (映射、过滤、变换等), 而不需要像普通集合那样,每进行一次数据操作,都必须要开辟新的内存来存储中间结果,而实际上绝大多数的数据集合操作的需求关注点在于最后的结果而不是中间的过程,

序列是在 Kotlin 中操作数据集的另一种选择,它和 Java8 中新增的 Stream 很像,在 Java8 中我们可以把一个数据集合转换成 Stream,然后再对 Stream 进行数据操作 (映射、过滤、变换等),序列 (Sequences) 可以说是用于优化集合在一些特殊场景下的工具。但是它不是用来替代集合,准确来说它起到是一个互补的作用。

序列操作分为两大类:

1、中间操作

序列的中间操作始终都是惰性的,一次中间操作返回的都是一个序列 (Sequences),产生的新序列内部知道如何变换原始序列中的元素。怎样说明序列的中间操作是惰性的呢?一起来看个例子:

fun main(args: Array<String>) {
    (0..6)
        .asSequence()
        .map {//map返回是Sequence<T>,故它属于中间操作
            println("map: $it")
            return@map it + 1
        }
        .filter {//filter返回是Sequence<T>,故它属于中间操作
            println("filter: $it")
            return@filter it % 2 == 0
        }
}

运行结果:

图片描述

以上例子只有中间操作没有末端操作,通过运行结果发现 map、filter 中并没有输出任何提示,这也就意味着 map 和 filter 的操作被延迟了,它们只有在获取结果的时候 (也即是末端操作被调用的时候) 才会输出提示

2、末端操作

序列的末端操作会执行原来中间操作的所有延迟计算,一次末端操作返回的是一个结果,返回的结果可以是集合、数字、或者从其他对象集合变换得到任意对象。上述例子加上末端操作:

fun main(args: Array<String>) {
    (0..6)
        .asSequence()
        .map {//map返回是Sequence<T>,故它属于中间操作
            println("map: $it")
            return@map it + 1
        }
        .filter {//filter返回是Sequence<T>,故它属于中间操作
            println("filter: $it")
            return@filter it % 2 == 0
        }
        .count {//count返回是Int,返回的是一个结果,故它属于末端操作
            it < 6
        }
        .run {
            println("result is $this");
        }
}

运行结果

图片描述

注意:判别是否是中间操作还是末端操作很简单,只需要看操作符 API 函数返回值的类型,如果返回的是一个 Sequence 那么这就是一个中间操作,如果返回的是一个具体的结果类型,比如 Int,Boolean, 或者其他任意对象,那么它就是一个末端操作

3. 怎么创建 Sequences

创建序列(Sequences)的方法主要有:

1、使用 Iterable 的扩展函数 asSequence 来创建

//定义声明
public fun <T> Iterable<T>.asSequence(): Sequence<T> {
    return Sequence { this.iterator() }
}
//调用实现
list.asSequence()

2、使用 generateSequence 函数生成一个序列

//定义声明
@kotlin.internal.LowPriorityInOverloadResolution
public fun <T : Any> generateSequence(seed: T?, nextFunction: (T) -> T?): Sequence<T> =
    if (seed == null)
        EmptySequence
    else
        GeneratorSequence({ seed }, nextFunction)

//调用实现,seed是序列的起始值,nextFunction迭代函数操作
val naturalNumbers = generateSequence(0) { it + 1 } //使用迭代器生成一个自然数序列
  • 3、使用序列 (Sequence) 的扩展函数 constrainOnce 生成一次性使用的序列。
//定义声明
public fun <T> Sequence<T>.constrainOnce(): Sequence<T> {
    // as? does not work in js
    //return this as? ConstrainedOnceSequence<T> ?: ConstrainedOnceSequence(this)
    return if (this is ConstrainedOnceSequence<T>) this else ConstrainedOnceSequence(this)
}
//调用实现
val naturalNumbers = generateSequence(0) { it + 1 }
val naturalNumbersOnce = naturalNumbers.constrainOnce()

注意:只能迭代一次,如果超出一次则会抛出 IllegalStateException (“This sequence can be consumed only once.”) 异常。

4. Sequences 操作和集合操作性能对比

关于序列性能对比,主要在以下几个场景下进行对比,通过性能对比你就清楚在什么场景下该使用普通集合操作还是序列操作。

1、同样数据操作在数据量级比较大情况下。

使用 Sequences 序列

fun computeRunTime(action: (() -> Unit)?) {
    val startTime = System.currentTimeMillis()
    action?.invoke()
    println("the code run time is ${System.currentTimeMillis() - startTime} ms")
}

fun main(args: Array<String>) = computeRunTime {
    (0..10000000)//10000000数据量级
        .asSequence()
        .map { it + 1 }
        .filter { it % 2 == 0 }
        .count { it < 100 }
        .run {
            println("by using sequence result is $this")
        }
}

运行结果:

图片描述

不使用 Sequences 序列

fun computeRunTime(action: (() -> Unit)?) {
    val startTime = System.currentTimeMillis()
    action?.invoke()
    println("the code run time is ${System.currentTimeMillis() - startTime} ms")
}

fun main(args: Array<String>) = computeRunTime {
    (0..10000000)//10000000数据量级
        .map { it + 1 }
        .filter { it % 2 == 0 }
        .count { it < 100 }
        .run {
            println("by using sequence result is $this")
        }
}

运行结果:

图片描述

2、同样数据操作在数据量级比较小情况下。

使用 Sequences 序列

fun computeRunTime(action: (() -> Unit)?) {
    val startTime = System.currentTimeMillis()
    action?.invoke()
    println("the code run time is ${System.currentTimeMillis() - startTime} ms")
}

fun main(args: Array<String>) = computeRunTime {
    (0..1000)//1000数据量级
        .asSequence()
        .map { it + 1 }
        .filter { it % 2 == 0 }
        .count { it < 100 }
        .run {
            println("by using sequence result is $this")
        }
}

运行结果:

图片描述

不使用 Sequences 序列

fun computeRunTime(action: (() -> Unit)?) {
    val startTime = System.currentTimeMillis()
    action?.invoke()
    println("the code run time is ${System.currentTimeMillis() - startTime} ms")
}

fun main(args: Array<String>) = computeRunTime {
    (0..1000)//1000数据量级
        .map { it + 1 }
        .filter { it % 2 == 0 }
        .count { it < 100 }
        .run {
            println("by using list result is $this")
        }
}

运行结果:

图片描述

通过以上性能对比发现,在数据量级比较大情况下使用 Sequences 序列性能会比普通数据集合更优;但是在数据量级比较小情况下使用 Sequences 序列性能反而会比普通数据集合更差。关于选择序列还是集合,记得前面翻译了一篇国外的博客,里面有详细的阐述。博客地址

5. Sequences 性能优化的原理

看到上面性能的对比,相信此刻的你迫不及待想要知道序列 (Sequences) 内部性能优化的原理吧,那么我们一起来看下序列内部的原理。来个例子

fun main(args: Array<String>){
    (0..10)
        .asSequence()
        .map { it + 1 }
        .filter { it % 2 == 0 }
        .count { it < 6 }
        .run {
            println("by using sequence result is $this")
        }
}

1、基本原理描述

序列操作 :基本原理是惰性求值,也就是说在进行中间操作的时候,是不会产生中间数据结果的,只有等到进行末端操作的时候才会进行求值。也就是上述例子中 0~10 中的每个数据元素都是先执行 map 操作,接着马上执行 filter 操作。然后下一个元素也是先执行 map 操作,接着马上执行 filter 操作。然而普通集合是所有元素都完执行 map 后的数据存起来,然后从存储数据集中又所有的元素执行 filter 操作存起来的原理。

集合普通操作 :针对每一次操作都会产生新的中间结果,也就是上述例子中的 map 操作完后会把原始数据集循环遍历一次得到最新的数据集存放在新的集合中,然后进行 filter 操作,遍历上一次 map 新集合中数据元素,最后得到最新的数据集又存在一个新的集合中。

2、原理图解

//使用序列
fun main(args: Array<String>){
    (0..100)
        .asSequence()
        .map { it + 1 }
        .filter { it % 2 == 0 }
        .find { it > 3 }
}
//使用普通集合
fun main(args: Array<String>){
    (0..100)
        .map { it + 1 }
        .filter { it % 2 == 0 }
        .find { it > 3 }
}

通过以上的原理转化图,会发现使用序列会逐个元素进行操作,在进行末端操作 find 获得结果之前提早去除一些不必要的操作,以及 find 找到一个符合条件元素后,后续众多元素操作都可以省去,从而达到优化的目的。而集合普通操作,无论是哪个元素都得默认经过所有的操作。其实有些操作在获得结果之前是没有必要执行的以及可以在获得结果之前,就能感知该操作是否符合条件,如果不符合条件提前摒弃,避免不必要操作带来性能的损失。

6. Sequences 原理源码完全解析

//使用序列
fun main(args: Array<String>){
    (0..100)
        .asSequence()
        .map { it + 1 }
        .filter { it % 2 == 0 }
        .find { it > 3 }
}
//使用普通集合
fun main(args: Array<String>){
    (0..100)
        .map { it + 1 }
        .filter { it % 2 == 0 }
        .find { it > 3 }
}

通过 decompile 上述例子的源码会发现,普通集合操作会针对每个操作都会生成一个 while 循环,并且每次都会创建新的集合保存中间结果。而使用序列则不会,它们内部会无论进行多少中间操作都是共享同一个迭代器中的数据,想知道共享同一个迭代器中的数据的原理吗?请接着看内部源码实现。

6.1 使用集合普通操作反编译源码

 public static final void main(@NotNull String[] args) {
      Intrinsics.checkParameterIsNotNull(args, "args");
      byte var1 = 0;
      Iterable $receiver$iv = (Iterable)(new IntRange(var1, 100));
      //创建新的集合存储map后中间结果
      Collection destination$iv$iv = (Collection)(new ArrayList(CollectionsKt.collectionSizeOrDefault($receiver$iv, 10)));
      Iterator var4 = $receiver$iv.iterator();

      int it;
      //对应map操作符生成一个while循环
      while(var4.hasNext()) {
         it = ((IntIterator)var4).nextInt();
         Integer var11 = it + 1;
         //将map变换的元素加入到新集合中
         destination$iv$iv.add(var11);
      }

      $receiver$iv = (Iterable)((List)destination$iv$iv);
      //创建新的集合存储filter后中间结果
      destination$iv$iv = (Collection)(new ArrayList());
      var4 = $receiver$iv.iterator();//拿到map后新集合中的迭代器
      //对应filter操作符生成一个while循环
      while(var4.hasNext()) {
         Object element$iv$iv = var4.next();
         int it = ((Number)element$iv$iv).intValue();
         if (it % 2 == 0) {
          //将filter过滤的元素加入到新集合中
            destination$iv$iv.add(element$iv$iv);
         }
      }

      $receiver$iv = (Iterable)((List)destination$iv$iv);
      Iterator var13 = $receiver$iv.iterator();//拿到filter后新集合中的迭代器
      
      //对应find操作符生成一个while循环,最后末端操作只需要遍历filter后新集合中的迭代器,取出符合条件数据即可。
      while(var13.hasNext()) {
         Object var14 = var13.next();
         it = ((Number)var14).intValue();
         if (it > 3) {
            break;
         }
      }
   }

6.2 使用序列 (Sequences) 惰性操作反编译源码

1、整个序列操作源码

 public static final void main(@NotNull String[] args) {
      Intrinsics.checkParameterIsNotNull(args, "args");
      byte var1 = 0;
      //利用Sequence扩展函数实现了fitler和map中间操作,最后返回一个Sequence对象。
      Sequence var7 = SequencesKt.filter(SequencesKt.map(CollectionsKt.asSequence((Iterable)(new IntRange(var1, 100))), (Function1)null.INSTANCE), (Function1)null.INSTANCE);
      //取出经过中间操作产生的序列中的迭代器,可以发现进行map、filter中间操作共享了同一个迭代器中数据,每次操作都会产生新的迭代器对象,但是数据是和原来传入迭代器中数据共享,最后进行末端操作的时候只需要遍历这个迭代器中符合条件元素即可。
      Iterator var3 = var7.iterator();
      //对应find操作符生成一个while循环,最后末端操作只需要遍历filter后新集合中的迭代器,取出符合条件数据即可。
      while(var3.hasNext()) {
         Object var4 = var3.next();
         int it = ((Number)var4).intValue();
         if (it > 3) {
            break;
         }
      }

   }

2、抽出其中这段关键 code,继续深入:

SequencesKt.filter(SequencesKt.map(CollectionsKt.asSequence((Iterable)(new IntRange(var1, 100))), (Function1)null.INSTANCE), (Function1)null.INSTANCE);

3、把这段代码转化分解成三个部分:

//第一部分
val collectionSequence = CollectionsKt.asSequence((Iterable)(new IntRange(var1, 100)))
//第二部分
val mapSequence = SequencesKt.map(collectionSequence, (Function1)null.INSTANCE)
//第三部分
val filterSequence = SequencesKt.filter(mapSequence, (Function1)null.INSTANCE)

4、解释第一部分代码:

第一部分反编译的源码很简单,主要是调用 Iterable 中扩展函数将原始数据集转换成 Sequence 对象。

public fun <T> Iterable<T>.asSequence(): Sequence<T> {
    return Sequence { this.iterator() }//传入外部Iterable<T>中的迭代器对象
}

更深入一层:

@kotlin.internal.InlineOnly
public inline fun <T> Sequence(crossinline iterator: () -> Iterator<T>): Sequence<T> = object : Sequence<T> {
    override fun iterator(): Iterator<T> = iterator()
}

通过外部传入的集合中的迭代器方法返回迭代器对象,通过一个对象表达式实例化一个 Sequence,Sequence 是一个接口,内部有个 iterator () 抽象函数返回一个迭代器对象,然后把传入迭代器对象作为 Sequence 内部的迭代器,也就是相当于给迭代器加了 Sequence 序列的外壳,核心迭代器还是由外部传入的迭代器对象,有点偷梁换柱的概念。

5、解释第二部分的代码:

通过第一部分,成功将普通集合转换成序列 Sequence,然后现在进行 map 操作,实际上调用了 Sequence 扩展函数 map 来实现的

val mapSequence = SequencesKt.map(collectionSequence, (Function1)null.INSTANCE)

进入 map 扩展函数:

public fun <T, R> Sequence<T>.map(transform: (T) -> R): Sequence<R> {
    return TransformingSequence(this, transform)
}

会发现内部会返回一个 TransformingSequence 对象,该对象构造器接收一个 Sequence 类型对象,和一个 transform 的 lambda 表达式,最后返回一个 Sequence 类型对象。我们先暂时解析到这,后面会更加介绍。

6、解释第三部分的代码:

通过第二部分,进行 map 操作后,然后返回的还是 Sequence 对象,最后再把这个对象进行 filter 操作,filter 也还是 Sequence 的扩展函数,最后返回还是一个 Sequence 对象。

val filterSequence = SequencesKt.filter(mapSequence, (Function1)null.INSTANCE)

进入 filter 扩展函数:

public fun <T> Sequence<T>.filter(predicate: (T) -> Boolean): Sequence<T> {
    return FilteringSequence(this, true, predicate)
}

会发现内部会返回一个 FilteringSequence 对象,该对象构造器接收一个 Sequence 类型对象,和一个 predicate 的 lambda 表达式,最后返回一个 Sequence 类型对象。我们先暂时解析到这,后面会更加介绍。

  • 7、Sequences 源码整体结构介绍

代码结构图 :图中标注的都是一个个对应各个操作符类,它们都实现 Sequence 接口

图片描述

首先,Sequence 是一个接口,里面只有一个抽象函数,一个返回迭代器对象的函数,可以把它当做一个迭代器对象外壳。

public interface Sequence<out T> {
    /**
     * Returns an [Iterator] that returns the values from the sequence.
     *
     * Throws an exception if the sequence is constrained to be iterated once and `iterator` is invoked the second time.
     */
    public operator fun iterator(): Iterator<T>
}

Sequence 核心类 UML 类图

这里只画出了某几个常用操作符的类图

图片描述

注意:通过上面的 UML 类关系图可以得到,共享同一个迭代器中的数据的原理实际上就是利用 Java 设计模式中的状态模式 (面向对象的多态原理) 来实现的,首先通过 Iterable 的 iterator () 返回的迭代器对象去实例化 Sequence,然后外部调用不同的操作符,这些操作符对应着相应的扩展函数,扩展函数内部针对每个不同操作返回实现 Sequence 接口的子类对象,而这些子类又根据不同操作的实现,更改了接口中 iterator () 抽象函数迭代器的实现,返回一个新的迭代器对象,但是迭代的数据则来源于原始迭代器中。

8、接着上面 TransformingSequence、FilteringSequence 继续解析.

通过以上对 Sequences 整体结构深入分析,那么接着 TransformingSequence、FilteringSequence 继续解析就非常简单了。我们就以 TransformingSequence 为例:

//实现了Sequence<R>接口,重写了iterator()方法,重写迭代器的实现
internal class TransformingSequence<T, R>
constructor(private val sequence: Sequence<T>, private val transformer: (T) -> R) : Sequence<R> {
    override fun iterator(): Iterator<R> = object : Iterator<R> {//根据传入的迭代器对象中的数据,加以操作变换后,构造出一个新的迭代器对象。
        val iterator = sequence.iterator()//取得传入Sequence中的迭代器对象
        override fun next(): R {
            return transformer(iterator.next())//将原来的迭代器中数据元素做了transformer转化传入,共享同一个迭代器中的数据。
        }

        override fun hasNext(): Boolean {
            return iterator.hasNext()
        }
    }

    internal fun <E> flatten(iterator: (R) -> Iterator<E>): Sequence<E> {
        return FlatteningSequence<T, R, E>(sequence, transformer, iterator)
    }
}

9、源码分析总结

序列内部的实现原理是采用状态设计模式,根据不同的操作符的扩展函数,实例化对应的 Sequence 子类对象,每个子类对象重写了 Sequence 接口中的 iterator () 抽象方法,内部实现根据传入的迭代器对象中的数据元素,加以变换、过滤、合并等操作,返回一个新的迭代器对象。这就能解释为什么序列中工作原理是逐个元素执行不同的操作,而不是像普通集合所有元素先执行 A 操作,再所有元素执行 B 操作。这是因为序列内部始终维护着一个迭代器,当一个元素被迭代的时候,就需要依次执行 A,B,C 各个操作后,如果此时没有末端操作,那么值将会存储在 C 的迭代器中,依次执行,等待原始集合中共享的数据被迭代完毕,或者不满足某些条件终止迭代,最后取出 C 迭代器中的数据即可。

7. 总结

到这里有关 Kotlin 中的序列就阐述完毕了,我们从为什么需要序列到如何使用序列以及最后序列本质是什么多个角度来分析了 Kotlin 中的序列,相信大家对 Kotlin 序列有了很深的印象,下篇文章我们就进入 Kotlin 中的集合和序列很像。