Channel 类位于 System.Threading.Channels 命名空间,提供了一组同步数据结构,用于异步在生产者和消费者之间传递数据。该库针对 .NET Standard 并适用于所有.NET实现。在程序设计中,Channels 是生产者/消费者概念模型的一种实现。
简而言之,它可以被视为一个内存消息队列。
⼩标题:示例 1
以下是一个简单的示例说明如何使用 Channel 类创建生产者-消费者模型:
static async Task Main(string[] args)
{
var channel = Channel.CreateUnbounded<int>();
var producer = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync(i);
await Task.Delay(1000); // 模拟生产者需要一些时间来生成数据
}
channel.Writer.Complete();
});
var consumer = Task.Run(async () =>
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"消费者接收到: {item}");
}
});
await Task.WhenAll(producer, consumer);
}
在此示例中,我们创建了一个无界的通道,然后创建了两个任务,分别是生产者和消费者。生产者每秒生成一个数字,然后将其写入通道。消费者从通道中读取数据并打印。当生产者完成写入后,调用 channel.Writer.Complete() 来通知消费者没有更多的数据可读。
⼩标题:示例 2
使用 Channel.CreateBounded
static async Task Main(string[] args)
{
var channel = Channel.CreateBounded<int>(5); // 创建一个容量为5的有界通道
var producer = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync(i);
Console.WriteLine($"生产者生成了: {i}");
await Task.Delay(1000); // 模拟生产者需要一些时间来生成数据
}
channel.Writer.Complete();
});
var consumer = Task.Run(async () =>
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"消费者接收到: {item}");
await Task.Delay(2000); // 模拟消费者需要一些时间来处理数据
}
});
await Task.WhenAll(producer, consumer);
}
在本示例中,我们创建了一个容量为5的有界通道。生产者每秒生成一个数字,然后将其写入通道。消费者从通道中读取数据并打印出来。消费者处理数据的速度比生产者慢,所以当通道满时,生产者的 WriteAsync 操作将会阻塞,直到消费者读取了一些数据,使得通道有空间可用。
⼩标题:示例 3
以下示例展示了如何在多个生产者和消费者之间共享一个通道:
static async Task Main(string[] args)
{
var channel = Channel.CreateUnbounded<int>();
var producer1 = Produce(channel.Writer, id: 1);
var producer2 = Produce(channel.Writer, id: 2);
var consumer1 = Consume(channel.Reader, id: 1);
var consumer2 = Consume(channel.Reader, id: 2);
await Task.WhenAll(producer1, producer2, consumer1, consumer2);
}
static async Task Produce(ChannelWriter<int> writer, int id)
{
for (int i = 0; i < 10; i++)
{
await writer.WriteAsync(i);
Console.WriteLine($"生产者{id}生成了: {i}");
await Task.Delay(1000); // 模拟生产者需要一些时间来生成数据
}
writer.Complete();
}
static async Task Consume(ChannelReader<int> reader, int id)
{
await foreach (var item in reader.ReadAllAsync())
{
Console.WriteLine($"消费者{id}接收到: {item}");
await Task.Delay(2000); // 模拟消费者需要一些时间来处理数据
}
}
在这个示例中,我们创建了两个生产者和两个消费者,它们共享同一个通道。这是一个非常重要的使用模式,因为在消息队列中,通常会有多个生产者和多个消费者。通过控制生产者的生产速度,来控制数据入队的量。还可以通过控制消费者的数量来控制数据的处理速度,从而调节系统的流量,实现消峰填谷。
⼩标题:总结
Channel 类是 .NET CORE 3.0 后新增的类,为我们提供了便利的生产者/消费者模式实现方案。相当于是一个进程内的内存队列,它没有持久化,是纯内存操作,性能非常非常高。在面临高并发时,可以为系统提供高吞吐量。当然,代价是一定的内存开销和一些实时性的牺牲。
⼩标题:关注我的公众号一起玩转技术