虽然C#转眼间已经发展到了10.0版本,但有些在 C#5.0 引入并随.NET Framework 4.5一同发布的特性比如async和await我仍不是很清楚,但这些新特性在一些人工智能生成的代码中使用的越来越多,因为它们确实能够提高异步编程时代码的清晰度,所以最近几天好好研究了一下它的原理。

异步编程可以解决线程因为等待独占式任务而导致的阻塞,但异步编程一直是一个难点,如何正确实现异步模式却不容易,因为它涉及到诸如状态管理、异常管理、同步上下文、异步任务组合等等诸多问题。C#中有很多异步编程模型,最早的有APM (Asynchronous Programming Model) 、EAP(Event-Based Asynchronous Pattern)以及建立在EAP基础上的BackgroundWorker等等,这些异步编程模型在场景比较简单时可能可以满足要求,但是在多任务组合、取消、异常管理以及易用度上随着问题的复杂而变得难以管理。于是TAP(Task-Based Asynchronous Pattern)异步编程模型出现了,它极大的解决了这些痛点。

TAP引入后,在语言层面上C#引入了async和await关键字,它使得能够以同步的方式编写异步代码。在尝试理解async和await的原理之前,我阅读了一些书籍,比如《C# in Depth,Fourth Edition》、《C# 12 in a Nutshell》,前一本很详细介绍了async和await,但后一本我更推荐,本文主要内容来自于这两本书的相关章节。

本文首先简单快速介绍C#中以前的那些异步编程模型,比如上面提到的APM、EAP、BackgroundWorker以及并行库(Task Parallel Liberary,TPL)等,随后介绍TAP这种新式的异步编程模式。TAP基于Task,所以在介绍TAP之前简单介绍了C#中的线程模型,如Thread、ThreadPool、Parallel Liberay以及Task。有了Task相关的基础,进而介绍了async和await如何在task的基础上来简化异步编程,最后介绍了async和await背后的原理。

旧有的异步编程模型


.NET Framework提供了几种异步编程模型,它在一定程度上减轻了异步编程的难度。

APM


最早的异步模型是APM(Asynchronous Programming Model),它在.NET Framework 1.x中就已存在。通过使用一对以BeginXXX和EndXXX开头的方法和一个名为IAsyncResult的接口来实现。比如System.IO.Stream类有Read方法,它的签名如下:

public int Read (byte[] buffer, int offset, int size);

这是一个同步阻塞方法。它对应的APM版本是以下两个方法:

public IAsyncResult BeginRead (byte[] buffer, int offset, int size,AsyncCallback callback, object state);
public int EndRead (IAsyncResult asyncResult);

首先通过调用BeginRead方法来发起一个操作,该操作返回一个IAsyncResult对象,这个对象可以用来进行后续的异步操作。当操作完成或抛出异常,AsyncCallback委托会被回调:

public delegate void AsyncCallback (IAsyncResult ar);

在该回调方法中调用EndRead方法,可以获取操作结果或者重新抛出异常。示例代码如下:

private void ReadFile()
{
	FileStream fs = new FileStream("test.txt", FileMode.Open);
	byte[] buffer = new byte[1024];
	IAsyncResult result = fs.BeginRead(buffer, 0, buffer.Length, ReadCallback, fs);
}
static void ReadCallback(IAsyncResult result)
{
	FileStream fs = (FileStream)result.AsyncState;
	int bytesRead = fs.EndRead(result);
	Console.WriteLine($"Read {bytesRead} bytes.");
	fs.Close();
}

APM方式需要开发人员手动管理回调和状态,它的缺点是代码比较复杂和难以维护,尤其是在处理多个异步调用时。比如上述例子,因为它只是一个demo,看起来似乎也没那么的复杂,如果“使用FileStream的BeginRead方法,完整读取一个文本文件的内容”,完整的代码是下面这样的:

// 保存读取状态的类
class ReadState
{
	public FileStream FileStream { get; set; }
	public byte[] Buffer { get; set; }
	public StringBuilder ContentBuilder { get; set; }
	public TaskCompletionSource<string> TaskCompletionSource { get; set; }
	public bool ManagesFileStream { get; set; }
}

// 使用BeginRead实现的异步文件读取方法
static Task<string> ReadFileAsync(string filePath)
{
	var tcs = new TaskCompletionSource<string>();
	FileStream fileStream = null;
	try
	{
		fileStream = new FileStream(filePath, FileMode.Open, FileAccess.Read);
		var state = new ReadState
		{
			FileStream = fileStream,
			Buffer = new byte[4096],
			ContentBuilder = new StringBuilder(),
			TaskCompletionSource = tcs,
			ManagesFileStream = true // 标记文件流由状态对象管理
		};

		// 开始异步读取
		BeginReadAsync(state);
	}
	catch (Exception ex)
	{
		fileStream?.Dispose();
		tcs.SetException(ex);
	}
	return tcs.Task;
}


static void BeginReadAsync(ReadState state)
{
	try
	{
		IAsyncResult asyncResult = state.FileStream.BeginRead(state.Buffer, 0, state.Buffer.Length, ReadFileAsyncCallback, state);
	}
	catch (Exception ex)
	{
		// 处理BeginRead可能抛出的异常
		state.TaskCompletionSource.SetException(ex);

		// 发生异常时清理资源
		if (state.ManagesFileStream)
		{
			state.FileStream.Dispose();
			state.ManagesFileStream = false;
		}
	}
}

// 异步读取回调方法
static void ReadFileAsyncCallback(IAsyncResult asyncResult)
{
	var state = (ReadState)asyncResult.AsyncState;
	bool readComplete = false;
	try
	{
		int bytesRead = state.FileStream.EndRead(asyncResult);
		if (bytesRead > 0)
		{
			// 转换并追加读取的内容
			string chunk = Encoding.UTF8.GetString(state.Buffer, 0, bytesRead);
			state.ContentBuilder.Append(chunk);
			// 继续读取下一块
			BeginReadAsync(state);
		}
		else
		{
			// 读取完成
			readComplete = true;
			// 读取完成
			state.TaskCompletionSource.SetResult(state.ContentBuilder.ToString());
		}
	}
	catch (Exception ex)
	{
		state.TaskCompletionSource.SetException(ex);
		readComplete = true;
	}
	finally
	{
		// 仅在读取完成或发生异常时关闭文件流
		if (readComplete && state.ManagesFileStream)
		{
			state.FileStream.Dispose();
			state.ManagesFileStream = false;
		}
	}
}

这样看来,还简单吗?

上面的例子其实也展示了如何使用TaskCompletionSouce来封装BeginRead和EndRead方法。这个跟使用Task.Factory.FromAsync适配方法如出一辙,该方法能够将APM方法对转换为Task。在方法内部,当异步操作完成或者发生异常时,它使用TaskCompletionSouce对象来通知Task。比如上述的BeginRead和EndRead方法,可以通过下面的方法来包装成一个Task:

private void ReadFileV2()
{
	FileStream stream = new FileStream("test.txt", FileMode.Open);
	byte[] buffer = new byte[1024];
	Task<int> readChunk = Task<int>.Factory.FromAsync(stream.BeginRead, stream.EndRead, buffer, 0, buffer.Length, null);
	Console.WriteLine($"Read {readChunk.Result} bytes.");
	stream.Close();
}

获得了读取的字节数之后,通过buffer和字节数,可以将二进数据转为文本。

这种方式还是过于复杂,.NET 后面的版本中FileStream提供了ReadAsync方法,可以极大简化上述操作。

EAP


从.NET 2.0开始提供了事件驱动的异步模型EAP(Event-Based Asynchronous Pattern),相比APM它更简单,特别是在UI编程场景中。然而在.NET中提供这种实现的类型并不多,比如WebClient类,它的方法如下:

public byte[] DownloadData (Uri address); // Synchronous version
public void DownloadDataAsync (Uri address);
public void DownloadDataAsync (Uri address, object userToken);
public event DownloadDataCompletedEventHandler DownloadDataCompleted;
public void CancelAsync (object userState); // Cancels an operation
public bool IsBusy { get; } // Indicates if still running

*Async方法发起了一个异步操作。当操作完成时,*Completed事件将会被调用,回调事件的参数中包含了以下信息:

  • 一个用来表示操作是否被取消的标记 (如果调用CancelAsync的话)
  • 一个Error对象用来表示是否有异常抛出
  • 调用Async方法时,可选的userToken对象

EAP类型也提供了表示进度的事件,当事件状态发生变化时,会触发进度报告回调:

public event DownloadProgressChangedEventHandler DownloadProgressChanged;

一个简单的下载例子如下:

static async Task Main(string[] args)
{
	string downloadUrl = "https://dotnetcli.blob.core.windows.net/dotnet/Sdk/6.0.402/dotnet-sdk-6.0.402-win-x64.zip";
	string savePath = @"C:\Temp\dotnet-sdk.zip";

	try
	{
		Console.WriteLine("开始异步下载文件...");
		await DownloadFileAsync(downloadUrl, savePath);
		Console.WriteLine("下载完成!");
	}
	catch (Exception ex)
	{
		Console.WriteLine($"下载过程中发生错误: {ex.Message}");
	}

	Console.WriteLine("按任意键退出...");
	Console.ReadKey();
}

static async Task DownloadFileAsync(string url, string filePath)
{
	using (WebClient client = new WebClient())
	{
		// 注册下载进度变更事件
		client.DownloadProgressChanged += (sender, e) =>
		{
			Console.WriteLine($"下载进度: {e.ProgressPercentage}% - 已下载 {e.BytesReceived / 1024} KB / 总计 {e.TotalBytesToReceive / 1024} KB");
		};

		// 注册下载完成事件
		client.DownloadFileCompleted += (sender, e) =>
		{
			if (e.Error != null)
			{
				Console.WriteLine($"下载失败: {e.Error.Message}");
			}
			else if (e.Cancelled)
			{
				Console.WriteLine("下载已取消");
			}
			else
			{
				Console.WriteLine("文件下载成功");
			}
		};
		// 执行异步下载
		await client.DownloadFileTaskAsync(new Uri(url), filePath);
	}
}

实现EAP模式需要大量的模板代码,因此不够优雅。

BackgroundWorker


BackgroundWorker在System.ComponentModel命名空间下,它是一个通用的EAP模式实现类。它允许一下UI客户端程序启动一个工作线程,并报告执行完成或百分比进度,且无需显示捕获同步上下文。

var worker = new BackgroundWorker { WorkerSupportsCancellation = true };
worker.DoWork += (sender, args) =>
{ 
        // This runs on a worker thread
	if (args.Cancel) return;
	Thread.Sleep(1000);
	args.Result = 123;
};
worker.RunWorkerCompleted += (sender, args) =>
{ 
        // Runs on UI thread
        // We can safely update UI controls here...
	if (args.Cancelled)
		Console.WriteLine("Cancelled");
	else if (args.Error != null)
		Console.WriteLine("Error: " + args.Error.Message);
	else
		Console.WriteLine("Result is: " + args.Result);
};
worker.RunWorkerAsync(); // Captures sync context and starts operation

RunWorkerAsync方法发起了一个异步操作,它在线程池的工作线程上发起了DoWorker事件,它会捕获同步上下文,当操作完成或出错时,通过同步上下文触发RunWorkerCompleted事件,这与Task里面的Continuation类似。

BackgroundWorker是一种粗粒度(Coarse grained)的并发控制,DoWork事件完全运行在工作线程,如果希望在DoWork中更新UI控件而不是报告完成进度,则需要使用Dispatcher.BeginInvoke或其它类似的方法。

TPL


在 C# 里,任务并行库(Task Parallel Liberary,TPL)是进行异步编程的重要工具,它以任务Task为核心来抽象多线程操作,大大降低了开发人员的编程难度。TPL在.NET 4.0的时候推出,它的核心就是Task类,是一种对线程的更高级别的抽象。下面就逐个介绍线程,线程池和Task以及相关概念。

Thread


在C#中,线程(Thread)基本就是对Windows线程的简单包装。线程运行于操作系统进程中,后者为程序运行提供了一个隔离的环境。如果一个进程中只有一个线程运行,那它就是单线程应用程序。在多线程程序中,一个进程中会运行多个线程,它们共享相同的执行环境(特别是内存)。一个线程可以在后台获取数据,另外一个UI线程可以在数据获取完成之后进行展示,这就是数据的状态共享。

客户端应用程序(控制台、WPF、UAP或者Windows Form)在序启动时,操作系统会自动创建一个单独的名为UI的主线程。这个单一的线程会存在于应用程序的整个生命周期,用户程序可以通过调用Thread类的相关方法来创建和管理多个线程。

可以通过构造函数来实例化一个线程。

using System;
using System.Threading;
Thread t = new Thread (WriteY); // Kick off a new thread
t.Start(); // running WriteY()
// Simultaneously, do something on the main thread.
for (int i = 0; i < 1000; i++) Console.Write ("x");
void WriteY()
{
for (int i = 0; i < 1000; i++) Console.Write ("y");
}

调用Start可以启动一个线程,Join和Sleep可以等待线程运行结束或者暂停当前线程一段时间。这里有两个特别的方法: Thread.Sleep(0)和Thread.Yield()。调用Thread.Sleep(0)时,当前线程会立即放弃自己的 CPU 时间片,告诉操作系统 “我现在不使用 CPU 了,请重新调度”。但它只会让给相同优先级或更高优先级的线程。如果没有其他符合条件的线程,当前线程会立即恢复执行。而Thread.Yield()则会尝试将当前线程的 CPU 时间片让给同一处理器核心(same processor)上的其他线程(仅针对多核系统)。如果没有其他线程需要执行,当前线程会立即继续执行

当需要在繁忙的循环中让出 CPU,允许其他线程(尤其是高优先级线程)执行时使用Thread.Sleep(0),而在多核系统中,当某个线程需要频繁让出 CPU 给同一核心上的其他线程时使用Thread.Yield(),例如在实现自旋锁(SpinLock)时。在现代 C# 开发中,更推荐使用Task.Yield()await Task.Delay(0),它们基于 TPL(任务并行库),能更好地与异步编程模型集成,避免直接操作线程带来的复杂性。

I/O密集型任务和计算密集型任务


如果一个操作大部分时间都在等待某个事件发生,那么它就是I/O密集型(I/O-bound)操作,例如下载一个网页或者简单的调用Console.ReadLine,I/O密集型操作通常会涉及到输入输出操作。与之相对的是,如果一个操作大部分时间都在执行大量的CPU操作,那么它就是计算密集型任务。

I/O密集型操作通常要么以同步的方式等待在当前线程上直到操作完成(如Console.ReadLine,Thread.Sleep或Thread.Join),要么发起一个异步操作后返回,待操作完成之后触发一个回调。同步的I/O密集型操作的大部分时间都花费在阻塞线程上,比如:

while (DateTime.Now < nextStartTime)
Thread.Sleep (100);

这种方式非常耗费CPU资源,它将一个I/O密集型的操作转变为了一个计算密集型操作。

手动创建线程也会遇到变量的状态共享问题,要处理变量的跨线程操作,通常需要加锁。

参数传递


可以在调用Thread.Start方法时,给线程传递数据,比如:

Thread t = new Thread (Print);
t.Start ("Hello from t!");
void Print (object messageObj)
{
    string message = (string) messageObj; // We need to cast here
    Console.WriteLine (message);
}

这是因为Thread的构造函数有如下重载:

public delegate void ThreadStart();
public delegate void ParameterizedThreadStart (object obj);

另外,也可以通过Lambda表达式来捕获参数。但是,需要特别留意一些对捕获参数进行修改的特殊情境,比如:

for (int i = 0; i < 10; i++)
    new Thread (() => Console.Write (i)).Start();

以上代码的输出是不确定的,一个典型的输出结果如下:

47444568910

这是因为在整个for循环的生命周期中,只有一个变量i,它引用的都是同一个内存地址。因此,每个线程调用Console.Write时,这个变量i的值都可能会被改变。可以使用一个临时变量来保存i的值。

for (int i = 0; i < 10; i++)
{
	int tmp = i;
	new Thread(() => Console.Write(tmp)).Start();
}

异常处理


异常处理也是线程中需要注意的地方,考虑一下代码:

try
{
    new Thread (Go).Start();
}
catch (Exception ex)
{
    // We'll never get here!
    Console.WriteLine ("Exception!");
}

void Go() { throw null; } // Throws a NullReferenceException

上述代码中try/catch根本不会起作用,因为异常是在新创建的线程中抛出的,而不是在调用线程中抛出。修改方法如下:

new Thread(Go).Start();
void Go()
{
	try
	{
		...
	throw null; // The NullReferenceException will get caught below
			...
	}
		catch (Exception ex)
		{
			// Typically log the exception and/or signal another thread
			// that we've come unstuck
			...
	}
}

这就表示我们需要在所有的线程中方法的入口都添加异常处理逻辑。当然,在WPF或者Windows Form中,可以注册一个全局的异常事件,比如Application.DispatcherUnhandledException 和 Application.ThreadException。当在消息管道中抛出任何非处理异常时,都会触发事件,可以在这些事件中记录详细的日志信息,以便于故障排查。

前台线程和后台线程


在 C# 中,线程分为前台线程(Foreground Thread)和后台线程(Background Thread),它们的主要区别在于对应用程序生命周期的影响。默认情况下,显示创建的线程都是前台线程,只要有一个前台线程还在运行,应用程序就不会退出。即使主线程(Main)结束,其它前台线程仍会继续执行,比如用户界面线程以及一些执行关键业务逻辑的线程。而后台线程则是当所有前台线程结束时,后台线程会被立即终止,无论其是否执行完毕,比如后台定时任务(如心跳检测)或一些非关键的辅助工作(比如清理缓存)。

需要注意的是,后台线程被终止时,不会执行finally块或释放非托管资源(如文件句柄),可能导致资源泄漏。因此,关键操作建议使用前台线程。大多数应用程序应以前台线程处理核心逻辑,后台线程处理辅助任务,避免意外终止重要操作。

同步上下文


SynchronizationContext抽象类实现了一般性的线程封送(thread marshaling)功能。对于富客户端应用的API,比如WPF,UWP或Winform都定义并实例化了SynchronizationContext的子类。当运行在UI线程上时,可以通过SynchronizationContext.Current来获取当前的同步上下文。通过捕获这个属性就可以从工作线程将数据”提交“到UI控件上,比如:

partial class MyWindow : Window
{
	SynchronizationContext _uiSyncContext;
	public MyWindow()
	{
		InitializeComponent();
		// Capture the synchronization context for the current UI thread:
		_uiSyncContext = SynchronizationContext.Current;
		new Thread(Work).Start();
	}

	void Work()
	{
		Thread.Sleep(5000); // Simulate time-consuming task
		UpdateMessage("The answer");
	}
	void UpdateMessage(string message)
	{
		// Marshal the delegate to the UI thread:
		_uiSyncContext.Post(_ => txtMessage.Text = message, null);
	}
}

调用Post方法相当于调用Dispatcher或Control控件的BeginInvoke方法,当然还有个Send方法,它等同于调用Invoke方法。

线程池


当创建一个线程时,都需要耗费大概几百毫秒来创建新的局部变量栈。而线程池通过预先创建一个可回收线程的池子来降低这一开销。线程池对于开发高性能的并行程序和细粒度的并发都是非常必要的。它可以支持运行一些短暂的操作而不会受到线程启动开销的影响。当然线程池线程也有一些限制:

  • 不能设置线程池线程的名称,这会增加代码调试的难度。
  • 线程池线程总是后台线程。
  • 阻塞线程池线程会影响性能。

可以通过 Thread.CurrentThread.IsThreadPoolThread 来判断当前执行操作的线程是否是线程池线程。

最简单的使用线程池线程的方式是执行Task.Run操作:

Task.Run (() => Console.WriteLine ("Hello from the thread pool"));

在.NET Framework 4.0之前,使用ThreadPool.QueueUserWorkItem方法:

ThreadPool.QueueUserWorkItem (notUsed => Console.WriteLine ("Hello"));

在.NET中,一些服务或组件隐式使用了线程池:

  • ASP.NET Core和Web API应用
  • System.Timers.Timer以及System.Threading.Timer
  • TPL
  • BackgroundWorker类

Task


线程(Thread)是创建并发的底层工具,它有一些局限:

  • 线程在启动时虽然很容易传递参数,但在Join的时候却很难从中获取”返回值“。一般需要定义一些共享的字段。此外,捕获和处理线程中抛出的异常也非常麻烦。
  • 很难在线程结束的时候告知线程继续执行一些操作。通常只能调用Join方法,然而Join是一个阻塞的方法,它会导致主线程被阻塞。

这些局限性会影响细粒度并发的实现。简单来说,就是很难将一些小的并发组合成大的并发操作,从而导致需要依赖更多的手动同步处理,比如使用锁 locking,信号发送singaling等,这很容易造成问题。

直接操作线程也会对线程产生影响,因为创建线程会有内存和时间方面的开销,如果需要运行大量并发的I/O密集型操作,则基于线程的方法仅仅在线程本身开销方面就会消耗成百上千兆的内存。

相比线程,Task是一种高层次的抽象,它表示一个并发的操作,而该操作并不一定依赖线程来完成。并且Task支持组合(compositional),可以将它们通过延续(continuation)操作串联起来。它可以使用线程池来减少启动延迟。配合使用TaskCompletionSource,采用回调的方式避免多个线程同时等待I/O密集型操作。

Task是在.NET Framework 4.0中作为TPL (Task Parallel Library) 库的一部分引入的,随后进一步的通过awaiter进行了改进,使得在常见的并发场景中发挥了越来越大的作用,Task类型也是C#异步功能的基础类型。

从.NET Framework 4.5开始,启动一个基于线程的Task的最简单方法是使用Task的静态Run方法,调用时只需要传入一个Action委托(在.NET Framework 4.0中,可以通过Task.Factory.StartNew实现相同的功能,相当于Task.Run的快捷方式):

Task.Run (() => Console.WriteLine ("Foo"));

Run方法默认会使用线程池线程,也就是后台线程,这意味着如果主线程结束,所有的Task都会结束。上面的语句相当于:

new Thread (() => Console.WriteLine ("Foo")).Start();

与Thread不同,Task.Run返回一个Task对象,可以通过该返回对象来检查Status属性来判断状态,或者调用Wait来等待,比如:

Task task = Task.Run (() =>
{
    Thread.Sleep (2000);
    Console.WriteLine ("Foo");
});
Console.WriteLine (task.IsCompleted); // False
task.Wait(); // Blocks until task is complete

默认情况下,Task是在线程池线程上运行的,这适合那些短时运行的计算密集型任务。如果要执行长时间阻塞的操作,可以用如下参数来避免Task在线程池线程上运行:

Task task = Task.Factory.StartNew (() => ..., TaskCreationOptions.LongRunning);

Task有一个泛型子类Task<TResult>,它允许Task返回一个值:

Task<int> task = Task.Run (() => { Console.WriteLine ("Foo"); return 3; });

通过查询Result属性,可以获取Task产生的返回值,如果Task还没执行完毕,则调用Result会阻塞当前线程直至Task结束。

int result = task.Result; // Blocks if not already finished
Console.WriteLine (result); // 3

在下面的代码中,新建一个task来计算2~3000000之间的素数个数:

Task<int> primeNumerTask = Task.Run(() => Enumerable.Range(2, 3000000).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0)));
Console.WriteLine("Task running....");
Console.WriteLine("The answer is " + primeNumerTask.Result);

代码输出结果如下:

Task running....
The answer is 216816

代码首先会输出“Task running....”,若干秒后输出最后的结果“216816”。

与Thread不同,Task可以很方便的对异常进行传递(propagate),因此如果在Task中抛出一个未处理异常,那么在调用Wait方法或者访问Result属性时,就会自动重新抛出异常。

比如下面代码:

Task task = Task.Run(() => { throw null; });
try
{
	task.Wait();
}
catch (AggregateException aex)
{
	if (aex.InnerException is NullReferenceException)
		Console.WriteLine("Null!");
	else
		throw;
}

使用Task的IsFaulted和IsCanceled属性可以在不抛出异常的情况下检测出错的任务,如果这两个属性都返回了false,则说明没有错误发生。如果IsCancel为true则说明任务抛出了OperationCanceledException;如果IsFaulted为true,则说明任务抛出了其它类型的异常,通过访问Exception可以了解异常的详情。

Continuations


当一个Task执行结束时,可以让它继续(延续)执行其它操作,这是通过回调来实现的。给Task任务附加延续有两种方法:第一种方法特别重要,它就是C#中异步函数的实现方式,那就是通过GetAwaiter方法获取一个awaiter对象,然后注册它的OnCompleted方法,比如:

Task<int> primeNumberTask = Task.Run (() =>
Enumerable.Range (2, 3000000).Count (n =>
Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0)));
var awaiter = primeNumberTask.GetAwaiter();
awaiter.OnCompleted (() =>
{
    int result = awaiter.GetResult();
    Console.WriteLine (result); // Writes result
});

调用GetAwaiter方法会返回一个awaiter对象,它的OnCompleted方法告诉先导Task当它执行完毕时,执行该委托操作。awaiter对象它有OnComplete和GetResult方法,以及一个名为IsCompleted的属性。

如果先导Task抛出异常,那么在后续的代码中调用awaiter.GetResult()的时候就会重新抛出这些异常。相比调用GetResult方法,也可以通过直接访问Result属性来得到前面方法的执行结果。GetResult方法的好处在于它能够重新抛出前面方法抛出的异常,而不是将异常包装到AggregatException中,这种方式能够保持catch块中代码的简单和清晰。

如果存在同步上下文,那么OnComplete能够自动捕获这个上下文,将延续提交到这个上下文中。这在客户端程序中非常有用,因为它能将延续的操作放回到UI线程中运行。

但在编写类库代码的时候,可能不需要这一特性。因为开销较大的UI线程切换应当在程序离开程序库的时候发生一次,而不是出现在方法调用之间发生,可以通过ConfigureAwait方法来避免这一行为:

var awaiter = primeNumberTask.ConfigureAwait (false).GetAwaiter();

如果没有同步上下文或者使用了ConfigureAwait(false),那么延续代码通常会运行在先导任务运行的线程上,从而避免不必要的开销。

另外一种附加延续的方法是调用Task的ContinueWith方法:

primeNumberTask.ContinueWith (antecedent =>
{
    int result = antecedent.Result;
    Console.WriteLine (result); 
});

ContinueWith本身会返回一个Task对象,这使得可以很方便添加更多的延续。但当操作失败的时候,必须直接处理AggregateException异常。另外,如果要将延续封送到UI程序上,还需要通过TaskSchedulers参数来设置。在非UI上下文中,如果需要延续任务和先导任务在同一个线程中执行,则必须指定TaskContinuationOptions.ExecuteSynchronously,否则后续操作会在线程池线程上执行。ContinueWith在TPL中很有用。

TaskCompletionSource


使用Task.Run能创建一个运行在线程池线程上的任务,另外一种创建Task的方式是使用TaskCompletionSource。TaskCompletionSource允许我们创建任何在将来完成的操作,它的原理是生成一个Task,这个Task可以由我们手动指定它的行为,比如它的返回值、何时操作完成或何时失败。要使用TaskCompletionSource很简单,只需要实例化这个类。它提供了一个Task属性来返回一个Task,从而允许等待或者附加延续操作。可以通过以下方法来控制生成Task的各种行为:

public class TaskCompletionSource<TResult>
{
    public void SetResult (TResult result);
    public void SetException (Exception exception);
    public void SetCanceled();
    public bool TrySetResult (TResult result);
    public bool TrySetException (Exception exception);
    public bool TrySetCanceled();
    public bool TrySetCanceled (CancellationToken cancellationToken);
    ...
}

调用这些方法会给Task发生信号,将它的状态变更为完成、错误或取消。这些方法只能调用一次,调用多次会抛出异常,而以Try开头的方法则会返回false。

比如下面这个代码在等待5秒之后打印42:

var tcs = new TaskCompletionSource<int>();
new Thread (() => { 
    Thread.Sleep (5000);
    tcs.SetResult (42); })
{ 
    IsBackground = true
}
.Start();
Task<int> task = tcs.Task; 
Console.WriteLine (task.Result); // 42

使用TaskCompletionSource方法,可以定义一个Run方法:

Task<TResult> Run<TResult> (Func<TResult> function)
{
var tcs = new TaskCompletionSource<TResult>();
new Thread (() =>
{
    try { 
            tcs.SetResult (function()); 
         }
        catch (Exception ex) 
         { 
           tcs.SetException (ex); 
         }
}).Start();
      return tcs.Task;
}
...
Task<int> task = Run (() => { Thread.Sleep (5000); return 42; });

以上方法相当于Task.Factory.StartNew附带TaskCreationOptions.LongRunning参数。

TaskCompletionSource的真正作用是创建一个不绑定线程的任务。比如要创建一个等待5秒钟,然后返回数字42的任务。可以通过Timer类而不是线程来实现这一功能。Timer类可以在x毫秒之后触发某个操作。

Task<int> GetAnswerToLife()
{
    var tcs = new TaskCompletionSource<int>();
    // Create a timer that fires once in 5000 ms:
    var timer = new System.Timers.Timer (5000) { AutoReset = false };
    timer.Elapsed += delegate { 
                                                timer.Dispose(); 
                                                tcs.SetResult (42); 
                                               };
    timer.Start();
    return tcs.Task;
}

这个方法返回一个在5秒后执行的Task,并返回结果42,可以在这个任务后面附加其它延续,而这些操作不会阻塞任何线程。

var awaiter = GetAnswerToLife().GetAwaiter();
awaiter.OnCompleted (() => Console.WriteLine (awaiter.GetResult()));

还可以使用TaskCompletionSource来实现一个通用的Delay方法:

Task Delay (int milliseconds)
{
    var tcs = new TaskCompletionSource<object>();
    var timer = new System.Timers.Timer (milliseconds) { AutoReset = false };
    timer.Elapsed += delegate { timer.Dispose(); tcs.SetResult (null); };
    timer.Start();
    return tcs.Task;
}

现在,之前代码改写如下:

Delay (5000).GetAwaiter().OnCompleted (() => Console.WriteLine (42));

使用TaskCompletionSource来创建一个不依赖线程参与的Task有个好处是可以实现只在后续延续操作开始执行时才让线程参与操作,比如下面这个操作:

for (int i = 0; i < 10000; i++)
Delay (5000).GetAwaiter().OnCompleted (() => Console.WriteLine (42));

Timers使用线程池线程来实现回调操作。因此上述代码在5秒之后线程池会接收到10000个TaskCompletionSource.SetResult(null)请求。如果请求超出了CPU能处理的速度,则这些请求操作会入队然后以一定的并行度来进行处理。

Task.Delay


前面我们实现了一个Delay方法,在Task上也提供了一个静态的Delay方法,比如:

Task.Delay (5000).GetAwaiter().OnCompleted (() => Console.WriteLine (42));

或者

Task.Delay (5000).ContinueWith (ant => Console.WriteLine (42));

Task.Delay是Thread.Sleep的异步版本。

取消操作


当启动一个并行操作后,通常应该提供一个取消选项允许用户进行取消操作。在C#中有一个名为CancellationToken的类,它的签名如下:

class CancellationToken
{
	public bool IsCancellationRequested { get; private set; }
	public void Cancel() { IsCancellationRequested = true; }
	public void ThrowIfCancellationRequested()
	{
		if (IsCancellationRequested)
			throw new OperationCanceledException();
	}
}

通常可以编写一个异步方法,以该类型作为参数:

async Task Foo(CancellationToken cancellationToken)
{
	for (int i = 0; i < 10; i++)
	{
		Console.WriteLine(i);
		await Task.Delay(1000);
		cancellationToken.ThrowIfCancellationRequested();
	}
}

要产生一个CancellationToken,通常会实例化一个CancellationTokenSource对象(这个对象可以共享给其它对异步方法有取消权的对象),然后获取Token属性:

var cancelSource = new CancellationTokenSource();
Task foo = Foo (cancelSource.Token);
...
... (sometime later)
cancelSource.Cancel();

CLR中提供的异步方法都支持取消,比如Delay,比如将上述Foo方法改造一下,添加一个CancellationToken参数,以支持取消Task.Delay:

async Task Foo(CancellationToken cancellationToken)
{
	for (int i = 0; i < 10; i++)
	{
		Console.WriteLine(i);
		await Task.Delay(1000, cancellationToken);
	}
}

现在不需要调用ThrowIfCancellationRequested方法,因为Delay方法本身会这么做。

异步编程与延续


Task非常适合异步编程,因为它支持延续。在前面的代码中使用TaskCompletionSource实现了一个Delay,这是实现底层I/O密集异步操作的标准方式。

对于计算密集型方法,可以使用Task.Run来发起一个线程绑定的并行操作。只需要简单的将返回的Task对象返回给调用者就可以创建一个异步方法。异步编程的目标,我们希望将异步放在底层方法上,这样对于客户端应用程序高层的方法调用可以依旧发生在UI线程中而不需要考虑线程安全问题。比如下面这个计算素数的方法:

int GetPrimesCount (int start, int count)
{
    return  ParallelEnumerable.Range (start, count).Count (n =>Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0));
}

这个方法虽然使用了并行,但是它仍是一个同步的方法,它需要运行一段时间。比如下面这个方法调用:

void DisplayPrimeCounts()
{
    for (int i = 0; i < 10; i++)
    {
        Console.WriteLine (GetPrimesCount (i*1000000 + 2, 1000000) +" primes between " + (i*1000000) + " and " + ((i+1)*1000000-1));
    }
    Console.WriteLine ("Done!");
}

输出结果如下:

78498 primes between 0 and 999999
70435 primes between 1000000 and 1999999
67883 primes between 2000000 and 2999999
66330 primes between 3000000 and 3999999
65367 primes between 4000000 and 4999999
64336 primes between 5000000 and 5999999
63799 primes between 6000000 and 6999999
63129 primes between 7000000 and 7999999
62712 primes between 8000000 and 8999999
62090 primes between 9000000 and 9999999
Done!

我们可以看到,调用图如下:DisplayPrimeCounts方法调用GetPrimesCount方法。在DisplayPrimeCounts方法中使用Console.WriteLine输出计算结束信息。在现实代码中,通常这里会涉及到更新UI界面的操作。我们可以发起一个Task来运行DisplayPrimeCounts方法来实现一个粗粒度( coarse-grained)的并发操作:

Task.Run (() => DisplayPrimeCounts());

这个就是在调用路径的高层实现的异步。而要实现一个细粒度(fine-grained)的并发操作,通常需要将异步操作放在最底层的方法调用上。比如将GetPrimesCount改写为异步版本:

Task<int> GetPrimesCountAsync (int start, int count)
{
return Task.Run (() =>
ParallelEnumerable.Range (start, count).Count (n =>
Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0)));
}

语言层面支持的重要


现在必须修改DisplayPrimeCounts方法,使得它能够调用GetPrimesCountAsync异步方法。这就是C#中await和async关键字的作用,如果没有这两关键字,那么实现起来可能就不是我们想要达成的结果,比如:

for (int i = 0; i < 10; i++)
{
    var awaiter = GetPrimesCountAsync (i*1000000 + 2, 1000000).GetAwaiter();
    awaiter.OnCompleted (() => Console.WriteLine (awaiter.GetResult() + " primes between... "));
}
Console.WriteLine ("Done");

上面这个方法中,会立即执行10次循环,这10次计算素数的操作会并发执行,然后打印"Done",执行结果是:

Done
67883 primes between... 
62090 primes between... 
62712 primes between... 
66330 primes between... 
78498 primes between... 
64336 primes between... 
63129 primes between... 
65367 primes between... 
70435 primes between... 
63799 primes between... 

这显然不是我们想要的结果。通常让多个Task并发执行并不是我们想要的结果,因为每个Task它已经是在并发执行了,让多个Task并发执行只会让花费更多的时间来看到第一个结果。在这个例子中,需要的是将这些Task串行化,即首先运行第一个Task,紧接着执行第二个Task,直到结束。比如访问Web页面,在执行一个HTTP请求之前必须要执行一下DNS查找。

为了得到上述结果,我们需要改写DisplayPrimeCounts方法,让它串行化执行,修改后的递归执行的方法如下:

void DisplayPrimesCount()
{
	DisplayPrimesCount(0);
}

void DisplayPrimesCount(int i)
{
	var awaiter = GetPrimesCountAsync(i * 1000000 + 2, 1000000).GetAwaiter();
	awaiter.OnCompleted(() => {
	Console.WriteLine(awaiter.GetResult() + " primes between...");
	if (++i<10)
	{
		DisplayPrimesCountV3(i);
	}else
	{
		Console.WriteLine("Done");
	}
	});
}

现在调用方法,输出如下结果:

78498 primes between...
70435 primes between...
67883 primes between...
66330 primes between...
65367 primes between...
64336 primes between...
63799 primes between...
63129 primes between...
62712 primes between...
62090 primes between...
Done

基本就是我们想要的结果。

现在假如要将DisplayPrimeCounts方法改为异步返回一个Task对象。要实现这个操作就必须要依赖前面介绍的TaskCompletionSource了。

Task DisplayPrimeCountsAsync()
{
	var machine = new PrimesStateMachine();
	machine.DisplayPrimeCountsFrom(0);
	return machine.Task;
}

class PrimesStateMachine
{
	TaskCompletionSource<object> _tcs = new TaskCompletionSource<object>();
	public Task Task { get { return _tcs.Task; } }
	public void DisplayPrimeCountsFrom(int i)
	{
		var awaiter = GetPrimesCountAsync(i * 1000000 + 2, 1000000).GetAwaiter();
		awaiter.OnCompleted(() =>
	   {
		   Console.WriteLine(awaiter.GetResult());
		   if (++i < 10) DisplayPrimeCountsFrom(i);
		   else { Console.WriteLine("Done"); _tcs.SetResult(null); }
	   });
	}
}

有了异步函数,只需要添加几个关键字,就能将普通的DisplayPrimeCounts方法改为DisplayPrimeCountsAsync方法,而不用手动生成状态机对象然后通过TaskCompletionSource来生成。使用async、await关键字改写如下:

async Task DisplayPrimeCountsAsync()
{
    for (int i = 0; i < 10; i++)
        Console.WriteLine (await GetPrimesCountAsync (i*1000000 + 2, 1000000) +" primes between " + (i*1000000) + " and " + ((i+1)*1000000-1));
    Console.WriteLine ("Done!");
}

这就是语言层面对异步函数支持的威力,它简化了异步方法的编写。上述方法输出如下:

78498 primes between 0 and 999999
70435 primes between 1000000 and 1999999
67883 primes between 2000000 and 2999999
66330 primes between 3000000 and 3999999
65367 primes between 4000000 and 4999999
64336 primes between 5000000 and 5999999
63799 primes between 6000000 and 6999999
63129 primes between 7000000 and 7999999
62712 primes between 8000000 and 8999999
62090 primes between 9000000 and 9999999
Done!

异步函数


async和await关键字可以使得在编写异步代码时能像编写同步代码那样简单自然。

awaiting


await关键字简化了在一个Task后面附加其它延续Task,比如下面的表达式:

var result = await expression;
statement(s);

相当于:

var awaiter = expression.GetAwaiter();
awaiter.OnCompleted (() =>
{
    var result = awaiter.GetResult();
    statement(s);
});

比如前面的方法:

Task<int> GetPrimesCountAsync (int start, int count)
{
    return Task.Run (() =>
        ParallelEnumerable.Range (start, count).Count (n =>
        Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0)));
}

使用await关键字,可以像下面这样调用:

int result = await GetPrimesCountAsync (2, 1000000);
Console.WriteLine (result);

为了DisplayPrimesCount方法通过编译,需要在方法前面添加async关键字。

async void DisplayPrimesCount()
{
    int result = await GetPrimesCountAsync (2, 1000000);
    Console.WriteLine (result);
}

async修饰符指示编译器方法内部的await是一个关键字而不是一个修饰符。async只能修饰那些返回Task的方法或者lambda表达式。被async修饰的方法称为异步方法(asynchronous functions),因为这些方法本身是异步执行的。

当遇到await表达式时,执行流程会返回到调用者,这与迭代器中的yield return类似。但在返回给调用者之前,运行时会在await的方法后面附加一些延续,以保证当await的方法执行完成之后,流程会跳转到await语句之后的方法继续执行。如果await的任务失败,则会抛出异常,否则返回值会赋值给await表达式。上面的DisplayPrimesCount相当于以下方法:

void DisplayPrimesCount()
{
    var awaiter = GetPrimesCountAsync (2, 1000000).GetAwaiter();
    awaiter.OnCompleted (() =>
    {
        int result = awaiter.GetResult();
        Console.WriteLine (result);
    });
}

捕获本地变量


await表达式可以放在方法内部的任何位置,比如可以放在循环内部:

async void DisplayPrimeCounts()
{
    for (int i = 0; i < 10; i++)
    Console.WriteLine (await GetPrimesCountAsync (i*1000000+2, 1000000));
}

在第一次执行GetPrimesCountAsync方法时,由于有await表达式,因此执行点返回给调用者。当方法完成或出错时,执行点会从停止之处恢复执行,流同时保留本地变量以及循环计数i的值。

在UI线程上等待处理


如果在一个Button的回调事件上执行耗时的同步方法,那么UI界面将会阻塞,因为这些耗时的方法在UI线程上执行。

class TestUI : Window
{
	Button _button = new Button { Content = "Go" };
	TextBlock _results = new TextBlock();
	public TestUI()
	{
		var panel = new StackPanel();
		panel.Children.Add(_button);
		panel.Children.Add(_results);
		Content = panel;
		_button.Click += (sender, args) => Go();
	}
	void Go()
	{
		for (int i = 1; i < 5; i++)
			_results.Text += GetPrimesCount(i * 1000000, 1000000) +
			" primes between " + (i * 1000000) + " and " + ((i + 1) * 1000000 - 1) +
			Environment.NewLine;
	}
	int GetPrimesCount(int start, int count)
	{
		return ParallelEnumerable.Range(start, count).Count(n =>
	  Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0));
	}
}

现在,将Button的回调事件改成异步方法:

Task<int> GetPrimesCountAsync(int start, int count)
{
	return Task.Run(() =>
   ParallelEnumerable.Range(start, count).Count(n =>
 Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0)));
}

async void Go()
{
	_button.IsEnabled = false;
	for (int i = 1; i < 5; i++)
		_results.Text += await GetPrimesCountAsync(i * 1000000, 1000000) +
		" primes between " + (i * 1000000) + " and " + ((i + 1) * 1000000 - 1) +
		Environment.NewLine;
	_button.IsEnabled = true;
}

现在,在Go方法中,调用了异步的GetPrimesCountAsync,该方法会运行在工作线程上,而Go方法其余部分都在UI线程上执行。

再举另外一个例子,假设点击按钮之后,需要从几个网页上下载内容,并统计这些内容的总长度。System.Net的WebClinet方法提供了一个异步的DownloadDataTaskAsync方法,它异步下载网页内容,并发挥一个二进制数组Task<byte[]>,通过使用await,我们可以获取byte[]数组:

async void Go()
{
	_button.IsEnabled = false;
	string[] urls = "www.albahari.com www.oreilly.com www.linqpad.net".Split();
	int totalLength = 0;
	try
	{
		foreach (string url in urls)
		{
			var uri = new Uri("http://" + url);
			byte[] data = await new WebClient().DownloadDataTaskAsync(uri);
			_results.Text += "Length of " + url + " is " + data.Length +
			Environment.NewLine;
			totalLength += data.Length;
		}
		_results.Text += "Total length: " + totalLength;
	}
	catch (WebException ex)
	{
		_results.Text += "Error: " + ex.Message;
	}
	finally { _button.IsEnabled = true; }
}

可以看到,将同步方法改造为异步方法非常简单,整体表面上与编写同步方法一样包括catch和finally块,但它是异步执行的。

小结


总结一下asyn和await,那就是:async 和 await 是 C# 中用于异步编程的两个关键字。它们使得编写异步代码几乎和编写同步代码一样简单和直观。异步编程的核心思想是避免阻塞线程,从而提高应用程序的响应性和可伸缩性。

async 关键字:

  • 用于修饰方法、Lambda 表达式或匿名方法。
  • 表明该方法是一个异步方法,它可能包含一个或多个 await 表达式。
  • async 方法的返回类型通常是 Task、Task<TResult> 或 void (主要用于事件处理程序,应谨慎使用)。
    • Task: 表示一个不返回值的异步操作。
    • Task<TResult>: 表示一个返回类型为 TResult 的异步操作。
    • void: 表示一个“即发即弃”的异步操作。如果 async void 方法内部发生异常,该异常通常难以捕获,可能导致应用程序崩溃。

await 关键字:

  • 只能在 async 方法内部使用。
  • 用于暂停 async 方法的执行,直到其等待的异步操作 (Task 或 Task<TResult>) 完成。
  • 在等待期间,它不会阻塞调用线程。相反,控制权会返回给调用方,允许调用线程继续执行其他工作。
  • 当等待的异步操作完成后,方法会从 await 表达式处恢复执行。
  • 如果等待的是 Task<TResult>,await 表达式的结果就是 TResult 类型的值。如果等待的是 Task,await 表达式没有结果 (可以看作是 void)。
async/await 的目标是让你能够编写非阻塞的代码,同时保持代码的逻辑结构清晰易懂。当一个 async 方法遇到 await 时,它会告诉运行时:“这个操作可能需要一些时间,在我等待的时候,你可以先去做别的事情。操作完成后再回来继续执行我剩下的代码。”

async/await 主要用于以下场景:

I/O 密集型操作 (I/O-bound operations):

  • 网络请求: 调用 Web API、下载文件、发送邮件等 (如上例)。
  • 文件系统操作: 异步读取或写入文件。
  • 数据库操作: 异步执行数据库查询或命令。
  • 在这些场景下,线程大部分时间都在等待外部资源响应,使用 async/await 可以释放线程去处理其他请求或任务,提高系统的吞吐量和可伸缩性。

CPU 密集型操作 (CPU-bound operations) - 在后台线程执行:

  • 对于需要大量计算的操作(如图像处理、复杂算法),直接在 UI 线程或请求处理线程上执行会导致界面卡顿或请求超时。
  • 虽然 async/await 本身不创建新线程,但可以与 Task.Run 结合使用,将 CPU 密集型工作卸载到线程池中的后台线程执行,然后 await 其结果。这样可以保持 UI 或请求处理线程的响应性。

提高 UI 应用程序的响应性:

  • 在桌面应用 (WPF, Windows Forms) 或移动应用 (Xamarin, MAUI) 中,长时间运行的操作如果直接在 UI 线程执行,会导致界面冻结,用户无法进行任何操作。
  • 使用 async/await 可以确保耗时操作(如网络请求、文件读写)在后台进行,UI 线程保持空闲,可以响应用户输入和更新界面。
  • 当异步操作完成后,await 会确保后续代码(通常是更新 UI 的代码)在正确的 UI 线程上执行(通过 SynchronizationContext)。

提高服务器端应用程序的可伸缩性:

  • 在 ASP.NET (Core) Web API 或 Web 应用中,当处理一个请求时,如果遇到 I/O 操作(如数据库查询、调用其他服务),同步等待会阻塞请求处理线程。
  • 如果并发请求很多,线程池中的线程很快会被耗尽,导致后续请求排队等待,系统吞吐量下降。
  • 使用 async/await,在等待 I/O 操作时,请求处理线程会被释放回线程池,可以去处理其他传入的请求。当 I/O 操作完成后,会从线程池中获取一个线程来继续处理该请求的剩余部分。这大大提高了服务器处理并发请求的能力。

async/await 并非魔法,它依赖于编译器和 .NET 运行时的一些巧妙机制:

  1. 编译器转换 (状态机):

    • 当你编写一个 async 方法时,编译器会将其转换为一个状态机 (State Machine)。这实际上是一个由编译器生成的类。

    • 这个状态机类负责管理异步方法的执行流程,包括:

      • 保存方法的局部变量。
      • 记录当前执行到的位置(状态)。
      • 处理 await 表达式:当遇到 await 时,如果操作未完成,状态机会设置一个回调(continuation),使得操作完成后能够继续执行状态机的下一部分。然后方法返回。
      • 当异步操作完成并调用回调时,状态机会恢复方法的状态,并从上次暂停的地方继续执行。
    • 简单来说,编译器会将你的线性代码逻辑拆分成多个部分,每个 await 点都是一个潜在的暂停和恢复点。

  2. TaskTask<TResult>:

    • 这两个类型是异步编程模型 (TAP - Task-based Asynchronous Pattern) 的核心。
    • Task 对象表示一个正在进行的异步操作。它有以下关键属性:
      • Status: (例如 Running, RanToCompletion, Faulted, Canceled)
      • IsCompleted: 操作是否已完成。
      • Exception: 如果操作失败,这里会包含异常信息。
    • Task<TResult> 继承自 Task,并额外包含一个 Result 属性,用于在操作成功完成后获取其返回值。如果尝试在任务未完成时访问 Result,当前线程会阻塞直到任务完成。await 则可以非阻塞地获取这个结果。
  3. Awaiter 对象:

    • await 关键字实际上可以用于任何具有 "awaitable" 模式的对象,不仅仅是 TaskTask<TResult>
    • 一个对象是 "awaitable" 的,如果它有一个 GetAwaiter() 方法,该方法返回一个 "awaiter" 对象。
    • "awaiter" 对象需要实现 INotifyCompletion (或 ICriticalNotifyCompletion) 接口,并具有以下成员:
      • IsCompleted (bool 属性): 指示等待的操作是否已经完成。
      • GetResult() (方法): 获取操作的结果。如果操作未成功完成,它应该抛出异常。
      • OnCompleted(Action continuation) (方法): 注册一个回调,当操作完成时执行该回调。
    • TaskTask<TResult> 都内置了对这种模式的支持。await task 实际上是编译器转换为类似以下逻辑:
      var awaiter = task.GetAwaiter();
      if (!awaiter.IsCompleted)
      {
          // 保存状态,注册回调
          awaiter.OnCompleted(() => {
              // 恢复状态
              var result = awaiter.GetResult(); // 获取结果或抛出异常
              // ... 继续执行 async 方法的剩余部分 ...
          });
          return; // 方法返回,不阻塞
      }
      // 如果已完成,则直接获取结果并继续
      var result_if_completed = awaiter.GetResult();
      // ... 继续执行 async 方法的剩余部分 ...
  4. 同步上下文 (SynchronizationContext):

    • SynchronizationContext 是一个抽象类,它提供了一种将工作单元(方法调用)封送到特定上下文(通常是特定线程)的方式。
    • 它的作用是确保 await 之后代码的“延续部分 (continuation)”在正确的线程上执行。
    • UI 应用程序 (WPF, WinForms, MAUI): 通常有一个与 UI 线程关联的 SynchronizationContext。当你在 UI 线程启动一个异步操作并 await 它时,await 默认会捕获当前的 SynchronizationContext。当异步操作完成后,延续部分会被发回到这个 SynchronizationContext,从而确保更新 UI 的代码在 UI 线程上执行,避免跨线程访问 UI 控件的问题。
    • ASP.NET Core: 在 ASP.NET Core 中,默认情况下没有 SynchronizationContext。延续部分通常会在线程池线程上执行。这对于服务器端应用是合适的,因为它避免了不必要的线程切换开销。
    • 控制行为: 你可以使用 ConfigureAwait(false) 来告诉 await 不要捕获并恢复同步上下文。在编写库代码时,通常建议使用 ConfigureAwait(false),除非库明确需要回到原始上下文。
     
  5. 线程池:

    • 对于 I/O 绑定的异步操作,当操作在等待时(例如等待网络响应),原始线程并不会被阻塞,而是可以返回到线程池(如果是线程池线程)或继续执行其他任务(如果是 UI 线程)。
    • 当 I/O 操作完成时,通常是由操作系统通知 .NET 运行时,然后运行时会从线程池中调度一个线程来执行异步方法的延续部分(除非有 SynchronizationContext 要求在特定线程上执行)。
    • 对于通过 Task.Run 包装的 CPU 密集型操作,Task.Run 本身就会将工作提交给线程池。

总结实现原理:

async/await 是一个编译器技巧,它将异步方法转换为一个状态机。这个状态机与 Task 对象和 awaitable 模式协同工作,以非阻塞的方式管理异步操作的流程。SynchronizationContext 确保了在特定环境下(如 UI 应用)代码的延续部分能在正确的线程上恢复执行。这一切使得开发者能够以近乎同步的编码风格编写出高效、响应迅速的异步应用程序。

下面的代码演示了手动生成async/await代码的逻辑实现:

public class SystemTaskAsyncSimulator
{
    // 模拟一个耗时的I/O操作,例如从网络获取数据
    // 这个方法现在返回一个标准的 Task<string>
    public Task<string> FetchDataAsync(string url, int delayMilliseconds)
    {
        Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] FetchDataAsync: 开始从 {url} 获取数据 (模拟耗时 {delayMilliseconds}ms)...");

        // TaskCompletionSource<TResult> 是创建和控制 Task 的好方法,
        // 特别是当 Task 的完成是由外部事件(如I/O完成)触发时。
        var tcs = new TaskCompletionSource<string>();

        // 使用线程池线程来模拟异步工作,避免阻塞当前线程
        ThreadPool.QueueUserWorkItem(_ =>
        {
            try
            {
                Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] FetchDataAsync (工作线程): 正在处理 {url}...");
                Thread.Sleep(delayMilliseconds); // 模拟网络延迟或I/O操作

                if (url.Contains("error"))
                {
                    Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] FetchDataAsync (工作线程): 模拟 {url} 发生错误。");
                    throw new InvalidOperationException($"获取 '{url}' 时发生模拟错误。");
                }

                string result = $"来自 {url} 的数据 (使用 System.Task)";
                Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] FetchDataAsync (工作线程): {url} 数据获取成功。");
                tcs.SetResult(result); // 成功完成 Task,并设置结果
            }
            catch (Exception ex)
            {
                Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] FetchDataAsync (工作线程): {url} 发生异常。");
                tcs.SetException(ex); // 使 Task 进入 Faulted 状态,并设置异常
            }
        });

        Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] FetchDataAsync: 已为 {url} 返回 Task,操作正在后台进行。");
        return tcs.Task; // 返回由 TaskCompletionSource 控制的 Task
    }

    // 这个方法手动模拟了 C# 编译器为 async 方法生成的状态机。
    // 它演示了如何与 System.Task 和 TaskAwaiter 交互。
    public Task<string> ProcessDataAsync_ManualStateMachine(string url1, string url2)
    {
        // 这个 TaskCompletionSource 用于控制 ProcessDataAsync_ManualStateMachine 方法本身返回的 Task
        var overallTaskCompletionSource = new TaskCompletionSource<string>();

        Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] ProcessDataAsync_ManualStateMachine: 开始执行...");

        // --- 步骤 1: 模拟 "await FetchDataAsync(url1, 2000);" ---
        Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] ProcessDataAsync_ManualStateMachine: 准备获取第一个URL的数据: {url1}");
        Task<string> task1 = FetchDataAsync(url1, 2000);
        TaskAwaiter<string> awaiter1 = task1.GetAwaiter(); // 获取 Task 的 Awaiter

        // 定义第一个 await 完成后的延续操作 (continuation)
        Action continuationAfterFirstAwait = () =>
        {
            string data1;
            try
            {
                Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] ProcessDataAsync_ManualStateMachine (Continuation 1): 第一个Task ({url1}) 完成,尝试获取结果...");
                data1 = awaiter1.GetResult(); // 如果 task1 失败,这里会抛出异常
                Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] ProcessDataAsync_ManualStateMachine (Continuation 1): 获取到 data1: '{data1}'");
            }
            catch (Exception ex)
            {
                Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] ProcessDataAsync_ManualStateMachine (Continuation 1): 获取 data1 时发生错误: {ex.Message}");
                overallTaskCompletionSource.SetException(ex); // 将错误传递给外部调用者
                return; // 中断后续流程
            }

            // --- 步骤 2: 模拟 "await FetchDataAsync(url2, 1000);" ---
            Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] ProcessDataAsync_ManualStateMachine (Continuation 1): 准备获取第二个URL的数据: {url2}");
            Task<string> task2 = FetchDataAsync(url2, 1000);
            TaskAwaiter<string> awaiter2 = task2.GetAwaiter();

            // 定义第二个 await 完成后的延续操作
            Action continuationAfterSecondAwait = () =>
            {
                string data2;
                try
                {
                    Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] ProcessDataAsync_ManualStateMachine (Continuation 2): 第二个Task ({url2}) 完成,尝试获取结果...");
                    data2 = awaiter2.GetResult();
                    Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] ProcessDataAsync_ManualStateMachine (Continuation 2): 获取到 data2: '{data2}'");
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] ProcessDataAsync_ManualStateMachine (Continuation 2): 获取 data2 时发生错误: {ex.Message}");
                    overallTaskCompletionSource.SetException(ex);
                    return;
                }

                // 所有 "await" 操作都已完成,现在可以处理最终结果
                string finalProcessedResult = $"合并结果: [{data1}] 和 [{data2}]";
                Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] ProcessDataAsync_ManualStateMachine (Continuation 2): 所有数据处理完毕。最终结果: {finalProcessedResult}");
                overallTaskCompletionSource.SetResult(finalProcessedResult); // 完成整个方法的 Task
            };

            // 检查第二个 Task 是否已经完成
            if (awaiter2.IsCompleted)
            {
                Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] ProcessDataAsync_ManualStateMachine (Continuation 1): task2 ({url2}) 已同步完成或在检查前完成。立即执行Continuation 2。");
                continuationAfterSecondAwait(); // 如果已完成,则直接执行
            }
            else
            {
                Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] ProcessDataAsync_ManualStateMachine (Continuation 1): task2 ({url2}) 尚未完成。注册Continuation 2。");
                // 如果未完成,则注册回调。
                // OnCompleted 会确保 continuationAfterSecondAwait 在 task2 完成时被调用。
                // 它默认会捕获当前的 SynchronizationContext (如果存在)。
                awaiter2.OnCompleted(continuationAfterSecondAwait);
            }
        };

        // 检查第一个 Task 是否已经完成
        if (awaiter1.IsCompleted)
        {
            Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] ProcessDataAsync_ManualStateMachine: task1 ({url1}) 已同步完成或在检查前完成。立即执行Continuation 1。");
            continuationAfterFirstAwait(); // 如果已完成,则直接执行后续步骤
        }
        else
        {
            Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] ProcessDataAsync_ManualStateMachine: task1 ({url1}) 尚未完成。注册Continuation 1。");
            awaiter1.OnCompleted(continuationAfterFirstAwait); // 注册回调
        }

        // 模拟的 "async" 方法立即返回其代表整体操作的 Task
        Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] ProcessDataAsync_ManualStateMachine: 返回 overallTaskCompletionSource.Task,方法主体可能尚未完成。");
        return overallTaskCompletionSource.Task;
    }
}

public class Program
{
    // 将 Main 方法声明为 async Task,这样我们就可以在其中使用 await
    public static async Task Main(string[] args)
    {
        Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] Main: 开始使用 System.Task 模拟 async/await...");
        var simulator = new SystemTaskAsyncSimulator();

        Console.WriteLine($"\n[线程: {Environment.CurrentManagedThreadId}] Main: === 测试成功场景 ===");
        Task<string> resultTaskSuccess = simulator.ProcessDataAsync_ManualStateMachine("http://example.com/dataA", "http://example.com/dataB");

        Console.WriteLine($"\n[线程: {Environment.CurrentManagedThreadId}] Main: === 测试失败场景 (第二个URL将出错) ===");
        Task<string> resultTaskError = simulator.ProcessDataAsync_ManualStateMachine("http://example.com/dataC", "http://example.com/error");

        Console.WriteLine($"\n[线程: {Environment.CurrentManagedThreadId}] Main: ProcessDataAsync_ManualStateMachine 方法已调用两次并返回。Main线程现在可以执行其他操作。");

        // 模拟 Main 线程在等待异步操作完成时执行其他工作
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] Main: 正在执行其他工作... ({i + 1}/3)");
            await Task.Delay(250); // 使用 await Task.Delay 实现非阻塞等待
        }

        Console.WriteLine($"\n[线程: {Environment.CurrentManagedThreadId}] Main: 现在尝试 'await' 获取成功场景的结果...");
        try
        {
            // 这里我们使用真正的 await 来等待我们手动模拟的异步方法返回的 Task
            string finalResult1 = await resultTaskSuccess;
            Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] Main: === 成功场景结果 ===\n最终结果: {finalResult1}");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] Main: === 成功场景发生意外错误 ===\n异常: {ex.GetType().Name} - {ex.Message}");
        }

        Console.WriteLine($"\n[线程: {Environment.CurrentManagedThreadId}] Main: 现在尝试 'await' 获取失败场景的结果...");
        try
        {
            string finalResult2 = await resultTaskError;
            // 如果上面的 await 成功,这行不应该被执行
            Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] Main: === 失败场景结果 (理论上不应到达这里) ===\n最终结果: {finalResult2}");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"[线程: {Environment.CurrentManagedThreadId}] Main: === 失败场景按预期捕获到错误 ===\n异常: {ex.GetType().Name} - {ex.Message}");
        }

        Console.WriteLine($"\n[线程: {Environment.CurrentManagedThreadId}] Main: 演示结束。");
    }
}

/* 预期的输出模式 (线程ID和某些顺序可能变化):

[线程: 1] Main: 开始使用 System.Task 模拟 async/await...

[线程: 1] Main: === 测试成功场景 ===
[线程: 1] ProcessDataAsync_ManualStateMachine: 开始执行...
[线程: 1] ProcessDataAsync_ManualStateMachine: 准备获取第一个URL的数据: http://example.com/dataA
[线程: 1] FetchDataAsync: 开始从 http://example.com/dataA 获取数据 (模拟耗时 2000ms)...
[线程: 1] FetchDataAsync: 已为 http://example.com/dataA 返回 Task,操作正在后台进行。
[线程: 1] ProcessDataAsync_ManualStateMachine: task1 (http://example.com/dataA) 尚未完成。注册Continuation 1。
[线程: 1] ProcessDataAsync_ManualStateMachine: 返回 overallTaskCompletionSource.Task,方法主体可能尚未完成。
[线程: X] FetchDataAsync (工作线程): 正在处理 http://example.com/dataA...

[线程: 1] Main: === 测试失败场景 (第二个URL将出错) ===
[线程: 1] ProcessDataAsync_ManualStateMachine: 开始执行...
[线程: 1] ProcessDataAsync_ManualStateMachine: 准备获取第一个URL的数据: http://example.com/dataC
[线程: 1] FetchDataAsync: 开始从 http://example.com/dataC 获取数据 (模拟耗时 2000ms)...
[线程: 1] FetchDataAsync: 已为 http://example.com/dataC 返回 Task,操作正在后台进行。
[线程: 1] ProcessDataAsync_ManualStateMachine: task1 (http://example.com/dataC) 尚未完成。注册Continuation 1。
[线程: 1] ProcessDataAsync_ManualStateMachine: 返回 overallTaskCompletionSource.Task,方法主体可能尚未完成。
[线程: Y] FetchDataAsync (工作线程): 正在处理 http://example.com/dataC...

[线程: 1] Main: ProcessDataAsync_ManualStateMachine 方法已调用两次并返回。Main线程现在可以执行其他操作。
[线程: 1] Main: 正在执行其他工作... (1/3)
[线程: 1] Main: 正在执行其他工作... (2/3)
[线程: 1] Main: 正在执行其他工作... (3/3)

[线程: X] FetchDataAsync (工作线程): http://example.com/dataA 数据获取成功。
[线程: X] ProcessDataAsync_ManualStateMachine (Continuation 1): 第一个Task (http://example.com/dataA) 完成,尝试获取结果...
[线程: X] ProcessDataAsync_ManualStateMachine (Continuation 1): 获取到 data1: '来自 http://example.com/dataA 的数据 (使用 System.Task)'
[线程: X] ProcessDataAsync_ManualStateMachine (Continuation 1): 准备获取第二个URL的数据: http://example.com/dataB
[线程: X] FetchDataAsync: 开始从 http://example.com/dataB 获取数据 (模拟耗时 1000ms)...
[线程: X] FetchDataAsync: 已为 http://example.com/dataB 返回 Task,操作正在后台进行。
[线程: X] ProcessDataAsync_ManualStateMachine (Continuation 1): task2 (http://example.com/dataB) 尚未完成。注册Continuation 2。
[线程: Z] FetchDataAsync (工作线程): 正在处理 http://example.com/dataB...

[线程: Y] FetchDataAsync (工作线程): http://example.com/dataC 数据获取成功。
[线程: Y] ProcessDataAsync_ManualStateMachine (Continuation 1): 第一个Task (http://example.com/dataC) 完成,尝试获取结果...
[线程: Y] ProcessDataAsync_ManualStateMachine (Continuation 1): 获取到 data1: '来自 http://example.com/dataC 的数据 (使用 System.Task)'
[线程: Y] ProcessDataAsync_ManualStateMachine (Continuation 1): 准备获取第二个URL的数据: http://example.com/error
[线程: Y] FetchDataAsync: 开始从 http://example.com/error 获取数据 (模拟耗时 1000ms)...
[线程: Y] FetchDataAsync: 已为 http://example.com/error 返回 Task,操作正在后台进行。
[线程: Y] ProcessDataAsync_ManualStateMachine (Continuation 1): task2 (http://example.com/error) 尚未完成。注册Continuation 2。
[线程: W] FetchDataAsync (工作线程): 正在处理 http://example.com/error...

[线程: 1] Main: 现在尝试 'await' 获取成功场景的结果...
[线程: Z] FetchDataAsync (工作线程): http://example.com/dataB 数据获取成功。
[线程: Z] ProcessDataAsync_ManualStateMachine (Continuation 2): 第二个Task (http://example.com/dataB) 完成,尝试获取结果...
[线程: Z] ProcessDataAsync_ManualStateMachine (Continuation 2): 获取到 data2: '来自 http://example.com/dataB 的数据 (使用 System.Task)'
[线程: Z] ProcessDataAsync_ManualStateMachine (Continuation 2): 所有数据处理完毕。最终结果: 合并结果: [来自 http://example.com/dataA 的数据 (使用 System.Task)] 和 [来自 http://example.com/dataB 的数据 (使用 System.Task)]
[线程: Z] Main: === 成功场景结果 ===
最终结果: 合并结果: [来自 http://example.com/dataA 的数据 (使用 System.Task)] 和 [来自 http://example.com/dataB 的数据 (使用 System.Task)]

[线程: 1] Main: 现在尝试 'await' 获取失败场景的结果...
[线程: W] FetchDataAsync (工作线程): 模拟 http://example.com/error 发生错误。
[线程: W] FetchDataAsync (工作线程): http://example.com/error 发生异常。
[线程: W] ProcessDataAsync_ManualStateMachine (Continuation 2): 第二个Task (http://example.com/error) 完成,尝试获取结果...
[线程: W] ProcessDataAsync_ManualStateMachine (Continuation 2): 获取 data2 时发生错误: 获取 'http://example.com/error' 时发生模拟错误。
[线程: W] Main: === 失败场景按预期捕获到错误 ===
异常: InvalidOperationException - 获取 'http://example.com/error' 时发生模拟错误。

[线程: 1] Main: 演示结束。
*/

TAP异步模式


有了以上关于task以及async和await的知识,就能够实现TAP(Task-Based Asynchronous Pattern)异步编程模式了。TAP方法一般定义如下:

  • 方法返回一个Task或者Task<TResult>
  • 方法有Asyn后缀
  • 方法有以CancellationToken重载
  • 方法在发起异步操作之后会立即返回给调用者,即它的初始同步阶段非常短小。
  • 对于I/O密集型操作不绑定线程。

按照之前的介绍,很容易使用C#的异步函数来实现TAP方法。

任务组合


任务的组合有Task.WhenAny和Task.WhenAll,现在假设有以下三个Task异步方法:

async Task<int> Delay1() { await Task.Delay (1000); return 1; }
async Task<int> Delay2() { await Task.Delay (2000); return 2; }
async Task<int> Delay3() { await Task.Delay (3000); return 3; }

WhenAny表示只要有任何一个方法完成,就返回那个那个完成的Task,比如如下代码:

Task<int> winningTask = await Task.WhenAny (Delay1(), Delay2(), Delay3());
Console.WriteLine ("Done");
Console.WriteLine (winningTask.Result); // 1

很明显,第一个方法最先结束,所以返回的结果是1。

因为任何重新抛出的异常不会被AggregateException包装,所以可以一步到位:

int answer = await await Task.WhenAny (Delay1(), Delay2(), Delay3());

WhenAny可以在原本不支持超时和取消的操作中添加超时和取消功能,比如:

Task<string> task = SomeAsyncFunc();
Task winner = await (Task.WhenAny (task, Task.Delay(5000)));
if (winner != task) throw new TimeoutException();
string result = await task; // Unwrap result/re-throw

WhenAll返回一个任务,该任务仅当参数中所有任务全部完成时才完成。比如下面的任务在3秒后都结束。

await Task.WhenAll (Delay1(), Delay2(), Delay3());

下面语句与WhenAll类似:

Task task1 = Delay1(), task2 = Delay2(), task3 = Delay3();
await task1; await task2; await task3;

唯一的区别在于,下面的方法当task1失败时,后续的await task2/task3不会被执行,并且任务异常都会被忽略。

相反,只有所有任务都结束或者失败时,Task.WhenAll返回的task才会结束。如果有多个任务都失败了,则异常信息会被放到task的AggregateException里面。如果等待该组合任务的话,则只会抛出第一个异常。

Task task1 = Task.Run (() => { throw null; } );
Task task2 = Task.Run (() => { throw null; } );
Task all = Task.WhenAll (task1, task2);
try { await all; }
catch
{
    Console.WriteLine (all.Exception.InnerExceptions.Count); // 2
}

对于一系列Task<TResult>任务,调用Task.WhenAll会返回一个Task<TResult[]>,即所有任务的结果组合:

Task<int> task1 = Task.Run (() => 1);
Task<int> task2 = Task.Run (() => 2);
int[] results = await Task.WhenAll (task1, task2); // { 1, 2 }

比如下面的代码:

async Task<int> GetTotalSize(string[] uris)
{
	IEnumerable<Task<byte[]>> downloadTasks = uris.Select(uri => new WebClient().DownloadDataTaskAsync(uri));
	byte[][] contents = await Task.WhenAll(downloadTasks);
	return contents.Sum(c => c.Length);
}

上述代码不是很高效,因为我们只能在所有任务都完成之后再处理字节数组。如果在下载完成之后马上将字节转为其长度,则效率会更高。修改如下:

async Task<int> GetTotalSize(string[] uris)
{
	IEnumerable<Task<int>> downloadTasks = uris.Select(async uri => (await new WebClient().DownloadDataTaskAsync(uri)).Length);
	int[] contentLengths = await Task.WhenAll(downloadTasks);
	return contentLengths.Sum();
}

有了以上两个操作符,我们可以自定义一些组合器,比如:

async static Task<TResult> WithTimeout<TResult>(this Task<TResult> task, TimeSpan timeout)
{
	Task winner = await Task.WhenAny(task, Task.Delay(timeout)).ConfigureAwait(false);
	if (winner != task) throw new TimeoutException();
	return await task.ConfigureAwait(false); // Unwrap result/re-throw
}

上面这个方法,可以给任何一个Task添加超时。需要注意因为这是一个类库方法,我们不需要与外部共享状态,所以使用了ConfigureAwait(false)来避免潜在的返回UI上下文执行的问题。

上面这个代码还可以继续优化,当Task的执行时间在超时时间之前结束,我们可以提前取消Delay,避免不必要的等待时间:

async static Task<TResult> WithTimeout<TResult>(this Task<TResult> task,TimeSpan timeout)
{
	var cancelSource = new CancellationTokenSource();
	var delay = Task.Delay(timeout, cancelSource.Token);
	Task winner = await Task.WhenAny(task, delay).ConfigureAwait(false);
	if (winner == task)
		cancelSource.Cancel();
	else
		throw new TimeoutException();
	return await task.ConfigureAwait(false); // Unwrap result/re-throw
}

另外,下面这个方法还可以提前取消不需要的Task:

static Task<TResult> WithCancellation<TResult>(this Task<TResult> task,CancellationToken cancelToken)
{
	var tcs = new TaskCompletionSource<TResult>();
	var reg = cancelToken.Register(() => tcs.TrySetCanceled());
	task.ContinueWith(ant =>
   {
	   reg.Dispose();
	   if (ant.IsCanceled)
		   tcs.TrySetCanceled();
	   else if (ant.IsFaulted)
		   tcs.TrySetException(ant.Exception.InnerExceptions);
	   else
		   tcs.TrySetResult(ant.Result);
   });
	return tcs.Task;
}

下面这个方法类似WhenAll,不同之处在于,如果任何一个Task失败,则整个结果立即返回:

async Task<TResult[]> WhenAllOrError<TResult>(params Task<TResult>[] tasks)
{
	var killJoy = new TaskCompletionSource<TResult[]>();
	foreach (var task in tasks)
		task.ContinueWith(ant =>
	   {
		   if (ant.IsCanceled)
			   killJoy.TrySetCanceled();
		   else if (ant.IsFaulted)
			   killJoy.TrySetException(ant.Exception.InnerExceptions);
	   });
	return await await Task.WhenAny(killJoy.Task, Task.WhenAll(tasks)).ConfigureAwait(false);
}

 

参考