30

Combining flows: merge, zip, and combine

 2 years ago
source link: https://kt.academy/article/cc-flow-combine
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

308_1_flow_processing_combine.jpg

Combining flows: merge, zip, and combine

This is a chapter from the book Kotlin Coroutines. You can find Early Access on LeanPub.

Let's talk about combining two flows into one. There are a few ways how this can be done. The simplest one is that elements from those two flows might be just merged into one. No modifications, no matter from which flow they originate. To do that, we use the merge top-level function.

xxxxxxxxxx
import kotlinx.coroutines.flow.*
suspend fun main() {
    val ints: Flow<Int> = flowOf(1, 2, 3)
    val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)
    val together: Flow<Number> = merge(ints, doubles)
    print(together.toList()) 
    // [1, 0.1, 0.2, 0.3, 2, 3]
    // or [1, 0.1, 0.2, 0.3, 2, 3]
    // or [0.1, 1, 2, 3, 0.2, 0.3]
    // or any other combination
}
Target platform: JVMRunning on kotlin v.1.6.10

merge.png

What is important, is that when we use merge, the elements from one flow do not wait for another flow. For instance, in the below example, elements from the first flow are delayed, but it does not stop the elements from the second flow.

xxxxxxxxxx
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
suspend fun main() {
    val ints: Flow<Int> = flowOf(1, 2, 3)
        .onEach { delay(1000) }
    val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)
    val together: Flow<Number> = merge(ints, doubles)
    together.collect { println(it) }
}
// 0.1
// 0.2
// 0.3
// (1 sec)
// 1
// (1 sec)
// 2
// (1 sec)
// 3
Target platform: JVMRunning on kotlin v.1.6.10

We use merge when we have multiple sources of events, that should lead to the same actions.

xxxxxxxxxx
fun listenForMessages() {
    merge(userSentMessages, messagesNotifications)
        .onEach { displayMessage(it) }
        .launchIn(scope)
}

The next function is zip that makes pairs from both flows. We also need to specify a function that decided how elements are paired (transformed into one what will be emitted in the new flow). Each element can only be part of a single pair, and it needs to wait for its pair. The elements left without a pair are lost, so when one of zipped flows is completed, the result flow is completed too (and the other one as well).

xxxxxxxxxx
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
suspend fun main() {
    val flow1 = flowOf("A", "B", "C")
        .onEach { delay(400) }
    val flow2 = flowOf(1, 2, 3, 4)
        .onEach { delay(1000) }
    flow1.zip(flow2) { f1, f2 -> "${f1}_${f2}" }
        .collect { println(it) }
}
// (1 sec)
// A_1
// (1 sec)
// B_2
// (1 sec)
// C_3
Target platform: JVMRunning on kotlin v.1.6.10

zip.png

Function zip reminds me of a Polonaise - a traditional Polish dance. One of its figures is that two lines of people are the first separated, and then make pairs by connecting one by one as they arrive.

polonaise_dance.jpeg

The last important function for combining two flows is combine. Just like zip, it also forms pairs from elements. Also, they both wait for the slower flow to produce the first pair. Although here the similarities end. When we use combine, every new element replaces its predecessor. If the first pair was formed already, it will produce a new pair together with the previous element from the other flow.

combine.png

Notice, that zip needs even pairs, so it closes when the first flow closes. combine does not have such a limitation, so it will emit until both flows are closed.

xxxxxxxxxx
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
suspend fun main() {
    val flow1 = flowOf("A", "B", "C")
        .onEach { delay(400) }
    val flow2 = flowOf(1, 2, 3, 4)
        .onEach { delay(1000) }
    flow1.combine(flow2) { f1, f2 -> "${f1}_${f2}" }
        .collect { println(it) }
}
// (1 sec)
// B_1
// (0.2 sec)
// C_1
// (0.8 sec)
// C_2
// (1 sec)
// C_3
// (1 sec)
// C_4
Target platform: JVMRunning on kotlin v.1.6.10

The combine is typically used when we need to actively observe two sources of changes. If you want to have elements emitted on all changes, you can add initial values to each combined flow (to have the initial pair).

xxxxxxxxxx
userUpdateFlow.onStart { emit(currentUser) }

A typical use case might be, when some view needs to be either ot two observable element changes. For example, when notification badge depends on both the current state of user and notifications, we might observe them both and combine their changes to update a view.

xxxxxxxxxx
userStateFlow
    .combine(notificationsFlow) { userState, notifications ->
        updateNotificationBadge(userState, notifications)
    }
    .collect()

Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK