How can I wait until everything is done in an observable Rx sequence after unsub...
source link: https://www.codesd.com/item/how-can-i-wait-until-everything-is-done-in-an-observable-rx-sequence-after-unsubscription.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
How can I wait until everything is done in an observable Rx sequence after unsubscription?
Introduction
In my WPF C# .NET application I use the reactive extensions (Rx) to subscribe to events and I often have to reload something from the DB to get the values I need to update the UI, because the event objects often only contains IDs and some meta data.
I use the Rx scheduling to load the data in the background and update the UI on the dispatcher. I have made some bad experience with mixing "Task.Run" inside of a Rx sequence (when using "SelectMany" the order is no longer guaranteed and it is hard to control the scheduling in UnitTests). See also: Executing TPL code in a reactive pipeline and controlling execution via test scheduler
My problem
If I shutdown my app (or close a tab) I want to unsubscribe and then await the DB call (which is called from a Rx "Select") that still can be running after "subscription.Dispose". Until now I haven't found any good utility or easy way to do that.
Questions
Is there any framework support to await everything still running in a Rx chain?
If not, do you have any good ideas how to make a easy to use utility?
Are there any good alternative ways to achieve the same?
Example
public async Task AwaitEverythingInARxChain()
{
// In real life this is a hot observable event sequence which never completes
IObservable<int> eventSource = Enumerable.Range(1, int.MaxValue).ToObservable();
IDisposable subscription = eventSource
// Load data in the background
.ObserveOn(Scheduler.Default)
.Select(id => LoadFromDatabase(id))
// Update UI on the dispatcher
.ObserveOn(DispatcherScheduler.Current)
.SubscribeOn(Scheduler.Default) // In real life the source produces the event values on a background thread.
.Subscribe(loadedData => UpdateUi(loadedData));
Thread.Sleep(TimeSpan.FromSeconds(10));
// In real life I want to cancel (unsubscribe) here because the user has closed the Application or closed the tab and return a task which completes when everything is done.
// Unsubscribe just guarantees that no "OnNext" is called anymore, but it doesn't wait until all operations in the sequence are finished (for example "LoadFromDatabase(id)" can still be runnig here.
subscription.Dispose();
await ?; // I need to await here, so that i can be sure that no "LoadFromDatabase(id)" is running anymore.
ShutDownDatabase();
}
What I already tried (and didn't worked)
- Using the "Finally" operator to set the result of a TaskCompletionSource. The problem with this approach: Finally gets called directly after unsubscribing and "LoadFromDatabase" can still be running
UPDATE: Example with console output and TakeUntil
public async Task Main()
{
Observable
.Timer(TimeSpan.FromSeconds(5.0))
.Subscribe(x =>
{
Console.WriteLine("Cancel started");
_shuttingDown.OnNext(Unit.Default);
});
await AwaitEverythingInARxChain();
Console.WriteLine("Cancel finished");
ShutDownDatabase();
Thread.Sleep(TimeSpan.FromSeconds(3));
}
private Subject<Unit> _shuttingDown = new Subject<Unit>();
public async Task AwaitEverythingInARxChain()
{
IObservable<int> eventSource = Observable.Range(0, 10);
await eventSource
.ObserveOn(Scheduler.Default)
.Select(id => LoadFromDatabase(id))
.ObserveOn(Scheduler.Default)
.TakeUntil(_shuttingDown)
.Do(loadedData => UpdateUi(loadedData));
}
public int LoadFromDatabase(int x)
{
Console.WriteLine("Start LoadFromDatabase: " + x);
Thread.Sleep(1000);
Console.WriteLine("Finished LoadFromDatabase: " + x);
return x;
}
public void UpdateUi(int x)
{
Console.WriteLine("UpdateUi: " + x);
}
public void ShutDownDatabase()
{
Console.WriteLine("ShutDownDatabase");
}
Output (actual):
Start LoadFromDatabase: 0
Finished LoadFromDatabase: 0
Start LoadFromDatabase: 1
UpdateUi: 0
Finished LoadFromDatabase: 1
Start LoadFromDatabase: 2
UpdateUi: 1
Finished LoadFromDatabase: 2
Start LoadFromDatabase: 3
UpdateUi: 2
Finished LoadFromDatabase: 3
Start LoadFromDatabase: 4
UpdateUi: 3
Cancel started
Cancel finished
ShutDownDatabase
Finished LoadFromDatabase: 4
Start LoadFromDatabase: 5
Finished LoadFromDatabase: 5
Start LoadFromDatabase: 6
Finished LoadFromDatabase: 6
Start LoadFromDatabase: 7
Expected: I want to have a guarantee that following are the last Outputs:
Cancel finished
ShutDownDatabase
I finally found a solution myself. You can use TakeWhile to achive it. TakeUntil does not work, because the main observable sequence immediately completes when the second observable sequence produces the first value.
Here is a example of the working solution:
public async Task Main_Solution()
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
Observable
.Timer(TimeSpan.FromSeconds(4))
.Subscribe(x =>
{
Console.WriteLine("Cancel startedthread='{0}'", Thread.CurrentThread.ManagedThreadId);
cancellationTokenSource.Cancel();
});
await AwaitEverythingInARxChain(cancellationTokenSource.Token);
Console.WriteLine("Cancel finished thread='{0}'", Thread.CurrentThread.ManagedThreadId);
ShutDownDatabase();
Thread.Sleep(TimeSpan.FromSeconds(10));
}
public async Task AwaitEverythingInARxChain(CancellationToken token)
{
IObservable<int> eventSource = Observable.Range(0, 10);
await eventSource
.ObserveOn(Scheduler.Default)
.Select(id => LoadFromDatabase(id))
.TakeWhile(_ => !token.IsCancellationRequested)
.ObserveOn(Scheduler.Default) // Dispatcher in real life
.Do(loadedData => UpdateUi(loadedData)).LastOrDefaultAsync();
}
public int LoadFromDatabase(int x)
{
Console.WriteLine("Start LoadFromDatabase: {0} thread='{1}'", x, Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(TimeSpan.FromSeconds(3));
Console.WriteLine("Finished LoadFromDatabase: {0} thread='{1}'", x, Thread.CurrentThread.ManagedThreadId);
return x;
}
public void UpdateUi(int x)
{
Console.WriteLine("UpdateUi: '{0}' thread='{1}'", x, Thread.CurrentThread.ManagedThreadId);
}
public void ShutDownDatabase()
{
Console.WriteLine("ShutDownDatabase thread='{0}'", Thread.CurrentThread.ManagedThreadId);
}
And the output:
Start LoadFromDatabase: 0 thread='9'
Finished LoadFromDatabase: 0 thread='9'
Start LoadFromDatabase: 1 thread='9'
UpdateUi: '0' thread='10'
Cancel startedthread='4'
Finished LoadFromDatabase: 1 thread='9'
Cancel finished thread='10'
ShutDownDatabase thread='10'
Note that "ShutDownDatabase" is the last output (as expected). It waits until "LoadFromDatabase" is finished for the second value, even if its produced value is not further processed. This is exactly what I want.
Recommend
-
76
GitHub is where people build software. More than 28 million people use GitHub to discover, fork, and contribute to over 79 million projects.
-
5
You’ll probably have to wait until 2022 for the Mini-LED MacBook Air Better screens will lead to a thinner and lighter desig...
-
4
Thought you loved Python? Wait until you meet Rust How a niche phenomenon became StackOverflow’s most beloved language...
-
2
Unimpressed with 5G? Wait until iPhone 14.
-
6
Google might make us wait until 2023 for a Pixel foldable By Arol Wright Published 6 hours ago It's r...
-
2
Wait until the last responsible moment to add structure to your Blazor UIJune 22, 2022 · 4 minute read · Tags: blazorA few years...
-
10
Can't wait for Breath of the Wild 2? This spinoff will tide you over until 2023
-
3
HomePod 2 wait times stretch out until March News
-
8
This Is Why You Can’t Wait Until Later
-
7
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK