Programming Android with Kotlin Achieving Structured Concurrency with Coroutines
Table of Contents
- 1. Coroutine
- 2. Channel
- 3. Flow
- 3.1. How to make a flow from callbacked api?
- 3.2. How to transform a stream of values concurrently?
- 3.3. How to implement a
bufferTimeout
operator? - 3.4. A
try/catch
statement inside a flow builder might catch downstream exceptions – which includes exceptions raised during the collection of the flow. - 3.5. When to use
try/catch
with the flow? - 3.6. How to handle exceptions inside the flow?
- 3.7. How to build a flow with exceptions?
- 3.8. How to implement a
mapWithRetry
operator? - 3.9. How to share emitted values among several collectors?
- 3.10. Why are there two methods to emit values exposed by
MutableSharedFlow
? - 3.11. How to
replay
values for new collectors?
- start
1. Coroutine
Conceptually, a job
is a cancellable thing with a life-cycle that culminates in its completion.
async
returns a Deferred
instance, which is a specialized Job
with a return value.
A CoroutineScope
is an object that plays the role of the parent in structured concurrency–its purpose is to manage and monitor the coroutines you create inside it.
interface CoroutineScope { val coroutineContext: CoroutineContext }
In other words, a CoroutineScope
is a container for a CoroutineContext
.
In practice, you'll most often use a special context element to control which thread, or which thread poll, will execute your coroutine(s).
Cancellation: designed or failure
Make callback suspendable with suspendCancellableCoroutine
:
suspend fun Call.await() = suspendCancellableCoroutine<ResponseBody?> { continuation -> continuation.invokeOnCancellation { cancel() } enqueue(object: Callback { override fun onResponse(call: Call, response: Response) { continuation.resume(response.body()) } override fun onFailure(call: Call, e: IOException) { continuation.resumeWithException(e) } }) }
Catch the unhandled coroutine exceptions with CoroutineExceptionHandler
:
fun main() = runBlocking { val ceh = CoroutineExceptionHandler { _, exception -> println("Caught $exception with suppressed ${exception.suppressed.contentToString()}") } val scope = CoroutineScope(coroutineContext + ceh + Job()) val job = scope.launch { try { throw AssertionError() } finally { withContext(NonCancellable) { println("Throwing exception from finally") throw ArithmeticException() // This exception is added to the `suppressed` list of the previous exception. } } } job.join() }
Stop exceptions' propagation with SupervisorJob
:
fun main() = runBlocking { val ceh = CoroutineExceptionHandler { _, exception -> println("Caught $exception with suppressed ${exception.suppressed.contentToString()}") } val supervisor = SupervisorJob() val scope = CoroutineScope(coroutineContext + supervisor) with(scope) { // upon could be replaced with supervisorScope val firstChild = launch(ceh) { println("First child is failing") throw AssertionError("First child is cancelled") } val secondChild = launch { firstChild.join() delay(10) println("First child is cancelled: ${firstChild.isCancelled}, but second one is still active") } // wait until the second child completes secondChild.join() } }
withContext
could be replaced with coroutineScope
Use runCatching
instead of try/catch
:
scope.launch { val result = runCatching { regularFunctionWhichCanThrowException() } if (result.isSuccess) { // no exception was thrown } else { // exception was thrown } }
- If the failure of a child should also cancel other children, use a regular scope. Otherwise, use a supervisor scope.
async
exposes exceptions, which can be caught by wrapping theawait
call in atry/catch
. On the other hand,launch
treats uncaught exceptions as unhandled, which can be handled using a CEH.
Unknown concepts or API:
Job.join()
: wait the job completeyield
: give a chance to suspend unsuspendable block
2. Channel
2.1. What is a Channel
?
A Channel
can be see just like that: a queue with suspending functions send
and receive
.
2.2. How to use a Channel
without buffer?
fun main() = runBlocking { val channel = Channel<Int>() launch { for (x in 1..5) channel.send(x * x) channel.close() } for (y in channel) println(y) }
2.3. Which Channel
flavors does coroutine provide?
- RENDEZVOUS: 0
- UNLIMITED
- CONFLATED: 1, just recevie the last value
- BUFFERED: when over capacity, receive start
3. Flow
3.1. How to make a flow from callbacked api?
fun messageFlow() = callbackFlow<Message> { // 1. Instantiate the "callback." In this case, it's an observer. val cb = object : Callback { override fun onMessage(message: Message) { trySend(message) } override fun onClosed() { channel.close() } override fun onFailure(t: Throwable) { cancel(CancellationException("Failure", t)) } } // 2. Register that callback using the available api. addCallback(cb) // 3. Listen for close event using `awaitClose`, and provide a relevant action // to take in this case. Most probably, you'll have to unregister the callback. awaitClose { removeCallback(cb) } }
3.2. How to transform a stream of values concurrently?
suspend fun transform(loc: Location): Content = withContext(Dispatchers.IO) { // do some work } fun main() = runBlocking { val contentFlow = locationsFlow.map { flow { emit(transform(it)) } }.flattenMerge(4) val contents = contentFlow.toList() }
3.3. How to implement a bufferTimeout
operator?
fun <T> Flow<T>.bufferTimeout(timeoutMillis: Long, capacity: Int = 16): Flow<List<T>> = flow { require(capacity > 0) { "Buffer capacity must be positive" } require(timeoutMillis > 0) { "Timeout must be positive" } coroutineScope { // Consume the upstram flow and send them over a channel. val channel = produceIn(this) val buffer = mutableListOf<T>() val ticker = ticker(timeoutMillis) suspend fun flush() { if (buffer.isNotEmpty()) { emit(buffer.toList()) buffer.clear() } } try { whileSelect { channel.onReceive { buffer.add(it) if (buffer.size == capacity) { flush() } true } ticker.onReceive { flush() true } } catch (e: ClosedReceiveChannelException) { // When the upstream flow collection completes, the coroutine started with `produceIn` // will still attempt to read from that flow, and a `ClosedReceiveChannelException` will // be raised. So we catch that exception, and we know that we should flush the content // of the buffer. flush() } finnaly { channel.cancel() ticker.cancel() } } } } suspend fun main() { val flow = (1..100).asFlow().onEach { delay(10) } val startTime = System.currentTimeMillis() flow.bufferTimeout(50, 10).collect { println("${System.currentTimeMillis() - startTime} ms: $it") } }
3.4. A try/catch
statement inside a flow builder might catch downstream exceptions – which includes exceptions raised during the collection of the flow.
3.5. When to use try/catch
with the flow?
The try/catch
block should only be used to surround the collector, to handle exceptions raised from the collector itself, or (possibly, although it’s not ideal) to handle exceptions raised from the flow.
try { aFlow.collect { // ... } } catch (e: Exception) { // ... }
3.6. How to handle exceptions inside the flow?
Use catch
operator.
3.7. How to build a flow with exceptions?
data class Image(val url: String) sealed calss Result data class Success(val image: Image) : Result() data class Error(val url: String) : Result() suspend fun fetchImage(url: String): Image { // simulate network request delay(10) // simulate an exception thrown by the network request if (url.contains("error")) { throw IOException("Network error") } return Image(url) } suspend fun fetchImageWithResult(url: String): Result { println("fetchImageWithResult: $url") return try { Success(fetchImage(url)) } catch (e: IOException) { Error(url) } } fun main() = runBlocking { val urls = flowOf( "https://www.google.com", "https://www.google.com/error", "https://www.google.com" ) urls.map { url -> fetchImageWithResult(url) }.collect { println("Result: $it") } }
3.8. How to implement a mapWithRetry
operator?
fun <T, R: Any> Flow<T>.mapWithRetry( action: suspend (T) -> R, predicate: suspend (R, attempt: Int) -> Boolean ) = map { data -> var attempt = 0 var shallRetry: Boolean var lastValue: R? = null do { val tr = action(data) shallRetry = predicate(tr, ++attempt) if (!shallRetry) { lastValue = tr } } while (shallRetry) return@map lastValue } fun main = runBlocking { val flow = flowOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) flow.mapWithRetry( action = { value -> println("action: $value") value }, predicate = { value, attempt -> value % 2 == 0 && attempt < 3 } ).collect { println("collect: $it") } }
3.9. How to share emitted values among several collectors?
SharedFlow
3.10. Why are there two methods to emit values exposed by MutableSharedFlow
?
默认情况下,当 MutableSharedFlow
使用 emit
发射一个值时,它会暂停直到所有订阅者开始处理这个值。
然而,有时这并不是你想做的。你会发现你必须从非暂停的代码中发射值的情况。因此,出现=了tryEmit=,它试图立即发射一个值,如果成功则返回真,否则返回假。
3.11. How to replay
values for new collectors?
一个 replay > 0
的共享流在内部使用一个缓存,其工作方式与通道类似。例如,如果你创建了一个 replay = 3
的共享流,前三个 emit
调用就不会暂停。在这种情况下,=emit= 和 tryEmit
做的是完全一样的事情:它们向缓存中添加一个新的值。
默认情况下,当重放缓存已满时,=emit= 会暂停,直到所有订阅者开始处理缓存中最旧的值。至于 =tryEmit=,它返回 =false=,因为它不能将值添加到缓存中。如果你不自己跟踪那第四个值,这个值就会丢失。
这种行为(当重放缓存已满时)可以被改变。你也可以选择丢弃缓存中最老的值或正在被添加到缓存中的值。在这两种情况下,=emit= 都不暂停,=tryEmit= 返回 =true=。因此,在缓冲区溢出时有三种可能的行为:暂停、放弃最旧的、放弃最新的。
MutableSharedFlow(replay = 3, onBufferOverflow = BufferOverflow.DROP_OLDEST)
BufferOverflow
is an enum with three possible values: SUSPEND
, DROP_OLDEST
, and DROP_LATEST
. If you don’t specify a value for onBufferOverflow
, SUSPEND
is the default strategy.
Buffer values