Programming Android with Kotlin Achieving Structured Concurrency with Coroutines

Table of Contents

start
<2022-09-15 Thu 09:26>

1. Coroutine

Conceptually, a job is a cancellable thing with a life-cycle that culminates in its completion.

async returns a Deferred instance, which is a specialized Job with a return value.

A CoroutineScope is an object that plays the role of the parent in structured concurrency–its purpose is to manage and monitor the coroutines you create inside it.

interface CoroutineScope {
  val coroutineContext: CoroutineContext
}

In other words, a CoroutineScope is a container for a CoroutineContext.

In practice, you'll most often use a special context element to control which thread, or which thread poll, will execute your coroutine(s).

Cancellation: designed or failure

Make callback suspendable with suspendCancellableCoroutine:

suspend fun Call.await() = suspendCancellableCoroutine<ResponseBody?> { continuation ->
  continuation.invokeOnCancellation {
    cancel()
  }

  enqueue(object: Callback {
    override fun onResponse(call: Call, response: Response) {
      continuation.resume(response.body())
    }

    override fun onFailure(call: Call, e: IOException) {
      continuation.resumeWithException(e)
    }
  })
}

Catch the unhandled coroutine exceptions with CoroutineExceptionHandler:

fun main() = runBlocking {
  val ceh = CoroutineExceptionHandler { _, exception ->
    println("Caught $exception with suppressed ${exception.suppressed.contentToString()}")
  }
  val scope = CoroutineScope(coroutineContext + ceh + Job())

  val job = scope.launch {
    try {
      throw AssertionError()
    } finally {
      withContext(NonCancellable) {
        println("Throwing exception from finally")
        throw ArithmeticException() // This exception is added to the `suppressed` list of the previous exception.
      }
    }
  }
  job.join()
}

Stop exceptions' propagation with SupervisorJob:

fun main() = runBlocking {
  val ceh = CoroutineExceptionHandler { _, exception ->
    println("Caught $exception with suppressed ${exception.suppressed.contentToString()}")
  }
  val supervisor = SupervisorJob()
  val scope = CoroutineScope(coroutineContext + supervisor)
  with(scope) {
    // upon could be replaced with supervisorScope
    val firstChild = launch(ceh) {
      println("First child is failing")
      throw AssertionError("First child is cancelled")
    }

    val secondChild = launch {
      firstChild.join()

      delay(10)
      println("First child is cancelled: ${firstChild.isCancelled}, but second one is still active")
    }

    // wait until the second child completes
    secondChild.join()
  }
}

withContext could be replaced with coroutineScope

Use runCatching instead of try/catch:

scope.launch {
  val result = runCatching {
    regularFunctionWhichCanThrowException()
  }

  if (result.isSuccess) {
    // no exception was thrown
  } else {
    // exception was thrown
  }
}
  • If the failure of a child should also cancel other children, use a regular scope. Otherwise, use a supervisor scope.
  • async exposes exceptions, which can be caught by wrapping the await call in a try/catch. On the other hand, launch treats uncaught exceptions as unhandled, which can be handled using a CEH.

Unknown concepts or API:

  • Job.join(): wait the job complete
  • yield: give a chance to suspend unsuspendable block

2. Channel

2.1. What is a Channel?

A Channel can be see just like that: a queue with suspending functions send and receive.

2.2. How to use a Channel without buffer?

fun main() = runBlocking {
  val channel = Channel<Int>()
  launch {
    for (x in 1..5) channel.send(x * x)
    channel.close()
  }

  for (y in channel) println(y)
}

2.3. Which Channel flavors does coroutine provide?

  1. RENDEZVOUS: 0
  2. UNLIMITED
  3. CONFLATED: 1, just recevie the last value
  4. BUFFERED: when over capacity, receive start

3. Flow

3.1. How to make a flow from callbacked api?

fun messageFlow() = callbackFlow<Message> {
  // 1. Instantiate the "callback." In this case, it's an observer.
  val cb = object : Callback {
    override fun onMessage(message: Message) {
      trySend(message)
    }

    override fun onClosed() {
      channel.close()
    }

    override fun onFailure(t: Throwable) {
      cancel(CancellationException("Failure", t))
    }
  }
  // 2. Register that callback using the available api.
  addCallback(cb)
  // 3. Listen for close event using `awaitClose`, and provide a relevant action
  // to take in this case. Most probably, you'll have to unregister the callback.
  awaitClose { removeCallback(cb) }
}

3.2. How to transform a stream of values concurrently?

suspend fun transform(loc: Location): Content = withContext(Dispatchers.IO) {
  // do some work
}

fun main() = runBlocking {
  val contentFlow = locationsFlow.map {
    flow {
      emit(transform(it))
    }
  }.flattenMerge(4)

  val contents = contentFlow.toList()
}

3.3. How to implement a bufferTimeout operator?

fun <T> Flow<T>.bufferTimeout(timeoutMillis: Long, capacity: Int = 16): Flow<List<T>> = flow {
  require(capacity > 0) { "Buffer capacity must be positive" }
  require(timeoutMillis > 0) { "Timeout must be positive" }

  coroutineScope {
    // Consume the upstram flow and send them over a channel.
    val channel = produceIn(this)
    val buffer = mutableListOf<T>()
    val ticker = ticker(timeoutMillis)

    suspend fun flush() {
      if (buffer.isNotEmpty()) {
        emit(buffer.toList())
        buffer.clear()
      }
    }

    try {
      whileSelect {
        channel.onReceive {
          buffer.add(it)
          if (buffer.size == capacity) {
            flush()
          }
          true
        }
        ticker.onReceive {
          flush()
          true
        }
      } catch (e: ClosedReceiveChannelException) {
        // When  the  upstream  flow  collection  completes,  the  coroutine  started  with `produceIn`
        // will still attempt to read from that flow, and a `ClosedReceiveChannelException`  will
         // be  raised.  So  we  catch  that  exception, and we know that we should flush the content
        // of the buffer.
        flush()
      } finnaly {
        channel.cancel()
        ticker.cancel()
      }
    }
  }
}

suspend fun main() {
  val flow = (1..100).asFlow().onEach { delay(10) }
  val startTime = System.currentTimeMillis()
  flow.bufferTimeout(50, 10).collect {
    println("${System.currentTimeMillis() - startTime} ms: $it")
  }
}

3.4. A try/catch statement inside a flow builder might catch downstream exceptions – which includes exceptions raised during the collection of the flow.

3.5. When to use try/catch with the flow?

The try/catch block should only be used to surround the collector, to handle exceptions raised from the collector itself, or (possibly, although it’s not ideal) to handle exceptions raised from the flow.

try {
  aFlow.collect {
    // ...
  }
} catch (e: Exception) {
  // ...
}

3.6. How to handle exceptions inside the flow?

Use catch operator.

3.7. How to build a flow with exceptions?

data class Image(val url: String)

sealed calss Result
data class Success(val image: Image) : Result()
data class Error(val url: String) : Result()

suspend fun fetchImage(url: String): Image {
  // simulate network request
  delay(10)

  // simulate an exception thrown by the network request
  if (url.contains("error")) {
    throw IOException("Network error")
  }

  return Image(url)
}

suspend fun fetchImageWithResult(url: String): Result {
  println("fetchImageWithResult: $url")
  return try {
    Success(fetchImage(url))
  } catch (e: IOException) {
    Error(url)
  }
}

fun main() = runBlocking {
  val urls = flowOf(
    "https://www.google.com",
    "https://www.google.com/error",
    "https://www.google.com"
  )

  urls.map { url ->
    fetchImageWithResult(url)
  }.collect {
    println("Result: $it")
  }
}

3.8. How to implement a mapWithRetry operator?

fun <T, R: Any> Flow<T>.mapWithRetry(
  action: suspend (T) -> R,
  predicate: suspend (R, attempt: Int) -> Boolean
) = map { data ->
  var attempt = 0
  var shallRetry: Boolean
  var lastValue: R? = null

  do {
    val tr = action(data)
    shallRetry = predicate(tr, ++attempt)
    if (!shallRetry) {
      lastValue = tr
    }
  } while (shallRetry)
  return@map lastValue
}

fun main = runBlocking {
  val flow = flowOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  flow.mapWithRetry(
    action = { value ->
      println("action: $value")
      value
    },
    predicate = { value, attempt ->
      value % 2 == 0 && attempt < 3
    }
  ).collect {
    println("collect: $it")
  }
}

3.9. How to share emitted values among several collectors?

SharedFlow

3.10. Why are there two methods to emit values exposed by MutableSharedFlow?

默认情况下,当 MutableSharedFlow 使用 emit 发射一个值时,它会暂停直到所有订阅者开始处理这个值。

然而,有时这并不是你想做的。你会发现你必须从非暂停的代码中发射值的情况。因此,出现=了tryEmit=,它试图立即发射一个值,如果成功则返回真,否则返回假。

3.11. How to replay values for new collectors?

一个 replay > 0 的共享流在内部使用一个缓存,其工作方式与通道类似。例如,如果你创建了一个 replay = 3 的共享流,前三个 emit 调用就不会暂停。在这种情况下,=emit= 和 tryEmit 做的是完全一样的事情:它们向缓存中添加一个新的值。

默认情况下,当重放缓存已满时,=emit= 会暂停,直到所有订阅者开始处理缓存中最旧的值。至于 =tryEmit=,它返回 =false=,因为它不能将值添加到缓存中。如果你不自己跟踪那第四个值,这个值就会丢失。

这种行为(当重放缓存已满时)可以被改变。你也可以选择丢弃缓存中最老的值或正在被添加到缓存中的值。在这两种情况下,=emit= 都不暂停,=tryEmit= 返回 =true=。因此,在缓冲区溢出时有三种可能的行为:暂停、放弃最旧的、放弃最新的。

MutableSharedFlow(replay = 3, onBufferOverflow = BufferOverflow.DROP_OLDEST)

BufferOverflow is an enum with three possible values: SUSPEND, DROP_OLDEST, and DROP_LATEST. If you don’t specify a value for onBufferOverflow, SUSPEND is the default strategy. Buffer values