5

研究c#异步操作async await状态机的总结 - yi念之间

 1 year ago
source link: https://www.cnblogs.com/wucy/p/17137128.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

前言#

    前一段时间得闲的时候优化了一下我之前的轮子[DotNetCoreRpc]小框架,其中主要的优化点主要是关于RPC异步契约调用的相关逻辑。在此过程中进一步了解了关于async和await异步操作相关的知识点,加深了异步操作的理解,因此总结一下。关于async和await每个人都有自己的理解,甚至关于异步和同步亦或者关于异步和多线程每个人也都有自己的理解。因此,如果本文涉及到个人观点与您的观点不一致的时候请勿喷。结论固然重要,但是在这个过程中的引发的思考也很重要。

async await是语法糖#

大家应该都比较清楚async和await这对关键字是一组语法糖,关于语法糖大家可以理解为,编码过程中写了一个关键字,但是编译的时候会把它编译成别的东西,主要是用来提升开发效率。比如我有一段关于async和await相关的代码,如下所示

var taskOne = await TaskOne();
Console.WriteLine(taskOne);

Console.ReadLine();
   
static async Task<string> TaskOne()
{
    var httpResponse = await ClassFactory.Client.GetAsync("https://www.cnblogs.com");
    var content = await httpResponse.Content.ReadAsStringAsync();
    return content;
}

public class ClassFactory
{
    public static HttpClient Client = new HttpClient();
}

这段代码是基于c#顶级语句声明的,它是缺省Main方法的,不过在编译的时候编译器会帮我们补齐Main方法,因为执行的时候JIT需要Main方法作为执行入口。关于如何查看编译后的代码。我经常使用的是两个工具,分别是ILSpydnSpy。这俩工具的区别在于ILSpy生成的代码更清晰,dnSpy生成的源码是可以直接调试的。需要注意的是如果使用的是ILSpy如果查看语法糖本质的话,需要在ILSpy上选择比语法糖版本低的版本,比如c# async和await关键字是在c# 5.0版本中引入的,所以我们这里我们在ILSpy里需要选择c#4.0或以下版本,入下图所示

2042116-20230220131731253-356038580.png

如果使用的是dnSpy的话,需要在调试-->选项-->反编译器中设置相关选项,如下所示

2042116-20230220135428086-394020064.png

这样就可以看到编译后生成的代码了。

生成的状态机#

围绕上面的示例我这里使用的Debug模式下编译生成的dll使用的ILSpy进行反编译,因为这里我需要让编译的源码看起来更清晰一点,而不是调试。如下所示首先看Main方法

//因为我们上面代码var taskOne = await TaskOne()
//使用了await语法糖,所以被替换成了状态机调用
[AsyncStateMachine(typeof(<<Main>$>d__0))]
[DebuggerStepThrough]
private static Task <Main>$(string[] args)
{
	//创建状态机实例
	<<Main>$>d__0 stateMachine = new <<Main>$>d__0();
	stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create();
	stateMachine.args = args;
	//设置状态-1
	stateMachine.<>1__state = -1;
	//启动状态机
	stateMachine.<>t__builder.Start(ref stateMachine);
	return stateMachine.<>t__builder.Task;
}

//这是系统默认帮我们生成的static void Main的入口方法
[SpecialName]
[DebuggerStepThrough]
private static void <Main>(string[] args)
{
	//同步调用<Main>$方法
	<Main>$(args).GetAwaiter().GetResult();
}

上面的代码就是编译器为我们生成的Main方法,通过这里我们可以得到两条信息

  • 顶级语句编译器会帮我们生成固定的入口函数格式,即static void Main这种标准格式
  • 编译器遇到await关键字则会编译出一段状态机相关的代码,把我们的逻辑放到编译的状态机类里

通过上面我们可以看到<<Main>$>d__0 这个类是编译器帮我们生成的,我们可以看一下生成的代码

[CompilerGenerated]
private sealed class <<Main>$>d__0 : IAsyncStateMachine
{
	public int <>1__state;
	public AsyncTaskMethodBuilder <>t__builder;
	public string[] args;
	private string <taskOne>5__1;
	private string <>s__2;

	[System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })]
	private TaskAwaiter<string> <>u__1;

	private void MoveNext()
	{
		int num = <>1__state;
		try
		{
			TaskAwaiter<string> awaiter;
			//num的值来自<>1__state,由于在创建状态机的时候传递的是-1所以一定会走到这个逻辑
			if (num != 0)
			{
				//调用TaskOne方法,也就是上面我们写的业务方法
				//这个方法返回的是TaskAwaiter<>实例,以为我们TaskOne方法是异步方法
				awaiter = <<Main>$>g__TaskOne|0_0().GetAwaiter();
				//判断任务是否执行完成
				if (!awaiter.IsCompleted)
				{
					num = (<>1__state = 0);
					<>u__1 = awaiter;
					<<Main>$>d__0 stateMachine = this;
					<>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
					return;
				}
			}
			else
			{
				awaiter = <>u__1;
				<>u__1 = default(TaskAwaiter<string>);
				num = (<>1__state = -1);
			}
			//调用GetResult()方法获取异步执行结果
			<>s__2 = awaiter.GetResult();
			<taskOne>5__1 = <>s__2;
			<>s__2 = null;
			//这里对应我们上面的输出调用TaskOne方法的结果
			Console.WriteLine(<taskOne>5__1);
			Console.ReadLine();
		}
		catch (Exception exception)
		{
			<>1__state = -2;
			<taskOne>5__1 = null;
			<>t__builder.SetException(exception);
			return;
		}
		<>1__state = -2;
		<taskOne>5__1 = null;
		<>t__builder.SetResult();
	}

	void IAsyncStateMachine.MoveNext()
	{
		this.MoveNext();
	}

	[DebuggerHidden]
	private void SetStateMachine([System.Runtime.CompilerServices.Nullable(1)] IAsyncStateMachine stateMachine)
	{
	}

	void IAsyncStateMachine.SetStateMachine([System.Runtime.CompilerServices.Nullable(1)] IAsyncStateMachine stateMachine)
	{
		this.SetStateMachine(stateMachine);
	}
}

这里的代码可以看到编译器生成的代码,其实这就是对应上面我们写的代码

var taskOne = await TaskOne();
Console.WriteLine(taskOne);
Console.ReadLine();

因为我们使用了await关键字,所以它帮我们生成了IAsyncStateMachine类,里面的核心逻辑咱们待会在介绍,因为今天的主题TaskOne方法还没介绍完成呢,TaskOne生成的代码如下所示

//TaskOne方法编译时生成的代码
[CompilerGenerated]
private sealed class <<<Main>$>g__TaskOne|0_0>d : IAsyncStateMachine
{
	public int <>1__state;
	public AsyncTaskMethodBuilder<string> <>t__builder;
	private HttpResponseMessage <httpResponse>5__1;
	private string <content>5__2;
	private HttpResponseMessage <>s__3;
	private string <>s__4;

	[System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })]
	private TaskAwaiter<HttpResponseMessage> <>u__1;

	[System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })]
	private TaskAwaiter<string> <>u__2;

	private void MoveNext()
	{
		int num = <>1__state;
		string result;
		try
		{
			//因为我们使用了两次await所以这里会有两个TaskAwaiter<>实例
			//var httpResponse = await ClassFactory.Client.GetAsync("https://www.cnblogs.com");
                        //var content = await httpResponse.Content.ReadAsStringAsync();
			TaskAwaiter<string> awaiter;
			TaskAwaiter<HttpResponseMessage> awaiter2;
			if (num != 0)
			{
				if (num == 1)
				{
					awaiter = <>u__2;
					<>u__2 = default(TaskAwaiter<string>);
					num = (<>1__state = -1);
					goto IL_0100;
				}
				//这段逻辑针对的是我们手写的这段代码
				//await ClassFactory.Client.GetAsync("https://www.cnblogs.com")
				awaiter2 = ClassFactory.Client.GetAsync("https://www.cnblogs.com").GetAwaiter();
                                //判断任务是否完成
				if (!awaiter2.IsCompleted)
				{
					num = (<>1__state = 0);
					<>u__1 = awaiter2;
					<<<Main>$>g__TaskOne|0_0>d stateMachine = this;
					<>t__builder.AwaitUnsafeOnCompleted(ref awaiter2, ref stateMachine);
					return;
				}
			}
			else
			{
				awaiter2 = <>u__1;
				<>u__1 = default(TaskAwaiter<HttpResponseMessage>);
				num = (<>1__state = -1);
			}
			//同步获取HttpResponseMessage结果实例
			<>s__3 = awaiter2.GetResult();
			<httpResponse>5__1 = <>s__3;
			<>s__3 = null;
			//这段代码对应生成的则是await httpResponse.Content.ReadAsStringAsync()
			awaiter = <httpResponse>5__1.Content.ReadAsStringAsync().GetAwaiter();
			if (!awaiter.IsCompleted)
			{
				num = (<>1__state = 1);
				<>u__2 = awaiter;
				<<<Main>$>g__TaskOne|0_0>d stateMachine = this;
				<>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
				return;
			}
			goto IL_0100;
			IL_0100:
			//同步获取httpResponse.Content.ReadAsStringAsync()放的结果
			<>s__4 = awaiter.GetResult();
			<content>5__2 = <>s__4;
			<>s__4 = null;
			result = <content>5__2;
		}
		catch (Exception exception)
		{
			<>1__state = -2;
			<httpResponse>5__1 = null;
			<content>5__2 = null;
			<>t__builder.SetException(exception);
			return;
		}
		<>1__state = -2;
		<httpResponse>5__1 = null;
		<content>5__2 = null;
		//调用AsyncTaskMethodBuilder<>方法放置httpResponse.Content.ReadAsStringAsync()结果
		<>t__builder.SetResult(result);
	}

	void IAsyncStateMachine.MoveNext()
	{
		this.MoveNext();
	}

	[DebuggerHidden]
	private void SetStateMachine([System.Runtime.CompilerServices.Nullable(1)] IAsyncStateMachine stateMachine)
	{
	}

	void IAsyncStateMachine.SetStateMachine([System.Runtime.CompilerServices.Nullable(1)] IAsyncStateMachine stateMachine)
	{
		this.SetStateMachine(stateMachine);
	}
}

到这里为止,这些方法就是编译器帮我们生成的代码,也就是这些代码就在生成好的dll里的。

启动状态机#

接下来我们分析一下状态机的调用过程,回到上面的stateMachine.<>t__builder.Start(ref stateMachine)这段状态机启动代码,我们跟进去看一下里面的逻辑

[MethodImpl(MethodImplOptions.AggressiveInlining)]
[DebuggerStepThrough]
public void Start<[Nullable(0)] TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
{
	//调用了AsyncMethodBuilderCore的Start方法并传递状态机实例
	//即<<Main>$>d__0 stateMachine = new <<Main>$>d__0()实例
	AsyncMethodBuilderCore.Start(ref stateMachine);
}

//AsyncMethodBuilderCore的Start方法
[DebuggerStepThrough]
public static void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
{
	if (stateMachine == null)
	{
		ThrowHelper.ThrowArgumentNullException(ExceptionArgument.stateMachine);
	}
	//获取当前线程实例
	Thread currentThread = Thread.CurrentThread;
	//获取当前执行上下文
	ExecutionContext executionContext = currentThread._executionContext;
	//获取当前同步上下文
	SynchronizationContext synchronizationContext = currentThread._synchronizationContext;
	try
	{
		//调用状态机的MoveNext方法
		stateMachine.MoveNext();
	}
	finally
	{
		//执行完MoveNext之后
		//还原SynchronizationContext同步上下文到当前实例
		if (synchronizationContext != currentThread._synchronizationContext)
		{
			currentThread._synchronizationContext = synchronizationContext;
		}
		//还原ExecutionContext执行上下文到当前实例
		ExecutionContext executionContext2 = currentThread._executionContext;
		if (executionContext != executionContext2)
		{
                        //执行完成之后把执行上下文装载到当前线程
			ExecutionContext.RestoreChangedContextToThread(currentThread, executionContext, executionContext2);
		}
	}
}

执行完异步任务之后,会判断SynchronizationContext同步上下文环境和ExecutionContext执行上下文环境,保证异步异步之后的可以操作UI线程上的控件,或者异步的后续操作和之前的操作处在相同的执行上线文中。

题外话:ExecutionContext 是一个用于传递状态和环境信息的类,它可以在不同的执行上下文之间传递状态。执行上下文表示代码执行的环境,包括线程、应用程序域、安全上下文和调用上下文等。ExecutionContext 对象包含当前线程上下文的所有信息,如当前线程的安全上下文、逻辑执行上下文、同步上下文和物理执行上下文等。它提供了方法,可以将当前的执行上下文复制到另一个线程中,或者在异步操作之间保存和还原执行上下文。在异步编程中,使用 ExecutionContext 可以确保代码在正确的上下文中运行,并且传递必要的状态和环境信息。

SynchronizationContext 是一个用于同步执行上下文和处理 UI 线程消息循环的抽象类。它可以将回调方法派发到正确的线程中执行,避免了跨线程访问的问题,并提高了应用程序的响应性和可靠性。在异步编程中,可以使用 SynchronizationContext.Current 属性获取当前线程的同步上下文,并使用同步上下文的 Post 或 Send 方法将回调方法派发到正确的线程中执行。

由于调用stateMachine.<>t__builder.Start(ref stateMachine)传递的是new <<Main>$>d__0()实例,所以这里核心就是在调用生成的状态机IAsyncStateMachine实例,即我们上面的<<Main>$>d__0类的MoveNext()方法

void IAsyncStateMachine.MoveNext()
{
	this.MoveNext();
}

由上面的代码可知,本质是调用的私有的MoveNext()方法,即会执行我们真实逻辑的那个方法。由于编译器生成的状态机代码的逻辑是大致相同的,所以我们直接来看,我们业务具体落实的代码即<<<Main>$>g__TaskOne|0_0>d状态机类里的,私有的那个MoveNext方法代码

AsyncTaskMethodBuilder<string> <>t__builder;
TaskAwaiter<HttpResponseMessage> awaiter2;
if (num != 0)
{
	if (num == 1)
	{}

        //ClassFactory.Client.GetAsyn()方法生成的逻辑
	awaiter2 = ClassFactory.Client.GetAsync("https://www.cnblogs.com").GetAwaiter();
	if (!awaiter2.IsCompleted)
	{
		num = (<>1__state = 0);
		<>u__1 = awaiter2;
		<<<Main>$>g__TaskOne|0_0>d stateMachine = this;
		<>t__builder.AwaitUnsafeOnCompleted(ref awaiter2, ref stateMachine);
		return;
	}
	//同步获取异步结果
	<>s__4 = awaiter.GetResult();
}
else
{}

TaskAwaiter<string> awaiter;
//httpResponse.Content.ReadAsStringAsync()方法生成的逻辑
awaiter = <httpResponse>5__1.Content.ReadAsStringAsync().GetAwaiter();
//判断任务是否完成
if (!awaiter.IsCompleted)
{
	num = (<>1__state = 1);
	<>u__2 = awaiter;
	<<<Main>$>g__TaskOne|0_0>d stateMachine = this;
	<>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
	return;
}
//同步获取异步结果,并将返回值装载
result= awaiter.GetResult();
<>t__builder.SetResult(result);

当然这里我们省了里面的很多逻辑,为了让结构看起来更清晰一点。
通过上面的它生成的结构来看,我们写代码的时候一个方法里的每个await都会被生成一个TaskAwaiter逻辑,根据当前异步状态IsCompleted判断任务是否完成,来执行下一步操作。如果任务未完成IsCompleted为false则调用AsyncTaskMethodBuilder实例的AwaitUnsafeOnCompleted方法,如果异步已完成则直接获取异步结果,进行下一步。

执行异步任务#

通过上面的逻辑我们可以看到,如果异步任务没有完成则调用了AsyncTaskMethodBuilder实例的AwaitUnsafeOnCompleted方法。接下来我们就看下AwaitUnsafeOnCompleted方法的实现

public void AwaitUnsafeOnCompleted<[Nullable(0)] TAwaiter, [Nullable(0)] TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine) where TAwaiter : ICriticalNotifyCompletion where TStateMachine : IAsyncStateMachine
{
	//调用AwaitUnsafeOnCompleted方法
	AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine, ref m_task);
}

internal static void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine, [NotNull] ref Task<TResult> taskField) where TAwaiter : ICriticalNotifyCompletion where TStateMachine : IAsyncStateMachine
{
	//创建IAsyncStateMachineBox实例
	IAsyncStateMachineBox stateMachineBox = GetStateMachineBox(ref stateMachine, ref taskField);
	//调用AwaitUnsafeOnCompleted()方法
	AwaitUnsafeOnCompleted(ref awaiter, stateMachineBox);
}

internal static void AwaitUnsafeOnCompleted<TAwaiter>(ref TAwaiter awaiter, IAsyncStateMachineBox box) where TAwaiter : ICriticalNotifyCompletion
{
	//判断awaiter实例类型
	if (default(TAwaiter) != null && awaiter is ITaskAwaiter)
	{
		//获取TaskAwaiter实例的m_task属性即Task类型
		TaskAwaiter.UnsafeOnCompletedInternal(Unsafe.As<TAwaiter, TaskAwaiter>(ref awaiter).m_task, box, true);
		return;
	}
	if (default(TAwaiter) != null && awaiter is IConfiguredTaskAwaiter)
	{
		//与上面逻辑一致m_task属性即Task类型本质他们都在操作Task
		ref ConfiguredTaskAwaitable.ConfiguredTaskAwaiter reference = ref Unsafe.As<TAwaiter, ConfiguredTaskAwaitable.ConfiguredTaskAwaiter>(ref awaiter);
		TaskAwaiter.UnsafeOnCompletedInternal(reference.m_task, box, reference.m_continueOnCapturedContext);
		return;
	}
	if (default(TAwaiter) != null && awaiter is IStateMachineBoxAwareAwaiter)
	{
		try
		{
			//调用IStateMachineBoxAwareAwaiter实例的AwaitUnsafeOnCompleted方法
			((IStateMachineBoxAwareAwaiter)(object)awaiter).AwaitUnsafeOnCompleted(box);
			return;
		}
		catch (Exception exception)
		{
			System.Threading.Tasks.Task.ThrowAsync(exception, null);
			return;
		}
	}
	try
	{
		//调用ICriticalNotifyCompletion实例的UnsafeOnCompleted方法
		awaiter.UnsafeOnCompleted(box.MoveNextAction);
	}
	catch (Exception exception2)
	{
		System.Threading.Tasks.Task.ThrowAsync(exception2, null);
	}
}

通过这个方法我们可以看到传递进来的TAwaiter都是ICriticalNotifyCompletion的实现类,所以他们的行为存在一致性,只是具体的实现动作根据不同的实现类型来判断。

  • 如果是ITaskAwaiter类的话直接调用TaskAwaiter.UnsafeOnCompletedInternal()方法,传递了TaskAwaiter.m_task属性,这是一个Task类型的属性
  • 如果是IConfiguredTaskAwaiter类型的话,也是调用了TaskAwaiter.UnsafeOnCompletedInternal()方法,传递了ConfiguredTaskAwaiter.m_task属性,这也是一个Task类型的属性
  • 如果是IStateMachineBoxAwareAwaiter类型的话,调用IStateMachineBoxAwareAwaiter.AwaitUnsafeOnCompleted()方法,传递的是当前的IAsyncStateMachineBox状态机盒子实例,具体实现咱们待会看
  • 如果上面的条件都不满足的话,则调用ICriticalNotifyCompletion.UnsafeOnCompleted()方法,传递的是IAsyncStateMachineBox.MoveNextAction方法,IAsyncStateMachineBox实现类包装了IAsyncStateMachine实现类,这里的stateMachineBox.MoveNextAction本质是在执行IAsyncStateMachine的MoveNext的方法,即我们状态机里我们自己写的业务逻辑。

我们首先来看一下StateMachineBoxAwareAwaiter.AwaitUnsafeOnCompleted()方法,找到一个实现类。因为它的实现类有好几个,比如ConfiguredValueTaskAwaiterValueTaskAwaiterYieldAwaitable等,这里咱们选择有类型的ConfiguredValueTaskAwaiter实现类,看一下AwaitUnsafeOnCompleted方法

void IStateMachineBoxAwareAwaiter.AwaitUnsafeOnCompleted(IAsyncStateMachineBox box)
{
	object? obj = _value._obj;
	Debug.Assert(obj == null || obj is Task || obj is IValueTaskSource);

	if (obj is Task t)
	{
		//如果是Task类型的话会调用TaskAwaiter.UnsafeOnCompletedInternal方法,也是上面咱们多次提到的
		TaskAwaiter.UnsafeOnCompletedInternal(t, box, _value._continueOnCapturedContext);
	}
	else if (obj != null)
	{
		Unsafe.As<IValueTaskSource>(obj).OnCompleted(ThreadPool.s_invokeAsyncStateMachineBox, box, _value._token,
			_value._continueOnCapturedContext ? ValueTaskSourceOnCompletedFlags.UseSchedulingContext : ValueTaskSourceOnCompletedFlags.None);
	}
	else
	{
		//兜底的方法也是TaskAwaiter.UnsafeOnCompletedInternal
		TaskAwaiter.UnsafeOnCompletedInternal(Task.CompletedTask, box, _value._continueOnCapturedContext);
	}
}

可以看到ConfiguredValueTaskAwaiter.AwaitUnsafeOnCompleted()方法最终也是执行到了TaskAwaiter.UnsafeOnCompletedInternal()方法,这个咱们上面已经多次提到了。接下里咱们再来看一下ICriticalNotifyCompletion.UnsafeOnCompleted()方法里的实现是啥,咱们找到它的一个常用的实现类,也是咱们上面状态机帮咱们生成的TaskAwaiter<>类里的实现

public void UnsafeOnCompleted(Action continuation)
{
	TaskAwaiter.OnCompletedInternal(m_task, continuation, true, false);
}
//TaskAwaiter的OnCompletedInternal方法
internal static void OnCompletedInternal(Task task, Action continuation, bool continueOnCapturedContext, bool flowExecutionContext)
{
	ArgumentNullException.ThrowIfNull(continuation, "continuation");
	if (TplEventSource.Log.IsEnabled() || Task.s_asyncDebuggingEnabled)
	{
		continuation = OutputWaitEtwEvents(task, continuation);
	}
	//这里调用了Task的SetContinuationForAwait方法
	task.SetContinuationForAwait(continuation, continueOnCapturedContext, flowExecutionContext);
}

咱们看到了这里调用的是Task的SetContinuationForAwait方法,上面我们提到的AwaitUnsafeOnCompleted方法里直接调用了TaskAwaiterUnsafeOnCompletedInternal方法,咱们可以来看一下里面的实现

internal static void UnsafeOnCompletedInternal(Task task, IAsyncStateMachineBox stateMachineBox, bool continueOnCapturedContext)
{
	if (TplEventSource.Log.IsEnabled() || Task.s_asyncDebuggingEnabled)
	{
		//默认情况下我们是没有去监听EventSource发布的时间消息
		//如果你开启了EventSource日志的监听则会走到这里
		task.SetContinuationForAwait(OutputWaitEtwEvents(task, stateMachineBox.MoveNextAction), continueOnCapturedContext, false);
	}
	else
	{
		task.UnsafeSetContinuationForAwait(stateMachineBox, continueOnCapturedContext);
	}
}

因为默认是没有开启EventSource的监听,所以上面的两个TplEventSource.Log.IsEnabled相关的逻辑执行不到,如果代码里坚挺了相关的EventSource则会执行这段逻辑。SetContinuationForAwait方法和UnsafeSetContinuationForAwait方法逻辑是一致的,只是因为如果开启了EventSource的监听会发布事件消息,其中包装了关于异步信息的事件相关。所以我们可以直接来看UnsafeSetContinuationForAwait方法实现

internal void UnsafeSetContinuationForAwait(IAsyncStateMachineBox stateMachineBox, bool continueOnCapturedContext)
{
	if (continueOnCapturedContext)
	{
	    //winform wpf等ui线程包含同步上下文SynchronizationContext相关的信息
		//如果存在则直接在SynchronizationContext同步上线文中的Post方法把异步结果在ui线程中完成回调执行
		SynchronizationContext current = SynchronizationContext.Current;
		if (current != null && current.GetType() != typeof(SynchronizationContext))
		{
			SynchronizationContextAwaitTaskContinuation synchronizationContextAwaitTaskContinuation = new SynchronizationContextAwaitTaskContinuation(current, stateMachineBox.MoveNextAction, false);
			if (!AddTaskContinuation(synchronizationContextAwaitTaskContinuation, false))
			{
				synchronizationContextAwaitTaskContinuation.Run(this, false);
			}
			return;
		}
		//判断是否包含内部任务调度器,如果不是默认的TaskScheduler.Default调度策略,也就是ThreadPoolTaskScheduler的方式执行MoveNext
		//则使用TaskSchedulerAwaitTaskContinuation的Run方法执行MoveNext
		TaskScheduler internalCurrent = TaskScheduler.InternalCurrent;
		if (internalCurrent != null && internalCurrent != TaskScheduler.Default)
		{
			TaskSchedulerAwaitTaskContinuation taskSchedulerAwaitTaskContinuation = new TaskSchedulerAwaitTaskContinuation(internalCurrent, stateMachineBox.MoveNextAction, false);
			if (!AddTaskContinuation(taskSchedulerAwaitTaskContinuation, false))
			{
				taskSchedulerAwaitTaskContinuation.Run(this, false);
			}
			return;
		}
	}
	//执行兜底逻辑使用线程池执行
	if (!AddTaskContinuation(stateMachineBox, false))
	{
		ThreadPool.UnsafeQueueUserWorkItemInternal(stateMachineBox, true);
	}
}

上面我们提到过IAsyncStateMachineBox实现类包装了IAsyncStateMachine实现类,它的stateMachineBox.MoveNextAction本质是在执行AsyncStateMachine的MoveNext的方法,即我们状态机里的自己的业务逻辑。根据上面的逻辑我们来大致总结一下相关的执行策略

  • 如果包含SynchronizationContext同步上下文,也就是winform wpf等ui线程,则直接在SynchronizationContext同步上线文中的Post方法把异步结果在ui线程中完成回调执行,里面的核心方法咱们待会会看到
  • 如果TaskScheduler调度器不是默认的ThreadPoolTaskScheduler调度器,则使用自定义的TaskScheduler来执行MoveNext方法,统一里面的核心方法咱们待来看
  • 兜底的逻辑则是使用线程池来执行,即使用ThreadPool的UnsafeQueueUserWorkItemInternal方法

好了上面留下了两个核心的方法,没有展示相关的实现,首先咱们来看下TaskSchedulerAwaitTaskContinuation的Run方法,这个方法适用于存在同步上下文的场景,来看下它的核心逻辑

internal sealed override void Run(Task task, bool canInlineContinuationTask)
{
	//判断当前线程同步上下文是否和传递的同步上下文一致,则直接执行,说明当前线程可以直接使用异步结果
	if (canInlineContinuationTask && m_syncContext == SynchronizationContext.Current)
	{
		RunCallback(AwaitTaskContinuation.GetInvokeActionCallback(), m_action, ref Task.t_currentTask);
		return;
	}
    //如果不是同一个同步上下文则执行PostAction委托
	RunCallback(PostAction, this, ref Task.t_currentTask);
}

private static void PostAction(object state)
{
	//通过传递的state来捕获执行回调的同步上下文,这里使用的SynchronizationContext的非阻塞的Post方法来执行后续逻辑
	SynchronizationContextAwaitTaskContinuation synchronizationContextAwaitTaskContinuation = (SynchronizationContextAwaitTaskContinuation)state;
	synchronizationContextAwaitTaskContinuation.m_syncContext.Post(s_postCallback, synchronizationContextAwaitTaskContinuation.m_action);
}

protected void RunCallback(ContextCallback callback, object state, ref Task currentTask)
{
        //捕获执行上下文,异步执行完成之后在执行上下文中执行后续逻辑
	ExecutionContext capturedContext = m_capturedContext;
	if (capturedContext == null)
	{
		//核心逻辑就是再行上面的委托即AwaitTaskContinuation.GetInvokeActionCallback方法或PostAction方法
		callback(state);
	}
	else
	{
		ExecutionContext.RunInternal(capturedContext, callback, state);
	}
}

上面的方法省略了一些逻辑,为了让逻辑看起来更清晰,我们可以看到里面的逻辑,即在同步上下文SynchronizationContext中执行异步的回调的结果。如果当前线程就包含同步上下文则直接执行,如果不是则使用之前传递进来的同步上下文来执行。执行的时候会尝试捕获执行上下文。咱们还说到了如果TaskScheduler调度器不是默认的ThreadPoolTaskScheduler调度器,则使用自定义的TaskScheduler来执行MoveNext方法,来看下里面的核心实现

internal sealed override void Run(Task ignored, bool canInlineContinuationTask)
{
	//如果当前的scheduler策略是TaskScheduler.Default即默认的ThreadPoolTaskScheduler
	//则直接使用默认策略调度任务
	if (m_scheduler == TaskScheduler.Default)
	{
		base.Run(ignored, canInlineContinuationTask);
		return;
	}
	//如果不是默认策略则使用,我们定义的TaskScheduler
	Task task = CreateTask(delegate(object state)
	{
		try
		{
			((Action)state)();
		}
		catch (Exception exception)
		{
			Task.ThrowAsync(exception, null);
		}
	}, m_action, m_scheduler);//这里的m_scheduler指的是自定义的TaskScheduler
    bool flag = canInlineContinuationTask && (TaskScheduler.InternalCurrent == m_scheduler || Thread.CurrentThread.IsThreadPoolThread);
	//或者是task其他形式的策略执行
	if (flag)
	{
		TaskContinuation.InlineIfPossibleOrElseQueue(task, false);
		return;
	}
	try
	{
		task.ScheduleAndStart(false);
	}
	catch (TaskSchedulerException)
	{
	}
}

这个逻辑看起来比较清晰,即根据Task的执行策略TaskScheduler判断如何执行任务,比如默认的ThreadPoolTaskScheduler策略,或其他策略,比如单线程策略或者自定义的等等。
上面的执行过程可以总结为以下两点

  • 是否是Task调度,否则执行默认的ThreadPool.UnsafeQueueUserWorkItemInternal()执行。如果是TaskScheduler则判断是哪一种策略,比如是默认的ThreadPoolTaskScheduler或是其它策略亦或是自定义策略等。
  • 是否包含同步上下文SynchronizationContext,比如UI线程,大家都知道修改界面控件需要在UI线程上才能执行,但是await操作可能存在线程切换如果await的结果需要在UI展示需要同步上下文保证异步的结果在UI线程中执行。

线程池和Task关联#

如果任务需要执行中,我们总得想办法把结果给相应的Task实例,这样我们才能在执行完成之后把得到对应的执行状态或者执行结果在相关的Task中体现出来,方便我们判断Task是否执行完成或者获取相关的执行结果,在ThreadPoolWorkQueue中有相关的逻辑具体在DispatchWorkItem方法中

private static void DispatchWorkItem(object workItem, Thread currentThread)
{
	//判断在线程池中自行的任务书否是Task任务
	Task task = workItem as Task;
	if (task != null)
	{
		task.ExecuteFromThreadPool(currentThread);
	}
	else
	{
		Unsafe.As<IThreadPoolWorkItem>(workItem).Execute();
	}
}

ThreadPool里的线程执行了Task的ExecuteWithThreadLocal的方法,核心执行方法在Task的ExecuteWithThreadLocal,这样的话执行相关的结果就可以体现在Task实例中,比如Task的IsCompleted属性判断是否执行完成,或者Task<TResult>的GetResultf方法获取结果等等。

Task的FromResult#

这里需要注意的是Task.FromResult<TResult>(TResult)这个方法,相信大家经常用到,如果你的执行结果需要包装成Task<TResult>总会用到这个方法。它的意思是创建一个Task<TResult>,并以指定结果成功完成。,也就是Task<TResult>的IsCompleted属性为true,这个结论可以在dotnet apiTask.FromResult(TResult)文档中看到,因为我们只需要把我们已有的结果包装成Task所以不涉及到复杂的执行,这也意味着在生成状态机的时候MoveNext方法里的逻辑判断IsCompleted时候代表任务是直接完成的,会直接通过GetResult()获取到结果,不需要AwaitUnsafeOnCompleted去根据执行策略执行

private void MoveNext()
{
	int num = <>1__state;
	try
	{
		TaskAwaiter<string> awaiter;
		if (num != 0)
		{
			awaiter = Task.FromResult("Hello World").GetAwaiter();
			//这里的IsCompleted会为true不会执行相关的执行策略
			if (!awaiter.IsCompleted)
			{
				<>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
				return;
			}
		}
		else
		{
		}
		<>s__2 = awaiter.GetResult();
	}
	catch (Exception exception)
	{
	}
	<>t__builder.SetResult();
}

总结#

    本文主要是展示了近期对async和await生成的状态机的研究,大概了解了相关的执行过程。由于异步编程涉及到的东西比较多,而且相当复杂,足够写一本书。所以本文设计到的不过是一些皮毛,也由于本人能力有限理解的不一定对,还望谅解。通过本文大家知道async和await是语法糖,会生成状态机相关代码,让我们来总结一下

  • 首先async和await是语法糖,会生成状态机类并填充我们编写的业务代码相关
  • 如果是未完成任务也就是IsCompleted为false则会执行相关的逻辑去执行任务
    • 是否是Task调度,否则执行默认的ThreadPool.UnsafeQueueUserWorkItemInternal()执行。如果是TaskScheduler则判断是哪一种策略,比如是默认的ThreadPoolTaskScheduler或是其它策略亦或是自定义策略等。
    • 是否包含同步上下文SynchronizationContext,比如UI线程,大家都知道修改界面控件需要在UI线程上才能执行,但是await操作可能存在线程切换如果await的结果需要在UI展示需要同步上下文保证异步的结果在UI线程中执行。
  • 需要注意的是Task.FromResult<TResult>(TResult)这个方法,它的意思是创建一个Task<TResult>,并以指定结果成功完成。,也就是Task<TResult>的IsCompleted属性为true

结论只涉及到了async和await语法糖生成的状态机相关,不涉及到关于异步或者同步相关的知识点,因为说到这些话题就变得很大了,还望谅解。

最近看到许多关于裁员跳槽甚至是换行的,每个人都有自己的生活,都有自己的处境,所以有些行为我们要换位思考,理解他们选择生活的方式,每个人能得到自己想要的,能开心就好,毕竟精力有限,为了最想要的总要舍弃一些。

👇欢迎扫码关注我的公众号👇
2042116-20200622133425514-1420050576.png

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK