Coroutines launch on default dispatcher when they should not · Issue #2003 · Kot...
source link: https://github.com/Kotlin/kotlinx.coroutines/issues/2003
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Join GitHub today
GitHub is home to over 50 million developers working together to host and review code, manage projects, and build software together.
Sign upCoroutines launch on default dispatcher when they should not #2003
Closed
maxpert opened this issue on May 9 · 19 comments
Comments
In one of our production services we had a fixed thread-pool for handling blocking DB calls, and we are using val executor = ThreadPoolExecutor( affinity, MAX_IO_THREADS, DEFAULT_TTL_SECONDS_THREAD, TimeUnit.SECONDS, SynchronousQueue() ) { runnable -> Thread(runnable).also { it.name = "io-pool-${it.id}" it.isDaemon = true it.priority = Thread.NORM_PRIORITY } } val dispatcher = executor.asCoroutineDispatcher() Notice
DefaultExecutor . This has multiple problems:
Here is a simple example to reproduce this issue locally: import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import java.util.concurrent.Executors import java.util.concurrent.SynchronousQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit val executor = ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, SynchronousQueue()) { runnable -> Thread(runnable).also { it.name = "io-pool-${it.id}" it.isDaemon = true it.priority = Thread.NORM_PRIORITY } } fun main() { runBlocking(executor.asCoroutineDispatcher()) { for (i in 1..5) { launch { println(Thread.currentThread().name) Thread.sleep(1000) } } } } I believe the subsequent co-routines should be rejected; launching by default on |
radityagumay commented on May 11 •
Hi @maxpert
IMO, the subsequent process should be waiting until there is a thread that can pick up the process. Take a look my plain thread example here.
is your DB transactions are thread confined?. what I mean by that is when a query is executed within an ongoing transaction on the current thread, then it is considered part of that transaction and can proceed. But, on the other hand, If the query was instead executed on a different thread, then it is not considered part of that transaction and will block until the transaction on the other thread ends. given your example above. a quick resolution would increase the number of thread-pool. to something like. import kotlinx.* import java.util.* val executor = ThreadPoolExecutor( 10, 10, 60L, TimeUnit.SECONDS, SynchronousQueue<Runnable>() ) { runnable -> Thread(runnable).also { it.name = "io-pool-${it.id}" it.isDaemon = true it.priority = Thread.NORM_PRIORITY } } fun main() { runBlocking(executor.asCoroutineDispatcher()) { for (i in 1..5) { launch { println(Thread.currentThread().name) delay(1000) } } } } not sure for applying the same with Mutex since it is non-reentrant |
Hi @maxpert,
I replied to your concerns, but the answer isn't there. Does
What is your use case? What behavior do you need? |
Hi @fvasco ,
So I've slept over this problem. Here are couple of observations that I have, and ideas that I am working with since for us it's a production issue right now. In traditional Java world the way you build back-pressure and don't run out of memory is using the rejection techniques.
If you look the problem statement we trying to build back-pressure/rejection mechanism. This is typical for any production service/app. I guess the behavior is either to have some constructs in co-routines itself that allows maximum parallel calls that can be active at a time on context. I was thrown a scenario by Roman Elizarov: launch { val file = openFile() try { .... delay(1000L) ... } finally { file.close() } } Here just rejecting might end-up in situation where we have no finally being invoked. The Dispatcher should have capability to understand if the co-routine being launched is new co-routine or is it resuming. In case of new co-routines we can define a different behavior (wait for availability unless a thread is available to execute); while for new ones we can define different behavior (may be use But it's pretty obvious that solution we are looking for is gonna restrict the maximum parallel co-routines in a context. I am already reading code to see if I can make a |
TL;DR Use a Semaphore Your considerations are applicable in a multi-threading programming, but they can lead to wrong consideration. Please consider this example: import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking fun main() { runBlocking { repeat(1_000) { launch { println("Hello") delay(1_000) println("world") } } } } This code executes a thousand of coroutine in a single thread, so it is not possible to use a thread to limit the parallelism. So you have to serialize the coroutine execution, using a Mutex or a Semaphore. This example requires a lot more time and it uses a single thread. import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Semaphore import kotlinx.coroutines.sync.withPermit val semaphore = Semaphore(10) fun main() { runBlocking { repeat(1_000) { launch { semaphore.withPermit { println("Hello") delay(1_000) println("world") } } } } } For your use case, the choosen executor should have at least one thread for a Semaphore's permit, for this example you can consider |
What you are implicitly implying is developer ensuring that launch/async etc. are not invoked by developer at all. Imagine the same being done by IDK but IMHO this can be potentially fatal trip-wire! |
Agreed, same here wanted simple solution too. I think it gets the job done really well . Right now even I am doing something similar by throwing custom exception. |
@fvasco The proposed solution is definitely not a generic one, but I belive it does solve this particular issue.
Indeed, but you cannot really launch a coorutine with
We don't have much choice when the task is rejected by the executor. We cannot abandon a coroutine and not execute it, because it may leak resources, so in all such cases we use Note, that |
Hi, @elizarov, If I am not wrong, please consider this example: launch(myDispatcher) { val fos = FileOutputStream("bigdata") try { // myExecutor.close() -> Job cancelled dataFlow.collect { data -> fos.write(data) // myDispatcher } } finally { fos.close() // Default dispatcher } } The |
@fvasco In fact, #2012 is a fix that makes sure that less work is shitted to the default executor that it is now. Now, if We could also start using some other fall-back dispatcher (like P.S I've also updated the docs. |
Hi @elizarov, fun Executor.asCoroutineDispatcher(rejectedTaskDispatcher: CoroutineDispatcher = Dispatchers.Default) In that Dispatcher the cancelled jobs should run. This should not require a test rework and allow a user to customize the |
@fvasco This kind of customization is hard to pull off. It expands API surface for this very rare use-case and makes it hard to freely convert dispatchers and executors because this conversion will have to preserve I've changed default fallback to |
Hi @elizarov, My main concern is that #2012 is not useful for the @maxpert's use case: it will be possible to create unlimited active connection and some of these will be cancelled randomly, except under a specific condition; so I proposed to improve a little the documentation. To limit the connection count using a thread pool, the connection handling have to be fully synchronous, but synchronous programming does not require a |
Actually, cancelling random connections might good as a load-limiting mechanism. I, for one, would not recommend doing it (or at least no rely on it as a sole mechanism to limit the number of connections), but it might be a helpful strategy. For example, routers used to use (and maybe are still using) RAD-derived algorithms where they drop random packets when their queues grow, but before their queues overflow, which helps to throttle down some connection going through them and to avoid total exhaustion of buffer space (which would have forced them to drop all packets). |
Hi @elizarov, I don't want debate on this, but a router handle IP packets, not TCP connections: it drops IP packets, and these are rather easy to recover, hopefully on another route. However I agree with the proposed changes. |
Will there be an patch version update to co-routines that fixes this issue? Any proposed back-ports. I tried throwing custom exception from rejection policy in my ThreadPoolExecutor( 16, 32 60, TimeUnit.SECONDS, SynchronousQueue(), ThreadFactory { runnable -> Thread(runnable).also { it.name = "io-pool-${it.id}" it.isDaemon = true it.priority = Thread.NORM_PRIORITY } }, RejectedExecutionHandler { _, _ -> throw OverloadException("Too many IO requests") } ) |
@maxpert We don't support custom exceptions throws from |
No one assigned
None yet
No milestone
Successfully merging a pull request may close this issue.
None yet
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK