【.Net实用方法总结】 整理并总结.NET 中的 System.IO.Pipelines(管道)
🐋作者简介:博主是一位.Net开发者,同时也是RPA和低代码平台的践行者。
🐬个人主页:会敲键盘的肘子
🐰系列专栏:.Net实用方法总结
🦀专栏简介:博主针对.Net开发和C站问答过程中遇到的问题进行总结,形成本专栏,希望可以帮助到您解决问题。
🐶座右铭:总有一天你所坚持的会反过来拥抱你。
🌈写在前面:
System.IO.Pipelines 是一个库,旨在使在 .NET 中执行高性能 I/O 更加容易。 该库的目标为适用于所有 .NET 实现的 .NET Standard。
System.IO.Pipelines
已构建为:
- 具有高性能的流数据分析功能。
- 减少代码复杂性。
👉本文关键字:System.IO.Pipelines、文件系统、方法示例、管道
文章目录
- 1️⃣ 概述
- 2️⃣ System.IO.Pipelines 解决什么问题
- ♈ 引入问题
- 3️⃣ 管道
- ♈ 基本介绍
- ♊ 基本用法
- 4️⃣ 反压和流量控制
- 5️⃣ PipeScheduler
- 6️⃣ 管道重置
- ♈ 场景需求
- ♊ PipeReader
- ⭐ 读取流数据方案
- ⭐ 读取单条消息
- ⭐ 读取多条消息
- ⭐ 取消
- ⭐ PipeReader 常见问题
- ⭐ 有问题的代码
- ❌数据丢失
- ❌ 无限循环
- ❌ 应用程序无响应
- ❌ 内存不足 (OOM)
- ❌ 内存损坏
- ♋ PipeWrite
- ⭐ 取消
- ⭐ PipeWriter 常见问题
- ♎ PipeWrite和PipeReader的建议
- 7️⃣ IDuplexPipe
- ♈ 基本介绍
- ♊ 流
- ⭐ 示例
1️⃣ 概述
文件和流 I/O(输入/输出)是指在存储媒介中传入或传出数据。 在 .NET 中,System.IO
命名空间包含允许以异步方式和同步方式对数据流和文件进行读取和写入操作的类型。 这些命名空间还包含对文件执行压缩和解压缩的类型,以及通过管道和串行端口启用通信的类型。
文件是一个由字节组成的有序的命名集合,它具有永久存储。 在处理文件时,你将处理目录路径、磁盘存储、文件和目录名称。 相反,流是一个字节序列,可用于对后备存储进行读取和写入操作,后备存储可以是多个存储媒介之一(例如,磁盘或内存)。 正如存在除磁盘之外的多种后备存储一样,也存在除文件流之外的多种流(如网络、内存和管道流)。
-
文件和目录
- File - 提供用于创建、复制、删除、移动和打开文件的静态方法,并可帮助创建 FileStream 对象。
- FileInfo - 提供用于创建、复制、删除、移动和打开文件的实例方法,并可帮助创建 FileStream 对象。
- Directory - 提供用于创建、移动和枚举目录和子目录的静态方法。
- DirectoryInfo - 提供用于创建、移动和枚举目录和子目录的实例方法。
- Path - 提供用于以跨平台的方式处理目录字符串的方法和属性。
-
流
- FileStream - 用于对文件进行读取和写入操作。
- MemoryStream - 用于作为后备存储对内存进行读取和写入操作。
- BufferedStream - 用于改进读取和写入操作的性能。
-
读取器和编写器
- BinaryReader 和 BinaryWriter - 用于将基元数据类型作为二进制值进行读取和写入。
- StreamReader 和 StreamWriter - 用于通过使用编码值在字符和字节之间来回转换来读取和写入字符。
- StringReader 和 StringWriter - 用于从字符串读取字符以及将字符写入字符串中。
- TextReader 和 TextWriter - 用作其他读取器和编写器(读取和写入字符和字符串,而不是二进制数据)的抽象基类。
2️⃣ System.IO.Pipelines 解决什么问题
♈ 引入问题
下面的代码是典型的 TCP 服务器,它从客户机接收行分隔的消息(由 '\n'
分隔):
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
前面的代码有几个问题:
- 单次调用
ReadAsync
可能无法接收整条消息(行尾)。 - 忽略了
stream.ReadAsync
的结果。stream.ReadAsync
返回读取的数据量。 - 它不能处理在单个
ReadAsync
调用中读取多行的情况。 - 它为每次读取分配一个
byte
数组。
要解决上述问题,需要进行以下更改:
-
缓冲传入的数据,直到找到新行。
-
分析缓冲区中返回的所有行。
-
该行可能大于 1KB(1024 字节)。 此代码需要调整输入缓冲区的大小,直到找到分隔符后,才能在缓冲区内容纳完整行。
- 如果调整缓冲区的大小,当输入中出现较长的行时,将生成更多缓冲区副本。
- 压缩用于读取行的缓冲区,以减少空余。
-
请考虑使用缓冲池来避免重复分配内存。
-
下面的代码解决了其中一些问题:
async Task ProcessLinesAsync(NetworkStream stream) { byte[] buffer = ArrayPool<byte>.Shared.Rent(1024); var bytesBuffered = 0; var bytesConsumed = 0; while (true) { // Calculate the amount of bytes remaining in the buffer. var bytesRemaining = buffer.Length - bytesBuffered; if (bytesRemaining == 0) { // Double the buffer size and copy the previously buffered data into the new buffer. var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2); Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length); // Return the old buffer to the pool. ArrayPool<byte>.Shared.Return(buffer); buffer = newBuffer; bytesRemaining = buffer.Length - bytesBuffered; } var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining); if (bytesRead == 0) { // EOF break; } // Keep track of the amount of buffered bytes. bytesBuffered += bytesRead; var linePosition = -1; do { // Look for a EOL in the buffered data. linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed); if (linePosition >= 0) { // Calculate the length of the line based on the offset. var lineLength = linePosition - bytesConsumed; // Process the line. ProcessLine(buffer, bytesConsumed, lineLength); // Move the bytesConsumed to skip past the line consumed (including \n). bytesConsumed += lineLength + 1; } } while (linePosition >= 0); } }
前面的代码很复杂,不能解决所识别的所有问题。 高性能网络通常意味着编写复杂的代码以使性能最大化。
System.IO.Pipelines
的设计目的是使编写此类代码更容易。
System.IO.Pipelines
已构建为:
- 具有高性能的流数据分析功能。
- 减少代码复杂性。
3️⃣ 管道
♈ 基本介绍
Pipe 类可用于创建 PipeWriter/PipeReader
对。 写入 PipeWriter
的所有数据都可用于 PipeReader
:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
我们已在上篇文章中介绍了Pipe 类的用法,大家可以去查看本专栏之前的文章。
♊ 基本用法
示例
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
上述代码有两个循环:
FillPipeAsync
从Socket
读取并写入PipeWriter
。ReadPipeAsync
从PipeReader
读取并分析传入的行。
没有分配显式缓冲区。 所有缓冲区管理都委托给 PipeReader
和 PipeWriter
实现。 委派缓冲区管理使使用代码更容易集中关注业务逻辑。
在第一个循环中:
- 调用 PipeWriter.GetMemory(Int32) 从基础编写器获取内存。
- 调用 PipeWriter.Advance(Int32) 以告知
PipeWriter
有多少数据已写入缓冲区。 - 调用 PipeWriter.FlushAsync 以使数据可用于
PipeReader
。
在第二个循环中,PipeReader
使用由 PipeWriter
写入的缓冲区。 缓冲区来自套接字。 对 PipeReader.ReadAsync
的调用:
- 返回包含两条重要信息的 ReadResult:
- 以
ReadOnlySequence<byte>
形式读取的数据。 - 布尔值
IsCompleted
,指示是否已到达数据结尾 (EOF)。
- 以
找到行尾 (EOL) 分隔符并分析该行后:
- 该逻辑处理缓冲区以跳过已处理的内容。
- 调用
PipeReader.AdvanceTo
以告知PipeReader
已消耗和检查了多少数据。
读取器和编写器循环通过调用 Complete
结束。 Complete
使基础管道释放其分配的内存。
4️⃣ 反压和流量控制
理想情况下,读取和分析可协同工作:
- 读取线程使用来自网络的数据并将其放入缓冲区。
- 分析线程负责构造适当的数据结构。
通常,分析所花费的时间比仅从网络复制数据块所用时间更长:
- 读取线程领先于分析线程。
- 读取线程必须减缓或分配更多内存来存储用于分析线程的数据。
为了获得最佳性能,需要在频繁暂停和分配更多内存之间取得平衡。
为解决上述问题,Pipe
提供了两个设置来控制数据流:
-
PauseWriterThreshold:确定在调用 FlushAsync 暂停之前应缓冲多少数据。
-
ResumeWriterThreshold:确定在恢复对
PipeWriter.FlushAsync
的调用之前,读取器必须观察多少数据。
PipeWriter.FlushAsync:
- 当
Pipe
中的数据量超过PauseWriterThreshold
时,返回不完整的ValueTask<FlushResult>
。 - 低于
ResumeWriterThreshold
时,返回完整的ValueTask<FlushResult>
。
使用两个值可防止快速循环,如果只使用一个值,则可能发生这种循环。
示例
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
5️⃣ PipeScheduler
通常在使用 async
和 await
时,异步代码会在 TaskScheduler 或当前 SynchronizationContext 上恢复。
在执行 I/O 时,对执行 I/O 的位置进行细粒度控制非常重要。 此控件允许高效利用 CPU 缓存。 高效的缓存对于 Web 服务器等高性能应用至关重要。 PipeScheduler 提供对异步回调运行位置的控制。 默认情况下:
- 使用当前的 SynchronizationContext。
- 如果没有
SynchronizationContext
,它将使用线程池运行回调。
示例
public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool 是 PipeScheduler 实现,用于对线程池的回调进行排队。 PipeScheduler.ThreadPool
是默认选项,通常也是最佳选项。 PipeScheduler.Inline 可能会导致意外后果,如死锁。
6️⃣ 管道重置
♈ 场景需求
通常重用 Pipe
对象即可重置。 要重置管道,请在完成 PipeReader
和 PipeWriter
时调用 PipeReaderReset。
♊ PipeReader
PipeReader 代表调用方管理内存。 在调用 PipeReader.ReadAsync 之后始终调用 PipeReader.AdvanceTo。 这使 PipeReader
知道调用方何时用完内存,以便可以对其进行跟踪。 从 PipeReader.ReadAsync
返回的 ReadOnlySequence<byte>
仅在调用 PipeReader.AdvanceTo
之前有效。 调用 PipeReader.AdvanceTo
后,不能使用 ReadOnlySequence<byte>
。
PipeReader.AdvanceTo
采用两个 SequencePosition 参数:
- 第一个参数确定消耗的内存量。
- 第二个参数确定观察到的缓冲区数。
将数据标记为“已使用”意味着管道可以将内存返回到底层缓冲池。 将数据标记为“已观察”可控制对 PipeReader.ReadAsync
的下一个调用的操作。 将所有内容都标记为“已观察”意味着下次对 PipeReader.ReadAsync
的调用将不会返回,直到有更多数据写入管道。 任何其他值都将使对 PipeReader.ReadAsync
的下一次调用立即返回并包含已观察到的和未观察到的数据,但不是已被使用的数据。
⭐ 读取流数据方案
尝试读取流数据时会出现以下几种典型模式:
- 给定数据流时,分析单条消息。
- 给定数据流时,分析所有可用消息。
以下示例使用 TryParseLines
方法分析来自 ReadOnlySequence<byte>
的消息。 TryParseLines
分析单条消息并更新输入缓冲区,以从缓冲区中剪裁已分析的消息。 TryParseLines
不是 .NET 的一部分,它是在以下部分中使用的用户编写的方法。
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
⭐ 读取单条消息
下面的代码从 PipeReader
读取一条消息并将其返回给调用方。
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
前面的代码:
- 分析单条消息。
- 更新已使用的
SequencePosition
并检查SequencePosition
以指向已剪裁的输入缓冲区的开始。
因为 TryParseLines
从输入缓冲区中删除了已分析的消息,所以更新了两个 SequencePosition
参数。 通常,分析来自缓冲区的单条消息时,检查的位置应为以下位置之一:
- 消息的结尾。
- 如果未找到消息,则返回接收缓冲区的结尾。
单条消息案例最有可能出现错误。 将错误的值传递给“已检查”可能会导致内存不足异常或无限循环 。 有关详细信息,请参阅本文中的 PipeReader 常见问题部分。
⭐ 读取多条消息
以下代码从 PipeReader
读取所有消息,并在每条消息上调用 ProcessMessageAsync
。
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
⭐ 取消
PipeReader.ReadAsync
:
- 支持传递 CancellationToken。
- 如果在读取挂起期间取消了
CancellationToken
,则会引发 OperationCanceledException。 - 支持通过 PipeReader.CancelPendingRead 取消当前读取操作的方法,这样可以避免引发异常。 调用
PipeReader.CancelPendingRead
将导致对PipeReader.ReadAsync
的当前或下次调用返回 ReadResult,并将IsCanceled
设置为true
。 这对于以非破坏性和非异常的方式停止现有的读取循环非常有用。
private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
⭐ PipeReader 常见问题
- 将错误的值传递给
consumed
或examined
可能会导致读取已读取的数据。 - 传递
buffer.End
作为检查对象可能会导致以下问题:- 数据停止
- 如果数据未使用,可能最终会出现内存不足 (OOM) 异常。 例如,当一次处理来自缓冲区的单条消息时,可能会出现
PipeReader.AdvanceTo(position, buffer.End)
。
- 将错误的值传递给
consumed
或examined
可能会导致无限循环。 例如,如果buffer.Start
没有更改,则PipeReader.AdvanceTo(buffer.Start)
将导致在下一个对PipeReader.ReadAsync
的调用在新数据到来之前立即返回。 - 将错误的值传递给
consumed
或examined
可能会导致无限缓冲(最终导致 OOM)。 - 在调用
PipeReader.AdvanceTo
之后使用ReadOnlySequence<byte>
可能会导致内存损坏(在释放之后使用)。 - 未能调用
PipeReader.Complete/CompleteAsync
可能会导致内存泄漏。 - 在处理缓冲区之前检查 ReadResult.IsCompleted 并退出读取逻辑会导致数据丢失。 循环退出条件应基于
ReadResult.Buffer.IsEmpty
和ReadResult.IsCompleted
。 如果错误执行此操作,可能会导致无限循环。
⭐ 有问题的代码
❌数据丢失
当 IsCompleted
被设置为 true
时,ReadResult
可能会返回最后一段数据。 在退出读循环之前不读取该数据将导致数据丢失。
警告
不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
警告
不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
❌ 无限循环
如果 Result.IsCompleted
是 true
,则以下逻辑可能会导致无限循环,但缓冲区中永远不会有完整的消息。
警告
不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
警告
不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
下面是另一段具有相同问题的代码。 该代码在检查 ReadResult.IsCompleted
之前检查非空缓冲区。 由于该代码位于 else if
中,如果缓冲区中没有完整的消息,它将永远循环。
警告
不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
警告
不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
❌ 应用程序无响应
在分析单条消息时,如果无条件调用 PipeReader.AdvanceTo
而 buffer.End
位于 examined
位置,则可能导致应用程序变为无响应。 对 PipeReader.AdvanceTo
的下次调用将在以下情况下返回:
- 有更多数据写入管道。
- 以及之前未检查过新数据。
警告
不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
警告
不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
❌ 内存不足 (OOM)
在满足以下条件的情况下,以下代码将保持缓冲,直到发生 OutOfMemoryException:
- 没有最大消息大小。
- 从
PipeReader
返回的数据不会生成完整的消息。 例如,它不会生成完整的消息,因为另一端正在编写一条大消息(例如,一条为 4GB 的消息)。
警告
不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
警告
不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
❌ 内存损坏
当写入读取缓冲区的帮助程序时,应在调用 Advance
之前复制任何返回的有效负载。 下面的示例将返回 Pipe
已丢弃的内存,并可能将其重新用于下一个操作(读/写)。
警告
不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
警告
不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
♋ PipeWrite
PipeWriter 管理用于代表调用方写入的缓冲区。 PipeWriter
可实现 IBufferWriter
。 IBufferWriter<byte>
使得无需额外的缓冲区副本就可以访问缓冲区来执行写入操作。
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
之前的代码:
- 使用 GetMemory 从
PipeWriter
请求至少 5 个字节的缓冲区。 - 将 ASCII 字符串
"Hello"
的字节写入返回的Memory<byte>
。 - 调用 Advance 以指示写入缓冲区的字节数。
- 刷新
PipeWriter
,以便将字节发送到基础设备。
以前的写入方法使用 PipeWriter
提供的缓冲区。 它可能还使用了 PipeWriter.WriteAsync,该项执行以下操作:
- 将现有缓冲区复制到
PipeWriter
。 - 根据需要调用
GetSpan``Advance
,然后调用 FlushAsync。
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
⭐ 取消
FlushAsync 支持传递 CancellationToken。 如果令牌在刷新挂起时被取消,则传递 CancellationToken
将导致 OperationCanceledException
。 PipeWriter.FlushAsync
支持通过 PipeWriter.CancelPendingFlush 取消当前刷新操作而不引发异常的方法。 调用 PipeWriter.CancelPendingFlush
将导致对 PipeWriter.FlushAsync
或 PipeWriter.WriteAsync
的当前或下次调用返回 FlushResult,并将 IsCanceled
设置为 true
。 这对于以非破坏性和非异常的方式停止暂停刷新非常有用。
⭐ PipeWriter 常见问题
-
GetSpan 和 GetMemory 返回至少具有请求内存量的缓冲区。 请勿假设确切的缓冲区大小 。
-
无法保证连续的调用将返回相同的缓冲区或相同大小的缓冲区。
-
在调用 Advance 之后,必须请求一个新的缓冲区来继续写入更多数据。 不能写入先前获得的缓冲区。
-
如果未完成对
FlushAsync
的调用,则调用GetMemory
或GetSpan
将不安全。 -
如果未刷新数据,则调用
Complete
或CompleteAsync
可能导致内存损坏。
♎ PipeWrite和PipeReader的建议
以下提示将帮助你成功使用 System.IO.Pipelines 类:
- 始终完成 PipeReader 和 PipeWriter(包括适用时的例外情况)。
- 在调用 PipeReader.ReadAsync 之后始终调用 PipeReader.AdvanceTo。
- 写入时定期
await
PipeWriter.FlushAsync,并始终检查 FlushResult.IsCompleted。 如果IsCompleted
为true
,则中止写入,因为这表示读取器已完成,不再关心所写入的内容。 - 在写入希望
PipeReader
有权访问的内容后调用 PipeWriter.FlushAsync。 - 如果读取器在
FlushAsync
完成之前无法启动,请勿调用FlushAsync
,因为这可能会导致死锁。 - 确保只有一个上下文“拥有”
PipeReader
或PipeWriter
或访问它们。 这些类型不是线程安全的。 - 调用
AdvanceTo
或完成PipeReader
后,切勿访问 ReadResult.Buffer。
7️⃣ IDuplexPipe
♈ 基本介绍
IDuplexPipe 是支持读写的类型的协定。 例如,网络连接将由 IDuplexPipe
表示。
与包含 PipeReader
和 PipeWriter
的 Pipe
不同,IDuplexPipe
表示全双工连接的一侧。 这意味着写入 PipeWriter
的内容不会从 PipeReader
中读取。
♊ 流
在读取或写入流数据时,通常使用反序列化程序读取数据,并使用序列化程序写入数据。 大多数读取和写入流 API 都有一个 Stream
参数。 为了更轻松地与这些现有 API 集成,PipeReader
和 PipeWriter
公开了一个 AsStream 方法。 AsStream 返回围绕 PipeReader
或 PipeWriter
的 Stream
实现。
⭐ 示例
可使用给定了 Stream 对象和可选的相应创建选项的静态 Create
方法创建 PipeReader
和 PipeWriter
实例。
StreamPipeReaderOptions 允许使用以下参数控制 PipeReader
实例的创建:
- StreamPipeReaderOptions.BufferSize 是从池中租用内存时使用的最小缓冲区大小(以字节为单位),默认值为
4096
。 - StreamPipeReaderOptions.LeaveOpen 标志确定在
PipeReader
完成之后基础流是否保持打开状态,默认值为false
。 - StreamPipeReaderOptions.MinimumReadSize 表示分配新缓冲区之前缓冲区中剩余字节的阈值,默认值为
1024
。 - StreamPipeReaderOptions.Pool 是分配内存时使用的
MemoryPool<byte>
,默认值为null
。
StreamPipeWriterOptions 允许使用以下参数控制 PipeWriter
实例的创建:
- StreamPipeWriterOptions.LeaveOpen 标志确定在
PipeWriter
完成之后基础流是否保持打开状态,默认值为false
。 - StreamPipeWriterOptions.MinimumBufferSize 表示从 Pool 租用内存时要使用的最小缓冲区大小,默认值为
4096
。 - StreamPipeWriterOptions.Pool 是分配内存时使用的
MemoryPool<byte>
,默认值为null
。
重要
使用
Create
方法创建PipeReader
和PipeWriter
实例时,需要考虑Stream
对象的生存期。 如果在读取器或编写器使用该方法完成操作后,你需要访问流,则需要在创建选项上将LeaveOpen
标志设置为true
。 否则,流将关闭。
以下代码演示了使用 Create
方法从流中创建 PipeReader
和 PipeWriter
实例。
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
应用程序使用 StreamReader 以流形式读取 lorem-ipsum.txt 文件,并且必须以空白行结尾。 FileStream 传递给 PipeReader.Create,后者实例化 PipeReader
对象。 然后,控制台应用程序使用 Console.OpenStandardOutput() 将其标准输出流传递到 PipeWriter.Create。 示例支持取消。
⭐写在结尾:
文章中出现的任何错误请大家批评指出,一定及时修改。
希望写在这里的小伙伴能给个三连支持!