13

Reactive Streams on Kotlin: SharedFlow and StateFlow

 3 years ago
source link: https://www.raywenderlich.com/22030171-reactive-streams-on-kotlin-sharedflow-and-stateflow
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
Home Android & Kotlin Tutorials

Reactive Streams on Kotlin: SharedFlow and StateFlow

In this tutorial, you’ll learn about reactive streams in Kotlin and build an app using two types of streams: SharedFlow and StateFlow.

By Ricardo Costeira Jul 5 2021 · Article (30 mins) · Intermediate

5/5 6 Ratings

Version

Event streams have become standard on Android. For years, RxJava has been the standard for reactive streams. Now, Kotlin provides its own reactive streams implementation, called Flow. Like RxJava, Kotlin Flow can create — and react to — streams of data. Also like RxJava, the event streams can come from cold or hot publishers. The difference between the two is simple: Although cold streams emit events only if there are any subscribers, hot streams can emit new events even without having any subscribers reacting to them. In this tutorial, you’ll learn about Flow’s hot stream implementations, called SharedFlow and StateFlow. More specifically, you’ll learn:

  • What a SharedFlow is.
  • What a StateFlow is and how it relates to SharedFlow.
  • How these hot stream flows compare to RxJava, Channels and LiveData.
  • How you can use them on Android.

You might ask yourself: “Why use Kotlin’s SharedFlow and StateFlow over RxJava though?” Although RxJava gets the job done well, some like to describe it as “using a bazooka to kill an ant”. In other words, although the framework works, it’s fairly easy to get carried away with all its capabilities. Doing so can lead to overly complex solutions and code that’s hard to understand. Kotlin Flow provides more direct and specific implementations for reactive streams.

Note: This tutorial assumes you have solid Android development knowledge. If not, you can check out our Beginning Android Development with Kotlin series first.

You also need to be familiar with at least the basics of Kotlin coroutines and flow. For coroutines, you can check out our Kotlin Coroutines Tutorial for Android: Getting Started and Kotlin Coroutines Tutorial for Android: Advanced tutorials. For Flow, you can look at our Kotlin Flow for Android: Getting Started tutorial.

Getting Started

Download the project materials by clicking the Download Materials button at the top or bottom of this tutorial and open the starter project.

You’ll work on an app called CryptoStonks5000. This app has two screens: The first screen shows the user a few cryptocurrencies, while the second shows the price progression for a cryptocurrency in the past 24 hours.

To learn about shared flows and state flows, you’ll:

  1. Implement an event stream with SharedFlow that emits events shared between screens.
  2. Refactor CryptoStonks5000 to use StateFlow to handle view state.

The project follows a Clean Architecture approach and MVVM pattern.

Build and run the project just to make sure everything is working. After that, it’s time to learn about shared flows!

SharedFlow

Before getting into the code, you need to at least be aware of what a SharedFlow is.

A shared flow is, at its core, a Flow. But it has two main differences from the standard Flow implementation. It:

  • Emits events even if you don’t call collect() on it. After all, it is a hot stream implementation.
  • Can have multiple subscribers.

Notice the term “subscribers” used here instead of “collectors” like you would see with a regular Flow. This change in naming is because shared flows never complete. In other words, when you call Flow.collect() on a shared flow, you’re not collecting all its events. Instead, you’re subscribing to the events that get emitted while that subscription exists.

Although this also means that calls to Flow.collect() on shared flows don’t complete normally, the subscription can still be canceled. As you might expect, this cancellation happens by canceling the coroutine.

Note: Flow-truncating operators such as Flow.take(count: Int) can force a shared flow to complete.

With that out of the way, it’s time to code.

Handling Shared Events

You’ll implement a fake price notification system to mimic coin value variations. It has to be a fake one because the real thing’s just too volatile. :]

Users should be aware of these variations no matter which screen they’re in. To make that possible, you’ll create a shared flow in a ViewModel shared by all screens.

In the presentation package, find and open CoinsSharedViewModel.kt.

To start, you need to know how to create a shared flow. Well, it’s your lucky day, because you’re about to create two in a row! Add this code at the top of the class:

private val _sharedViewEffects = MutableSharedFlow<SharedViewEffects>() // 1

val sharedViewEffects = _sharedViewEffects.asSharedFlow() // 2

In this code:

  1. You call MutableSharedFlow. This creates a mutable shared flow that emits events of type SharedViewEffects, which is a simple sealed class to model the possible events. Note that this is a private property. You’ll use this one internally to emit events while exposing an immutable shared flow to make them visible externally.
  2. You create the public immutable shared flow mentioned above by calling asSharedFlow() on the mutable shared flow. This way, the immutable exposed property always reflects the value of the mutable private one.

Having these two properties is a good practice. Not only does it give you the freedom to emit whatever you want internally through _sharedViewEffects, but it also makes it so external code can only react to those emissions by subscribing to sharedViewEffects. As such, the subscribing code has no power to change the shared flow, which is a neat way of forcing a robust design and separation of concerns and avoiding mutability bugs.

Event Emission With SharedFlow

OK, you have your flows. Now, you need to emit something with them: price variations. CoinsSharedViewModel calls getPriceVariations() in its init block, but the method doesn’t do anything yet.

Add this code to getPriceVariations():

viewModelScope.launch { // 1
  for (i in 1..100) { // 2
    delay(5000) // 3
    _sharedViewEffects.emit(SharedViewEffects.PriceVariation(i)) // 4
  }
}

This code does a few different things. It:

  1. Launches a coroutine.
  2. Runs a for loop from one to 100 inclusive.
  3. Delays the coroutine for five seconds. delay() checks for cancellation, so it’ll stop the loop if the job gets canceled.
  4. Calls emit on the mutable shared flow, passing it an instance of PriceVariation, which is an event from SharedViewEffects.

That emit(value: T) is one of the two event emission methods you can call on a shared flow. The alternative is to use tryEmit(value: T).

The difference between the two is that emit is a suspending function, while tryEmit isn’t. This small difference results in a huge behavioral contrast between the two methods. To explain this, though, you need to dive deep into shared flow’s replay cache and buffering. Buckle up!

Replay and Buffering

MutableSharedFlow() accepts three parameters:

public fun <T> MutableSharedFlow(
  replay: Int = 0, // 1
  extraBufferCapacity: Int = 0, // 2
  onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND // 3
): MutableSharedFlow<T>

Here’s what they’re used for:

  1. replay: The number of values replayed to new subscribers. It can’t be negative and it defaults to zero.
  2. extraBufferCapacity: The number of values buffered. It can’t be negative and it defaults to zero. The sum of this value plus replay comprises the total buffer of the shared flow.
  3. onBufferOverflow: Action to take when buffer overflow is reached. It can have three values: BufferOverflow.SUSPEND, BufferOverflow.DROP_OLDEST or BufferOverflow.DROP_LATEST. It defaults to BufferOverflow.SUSPEND.

Default Behavior

This can get quite tricky to understand, so here’s a short animation of a possible interaction with a shared flow built with the default values. Assume the shared flow uses emit(value: T).

Going step by step:

  1. This shared flow has three events and two subscribers. The first event is emitted when there are no subscribers yet, so it gets lost forever.
  2. By the time the shared flow emits the second event, it already has one subscriber, which gets said event.
  3. Before reaching the third event, another subscriber appears, but the first one gets suspended and remains like that until reaching the event. This means emit() won’t be able to deliver the third event to that subscriber. When this happens, the shared flow has two options: It either buffers the event and emits it to the suspended subscriber when it resumes or it reaches buffer overflow if there’s not enough buffer left for the event.
  4. In this case, there’s a total buffer of zero — replay + extraBufferCapacity. In other words, buffer overflow. Because onBufferOverflow is set with BufferOverflow.SUSPEND, the flow will suspend until it can deliver the event to all subscribers.
  5. When the subscriber resumes, so does the stream, delivering the event to all subscribers and carrying on its work.
Note: The shared flow specification forbids you from using anything other than onBufferOverflow = BufferOverflow.SUSPEND when the total buffer value amounts to zero. Because tryEmit(value: T) doesn’t suspend, it won’t work if you use it with the default replay and extraBufferCapacity values. In other words, the only way to emit events with tryEmit(value: T) is by having, at least, a total buffer of one.

With Replay

OK, that wasn’t so bad. What happens if there’s a buffer, though? Here’s an example with replay = 1:

Breaking it down:

  1. When the shared flow reaches the first event without any active subscribers, it doesn’t suspend anymore. With replay = 1, there’s now a total buffer size of one. As such, the flow buffers the first event and keeps going.
  2. When it reaches the second event, there’s no more room in the buffer, so it suspends.
  3. The flow remains suspended until the subscriber resumes. As soon as it does, it gets the buffered first event, along with the latest second event. The shared flow resumes, and the first event disappears forever because the second one now takes its place in the replay cache.
  4. Before reaching the third event, a new subscriber appears. Due to replay, it also gets a copy of the latest event.
  5. When the flow finally reaches the third event, both subscribers get a copy of it.
  6. The shared flow buffers this third event while discarding the previous one. Later, when a third subscriber shows up, it also gets a copy of the third event.

With extraBufferCapacity and onBufferOverflow

The process is similar with extraBufferCapacity, but without the replay-like behavior. This third example shows a shared flow with both extraBufferCapacity = 1 and onBufferOverflow = BufferOverflow.DROP_OLDEST:

SharedFlow with extraBufferCapacity = 1 and onBufferOverflow = DROP_LATEST

In this example:

  1. The behavior is the same at first: With a suspended subscriber and a total buffer size of one, the shared flow buffers the first event.
  2. The different behavior starts on the second event emission. With onBufferOverflow = BufferOverflow.DROP_OLDEST, the shared flow drops the first event, buffers the second one and carries on. Also, notice how the second subscriber does not get a copy of the buffered event: Remember, this shared flow has extraBufferCapacity = 1, but replay = 0.
  3. The flow eventually reaches the third event, which the active subscriber receives. The flow then buffers this event, dropping the previous one.
  4. Shortly after, the suspended subscriber resumes, triggering the shared flow to emit the buffered event to it and cleaning up the buffer.

Subscribing to Event Emissions

OK, good job getting this far! You now know how to create a shared flow and customize its behavior. There’s only one thing left to do, which is to subscribe to a shared flow.

In the code, go to the coinhistory package inside presentation and open CoinHistoryFragment.kt. At the top of the class, declare and initialize the shared ViewModel:

private val sharedViewModel: CoinsSharedViewModel by activityViewModels { CoinsSharedViewModelFactory }

You want the shared flow to emit no matter which screen you’re in, so you can’t bind this ViewModel to this specific Fragment. Instead, you want it bound to the Activity so it survives when you go from one Fragment to another. That’s why the code uses the by activityViewModels delegate. As for CoinsSharedViewModelFactory, don’t worry about it: Every ViewModel factory in the app is already prepared to properly inject any dependencies.

Collecting the SharedFlow

Now that you have the shared ViewModel, you can use it. Locate subscribeToSharedViewEffects(). Subscribe to the shared flow here by adding the following code:

viewLifecycleOwner.lifecycleScope.launchWhenStarted { // 1
  sharedViewModel.sharedViewEffects.collect { // 2
    when (it) {
      // 3
      is SharedViewEffects.PriceVariation -> notifyOfPriceVariation(it.variation)
    }
  }
}

This code has a few important details:

  1. The coroutine is scoped to the View instead of the Fragment. This ensures the coroutine is alive only while the View is alive, even if the Fragment outlives it. The code creates the coroutine with launchWhenStarted, instead of the most common launch. This way, the coroutine launches only when the lifecycle is at least in the STARTED state, suspends when it’s at least in the STOPPED state and gets canceled when the scope is destroyed. Using launch here can lead to potential crashes, as the coroutine will keep processing events even in the background.
  2. As you can see, subscribing to a shared flow is the same as subscribing to a regular flow. The code calls collect() on the SharedFlow to subscribe to new events.
  3. The subscriber reacts to the shared flow event.

Keep in mind at all times that even using launchWhenStarted, the shared flow will keep emitting events without subscribers. As such, you always need to consider the wasted resources. In this case, the event emission code is fairly harmless. But things can get heavy, especially if you turn cold flows into hot ones using something like shareIn.

Note: Turning cold flows into hot ones is out of scope for this tutorial — truth be told, it deserves a tutorial on its own. If you’re interested, check out the last section of the tutorial for references about the topic.

Applying the Stream Data to the View

Back in the code, you can see notifyOfPriceVariation() doesn’t exist yet. Add it as well:

private fun notifyOfPriceVariation(variation: Int) {
  val message = getString(R.string.price_variation_message, variation)
  showSnackbar(message)
}

Easy-peasy. Build and run the app. Now, when you go to the coin history screen, you’ll see some periodical Snackbar messages at the bottom. The shared flow will only start emitting when you go to that screen, though. Even if the CoinsSharedViewModel instance is bound to the Activity, it’s only created when you first visit the coin history screen.

You want all screens to be aware of price changes, so this isn’t ideal. To fix it, do the exact same changes in CoinListFragment:

  1. Create the CoinsSharedViewModel instance in the same way.
  2. Add the code to subscribeToSharedViewEffects().
  3. Create notifyOfPriceVariation().

Build and run the app. You’ll now see the periodical Snackbar messages in CoinListFragment as well. As you switch screens, you’ll see that the messages always show the next event and not the previous ones. MutableSharedFlow() in CoinsSharedViewModel is using the default parameters. But feel free to play around with it to see how it affects the shared flow!

SharedFlow and Channels

Like shared flows, channels represent hot streams. But this doesn’t mean shared flow will replace the channels API — not entirely, at least. :]

SharedFlow is designed to completely replace BroadcastChannel. Not only is SharedFlow simpler and faster to use, but it’s a lot more versatile than BroadcastChannel. Keep in mind, though, that other elements from the channels API can and should still be used when it makes sense to do so.

StateFlow

A state flow is structured like a shared flow. This is because StateFlow is nothing more than a specialization of SharedFlow. In fact, you can create a shared flow that behaves exactly like a state flow:

val shared = MutableSharedFlow(
    replay = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(InitialState()) // emit the initial value
val state = shared.distinctUntilChanged() // get StateFlow-like behavior

The code above creates a shared flow that emits the latest value only to any new subscribers. Due to that distinctUntilChanged at the bottom, it’ll only emit any value if it’s different from the previous one. This is exactly what a state flow does, which makes it great for holding and handling state.

Handling App State

There are simpler ways of creating state flows though, which you’ll use now. Expand the coinlist package and, inside, open CoinListFragmentViewModel.kt. This simple ViewModel uses LiveData to expose a view state class to CoinListFragment. The state class itself is also fairly simple, and it has default values to match the initial view state:

data class CoinListFragmentViewState(
    val loading: Boolean = true,
    val coins: List<UiCoin> = emptyList()
)

The Fragment then uses the current state to update the view by observing the LiveData:

// Code in CoinListFragment.kt
private fun observeViewStateUpdates(adapter: CoinAdapter) {
  viewModel.viewState.observe(viewLifecycleOwner) { updateUi(it, adapter) }
}

Start the refactoring by changing MutableLiveData to a MutableStateFlow. So in CoinListFragmentViewModel, go from:

private val _viewState = MutableLiveData(CoinListFragmentViewState())
private val _viewState = MutableStateFlow(CoinListFragmentViewState())

Make sure to include the necessary import for MutableStateFlow. This is how you create a mutable state flow. Unlike shared flows, state flows require an initial value or, in other words, an initial state. But because state flow is a specific implementation of shared flow, there’s no way for you to customize things like replay or extraBufferCapacity. Regardless, the generic rules and constraints for shared flows still apply.

Next, update the immutable LiveData accordingly, from:

val viewState: LiveData<CoinListFragmentViewState> get() = _viewState
val viewState: StateFlow<CoinListFragmentViewState> get() = _viewState

Of course, you could also do:

val viewState = _viewState.asStateFlow()

Add the import for StateFlow. Be it a shared flow or a state flow, you can create an immutable one with both options. The advantage of using asStateFlow() or asSharedFlow() is that you get the extra safety of explicitly creating an immutable version of the flow. This avoids things like creating another mutable version by mistake.

Event Emission With StateFlow

A difference worth noting between shared and state flows is event emission. You can still use emit and tryEmit with state flow, but … don’t. :]

Instead, you should do:

mutableState.value = newState

The reason is that updates to value are always conflated, which means that even if you update it faster than subscribers can consume it, they’ll get the most recent value only. One thing to keep in mind is that whatever you assign to value has to be a completely different object from whatever was there before. For instance, take this code:

data class State(
  var name: String = "",
  var age: Int = -1
)

val mutableState = MutableStateFlow<State>(State())

// ...

// newState and mutableState.value will reference the same object
val newState = mutableState.value 

// Reference is the same, so this is also changing mutableState.value!
newState.name = "Marc"

mutableState.value = newState

In this case, the state flow won’t emit the new value. Because the referenced object is the same, the equality comparison will return true, so the flow will assume it’s the same state.

To make this work, you need to use immutable objects. For example:

data class State(
  val name: String = "",
  val age: Int = -1
)

val mutableState = MutableStateFlow<State>(State())

// ...

mutableState.value = State(name = "Marc")

This way, the state flow will properly emit a state update. Immutability saves the day once again. :]

Back at the code, the cool thing about replacing LiveData with StateFlow is that both of them use a property called value, so nothing changes there.

There’s one last change to make in CoinListFragmentViewModel, inside the requestCoinList() method. You can now update that if condition at the beginning to:

if (viewState.value.coins.isNotEmpty()) return

You don’t need the ? anymore, because value won’t be null. Also, you invert the condition by using isNotEmpty() instead of isNullOrEmpty() and by dropping ! at the beginning. This makes the code a little more readable.

If you try to build the app, you get an error on CoinListFragment stating that there’s an unresolved reference to observe. StateFlow doesn’t have an observe method, so you need to refactor that as well.

Subscribing to State Updates

Open CoinListFragment.kt. Find observeViewStateUpdates() and update it to:

private fun observeViewStateUpdates(adapter: CoinAdapter) {
  viewLifecycleOwner.lifecycleScope.launchWhenStarted {
    viewModel.viewState.collect { updateUi(it, adapter) }
  }
}

This code is much like what you did with SharedFlow in the sense that the same logic applies. Despite this, you might worry about the state flow emitting values when the app is in the background. But you don’t need to. It’s true that, because it’s scoped to viewModelScope, it’ll still emit even without any subscribers as long as the ViewModel exists. Regardless, state flow emissions are lightweight operations: It’s just updating the value and notifying all subscribers. Plus, you probably do want the app to show you the latest UI state when it comes to the foreground.

Build and run the app. Everything should work as before because you just refactored the code. Good job on using StateFlow!

StateFlow and Channels

Like SharedFlow can replace BroadcastChannel completely, StateFlow can replace ConflatedBroadcastChannel completely. There are a couple reasons for this. StateFlow is simpler and more efficient than ConflatedBroadcastChannel. It also has better distinction between mutability and immutability with MutableStateFlow and StateFlow.

Hot Flows, RxJava and LiveData

You’re now aware of how both SharedFlow and StateFlow work. But are they even useful on Android?

Although they might not bring anything “new”, they provide more direct and efficient alternatives to the table. For instance, wherever you’d use an RxJava‘s PublishSubject, you can use a SharedFlow. Or wherever you’d use a BehaviorSubject, you can probably use a StateFlow. In fact, if hot event emission is not an issue, StateFlow can even easily replace LiveData.

Note: You can also convert SharedFlow and StateFlow objects in to LiveData as well with the AndroidX lifecycle-livedata-ktx library. The library provides an extension method asLiveData() that allows you to convert the flow and expose it as LiveData for consumption in your view. For more details, see the StateFlow, Flow, and LiveData section of the Android Developers StateFlow and SharedFlow article.

So, putting it in simpler terms:

  • If you have some kind of state management, you can use StateFlow.
  • Whenever you have some event stream going on, where it’s not a problem if events aren’t handled by all possible subscribers or past events might not be handled at all, you can use SharedFlow.

Challenge: Using SharedFlow To Handle Screen Events

Congratulations! This concludes the tutorial. If you want to go the extra mile, you can also handle the screen-specific events modeled in the CoinListFragmentViewEffects and CoinHistoryFragmentViewEffects classes using shared flows. These are events that should be handled exactly once, which means a simple Channel would fit better — remember that shared flows drop events when there are no subscribers. Still, you can do it with shared flows for practice. If you’re curious, the final project in the project materials has a sample implementation.

Where to Go From Here?

If you want to learn more about StateFlow and SharedFlow, you can check their corresponding documentation pages here and here, respectively.

You can also find some information specific to shared and state flows usage on Android in the Android developers page.

If you’re curious about turning cold flows into hot ones, you can check out the following articles:

Finally, if the challenge section piqued your interest in using Channels to handle screen events, you can check out this interesting article on Medium.

I hope you enjoyed this tutorial. If you have any questions, tips or comments, feel free to join the discussion below. :]

raywenderlich.com Weekly

The raywenderlich.com newsletter is the easiest way to stay up-to-date on everything you need to know as a mobile developer.

Get a weekly digest of our tutorials and courses, and receive a free in-depth email course as a bonus!

Average Rating

5/5

Add a rating for this content

Sign in to add a rating
6 ratings

Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK