Kotlin 协程是在 Kotlin 1.3 版本中引入的,但是协程并不是 Kotlin 语言所特有的。
协程(Coroutines)是一种并发设计模式,使用它可以在 Android 平台上简化异步执行的代码。可以将它简单地理解成一种轻量级的线程,协程和线程属于一个层级的概念。要知道,之前所学习的线程是非常重量级的,需要依靠操作系统的调度才能实现不同线程之间的切换。而使用协程却可以仅在编程语言的层面就能实现不同协程之间的切换,从而大大提升了并发编程的运行效率。
协程允许我们在单线程模式下模拟多线程编程的效果,使用协程可以写出上下两行看似同步却异步执行的代码,代码执行时的挂起与恢复完全是由编程语言来控制的,和操作系统无关。这就是协程的非阻塞时挂起。这种特性使得高并发程序的运行效率得到了极大地提升。
一些处理异步的方式:
线程:常用,简便。
资源消耗、数量受限、平台支持(JS 无线程)、调试使用不便。
回调:层次调用。
Futures、Promise:
Rx 响应式扩展:可观察流的设计思想,优雅便捷。
协程:易学、易用。
为什么要使用协程?
轻量、高效。
简单、好用。
可以用看起来同步的代码写出实质上异步的操作。(将后台任务,函数标记为 suspend,然后用 withoutcontent 切换线程。)
耗时函数自动后台,从而提高性能。
线程的【自动切回】
Kotlin for Java 的协程并不属于广义的协程。他是一个框架,线程框架,基于 Java 的线程框架,是上层封装,归根结底只是为开发者提供的一种线程 API,可以更方便的使用 Java 中的线程。(协程、Executor、RxJava。它们切线程,也是靠 Handler。) 不管是 Kotlin 还是 Groovy,不管上层怎么做,但最后都是基于 JVM,所以最后都是线程。 协程是一种在程序中处理并发任务的方案,也是这种方案的一个组件。
并发;宏观性的概念。一起做,并且是一件事情,不一定是同时做的。 并行;微观。同一时间,同一时刻。 协程不存在线程,协程里没有并行任务,线程可以,多线程同时工作。 协程不需要在意并发和并行的问题和概念,但一些问题还是跑不掉。
线程的问题,协程依然有,但这是线程的问题,不是协程的问题。 学协程的时候不用关注线程安全的问题,还有并发、并行的问题。
活跃的线程是 GC Root,AsyncTask(API 30 中已被废弃) 除了持有外部引用时,更因为内部有活跃的线程,而 new OnClickListener 没有,所以不会内存泄漏。
协程泄露,本质就是一种线程泄漏,也就是一个内存泄漏。不用的协程要取消掉,通过 job.cancle()。Job 对象就是用来取消的。
1 2 3 4 5 6 7 val scope = MainScopeScope.lauch{ } Scope.cancle()
协程的基本用法 添加依赖 首先要添加依赖库:
1 2 3 implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.1" implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.1.1"
GlobalScope.launch 函数 最简单的开启一个协程的方式:使用 GlobalScope.launch 函数,它可以创建一个协程的作用域。并且创建的永远是顶层协程,这一点和线程比较像,因为线程也没有层级这一说,永远是顶层的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 fun main () { GlobalScope.launch { println("codes run in coroutine scope" ) delay(1500 ) println("codes run in coroutine scope finished" ) } Thread.sleep(1000 ) }
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 fun testLaunch () { val time:Long = measureTimeMillis { GlobalScope.launch { Thread.sleep(1000 ) println("1.${Thread.currentThread()} " ) } GlobalScope.launch { Thread.sleep(1000 ) println("2.${Thread.currentThread()} " ) } println("3,${Thread.currentThread()} " ) Thread.sleep(2200 ) } println("函数总耗时:$time " ) }
示例2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 class TestActivity :AppCompatActivity (){ private val TAG = "TAG_TestActivity" override fun onCreate (savedInstanceState: Bundle ?) { super .onCreate(savedInstanceState) setContentView(R.layout.activity_test) GlobalScope.launch { Log.d(TAG,"当前线程1:${Thread.currentThread()} " ) Log.d(TAG,"当前线程2:${Thread.currentThread().name} " ) } GlobalScope.launch(Dispatchers.IO) { Log.d(TAG,"当前线程3:${Thread.currentThread().name} " ) } } }
runBlocking 函数 借助 runBlocking 函数可以让应用程序在协程中所有代码都运行完了之后再结束。
注意:此函数通常只应该在测试环境下使用,在正式环境中使用容易产生一些性能上的问题。
1 2 3 4 5 6 7 8 9 fun main () { runBlocking { println("codes run in coroutine scope" ) delay(1500 ) println("codes run in coroutine scope finished" ) } }
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 fun testRunBlocking () { val time:Long = measureTimeMillis { runBlocking { println("1.${Thread.currentThread()} " ) delay(500 ) println("2.${Thread.currentThread()} " ) } println("3,${Thread.currentThread()} " ) } println("函数总耗时:$time " ) }
launch 函数 使用 launch 函数可以创建多个协程,它必须在协程的作用域中才能调用,其次,它会在当前协程的作用域下创建子协程。子协程的特点是如果外层作用域的协程结束了,该作用域下的所有子协程也会一同结束。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 fun main () { runBlocking { launch { println("launch1" ) delay(1000 ) println("launch1 finished" ) } launch { println("launch2" ) delay(1000 ) println("launch2 finished" ) } } test() } fun test () { val start = System.currentTimeMillis() runBlocking { repeat(100000 ){ launch { println("." ) } } } val end = System.currentTimeMillis() println(end - start) }
suspend 关键字 在 launch 函数中编写的代码是拥有协程作用域的,但是提取到一个单独的函数中就没有协程作用域了。
这时可以使用 suspend 关键字,它可以将任意函数声明成挂起函数,而挂起函数之间都是可以相互调用的。
suspend;并不是用来切线程的
关键作用;标记,或者说提醒
标记;线程切回时,切回到哪。
提醒;提示需要特别对待,说明此函数是个耗时函数,需要在协程里面。
1 2 3 4 suspend fun printDot () { println("." ) delay(1000 ) }
coroutineScope 函数 但 suspend 关键字无法提供协程作用域,也就无法调用像 launch 函数。这时可使用 coroutineScope 函数,它也是一个挂起函数,因此可以在任何其他挂起函数中调用。它的特点是会继承外部的协程作用域并创建一个子作用域。
1 2 3 4 5 6 7 8 9 10 11 12 fun main () { runBlocking { printDot() } } suspend fun printDot () = coroutineScope{ launch { println("." ) delay(1000 ) } }
coroutineScope 函数和 runBlocking 函数还有点类似,它可以保证其作用域内的所有代码和子协程在全部执行完之前,会一直阻塞当前协程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 fun main () { runBlocking { coroutineScope { launch { for (i in 1. .10 ){ println(i) delay(1000 ) } } } println("coroutineScope finished" ) } println("runBlocking finished" ) }
但是,coroutineScope 函数只会阻塞当前协程,既不影响其它协程,也不影响任何线程,因此是不会造成任何性能上的问题的。而 runBlocking 函数由于会阻塞当前线程,而我们又恰好在主线程中调用它的话,那么就有可能会导致界面卡死的情况,所以不太推荐在实际项目中使用。
示例2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class TestActivity :AppCompatActivity (){ private val TAG = "TAG_TestActivity" override fun onCreate (savedInstanceState: Bundle ?) { super .onCreate(savedInstanceState) setContentView(R.layout.activity_test) val job = Job() CoroutineScope(job).launch { Log.d(TAG,"当前线程:${Thread.currentThread().name} " ) } job.cancel() } }
CoroutineScope 与 coroutineScope 的区别?
###withTimeout
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 fun testTimeOut () = runBlocking{ val time:Long = measureTimeMillis { withTimeout(3000 ){ launch { repeat(200 ){ println("${Thread.currentThread()} ,$it " ) delay(300 ) } } println("外部:${Thread.currentThread()} " ) } withTimeoutOrNull(1000 ){ } } }
更多的作用域构造器 上面学习了几种作用域构造器,它们都可以用于创建一个新的协程作用域。不过,GlobalScope.launch 和 runBlocking 函数是可以在任意地方调用的,coroutineScope 函数可以在协程作用域或挂起函数中调用,而 launch 函数只能在协程作用域中调用。
Job 对象 其中,GlobalScope.launch 由于每次创建的都是顶层协程,一般也不太建议使用,因为它管理起来成本太高了,当 Activity 关闭时,需要逐个调用所有已创建协程的 cancel() 方法。除非非常明确就是要创建顶层函数。
1 2 3 4 5 6 7 8 9 10 fun main () { val job = GlobalScope.launch { } job.cancel() }
实际项目中比较常用的写法如下:
1 2 3 4 5 6 7 8 9 10 11 12 fun main () { val job = Job() val scope = CoroutineScope(job) scope.launch { } job.cancel() }
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 fun testJob () = runBlocking{ val time:Long = measureTimeMillis { val l1:Job = launch { println("L1: ${Thread.currentThread()} " ) } val l2:Job = launch { println("L2: ${Thread.currentThread()} " ) } val a2:Deferred<Unit > = async { repeat(5 ){ println("A2: ${Thread.currentThread()} , $it " ) delay(200 ) } } val a1:Deferred<Unit > = async { println("A1: ${Thread.currentThread()} " ) } println("外部:${Thread.currentThread()} " ) } println("函数总耗时:$time " ) }
async 函数 当调用 launch 函数时可以创建一个新的协程,但 launch 函数只能用于执行一段逻辑,却不能获取执行的结果,因为它的返回值永远是一个 Job 对象。这时,可使用 async 函数实现。
async 函数必须在协程作用域当中调用,它会创建一个新的子协程并返回一个 Deferred 对象(一个封装的数据类型),并通过此对象的 await() 即可。
1 2 3 4 5 6 7 8 fun main () { runBlocking { val result = async { 5 + 5 }.await() println(result) } }
在调用 async 函数后,代码块中的代码会立刻开始执行。当调用 await() 时,如果代码块中的代码还没执行完,那么 await() 会将当前协程阻塞住,直到可以获得 async 函数的执行结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 fun main () { runBlocking { val start = System.currentTimeMillis() val result = async { delay(1000 ) 5 + 5 }.await() val result2 = async { delay(1000 ) 4 + 6 }.await() println("result id ${result + result2} ." ) val end = System.currentTimeMillis() println("cost ${end - start} ms." ) } }
将上面代码改为并行关系,从而提升运行效率:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 fun main () { runBlocking { val start = System.currentTimeMillis() val result = async { delay(1000 ) 5 + 5 } val result2 = async { delay(1000 ) 4 + 6 } println("result id ${result.await() + result2.await()} ." ) val end = System.currentTimeMillis() println("cost ${end - start} ms." ) } }
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 fun testAwait () = runBlocking{ val time:Long = measureTimeMillis { val a2:Deferred<Int > = async { println("a2: ${Thread.currentThread()} " ) delay(2000 ) 200 } val a1:Deferred<Int > = async { getA1() } println("结果: ${Thread.currentThread()} -- ${a2.await()} ${a1.await()} " ) } println("函数总耗时: $time " ) } suspend fun getA1 () :Int { println("a1: ${Thread.currentThread()} " ) delay(1000 ) return 100 }
示例2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 class TestActivity : AppCompatActivity () { private val TAG = "TAG_TestActivity" override fun onCreate (savedInstanceState: Bundle ?) { super .onCreate(savedInstanceState) setContentView(R.layout.activity_test) val job = Job() CoroutineScope(job).launch { val startTime = System.currentTimeMillis() val result = async { delay(2000 ) "操作成功" } val result2 = async { delay(1000 ) "获取成功" } Log.d(TAG, "执行结果:${result.await()} -${result2.await()} " ) val endTime = System.currentTimeMillis() Log.d(TAG, "执行时间:${endTime-startTime} " ) } } }
withContext 函数 一个比较特殊的作用域构造器,它是一个挂起函数,大体可将它理解成 async 函数的一种简化版写法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 fun main () { runBlocking { val result = withContext(Dispatchers.Default){ 5 + 5 } println(result) } }
线程参数的意义在于:虽然不需要像传统编程那样开启多线程来执行并发任务,只需要在一个线程下开启多个协程即可。但是,这并不意味着就不再需要开启线程了,比如 Android 中执行耗时任务需要在子线程当中,那我们在主线程中开启的协程就无法胜任了。这时,便可以通过线程参数来为协程指定一个具体的运行线程。
线程参数主要有以下 3 种值可选:
Dispatchers.Default:使用一种默认低并发的线程策略,当要执行的代码属于计算密集型任务时,开启过高的并发反而可能会影响任务的运行效率,此时就可以使用 Dispatchers.Default。
Dispatchers.IO:使用一种较高并发的线程策略,当要执行的代码大多数时间是在阻塞和等待中,比如执行网络请求时,为了能够支持更高的并发数量,此时可使用 Dispatchers.IO。
Dispatchers.Main:表示不会开启子线程,并且这个值只能在 Android 项目中使用,会在 Android 主线程中执行代码。
在学习协程之前,如果需要在 UI 上实现显示网络请求结果的功能,那么要在开启 Thread 之后将结果回调,并使用 runOnUiThread 方法切换主线程。而通过线程参数则能够更简便的处理此问题。
事实上,以上所学的协程作用域构造器中,除了 coroutineScope 函数之外,其它函数都是可以指定这样一个线程参数的,只不过 withContext 函数是强制要求指定的。
示例2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 class TestActivity : AppCompatActivity () { private val TAG = "TAG_TestActivity" override fun onCreate (savedInstanceState: Bundle ?) { super .onCreate(savedInstanceState) setContentView(R.layout.activity_test) val job = Job() CoroutineScope(job).launch(Dispatchers.Main) { val result = getResult() showUI(result) val result1 = getResult() showUI(result1) } } private suspend fun getResult () = withContext(Dispatchers.IO){ Log.d(TAG,"当前线程 Dispatchers.IO:${Thread.currentThread().name} " ) delay(2000 ) "操作成功" } private fun showUI (string: String ) { Log.d(TAG,"当前线程 Dispatchers.Main:${Thread.currentThread().name} " ) Log.d(TAG,string) } }
简化回调的写法 协程的主要用途是可以大幅度地提升并发编程的运行效率。但实际上,Kotlin 中的协程还可以对传统回调的写法进行优化,从而让代码变得更加简洁。
简化 HttpURLConnection 网络请求的回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 HttpUtil.sendHttpRequest(address, object : HttpCallbackListener{ override fun onFinish (response: String ) { showResponse(response) } override fun onError (e: Exception ) { } }) suspend fun request (address:String ) :String{ return suspendCoroutine { continuation -> HttpUtil.sendHttpRequest(address,object :HttpCallbackListener{ override fun onFinish (response: String ) { continuation.resume(response) } override fun onError (e: Exception ) { continuation.resumeWithException(e) } }) } } suspend fun getBaiduResponse () { try { val response = request("https://www.baidu.com" ) }catch (e:Exception){ } }
原理解析 CoroutineScope(job).launch 方法源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 public fun CoroutineScope.launch ( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope .() -> Unit ) : Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true ) coroutine.start(start, coroutine, block) return coroutine }
launch 函数三个参数分别为:
CoroutineContext:含义是附属到协程的上下文中
CoroutineStart:含义是启动器默认使用 CoroutineStart.DEFAULT
用 suspend 修饰的 CoroutineScope 高阶函数:实质上就是要执行的协程代码块
在 launch 函数中通过 newCoroutineContext 函数创建新的协程上下文:
1 2 3 4 5 6 7 @ExperimentalCoroutinesApi public actual fun CoroutineScope.newCoroutineContext (context: CoroutineContext ) : CoroutineContext { val combined = foldCopies(coroutineContext, context, true ) val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null ) debug + Dispatchers.Default else debug }
通过 CoroutineScope 的扩展函数将 CoroutineContext 添加进来,之后会调用 coroutine.start 函数启动协程:
1 2 3 public fun <R> start (start: CoroutineStart , receiver: R , block: suspend R .() -> T ) { start(block, receiver, this ) }
start 函数最终会调用 startCoroutineCancellable 函数:
1 2 3 4 @InternalCoroutinesApi public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) { createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit )) }
这里创建了一个未被拦截的 Continuation,Continuation 有延续、续集的意思,这里可以理解为当协程挂起时会将代码分割成若干个 Continuation。当协程挂起时,执行结束后会通过一个 Continuation 来告诉协程应从哪个地方继续执行。resumeCancellableWith 最终又会执行 BaseContinuationImpl 的 resumeWith 函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public final override fun resumeWith (result: Result <Any ?>) { var current = this var param = result while (true ) { probeCoroutineResumed(current) with(current) { val completion = completion!! val outcome: Result<Any?> = try { val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() if (completion is BaseContinuationImpl) { current = completion param = outcome } else { completion.resumeWith(outcome) return } } } }
上述代码通过 invokeSuspend 函数来执行 suspend 中的代码段,如果代码段中执行了挂起方法就会直接返回,挂起函数最终会通过 complete 函数进行恢复。
备注 参考资料 :
第一行代码(第3版)
官方文档
官方中文翻译站
《Android Jetpack开发 原理解析与应用实战》
相关文章 :
Retroft 使用
传送门 :GitHub
欢迎关注微信公众号:非也缘也