【Kotlin】协程的基础使用

【Kotlin】协程的基础使用

本文介绍Kotlin协程挂起和恢复的原理

文章后半部分源码和介绍来自Kotlin官方网站

协程简介

协程是一种并发设计模式,您可以在 Android 平台上使用它来简化异步执行的代码。协程 是在 1.3 版中添加到 Kotlin 的,基于既定的从其他语言转换成的概念。

Android 上,协程有助于管理长时间运行的任务,如果管理不当,这些任务可能会阻塞主线程并导致应用无响应。使用协程的专业开发者中有超过 50% 的人反映使用协程提高了工作效率。本主题介绍如何使用 Kotlin 协程解决以下问题,从而让您能够编写出更清晰、更简洁的应用代码。

协程和线程

线程

  • 线程是操作系统级别的概念
  • 我们开发者通过编程语言(Thread.java)创建的线程,本质还是操作系统内核线程的映射
  • JVM 中的线程与内核线程的存在映射关系,有“一对一”,“一对多”,“M对N”。* JVM 在不同操作系统中的具体实现会有差别,“一对一”是主流
  • 一般情况下,我们说的线程,都是内核线程,线程之间的切换,调度,都由操作系统负责
  • 线程也会消耗操作系统资源,但比进程轻量得多
  • 线程,是抢占式的,它们之间能共享内存资源,进程不行
  • 线程共享资源导致了多线程同步问题
  • 有的编程语言会自己实现一套线程库,从而能在一个内核线程中实现多线程效果,早期 JVM 的“绿色线程” 就是这么做的,这种线程被称为“用户线程”

协程

  • 协程不是操作系统级别的概念,无需操作系统支持
  • 协程有点像上面提到的“绿色线程”,一个线程上可以运行成千上万个协程
  • 协程是用户态的(userlevel),内核对协程无感知
  • 协程是协作式的,由开发者管理,不需要操作系统进行调度和切换,也没有抢占式的消耗,因此它更加高效
  • 协程它底层基于状态机实现,多协程之间共用一个实例,资源开销极小,因此它更加轻量
  • 协程本质还是运行于线程之上,它通过协程调度器,可以运行到不同的线程上

项目使用实例

最常见的使用方式,在 ViewModel 或者 Controller 里写业务逻辑,在 Activity 里调用,这样就可以在IO线程执行网络请求,拿到结果后自动切换到主线程更新UI。

// viewModel或者controller里获取数据逻辑
// 使用suspend限制在协程里使用;withContext切换调度器,指定在IO线程执行下面的任务
suspend fun getUserName() = withContext(Dispatchers.IO) {
    debugLog("thread name: ${Thread.currentThread().name}")
    ServiceCreator.createService<UserService>()
        .getUserName("2cd1e3c5ee3cda5a")
        .execute()
        .body()
}

// Activity调用处
override fun onCreate(savedInstanceState: Bundle?){
    // 最直接的声明方法,在主线程执行下面的逻辑
    lifeCycleScope.launch {
        // 相当于get这一半是在IO线程执行
        //拿到结果后的变量赋值这一半操作由调度器自动切换到主线程来执行了
        val userName = mViewModel.getUserName()
        infoLog("userName: $userName")
        binding.tvUserName.text = userName
    }
}

API介绍

四个基础概念

  • suspend function。即挂起函数,delay() 就是协程库提供的一个用于实现非阻塞式延时的挂起函数
  • CoroutineScope。即协程作用域,GlobalScope 是 CoroutineScope 的一个实现类,用于指定协程的作用范围,可用于管理多个协程的生命周期,所有协程都需要通过 CoroutineScope 来启动
  • CoroutineContext。即协程上下文,包含多种类型的配置参数。Dispatchers.IO 就是 CoroutineContext 这个抽象概念的一种实现,用于指定协程的运行载体,即用于指定协程要运行在哪类线程上
  • CoroutineBuilder。即协程构建器,协程在 CoroutineScope 的上下文中通过 launch、async 等协程构建器来进行声明并启动。launch、async 均被声明为 CoroutineScope 的扩展方法

Kotlin 协程(Coroutines)提供了一套丰富的 API 方法,用于简化异步编程。以下是一些常用的 API 方法及其简要说明:

启动

launch方法签名:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy){
        LazyStandaloneCoroutine(newContext, block) 
    }else{
        StandaloneCoroutine(newContext, active = true)
    }
    coroutine.start(start, coroutine, block)
    return coroutine
}

start参数代表启动方式:

CoroutineStart.DEFAULT:协程创建后,立即开始调度,但 有可能在执行前被取消。在调度前如果协程被取消,其将直接进入取消响应的状态。 CoroutineStart.LAZY:只要协程被需要时(主动调用该协程的 start、 join、 await等函数时 ), 才会开始调度,如果调度前就被取消,协程将直接进入异常结束状态。 CoroutineStart.ATOMIC:协程创建后,立即开始调度, 协程执行到第一个挂起点之前不响应取消。其将调度和执行两个步骤合二为一,就像它的名字一样,其保证调度和执行是原子操作,因此协程也 一定会执行。 CoroutineStart.UNDISPATCHED:协程创建后,立即在当前线程中执行,直到遇到第一个真正挂起的点。是立即执行,因此协程 一定会执行。

context上下文参数:

Job:工作空间。用于启动or取消协程。

Dispatchers为调度器。用于指定协程的执行线程。 Default:默认调度器 ,适合处理后台计算,其是一个 CPU 密集型任务调度器。 IO:IO 调度器,适合执行 IO 相关操作,其是 IO 密集型任务调度器。 Main:UI 调度器,根据平台不同会被初始化为对应的 UI 线程的调度器, 在Android 平台上它会将协程调度到 UI 事件循环中执行,即通常在 主线程上执行。 Unconfined:“无所谓”调度器,不要求协程执行在特定线程上。 CoroutineExceptionHandler:全局异常捕获(只能在根协程配置)。

CoroutineName:协程名称。

协程上下文就是CoroutineContext,其中可以用加和函数plus()来连接使用,比如:

    override val coroutineContext: CoroutineContext
        get() = Dispatchers.Main + job + handler

这里的+就是加和函数,如上所写就是让CoroutineContext具备主线程+工作空间job,和CoroutineExceptionHandler的能力。

作用域

  • 顶级作用域:GlobalScope–> 全局范围,不会自动结束执行,无法取消。
  • 协同作用域:coroutineScope –> 抛出异常会取消父协程
  • 主从作用域:supervisorScope –> 抛出异常,不会取消父协程

三种作用域真正常用的其实只有主从作用域,谁也不想让自己写的协程挂了导致app崩溃吧。但实际使用过程中,由于没有作用域的概念,往往会用到顶级作用域和协同作用域,协程挂了导致app崩溃,然后再去解决异常。

常用的主从作用域有下面这些:

  • MainScope :主线程的作用域,全局范围,可以取消。
  • lifecycleScope : 生命周期范围,用于activity等有生命周期的组件,在Desroyed的时候会自动结束。
  • viewModelScope :ViewModel范围,用于ViewModel中,在ViewModel被回收时会自动结束。

主从作用域启动的协程,崩溃后不会影响其他协程执行。

以MainScope为例,在构建上下文时,加入了SupervisorJob(),SupervisorJob()是一个工作空间,它会在子协程抛出异常时,会将异常控制在子协程内部,不往上传递,不会影响父协程的执行。

线程切换

还是以launch方法签名为入口:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy){
        LazyStandaloneCoroutine(newContext, block) 
    }else{
        StandaloneCoroutine(newContext, active = true)
    }
    coroutine.start(start, coroutine, block)
    return coroutine
}

追进start方法:

    /**
     * Starts this coroutine with the given code [block] and [start] strategy.
     * This function shall be invoked at most once on this coroutine.
     * 
     * - [DEFAULT] uses [startCoroutineCancellable].
     * - [ATOMIC] uses [startCoroutine].
     * - [UNDISPATCHED] uses [startCoroutineUndispatched].
     * - [LAZY] does nothing.
     */
    public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        start(block, receiver, this)
    }

可以看到模式启动模式下,使用的是 startCoroutineCancellable ,最终会调用到 resumeCancellableWith 方法,在 resumeCancellableWith 方法中,会判断当前上下文是否需要重新分发,如果需要就将上下文中提取新的Dispathers赋给dispatcher,否则就在当前线程直接执行。

inline fun resumeCancellableWith(
        result: Result<T>,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {
        val state = result.toState(onCancellation)
        // 判断当前上下文是否需要重新分发,如果需要就将上下文中提取新的Dispathers赋给dispatcher,否则就在当前线程直接执行
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
            dispatcher.dispatch(context, this)
        } else {
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled(state)) {
                    resumeUndispatchedWith(result)
                }
            }
        }
    }

在不同的JVM平台上,Dispatcher.Main 调度器的执行位置取决于 具体的UI框架 。以下是主要情况:

  • Android平台上会调度到Android的主线程(UI线程)执行,这是通过Handler(Looper.getMainLooper())实现的
  • JavaFX平台会调度到JavaFX的Application线程执行,这是通过Platform.runLater()实现的
  • Swing平台会调度到Swing的Event Dispatch Thread (EDT)执行,这是通过SwingUtilities.invokeLater()实现的
  • 其他情况则会回退到单线程执行器

除了主调度器之外,其他几个切换也类似此流程,比如Dispatchers.Default是 创建了一个默认的线程池 ,而Dispatchers.IO也是沿用的线程池,只是对线程数量做了限制罢了。

IOS平台

在iOS平台上,Kotlin协程的线程切换主要通过以下方式实现:

  1. Main Dispatcher(主线程调度器): 使用DispatchQueue.main来调度到主线程执行 这是通过Kotlin/Native与iOS的GCD(Grand Central Dispatch)集成实现的 协程会被调度到主队列(Main Queue)执行,确保UI操作在主线程进行
  2. Default Dispatcher(默认调度器): 使用后台线程池执行任务 在iOS上,这通常是通过GCD的全局队列(Global Queue)实现的 使用DispatchQueue.global()来获取后台队列
  3. IO Dispatcher(IO调度器): 专门用于IO密集型操作 同样基于GCD实现,但使用不同的队列优先级 使用DispatchQueue.global(qos: .utility)或DispatchQueue.global(qos: .background)来执行IO操作

简化api表达:

// Main Dispatcher实现
internal class MainDispatcher : CoroutineDispatcher() {
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        DispatchQueue.main.async {
            block.run()
        }
    }
}

// Default Dispatcher实现
internal class DefaultDispatcher : CoroutineDispatcher() {
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        DispatchQueue.global().async {
            block.run()
        }
    }
}

流程图如下:

常用api

协程构建器

用于启动协程的主要方法。

  • launch
    启动一个不会返回结果的协程(Job 类型)。
    GlobalScope.launch {
        // 协程代码
    }
    
  • async
    启动一个会返回结果的协程(Deferred 类型),结果可以通过 await() 获取。
    val deferred = GlobalScope.async {
        // 协程代码
        "Result"
    }
    val result = deferred.await()
    
  • runBlocking
    阻塞当前线程,直到协程执行完毕。通常用于测试或主函数中。
    runBlocking {
        // 协程代码
    }
    

协程上下文与调度器

用于控制协程的执行线程或上下文。

  • Dispatchers.Default
    用于 CPU 密集型任务的默认线程池。
    launch(Dispatchers.Default) {
        // 在后台线程执行
    }
    
  • Dispatchers.IO
    用于 IO 密集型任务的线程池。
    launch(Dispatchers.IO) {
        // 执行 IO 操作
    }
    
  • Dispatchers.Main
    用于在主线程(如 Android 的 UI 线程)执行任务。
    launch(Dispatchers.Main) {
        // 更新 UI
    }
    
  • Dispatchers.Unconfined
    不限制协程的执行线程,根据调用点决定。
    launch(Dispatchers.Unconfined) {
        // 不限制线程
    }
    
  • withContext
    切换协程的上下文。
    withContext(Dispatchers.IO) {
        // 在 IO 线程执行
    }
    

协程作用域

用于管理协程的生命周期。

  • GlobalScope
    全局作用域,协程的生命周期与应用程序一致。
    GlobalScope.launch {
        // 全局协程
    }
    
  • CoroutineScope
    自定义作用域,通常与 lifecycleScopeviewModelScope 结合使用。
    val scope = CoroutineScope(Dispatchers.Main)
    scope.launch {
        // 协程代码
    }
    
  • lifecycleScope(Android)
    Lifecycle 绑定的作用域,协程在 Lifecycle 销毁时自动取消。
    lifecycleScope.launch {
        // 协程代码
    }
    
  • viewModelScope(Android)
    ViewModel 绑定的作用域,协程在 ViewModel 销毁时自动取消。
    viewModelScope.launch {
        // 协程代码
    }
    

协程取消与超时

用于控制协程的执行时间或取消协程。

  • cancel()
    取消协程。
    val job = launch {
        // 协程代码
    }
    job.cancel()
    
  • isActive
    检查协程是否仍处于活动状态。
    if (isActive) {
        // 协程仍在运行
    }
    
  • withTimeout
    设置协程的超时时间,超时后抛出 TimeoutCancellationException
    withTimeout(1000) {
        // 协程代码
    }
    
  • withTimeoutOrNull
    设置协程的超时时间,超时后返回 null 而不是抛出异常。
    val result = withTimeoutOrNull(1000) {
        // 协程代码
    }
    

协程挂起函数

用于在协程中挂起执行。

  • delay
    挂起协程一段时间。
    delay(1000) // 挂起 1 秒
    
  • yield
    挂起当前协程,让出执行权给其他协程。
    yield()
    

协程异常处理

用于处理协程中的异常。

  • try-catch
    捕获协程中的异常。
    try {
        // 协程代码
    } catch (e: Exception) {
        // 处理异常
    }
    
  • CoroutineExceptionHandler
    全局异常处理器。
    val handler = CoroutineExceptionHandler { _, exception ->
        // 处理异常
    }
    launch(handler) {
        // 协程代码
    }
    

协程组合与并发

用于处理多个协程的组合与并发。

  • awaitAll
    等待多个 Deferred 完成并返回结果列表。
    val deferred1 = async { 1 }
    val deferred2 = async { 2 }
    val results = awaitAll(deferred1, deferred2)
    
  • supervisorScope
    创建一个子作用域,子协程的失败不会影响其他子协程。
    supervisorScope {
        launch {
            // 子协程 1
        }
        launch {
            // 子协程 2
        }
    }
    
  • coroutineScope
    创建一个子作用域,子协程的失败会传播到父协程。
    coroutineScope {
        launch {
            // 子协程 1
        }
        launch {
            // 子协程 2
        }
    }
    

协程间的通信Channel

编写具有共享可变状态的代码非常困难且容易出错(例如在使用回调的解决方案中)。更简单的方法是通过通信而不是使用公共可变状态来共享信息。协程可以通过通道相互通信。

通道是允许数据在协程之间传递的通信原语。 一个协程可以向通道发送一些信息,而另一个协程可以从该通道接收该信息

使用方法

发送(生产)信息的协程通常称为生产者,接收(消费)信息的协程称为消费者。一个或多个协程可以向同一个通道发送信息,一个或多个协程也可以从该通道接收数据。

当多个协程从同一个通道接收信息时,每个元素仅由其中一个消费者处理一次。 一旦元素被处理,它将立即从通道中移除。

可以将通道视为元素集合,或者更准确地说,队列这种数据结构,其中元素被添加到一端并从另一端接收。但是,有一个重要的区别:与集合不同,即使在其同步版本中,通道也可以暂停 send()和receive()操作。当通道为空或满时会发生这种情况。如果通道大小有上限,则通道可能会满。

Channel由三个不同的接口表示:SendChannel、ReceiveChannel和Channel,其中后者扩展了前两个。您通常会创建一个通道并将其作为SendChannel实例提供给生产者,以便只有他们可以向该通道发送信息。

您将通道作为ReceiveChannel实例提供给消费者,以便只有他们可以从中接收信息。send和receive方法都声明为suspend:

interface SendChannel<in E> {
    suspend fun send(element: E)
    fun close(): Boolean
}

interface ReceiveChannel<out E> {
    suspend fun receive(): E
}

interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

生产者可以关闭一个通道来表明没有更多的元素到来。

库中定义了几种类型的通道。它们的区别在于内部可以存储多少个元素以及是否send()可以暂停调用。对于所有通道类型,receive()调用的行为都类似:如果通道不为空,则接收一个元素;否则,调用将被暂停。

创建通道时,请指定其类型或缓冲区大小(如果需要缓冲):

val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)

默认情况下,会创建一个“Rendezvous”通道。

在以下任务中,您将创建一个“Rendezvous”通道、两个生产者协程和一个消费者协程:

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch {
        channel.send("A1")
        channel.send("A2")
        log("A done")
    }
    launch {
        channel.send("B1")
        log("B done")
    }
    launch {
        repeat(3) {
            val x = channel.receive()
            log(x)
        }
    }
}

fun log(message: Any?) {
    println("[${Thread.currentThread().name}] $message")
}

Flow API

Flow API 是 Kotlin 协程库中的一部分,主要用于处理数据流。

  • flow
    创建一个冷流(Cold Flow)。
    val flow = flow {
        emit(1)
        emit(2)
    }
    
  • collect
    收集流中的数据。
    flow.collect { value ->
        // 处理数据
    }
    
  • map
    对流中的数据进行转换。
    flow.map { value -> value * 2 }
    
  • filter
    过滤流中的数据。
    flow.filter { value -> value > 1 }
    
  • flatMapConcat
    将流中的每个值映射为一个新流,并按顺序连接。
    flow.flatMapConcat { value -> flowOf(value, value * 2) }
    
  • zip
    将两个流合并为一个流。
    val flow1 = flowOf(1, 2)
    val flow2 = flowOf("A", "B")
    flow1.zip(flow2) { a, b -> "$a$b" }
    

关于更多Flow的基础和进阶使用,此前也写过更详细的一篇文章。

Kotlin Flow全面总结