協程(又名纖程),輕量級線程(建立在線程基礎上,屬於用戶態調用),非阻塞式編程(像同步編寫一樣),在用戶態內進行任務調度,避免與內核態過多交互問題,提高程序快速響應。協程使用掛起當前上下文替代阻塞,被掛起后的協程可以去運行其它active task,即協程可以被復用,相比於線程,減少了線程資源的大量浪費。
備注
掛起:保存當前運行狀態,釋放資源,此時協程可去做其它工作,可充分利用資源
阻塞:占用資源未釋放,等待狀態
基本使用:
fun runAsync()= runBlocking {
val time = measureTimeMillis {//系統函數統計時間
val one = async { doSomethingUsefulOne() }//異步調用,返回結果
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")//等待異步執行完成(await調用會掛起當前線程,等待執行結果完成后,通過調用resume恢復掛起前狀態)
}
println("Completed in $time ms")
}
//協程coroutines 調用的方法需要用suspend修飾,告訴編譯器此函數可以被掛起
suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // pretend we are doing something useful here
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // pretend we are doing something useful here, too
return 29
}
這里面沒有使用異步+回調,直接像寫同步代碼一樣,簡潔
launch 異步執行沒有返回結果,產生Job對象用於cancel,join處理
fun cancelCoroutine() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (isActive) { // cancellable computation loop
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
線程之間切換,使用withContext
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun jumpCor(){//創建單線程coroutines
newSingleThreadContext("Ctx1").use { ctx1 ->
newSingleThreadContext("Ctx2").use { ctx2 ->
runBlocking(ctx1) {
log("Started in ctx1")
withContext(ctx2) {
log("Working in ctx2")
}
log("Back to ctx1")
}
}
}
}
協程必須關聯CoroutineScope以便於管理追蹤,方法內創建Scope
suspend fun showSomeData() = coroutineScope {//此處coroutineScope屬於out scope的child scop
val data = async(Dispatchers.IO) { // IO task io線程調用操作
// ... load some UI data for the Main thread ...
}
withContext(Dispatchers.Main){//UI task UI更新
val result = data.await()
// display(result)
}
}
協程上下文環境,CoroutineScope,CoroutineContext
每個協程運行需要在指定Scope內才能使用協程相關方法delay,asyc,launch,創建CoroutineScope ,runBlocking函數內部會創建CoroutineScope,系統提供GlobalScope,MainScope等輔助類創建Scope
也可以通過CoroutineContext和Job創建自己的CoroutineScope
fun sampleCreateCorountine(){
//create corountine scope
//自定義CoroutineScope
val coroutineContext = Dispatchers.Default
val job = Job()
val coroutineScope = CoroutineScope(coroutineContext + job)
//創建child scope
coroutineScope.launch {
}
//創建全局Scope
GlobalScope.launch (Dispatchers.Default+CoroutineName("global background thread")){
}
//創建主線程分發處理Scope
MainScope().launch {
}
}
類內部定義協程
1,直接繼承CoroutineScope
class SomethingWithLifecycle : CoroutineScope {
// 使用job來管理你的SomethingWithLifecycle的所有子協程
private val job = Job()
override val coroutineContext: CoroutineContext
get() = Dispatchers.Main + job
fun destory(){//退出取消
job.cancel()
}
}
2,直接使用已定義Scope
class CorMyActivity : AppCompatActivity(), CoroutineScope by MainScope() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
showSomeData()
}
/**
* Note how coroutine builders are scoped: if activity is destroyed or any of the launched coroutines
in this method throws an exception, then all nested coroutines are cancelled.
*/
fun showSomeData() = launch {
// <- extension on current activity, launched in the main thread
// ... here we can use suspending functions or coroutine builders with other dispatchers
// draw(data) // draw in the main thread
}
override fun onDestroy() {
super.onDestroy()
cancel()
}
}
Dispatchers,協程分發器:
fun dispatchTask()= runBlocking<Unit> {
// it inherits the context (and thus dispatcher) from the CoroutineScope that it is being launched from.
launch { // context of the parent, main runBlocking coroutine
println("main runBlocking : I'm working in thread ${Thread.currentThread().name}")
}
//執行coroutine是在調用者的線程,但是當在coroutine中第一個掛起之后,后面所在的線程將完全取決於
// 調用掛起方法的線程(如delay一般是由kotlinx.coroutines.DefaultExecutor中的線程調用)
//Unconfined在掛起后在delay的調用線程DefaultExecutor執行
launch(context = Dispatchers.Unconfined) { // not confined -- will work with main thread
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
}
// coroutines are launched in GlobalScope,uses shared background pool of threads
//uses the same dispatcher as GlobalScope.launch
//Dispatchers.Default 處理cup密集型任務,線程數為cpu內核數,最少為2,Dispatchers.IO 處理阻塞性IO,socket密集度任務,數量隨任務多少變化,默認最大數量64
launch(context = Dispatchers.Default) { // will get dispatched to DefaultDispatcher
println("Default : I'm working in thread ${Thread.currentThread().name}")
}
//creates a thread for the coroutine to run
launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
}
}
suspend 是如何工作的?
Kotlin 使用堆棧幀來管理要運行哪個函數以及所有局部變量。暫停協程時,
會復制並保存當前的堆棧幀以供稍后使用。恢復協程時,調度器會將堆棧幀從其保存位置復制回來,然后函數再次開始運行
協程間通信之channel
協程之間通過channel進行數據傳遞,生產者->消費者模式


例:
fun channelTest()= runBlocking { val channel = Channel<Int>() launch {//生產數據 for (x in 1..5) channel.send(x * x) channel.close() //關閉停止 } // 循環接收直到channnel close for (y in channel) println(y) println("Done!") }
生產者每生產一個數據就發送到channel里,消費者等待接收數據,
channel分類:
SendChannel:創建的producers類型屬於sendChannel實例
ReceiveChannel:創建的consumers類型屬於receiveChannel實例
Channel:繼承SendChannel和ReceiveChannel即可send,又可以receive數據
channel類型:
Unlimited channel:容量無限制,producer不斷生產數據,可能會產生OutOfMemoryException,consumer接收數據時,如果channel內數據為空則會掛起
Buffered channel:指定 channel size,當生產者的數據達到buffer size大小則send會掛起,直到channel內數據量小於size才能繼續生產數據
Rendezvous:是bufferred channel size=0,當producer生成數據send時如果沒有consumer接受,則producer會掛起直到consumer取走數據,才繼續send下一個數據,即實現同步傳遞數據功能
Conflated channel:producer不停地send數據,后面的數據會覆蓋前面已經存在的數據,consumer始終取到最新的數據
val rendezvousChannel = Channel<String>()//同步傳遞 val bufferedChannel = Channel<String>(10)//指定size pool val conflatedChannel = Channel<String>(Channel.CONFLATED)//channel內數據實時更新 val unlimitedChannel = Channel<String>(Channel.UNLIMITED)//無容量限制
協程結合Architecture ViewModel使用
class NewsViewModel: ViewModel() {
private val mApi:WebServer
init {
mApi = WebServer()
}
val dataNews: MutableLiveData<DataResource<NewsDataRsp>> by lazy {
// MutableLiveData<DataResource<NewsDataRsp>>().also {
// loadNewsData(minId=null)
// }
MutableLiveData<DataResource<NewsDataRsp>>()
}
fun loadNewsData(pageIndex:Int =1,countItem:Int = 20,minId:String?=null){
runCoroutine(dataNews){
val mp = mutableMapOf("encode" to "ywjh","source" to "app","sys" to "android","banner" to "banner",
"limit" to countItem.toString(),"version" to "7002000")
if(pageIndex>1 && false==minId.isNullOrEmpty()){
mp.put("min_id",minId)
}
val response = mApi.commonDataSourceApi.getNewsData(mp).execute()
return@runCoroutine response.body()!!
}
}
fun fetchNews(pageIndex:Int =1,countItem:Int = 20,minId:String){
val mp = mutableMapOf("encode" to "ywjh","source" to "app","sys" to "android","banner" to "banner",
"limit" to countItem.toString(),"version" to "7002000")
if(pageIndex>1 && false==minId.isNullOrEmpty()){
mp.put("min_id",minId)
}
val cor = CoroutineScope(Dispatchers.IO)
cor.launch {
try {
val response = mApi.commonDataSourceApi.getNewsData(mp).execute()
dataNews.postValue(DataResource(DataResource.Status.COMPLETED, response.body(), null))
} catch (exception: Exception) {
dataNews.postValue(DataResource(DataResource.Status.COMPLETED, null, exception))
}
}
}
suspend fun simpleGetData(pageIndex:Int =1,countItem:Int = 20,minId:String) = withContext(Dispatchers.IO) {
val mp = mutableMapOf("encode" to "ywjh","source" to "app","sys" to "android","banner" to "banner",
"limit" to countItem.toString(),"version" to "7002000")
if(pageIndex>1 && false==minId.isNullOrEmpty()){
mp.put("min_id",minId)
}
try {
val response = mApi.commonDataSourceApi.getNewsData(mp).execute()
dataNews.postValue(DataResource(DataResource.Status.COMPLETED, response.body(), null))
} catch (exception: Exception) {
dataNews.postValue(DataResource(DataResource.Status.COMPLETED, null, exception))
}
}
private fun <T> runCoroutine(correspondenceLiveData: MutableLiveData<DataResource<T>>, block: suspend () -> T) {
correspondenceLiveData.value = DataResource(DataResource.Status.LOADING, null, null)
GlobalScope.launch(Dispatchers.IO) {
try {
val result = block()
correspondenceLiveData.postValue(DataResource(DataResource.Status.COMPLETED, result, null))
} catch (exception: Exception) {
// val error = ErrorConverter.convertError(exception)
correspondenceLiveData.postValue(DataResource(DataResource.Status.COMPLETED, null, exception))
}
}
}
}
