I read that usage of Globalscope
is highly discouraged, here.
I have a simple use-case. For every kafka message (let's say a list of Ids) that I receive I have to split it and invoke a rest service simultaneously and wait for it to be done and proceed with other synchronous tasks. There is nothing else is in that application that requires coroutine. In this case, Can I just get away with it?
Note: This is not an android application. It's just a kafka stream processor running on server side. It's an ephemeral, stateless, containerized (Docker) application running in Kubernetes (Buzzword-compliant if you will)
You should scope your concurrency appropriately using structured concurrency. Your coroutines can leak if you don't do this. In your case, scoping them to the processing of a single message seems appropriate.
Here's an example:
/* I don't know Kafka, but let's pretend this function gets
* called when you receive a new message
*/
suspend fun onMessage(msg: Message) {
val ids: List<Int> = msg.getIds()
val jobs = ids.map { id ->
GlobalScope.launch { restService.post(id) }
}
jobs.joinAll()
}
If one of the calls to restService.post(id)
fails with an exception, the example will immediately rethrow the exception, and all the jobs that hasn't completed yet will leak. They will continue to execute (potentially indefinitely), and if they fail, you won't know about it.
To solve this, you need to scope your coroutines. Here's the same example without the leak:
suspend fun onMessage(msg: Message) = coroutineScope {
val ids: List<Int> = msg.getIds()
ids.forEach { id ->
// launch is called on "this", which is the coroutineScope.
launch { restService.post(id) }
}
}
In this case, if one of the calls to restService.post(id)
fails, then all other non-completed coroutines inside the coroutine scope will get cancelled. When you leave the scope, you can be sure that you haven't leaked any coroutines.
Also, because coroutineScope
will wait until all child-coroutines are done, you can drop the jobs.joinAll()
call.
Side note:
A convention when writing a function that start some coroutines, is to let the caller decide the coroutine scope using the receiver parameter. Doing this with the onMessage
function could look like this:
fun CoroutineScope.onMessage(msg: Message): List<Job> {
val ids: List<Int> = msg.getIds()
return ids.map { id ->
// launch is called on "this", which is the coroutineScope.
launch { restService.post(id) }
}
}