Kotlin 协程

Kotlin 协程是在 Kotlin 1.3 版本中引入的,但是协程并不是 Kotlin 语言所特有的。

协程(Coroutines)是一种并发设计模式,使用它可以在 Android 平台上简化异步执行的代码。可以将它简单地理解成一种轻量级的线程,协程和线程属于一个层级的概念。要知道,之前所学习的线程是非常重量级的,需要依靠操作系统的调度才能实现不同线程之间的切换。而使用协程却可以仅在编程语言的层面就能实现不同协程之间的切换,从而大大提升了并发编程的运行效率。

协程允许我们在单线程模式下模拟多线程编程的效果,使用协程可以写出上下两行看似同步却异步执行的代码,代码执行时的挂起与恢复完全是由编程语言来控制的,和操作系统无关。这就是协程的非阻塞时挂起。这种特性使得高并发程序的运行效率得到了极大地提升。

一些处理异步的方式:

  • 线程:常用,简便。
    • 资源消耗、数量受限、平台支持(JS 无线程)、调试使用不便。
  • 回调:层次调用。
    • 多重嵌套、错误处理麻烦。
  • Futures、Promise:
  • Rx 响应式扩展:可观察流的设计思想,优雅便捷。
    • 学习曲线、API、行为难估。
  • 协程:易学、易用。
    • 学习成本。(想用的好还是需要一定的学习)

为什么要使用协程?

  • 轻量、高效。
  • 简单、好用。
  • 可以用看起来同步的代码写出实质上异步的操作。(将后台任务,函数标记为 suspend,然后用 withoutcontent 切换线程。)
    • 耗时函数自动后台,从而提高性能。
    • 线程的【自动切回】

Kotlin for Java 的协程并不属于广义的协程。他是一个框架,线程框架,基于 Java 的线程框架,是上层封装,归根结底只是为开发者提供的一种线程 API,可以更方便的使用 Java 中的线程。(协程、Executor、RxJava。它们切线程,也是靠 Handler。)
不管是 Kotlin 还是 Groovy,不管上层怎么做,但最后都是基于 JVM,所以最后都是线程。
协程是一种在程序中处理并发任务的方案,也是这种方案的一个组件。

并发;宏观性的概念。一起做,并且是一件事情,不一定是同时做的。
并行;微观。同一时间,同一时刻。
协程不存在线程,协程里没有并行任务,线程可以,多线程同时工作。
协程不需要在意并发和并行的问题和概念,但一些问题还是跑不掉。

线程的问题,协程依然有,但这是线程的问题,不是协程的问题。
学协程的时候不用关注线程安全的问题,还有并发、并行的问题。

活跃的线程是 GC Root,AsyncTask(API 30 中已被废弃) 除了持有外部引用时,更因为内部有活跃的线程,而 new OnClickListener 没有,所以不会内存泄漏。

协程泄露,本质就是一种线程泄漏,也就是一个内存泄漏。不用的协程要取消掉,通过 job.cancle()。Job 对象就是用来取消的。

1
2
3
4
5
6
7
val scope = MainScope

Scope.lauch{

}

Scope.cancle()

协程的基本用法

添加依赖

首先要添加依赖库:

1
2
3
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.1"
// 此依赖库是 Android 项目才会用到的,纯 Kotlin 程序其实用不到。
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.1.1"

GlobalScope.launch 函数

最简单的开启一个协程的方式:使用 GlobalScope.launch 函数,它可以创建一个协程的作用域。并且创建的永远是顶层协程,这一点和线程比较像,因为线程也没有层级这一说,永远是顶层的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun main(){
// GlobalScope.launch 函数每次创建的都是一个顶层协程,这种协程当应用程序运行结束时也会跟着一起结束,所以,代码块中的代码还没来得及运行,应用程序就结束了。解决的办法是:让程序延迟一段时间再结束。
GlobalScope.launch {
println("codes run in coroutine scope")
// delay 函数可以让当前协程延迟指定时间后再运行。
// delay 函数是一个非阻塞式的挂起函数,它只会挂起当前协程,并不会影响其它协程的运行。
// delay 函数只能在协程的作用域或其他挂起函数中调用。
// ----------------------------------------------------------------------
// 而 Thread.sleep() 会阻塞当前的线程,这样运行在该线程下的所有协程都会被阻塞。
delay(1500)
println("codes run in coroutine scope finished") // 不会运行
}
// 如果代码块中的代码不能在 1 秒钟内运行结束,那么就会被强制中断。
Thread.sleep(1000)
}

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
fun testLaunch() {
val time:Long = measureTimeMillis {
// launch 是异步的,不阻塞线程的。(同一线程的不同协程)
// 两个 GlobalScope.launch 是并行的,
// 至于先打印哪个结果则由 CPU 调度决定,不由我们代码层面决定。
GlobalScope.launch {
Thread.sleep(1000) // 阻塞的是当前的协程块
println("1.${Thread.currentThread()}")
}
GlobalScope.launch {
Thread.sleep(1000)
println("2.${Thread.currentThread()}")
}
println("3,${Thread.currentThread()}")
// 由于代码函数生命周期的缘故,这里执行完代码块,jvm 就销毁了函数栈,
// 所以需要等待一下,才能看到 launch 的异步代码块效果。
Thread.sleep(2200)
}
println("函数总耗时:$time")

// 打印结果:执行了 Thread.sleep(2200)
// 3,Thread[main,5,main]
// 2.Thread[DefaultDispatcher-worker-2,5,main]
// 1.Thread[DefaultDispatcher-worker-1,5,main]
// 函数总耗时:2262

// 打印结果:没执行 Thread.sleep(2200)
// 3,Thread[main,5,main]
// 函数总耗时:72
}

示例2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class TestActivity :AppCompatActivity(){
private val TAG = "TAG_TestActivity"

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_test)

// GlobalScope.launch 函数创建的是一个顶层协程,在实际开发中很少使用。
GlobalScope.launch {
Log.d(TAG,"当前线程1:${Thread.currentThread()}")
Log.d(TAG,"当前线程2:${Thread.currentThread().name}")
// launch 中代码段执行在子线程中
}

// 通过设置 Dispatchers 参数值来指定线程
GlobalScope.launch(Dispatchers.IO) {
Log.d(TAG,"当前线程3:${Thread.currentThread().name}")
// launch 中代码执行在 I/O 线程
}
// 打印结果:
// 当前线程3:DefaultDispatcher-worker-2
// 当前线程1:Thread[DefaultDispatcher-worker-1,5,main]
// 当前线程2:DefaultDispatcher-worker-1


// 当协程在处理一个耗时任务时,如果在任务结束之前 Activity 被销毁,也需要取消协程的任务。
// launch 方法返回一个 Job 对象,调用其 cancel 方法即可。
// var job = GlobalScope.launch(Dispatchers.IO) {
// Log.d(TAG,"当前线程:${Thread.currentThread().name}")
// }
// job.cancel()
}
}

runBlocking 函数

借助 runBlocking 函数可以让应用程序在协程中所有代码都运行完了之后再结束。

注意:此函数通常只应该在测试环境下使用,在正式环境中使用容易产生一些性能上的问题。

1
2
3
4
5
6
7
8
9
fun main(){
// runBlocking 同样会创建一个协程的作用域,
// 但是它可以保证协程作用域的内的所有协程和子协程没有全部执行完之前一直阻塞当前线程。
runBlocking {
println("codes run in coroutine scope")
delay(1500)
println("codes run in coroutine scope finished")
}
}

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fun testRunBlocking() {
val time:Long = measureTimeMillis {
runBlocking {
println("1.${Thread.currentThread()}")
delay(500)
println("2.${Thread.currentThread()}")
}
println("3,${Thread.currentThread()}")
}
println("函数总耗时:$time")

// 执行结果:
// 1.Thread[main,5,main]
// 2.Thread[main,5,main]
// 3,Thread[main,5,main]
// 函数总耗时:575
}

launch 函数

使用 launch 函数可以创建多个协程,它必须在协程的作用域中才能调用,其次,它会在当前协程的作用域下创建子协程。子协程的特点是如果外层作用域的协程结束了,该作用域下的所有子协程也会一同结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
fun main(){
runBlocking {
launch {
println("launch1")
delay(1000)
println("launch1 finished")
}
launch {
println("launch2")
delay(1000)
println("launch2 finished")
}
}

// 运行结果:
// launch1
// launch2
// launch1 finished
// launch2 finished

// 测试性能
test()
}

fun test(){
val start = System.currentTimeMillis()
runBlocking {
// 循环创建了 10 万个协程
repeat(100000){
launch {
println(".")
}
}
}
// 查看消耗时间
val end = System.currentTimeMillis()
println(end - start)

// 运行结果:433 毫秒
}

suspend 关键字

在 launch 函数中编写的代码是拥有协程作用域的,但是提取到一个单独的函数中就没有协程作用域了。

这时可以使用 suspend 关键字,它可以将任意函数声明成挂起函数,而挂起函数之间都是可以相互调用的。

suspend;并不是用来切线程的

关键作用;标记,或者说提醒

  • 标记;线程切回时,切回到哪。
  • 提醒;提示需要特别对待,说明此函数是个耗时函数,需要在协程里面。
1
2
3
4
suspend fun printDot(){
println(".")
delay(1000)
}

coroutineScope 函数

但 suspend 关键字无法提供协程作用域,也就无法调用像 launch 函数。这时可使用 coroutineScope 函数,它也是一个挂起函数,因此可以在任何其他挂起函数中调用。它的特点是会继承外部的协程作用域并创建一个子作用域。

1
2
3
4
5
6
7
8
9
10
11
12
fun main(){
runBlocking {
printDot()
}
}

suspend fun printDot() = coroutineScope{
launch {
println(".")
delay(1000)
}
}

coroutineScope 函数和 runBlocking 函数还有点类似,它可以保证其作用域内的所有代码和子协程在全部执行完之前,会一直阻塞当前协程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
fun main(){
// 创建了一个协程作用域
runBlocking {
// 创建了一个子协程作用域
coroutineScope {
// 创建了一个子协程
launch {
for (i in 1..10){
println(i)
delay(1000)
}
}
}
println("coroutineScope finished")
}
println("runBlocking finished")

// 运行结果:
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// 8
// 9
// 10
// coroutineScope finished
// runBlocking finished
}

但是,coroutineScope 函数只会阻塞当前协程,既不影响其它协程,也不影响任何线程,因此是不会造成任何性能上的问题的。而 runBlocking 函数由于会阻塞当前线程,而我们又恰好在主线程中调用它的话,那么就有可能会导致界面卡死的情况,所以不太推荐在实际项目中使用。

示例2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class TestActivity :AppCompatActivity(){
private val TAG = "TAG_TestActivity"

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_test)

val job = Job()
// CoroutineScope 是一个函数,他会返回一个 CoroutineScope 对象,
// 有了 CoroutineScope 对象之后就可以调用 launch 方法来创建协程了。
// 然后使用 async 函数就可以获取协程的执行结果了
CoroutineScope(job).launch {
// 逻辑处理
Log.d(TAG,"当前线程:${Thread.currentThread().name}")
}
job.cancel()
}
}

CoroutineScope 与 coroutineScope 的区别?

###withTimeout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
fun testTimeOut() = runBlocking{
val time:Long = measureTimeMillis {
// 超时自动取消内部协程,抛异常。
withTimeout(3000){
launch {
repeat(200){
println("${Thread.currentThread()}$it")
delay(300)
}
}
println("外部:${Thread.currentThread()}")
}
withTimeoutOrNull(1000){

}
}

// 打印结果:
// 外部:Thread[main,5,main]
// Thread[main,5,main],0
// Thread[main,5,main],1
// Thread[main,5,main],2
// Thread[main,5,main],3
// Thread[main,5,main],4
// Thread[main,5,main],5
// Thread[main,5,main],6
// Thread[main,5,main],7
// Thread[main,5,main],8
// Thread[main,5,main],9
// Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 3000 ms
// at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:126)
// at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:92)
// at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:491)
// at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:270)
// at kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:68)
// at java.lang.Thread.run(Thread.java:748)
}

更多的作用域构造器

上面学习了几种作用域构造器,它们都可以用于创建一个新的协程作用域。不过,GlobalScope.launch 和 runBlocking 函数是可以在任意地方调用的,coroutineScope 函数可以在协程作用域或挂起函数中调用,而 launch 函数只能在协程作用域中调用。

Job 对象

其中,GlobalScope.launch 由于每次创建的都是顶层协程,一般也不太建议使用,因为它管理起来成本太高了,当 Activity 关闭时,需要逐个调用所有已创建协程的 cancel() 方法。除非非常明确就是要创建顶层函数。

1
2
3
4
5
6
7
8
9
10
/**
* 取消协程的办法
*/
fun main(){
// 不管是 GlobalScope.launch 函数还是 launch 函数,它们都会返回一个 Job 对象。
val job = GlobalScope.launch {
//处理具体逻辑
}
job.cancel()
}

实际项目中比较常用的写法如下:

1
2
3
4
5
6
7
8
9
10
11
12
fun main(){
// 创建 Job 对象并传入到 CoroutineScope() 函数中。
val job = Job()
// CoroutineScope() 函数会返回一个 CoroutineScope 对象,便可以调用它的 launch 函数来创建一个协程了。
val scope = CoroutineScope(job)
// 这样创建的协程都会被关联在 Job 对象的作用域下面,只需要调用一次 cancel(),
// 就可以将同一作用域内的所有协程全部取消,从而大大降低了协程管理的成本。
scope.launch {
// 处理具体的逻辑
}
job.cancel()
}

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
fun testJob() = runBlocking{
// runBlocking 保证内部协程执行完毕才结束函数栈
val time:Long = measureTimeMillis {
val l1:Job = launch {
println("L1: ${Thread.currentThread()}")
}
val l2:Job = launch {
println("L2: ${Thread.currentThread()}")
}
// Deferred 其实就是实现了 Job 接口的子接口
val a2:Deferred<Unit> = async {
repeat(5){
println("A2: ${Thread.currentThread()}, $it")
delay(200)
}
}
// a2.cancelAndJoin() // 取消协程,并返回主协程。
// a2.join() // 将此协程加入到父协程,协程结束才会走下面的。
val a1:Deferred<Unit> = async {
println("A1: ${Thread.currentThread()}")
// 取消 A2
// a2.cancel()
}
// a1.join()
println("外部:${Thread.currentThread()}")
}
println("函数总耗时:$time")

// 执行结果:
// 外部:Thread[main,5,main]
// 函数总耗时:7
// L1: Thread[main,5,main]
// L2: Thread[main,5,main]
// A2: Thread[main,5,main], 0
// A1: Thread[main,5,main]
// A2: Thread[main,5,main], 1
// A2: Thread[main,5,main], 2
// A2: Thread[main,5,main], 3
// A2: Thread[main,5,main], 4

// 执行结果:执行了 a2.join()
// L1: Thread[main,5,main]
// L2: Thread[main,5,main]
// A2: Thread[main,5,main], 0
// A2: Thread[main,5,main], 1
// A2: Thread[main,5,main], 2
// A2: Thread[main,5,main], 3
// A2: Thread[main,5,main], 4
// 外部:Thread[main,5,main]
// 函数总耗时:1024
// A1: Thread[main,5,main]

// 执行结果:执行了 a2.cancelAndJoin()
// L1: Thread[main,5,main]
// L2: Thread[main,5,main]
// 外部:Thread[main,5,main]
// 函数总耗时:14
// A1: Thread[main,5,main]

// 执行结果:执行了 a2.cancel()
// 外部:Thread[main,5,main]
// 函数总耗时:7
// L1: Thread[main,5,main]
// L2: Thread[main,5,main]
// A2: Thread[main,5,main], 0
// A1: Thread[main,5,main]

// 执行结果:执行了 a1.join()
// L1: Thread[main,5,main]
// L2: Thread[main,5,main]
// A2: Thread[main,5,main], 0
// A1: Thread[main,5,main]
// 外部:Thread[main,5,main]
// 函数总耗时:14
// A2: Thread[main,5,main], 1
// A2: Thread[main,5,main], 2
// A2: Thread[main,5,main], 3
// A2: Thread[main,5,main], 4

// 执行结果:执行了 a2.join() 和 a1.join()
// L1: Thread[main,5,main]
// L2: Thread[main,5,main]
// A2: Thread[main,5,main], 0
// A2: Thread[main,5,main], 1
// A2: Thread[main,5,main], 2
// A2: Thread[main,5,main], 3
// A2: Thread[main,5,main], 4
// A1: Thread[main,5,main]
// 外部:Thread[main,5,main]
// 函数总耗时:1030
}

async 函数

当调用 launch 函数时可以创建一个新的协程,但 launch 函数只能用于执行一段逻辑,却不能获取执行的结果,因为它的返回值永远是一个 Job 对象。这时,可使用 async 函数实现。

async 函数必须在协程作用域当中调用,它会创建一个新的子协程并返回一个 Deferred 对象(一个封装的数据类型),并通过此对象的 await() 即可。

1
2
3
4
5
6
7
8
fun main(){
runBlocking {
val result = async {
5 + 5
}.await()
println(result)
}
}

在调用 async 函数后,代码块中的代码会立刻开始执行。当调用 await() 时,如果代码块中的代码还没执行完,那么 await() 会将当前协程阻塞住,直到可以获得 async 函数的执行结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fun main(){
runBlocking {
val start = System.currentTimeMillis()
val result = async {
delay(1000)
5 + 5
}.await()
val result2 = async {
delay(1000)
4 + 6
}.await()
println("result id ${result + result2}.")
val end = System.currentTimeMillis()
println("cost ${end - start} ms.")

// 运行结果:
// result id 20.
// cost 2021 ms. // 耗时 2 秒,说明确实是串行。
}
}

将上面代码改为并行关系,从而提升运行效率:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fun main(){
runBlocking {
val start = System.currentTimeMillis()
val result = async {
delay(1000)
5 + 5
}
val result2 = async {
delay(1000)
4 + 6
}
// 仅在需要用到 async 函数的执行结果时才调用 await() 获取。
println("result id ${result.await() + result2.await()}.")
val end = System.currentTimeMillis()
println("cost ${end - start} ms.")

// 运行结果:
// result id 20.
// cost 1021 ms. // 耗时 1 秒,说明确实是并行。
}
}

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
fun testAwait() = runBlocking{
val time:Long = measureTimeMillis {
val a2:Deferred<Int> = async {
println("a2: ${Thread.currentThread()}")
delay(2000)
200
}
// a2.join()
val a1:Deferred<Int> = async {
getA1()
}
// a1.join()
println("结果: ${Thread.currentThread()} -- ${a2.await()} ${a1.await()}")
}
println("函数总耗时: $time")

// 执行结果:
// a2: Thread[main,5,main]
// a1: Thread[main,5,main]
// 结果: Thread[main,5,main] -- 200 100
// 函数总耗时: 2014


// 执行结果:执行了 a2.join() 和 a1.join()
// a2: Thread[main,5,main]
// a1: Thread[main,5,main]
// 结果: Thread[main,5,main] -- 200 100
// 函数总耗时: 3021
}

/**
* 挂起函数
*/
suspend fun getA1():Int {
println("a1: ${Thread.currentThread()}")
delay(1000)
return 100
}

示例2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class TestActivity : AppCompatActivity() {
private val TAG = "TAG_TestActivity"

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_test)

val job = Job()
// CoroutineScope(job).launch {
// // 逻辑处理
// val result = async {
// // try-catch 要包裹在 async 开启的协程作用域中,
// // 在协程作用域外层是无法捕获到协程异常的,这是因为已经超出了协程作用域的范围。
// try {
// // 模拟耗时操作
// delay(3000)
// "运算结果:" + 3/0
// } catch (e: Exception) {
// "结果异常"
// }
// }.await()
// Log.d(TAG, result)
// }
// 日志打印:
// 结果异常

// CoroutineScope(job).launch {
// val startTime = System.currentTimeMillis()
// val result = async {
// delay(2000)
// "操作成功"
// }.await()
// // await 方法会阻塞当前协程,直到获取执行结果,
// // 2 秒后 result 执行结束,开始执行 result2
//
// val result2 = async {
// delay(1000)
// "获取成功"
// }.await()
// Log.d(TAG, "执行结果:$result-$result2")
// val endTime = System.currentTimeMillis()
// Log.d(TAG, "执行时间:${endTime-startTime}")
// }
// 日志打印:
// 执行结果:操作成功-获取成功
// 执行时间:3015

CoroutineScope(job).launch {
val startTime = System.currentTimeMillis()
val result = async {
delay(2000)
"操作成功"
}

val result2 = async {
delay(1000)
"获取成功"
}
// 可以只在用到执行结果的时候调用 await 方法,
// 这样就可以让 result 和 result2 同时执行,相当于并行的关系了。
// 在实际项目中常有需要合并不同接口执行结果的需求,这时就可采用这种方式来提高运行效率。
Log.d(TAG, "执行结果:${result.await()}-${result2.await()}")
val endTime = System.currentTimeMillis()
Log.d(TAG, "执行时间:${endTime-startTime}")
}
// 日志打印:
// 执行结果:操作成功-获取成功
// 执行时间:2008

// job.cancel()
}
}

withContext 函数

一个比较特殊的作用域构造器,它是一个挂起函数,大体可将它理解成 async 函数的一种简化版写法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fun main(){
runBlocking {
// val result1 = async { 5 + 5 }.await()

// 等价于上面代码,唯一不同的是,强制需要指定一个线程参数。
// ------------------------------------------------------------
// 调用 withContext 函数后会立即执行代码块中的代码,同时将当前协程阻塞住。
// 当执行结束后,会将最后一行的执行结果作为返回值。
val result = withContext(Dispatchers.Default){
5 + 5
}
println(result)
}
}

线程参数的意义在于:虽然不需要像传统编程那样开启多线程来执行并发任务,只需要在一个线程下开启多个协程即可。但是,这并不意味着就不再需要开启线程了,比如 Android 中执行耗时任务需要在子线程当中,那我们在主线程中开启的协程就无法胜任了。这时,便可以通过线程参数来为协程指定一个具体的运行线程。

线程参数主要有以下 3 种值可选:

  • Dispatchers.Default:使用一种默认低并发的线程策略,当要执行的代码属于计算密集型任务时,开启过高的并发反而可能会影响任务的运行效率,此时就可以使用 Dispatchers.Default。
  • Dispatchers.IO:使用一种较高并发的线程策略,当要执行的代码大多数时间是在阻塞和等待中,比如执行网络请求时,为了能够支持更高的并发数量,此时可使用 Dispatchers.IO。
  • Dispatchers.Main:表示不会开启子线程,并且这个值只能在 Android 项目中使用,会在 Android 主线程中执行代码。

在学习协程之前,如果需要在 UI 上实现显示网络请求结果的功能,那么要在开启 Thread 之后将结果回调,并使用 runOnUiThread 方法切换主线程。而通过线程参数则能够更简便的处理此问题。

事实上,以上所学的协程作用域构造器中,除了 coroutineScope 函数之外,其它函数都是可以指定这样一个线程参数的,只不过 withContext 函数是强制要求指定的。

示例2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class TestActivity : AppCompatActivity() {
private val TAG = "TAG_TestActivity"

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_test)

val job = Job()
// 在 CoroutineScope(job).launch 开启的协程中,可以做许多网络请求、I/O 读写等耗时操作,
// 为了便于代码阅读和扩展,通常会将部分代码抽取到单独的方法中,如 loadData()
CoroutineScope(job).launch(Dispatchers.Main) {
// 使用协程可以更优雅地实现异步任务,即使程序多次切换线程,也不需要像使用线程一样层层嵌套。
val result = getResult()
showUI(result)
val result1 = getResult()
showUI(result1)
}

// job.cancel()
}

// delay 是一个挂起函数,withContext 函数也是一个挂起函数
// 而挂起函数必须放在协程作用域或者另一个挂起函数中执行(所以挂起函数最终肯定是在协程中执行),
// 因此这里需要加上 suspend 关键字,
// suspend 关键字在 Kotlin 协程中仅仅起到提醒作用,比如提醒其他成员该方法是一个耗时操作需要在协程中执行。
private suspend fun getResult() =
withContext(Dispatchers.IO){
Log.d(TAG,"当前线程 Dispatchers.IO:${Thread.currentThread().name}")
delay(2000)
"操作成功"
}

private fun showUI(string: String){
Log.d(TAG,"当前线程 Dispatchers.Main:${Thread.currentThread().name}")
Log.d(TAG,string)
// 日志打印:
// 当前线程 Dispatchers.IO:DefaultDispatcher-worker-1
// 当前线程 Dispatchers.Main:main
// 操作成功
// 当前线程 Dispatchers.IO:DefaultDispatcher-worker-1
// 当前线程 Dispatchers.Main:main
// 操作成功
}
}

简化回调的写法

协程的主要用途是可以大幅度地提升并发编程的运行效率。但实际上,Kotlin 中的协程还可以对传统回调的写法进行优化,从而让代码变得更加简洁。

简化 HttpURLConnection 网络请求的回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* 回调机制基本上是依靠匿名类来实现的,但这种写法通常比较繁琐,
* 比如下面的代码,有多少个地方发起请求,就需要编写多少次这样的匿名类实现。
*/
HttpUtil.sendHttpRequest(address, object : HttpCallbackListener{
override fun onFinish(response: String) {
// 得到服务器返回的具体内容
showResponse(response)
}

override fun onError(e: Exception) {
// 在这里对异常情况进行处理
}
})

/**
* 在过去,可能确实没什么更加简单的写法了。但在 Kotlin 中,可借助 suspendCoroutine 函数来继续简化。
* suspendCoroutine 函数必须在协程作用域或挂起函数中才能调用,它接收一个 Lambda 表达式参数,主要作用是将当* 前协程立即挂起,然后在一个普通的线程中执行 Lambda 表达式中的代码,Lambda 表达式的参数列表上会传入一个
* Continuation 参数,调用它的 resume() 或 resumeWithException() 可以让协程恢复执行,
* -----------------------------------------------------------------------------------------
* 定义一个挂起函数 request()
*/
suspend fun request(address:String):String{
return suspendCoroutine { continuation ->
HttpUtil.sendHttpRequest(address,object:HttpCallbackListener{
override fun onFinish(response: String) {
continuation.resume(response)
}

override fun onError(e: Exception) {
continuation.resumeWithException(e)
}
})
}
}

/**
* 获取百度首页的响应数据
* 也是一个挂起函数,因此当调用 request() 时,当前的协程就会被立刻挂起,然后一直等待网络请求成功或失败后,当前协程才能恢复运行。这样即使不使用回调的写法,也能获得异步网络请求的响应数据,而如果请求失败,则会直接进入 catch 语句当中。
*/
suspend fun getBaiduResponse(){
try {
val response = request("https://www.baidu.com")
// 对服务器相应的数据进行处理
}catch (e:Exception){
// 对异常情况进行处理
}
}

原理解析

CoroutineScope(job).launch 方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}

launch 函数三个参数分别为:

  • CoroutineContext:含义是附属到协程的上下文中
  • CoroutineStart:含义是启动器默认使用 CoroutineStart.DEFAULT
  • 用 suspend 修饰的 CoroutineScope 高阶函数:实质上就是要执行的协程代码块

在 launch 函数中通过 newCoroutineContext 函数创建新的协程上下文:

1
2
3
4
5
6
7
@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = foldCopies(coroutineContext, context, true)
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}

通过 CoroutineScope 的扩展函数将 CoroutineContext 添加进来,之后会调用 coroutine.start 函数启动协程:

1
2
3
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}

start 函数最终会调用 startCoroutineCancellable 函数:

1
2
3
4
@InternalCoroutinesApi
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}

这里创建了一个未被拦截的 Continuation,Continuation 有延续、续集的意思,这里可以理解为当协程挂起时会将代码分割成若干个 Continuation。当协程挂起时,执行结束后会通过一个 Continuation 来告诉协程应从哪个地方继续执行。resumeCancellableWith 最终又会执行 BaseContinuationImpl 的 resumeWith 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted()
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
completion.resumeWith(outcome)
return
}
}
}
}

上述代码通过 invokeSuspend 函数来执行 suspend 中的代码段,如果代码段中执行了挂起方法就会直接返回,挂起函数最终会通过 complete 函数进行恢复。


备注

参考资料

第一行代码(第3版)

官方文档

官方中文翻译站

《Android Jetpack开发 原理解析与应用实战》

相关文章

Retroft 使用

传送门GitHub

欢迎关注微信公众号:非也缘也