Pull to refresh

System.IO.Pipelines — a little-known tool for lovers of high performance

Reading time14 min
Views30K
Original author: Pavel Romash
Hello reader. Quite a lot of time has passed since the release of .NET Core 2.1. And such cool innovations as Span and Memory are already widely known, you can read, see and hear a lot about them. However, unfortunately, library called System.IO.Pipeslines did not receive the same attention. Almost everything there is on this topic is the only post that have been translated and copied on many resources. There should be more information about that technology to look on it from different angles.



Introduction


So, this library aims to speed up processing of streaming data. It was originally created and used by the development team of Kestrel (a cross-platform web server for ASP.NET Core), but it is currently available for mortals through a nuget package.

Before we delve into the topic, we can imagine the library mechanism as an improved analogue of MemoryStream. The problem with the original MemoryStream is an excessive number of copies, which is obvious if you remember that a private byte array is used inside MemoryStream as a buffer. For example, in the Read and Write methods you can clearly see the data copying. Thus, for the object that we want to write to the stream, a copy will be created in the internal buffer, and during reading, a copy of the internal copy will be returned to the consumer. It sounds like not the most rational usage of memory.

System.IO.Pipelines does not aim to replace all streams, it is an additional tool in the arsenal of a developer writing high-performance code. I suggest that you familiarize yourself with the basic methods and classes, see their implementation details and analyze basic examples.

Let's start with the internals and implementation details, at the same time looking at simple code fragments. After that, it will become clear how it works and how it should be used. When working with System.IO.Pipelines it is worth remembering that the basic concept is that all read-write operations should take place without additional allocations. But some methods that are attractive at first glance contradict this rule. Accordingly, the code that you are trying to speed up so hard begins to allocate memory for new and new data, loading the garbage collector.

The library’s internals use the widest possibilities of the latest versions of the language and runtime — Span, Memory, object pools, ValueTask and so on. It’s worth to look there at least for a great example of using these features in production.

At one time, some developers were not satisfied with the implementation of streams in C#, because one class was used for both reading and writing. But as they say, you cannot throw methods out of a class. Even if the stream doesn't support reading/writing/seeking, the CanRead, CanWrite and CanSeek properties are used. It looks like a small crutch. But now things become different.

To work with pipelines, 2 classes are used: PipeWriter and PipeReader. These classes contain approximately 50 lines of code and are pseudo-facades (not the most classic of its incarnations, since they hide a single class, not a lot) for the class Pipe, which contains all the basic logic for working with data. This class contains 5 public members: 2 constructors, 2 get-only properties — Reader and Writer, the Reset() method, which resets internal fields to their initial state so that the class can be reused. The remaining methods for work are internal and called using pseudo-facades.

Let's get started with the Pipe class


The class instance occupies 320 bytes, which is quite a lot (almost a third of a kilobyte, 2 of such objects could not fit in the memory of Manchester Mark I). So allocation huge amount of it's instances is a bad idea. Moreover, the object is intended for long-term use. Using pools also makes an argument for this statement. The objects used in the pool will live forever (for the default pool implementation).
Note that the class is marked as sealed and that it is thread safe — many sections of the code are a critical section and are wrapped in locks.

To start using this class you should create an instance of the Pipe class and obtain the PipeReader and PipeWriter objects using the properties mentioned.

Simple initialization
    var pipe = new Pipe();
    PipeWriter pipeWriter = pipe.Writer;
    PipeReader pipeReader = pipe.Reader;


Consider the methods for working with pipes:
Writing with PipeWriter — WriteAsync, GetMemory/GetSpan, Advance, FlushAsync, Complete, CancelPendingFlush, OnReaderCompleted.

Reading with PipeReader — AdvanceTo, ReadAsync, TryRead, Complete, CancelPendingRead, OnWriterCompleted.

As stated in the mentioned post, the class uses a singly linked list of buffers. But, obviously, they are not transferred between PipeReader and PipeWriter — all the logic is in one class. This list is used for both reading and writing. Moreover, the returned data is stored in this list (so no copying performed).

Also, there are objects indicating the beginning of data to read (ReadHead and index), the end of data to read (ReadTail and index) and the beginning of space to write (WriteHead and the number of buffered bytes written). Here ReadHead, ReadTail and WriteHead are a specific members (segments) of the internal list of segments, and the index indicates a specific position within the segment. Thus, recording can start from the middle of a segment, capture one whole next segment and end in the middle of the third. These pointers are moved in various methods.

Getting started with PipeWriter methods


#1 ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)


That is mentioned attractive at first glance method. It has a very suitable and fashionable signature — accepts ReadOnlyMemory, asynchronous. And many may be tempted, especially remembering that Span and Memory are so fast and cool. But do not flatter yourself. All that this method does is copying the ReadOnlyMemory passed to it into the internal list. And by “copy” is meant a call to the CopyTo() method, and not copying only the object itself. All the data that we want to record will be copied, thereby loading the memory. This method should be mentioned only to make sure that it is better not to use it. Well, and perhaps for some rare situations, this behavior is appropriate.
The body of the method is a critical section, access to it is synchronized through a monitor.

Then the question may arise, how to write something, if not through the most obvious and only suitable method

#2 Memory<byte> GetMemory(int sizeHint)


The method takes one parameter of an integer type. In it, we must indicate how many bytes we want to write to the pipeline (what size of the buffer we want). This method checks if there is enough space for writing in the current memory fragment stored in _writingHeadMemory. If enough, _writingHeadMemory is returned as the Memory. Otherwise, for the data written to the buffer, but for which the FlushAsync method was not called, it is called and another BufferSegment is allocated, which is connected to the previous one (here is our internal list). If _writingHeadMemory is null, it is initialized with a new BufferSegment. And the allocation of the buffer is a critical section and is done under the lock.

I suggest to look at such an example. At first glance, it might seem that the compiler (or runtime) has beguiled the demon.

Devilry
    var pipeNoOptions = new Pipe();

    Memory<byte> memoryOne = pipeNoOptions.Writer.GetMemory(2);
    Console.WriteLine(memoryOne.Length); //2048 or 4096

    var pipeWithOptions = new Pipe(new PipeOptions(minimumSegmentSize: 5));

    Memory<byte> memoryTwo = pipeWithOptions.Writer.GetMemory(2);
    Console.WriteLine(memoryTwo.Length); //16


But everything in this example is understandable and simple.
While creating a Pipe instance, we can pass the PipeOptions object to it in the constructor with options for creating.

PipeOptions has a default minimum segment size field. Not so long ago, it was 2048, but this commit has updated this value to 4096. At the time of writing this article, the 4096 version was in prerelease nuget-package, the last release version had a value of 2048. This explains the behavior of the first example. If you are critical of using a smaller size for the default buffer, you can specify it in an instance of the PipeOptions type.

But in the second example, where the minimum size is specified, the length does not match it anyway. And this is happening because the creation of a new BufferSegment occurs using pools. One of the options in PipeOptions is the memory pool. After that, the specified pool will be used to create a new segment. If you did not specify memory pool, the default ArrayPool will be used, which, as you know, has several buckets for different sizes of arrays (each next one is 2 times larger than the previous one) and when it is requested for a certain size, it searches for a bucket with arrays of suitable size (i.e., the nearest larger or equal). Accordingly, the new buffer will almost certainly be larger than you requested. The minimum array size in the default ArrayPool (System.Buffers.TlsOverPerCoreLockedStacksArrayPool) is 16. But do not worry, this is a pool of arrays. Accordingly, in the vast majority of cases, the array does not exert pressure on the garbage collector and will be reused later.

#2.5 Span<byte> GetSpan(int sizeHint)


It works similarly, giving Span from Memory.

Thus GetMemory() or GetSpan() are the main methods for writing. They give us an object that we can write to. To do this, we do not need to allocate memory for new arrays of values, we can write directly into the pipe. Which one to use will mainly depend on the API you are using and the method asynchrony. However, in view of the above, a question arises. How will the reader know how much we wrote? If we always used a specific implementation of the pool, which gives an array of exactly the same size as requested, then the reader could read the entire buffer at once. However, as we have already said, we are allocated a buffer with a high probability of a larger size. This leads to the following method required for operation.

#3 void Advance(int bytes)


A terribly simple method. It takes the number of bytes written as an argument. They increment the internal counters — _unflushedBytes and _writingHeadBytesBuffered, whose names speak for themselves. It also truncates (slices) _writingHeadMemory exactly to the number of bytes written (using the Slice method). Therefore, after calling this method, you need to request a new memory block in the form of Memory or Span, you cannot write to the previous one. And the whole body of the method is a critical section and runs under a lock.

It seems that after this the reader can receive data. But one more step is needed.

#4 ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken)


The method is called after we write the necessary data to the received Memory (GetMemory) and indicate how much we wrote there (Advance). The method returns ValueTask, however it is not asynchronous (unlike its descendant StreamPipeWriter). ValueTask is a special type (readonly struct) used in the case when most calls will not be asynchronous, that is all the necessary data will be available at the time of its call and the method will end synchronously. Inside itself it contains either data or Task (in case it didn’t work out synchronously). It depends on _writerAwaitable.IsCompleted property. If we look for what changes the state of this _writerAwaitable, we will see that this happens if the amount of not consumed data (this is not exactly the same as not examined data will be explained later) exceeds a certain threshold (_pauseWriterThreshold). The default value is 16 segment sizes. If desired, the value can be changed in PipeOptions. Also, this method starts the continuation of the ReadAsync method, if one was blocked.

Returns a FlushResult containing 2 properties — IsCanceled and IsCompleted. IsCanceled indicates whether Flush has been canceled (CancelPendingFlush() call). IsCompleted indicates whether the PipeReader has completed (by calling the Complete() or CompleteAsync() methods).
The main part of the method is performed under the lock.

Other methods of PipeWriter are not interesting from the implementation point of view and are used much less often, therefore only a brief description will be given.

#5 void Complete(Exception exception = null) or ValueTask CompleteAsync(Exception exception = null)


Marks pipe closed for writing. An exception will be thrown when attempting to use the write methods after completion. If PipeReader has already been completed, the entire Pipe instance is also completed. Most of the work is done under the lock.

#6 void CancelPendingFlush()


As the name implies, it cancels the current FlushAsync() operation. There is a lock.

#7 void OnReaderCompleted(Action<Exception, object> callback, object state)


Executes the passed delegate when the reader completes. There is also a lock.
In the documentation it is currently written that this method may not be called on some PipeWriter implementations and will be removed in the future. Therefore, you should not tie logic to these methods.

It's time for PipeReader


#1 ValueTask<ReadResult> ReadAsync(CancellationToken token)


Here, like in FlushAsync(), ValueTask is returned, which hints that the method is mostly synchronous, but not always. Depends on the state of _readerAwaitable. As with FlushAsync, you need to find when _readerAwaitable is set to incomplete. This happens when PipeReader has read everything from the internal list (or it contains data that was marked as examined and you need more data to continue). Which, in fact, is obvious. Accordingly, we can conclude that it is desirable to fine-tune Pipe to your work, to set all its options thoughtfully, based on empirically identified statistics. Proper configuration will reduce the chance of an asynchronous execution branch and will allow more efficient processing of data. Almost all code in the entire method is surrounded by a lock.

Returns some mysterious ReadResult. In fact, it’s just a buffer + flags showing the status of the operation (IsCanceled — whether ReadAsync was canceled and IsCompleted indicating whether the PipeWriter was closed). IsCompleted is a value indicating whether the PipeWriter Complete() or CompleteAsync() methods were called. If these methods were called with an exception passed, then it will be thrown when trying to read.

And again, the buffer has a mysterious type — ReadOnlySequence. This, in turn, is the object for the contents of segments (ReadOnlySequenceSegment) of the beginning and the end + start and end indexes inside the corresponding segments. Which actually resembles the structure of the Pipe class itself. By the way, BufferSegment is inherited from ReadOnlySequenceSegment, which hints that BufferSegment is used in this sequence. Thanks to this, you can just get rid of unnecessary memory allocations for data transfer from writer to reader.
ReadOnlySpan can be obtained from the buffer for further processing. To complete the picture, you can check if the buffer contains a single ReadOnlySpan. If it contains, we do not need to iterate over the collection from one element and we can get it using the First property. Otherwise, it is necessary to go over all segments in the buffer and process ReadOnlySpan of each.

Discussion topic — in the ReadOnlySequence class, nullable reference types are actively used and there is goto (not for deep loop nesting and not in the generated code) — in particular, here.

After processing, you need to signal the Pipe instance that we read the data.

#2 bool TryRead(out ReadResult result)


Synchronous version. Allows you to get the result if it exists. Otherwise, unlike ReadAsync it does not block and returns false. Also the code of this method is in the lock.

#3 void AdvanceTo(SequencePosition consumed, SequencePosition examined)


In this method, you can specify how many bytes we examine and consume. Data that has been examined but not consumed will be returned the next time it is read. This feature may seem strange at first glance, but when processing a stream of bytes it is rarely necessary to process each byte individually. Usually data is exchanged using messages. A situation may arise that the reader, when reading, received one whole message and part of the second. The whole must be processed, and part of the second should be left for future so that it comes along with the remaining part. The AdvanceTo method takes a SequencePosition, which is actually a segment + index. When processing everything that ReadAsync has read, you can specify buffer.End. Otherwise, you have to explicitly create a position, indicating the segment and index at which processing was stopped. Lock is under the hood.
Also, if the amount of not consumed information is less than the specified threshold (_resumeWriterThreshold), it starts the continuation of PipeWriter if it was blocked. By default, this threshold is 8 segment volumes (half the blocking threshold).

#4 void Complete(Exception exception = null)


Completes the PipeReader. If the PipeWriter is completed at this point, then the entire Pipe instance completes. Lock inside.

#5 void CancelPendingRead()


Allows you to cancel the reading that is currently in pending state. Lock.

#6 void OnWriterCompleted(Action <Exception, object> callback, object state)


Allows you to specify the delegate to execute upon completion of the PipeWriter.
Like the similar method of PipeWriter, in the documentation there is the same tag that will be removed. Lock is under the hood.

Example


The listing below shows an example of working with pipes.
Since the introduction of .NET Core Span and Memory, many classes for working with data have been supplemented by overloads using these types. So the general interaction scheme will be approximately the same. In my example, I used pipelines to work with pipes (I like similar words) — OS objects for interprocess communication. The pipes API has just been expanded accordingly to read data in Span and Memory. The asynchronous version uses Memory, since the asynchronous method will be converted to a template method using an auto-generated finite state machine, in which all local variables and method parameters are stored, and since Span is ref readonly struct, it cannot be placed in the heap, respectively, using Span in an asynchronous method is impossible. But there is also a synchronous version of the method that allows you to use Span. In my example, I tried both and it turned out that the synchronous version in this situation shows itself better. When using it, less garbage collection occurs, and data processing is faster. But this was only because there was a lot of data in the pipe (the data was always available). In the situation in which it is rather likely that there will be no data at the time of applying for the next batch, you should use the asynchronous version so as not to strain the processor idle.
The example has comments that explain some points. I draw your attention to the fact that despite the fact that the fragments of the program responsible for reading from the pipe and processing are separated, when writing to a file, the data is read exactly from the place where it is written when reading from the pipe.

Years of evolution for the sake of a powerful feature - asynchronous main
    class Program
    {
        static async Task Main(string args)
        {
            var pipe = new Pipe();
            var dataWriter = new PipeDataWriter(pipe.Writer, "testpipe");
            var dataProcessor = new DataProcessor(new ConsoleBytesProcessor(), pipe.Reader);
            var cts = new CancellationTokenSource();
            await Task.WhenAll(dataWriter.ReadFromPipeAsync(cts.Token), dataProcessor.StartProcessingDataAsync(cts.Token));
        }
    }


PipeDataWriter
    public class PipeDataWriter
    {
        private readonly NamedPipeClientStream _namedPipe;
        private readonly PipeWriter _pipeWriter;
        private const string Servername = ".";
        
        public PipeDataWriter(PipeWriter pipeWriter, string pipeName)
        {
            _pipeWriter = pipeWriter ?? throw new ArgumentNullException(nameof(pipeWriter));
            _namedPipe = new NamedPipeClientStream(Servername, pipeName, PipeDirection.In);
        }

        public async Task ReadFromPipeAsync(CancellationToken token)
        {
            await _namedPipe.ConnectAsync(token);

            while (true)
            {
                token.ThrowIfCancellationRequested();
                
                //// when working with the asynchronous method, use Memory <T>
                //Memory<byte> buffer = _pipeWriter.GetMemory();
                //// asynchronous reading from a named pipe in Memory <T>
		//// there can be any operation to obtain data - from reading from a file to random generation.
                //int readBytes = await _namedPipe.ReadAsync(buffer, token); 
                
                // synchronous reading from the named pipe to the requested from PipeWriter Span
		// there can be any operation to obtain data - from reading from a file to random generation.
                int readBytes = _namedPipe.Read(_pipeWriter.GetSpan());
                
                // if there was nothing in the channel, release the thread for half a second and try again
		// in other cases we can break the loop, it's just example
                if (readBytes == 0)
                {
                    await Task.Delay(500, token);
                    continue;
                }
                
                // specify the amount of bytes read from the pipe
                _pipeWriter.Advance(readBytes);
                
                // flush data to make them available PipeReader
                FlushResult result = await _pipeWriter.FlushAsync(token);
                
                // if PipeReader has been completed, it no longer needs to write data
                // PS this behavior was chosen by me as an example, it depends on business logic
                if (result.IsCompleted)
                {
                    break;
                }
            }
            
            // complete _pipeWriter to complete the entire instance of pipe
            _pipeWriter.Complete();
        }
    }


DataProcessor
    public class DataProcessor
    {
        private readonly IBytesProcessor _bytesProcessor;
        private readonly PipeReader _pipeReader;

        public DataProcessor(IBytesProcessor bytesProcessor, PipeReader pipeReader)
        {
            _bytesProcessor = bytesProcessor ?? throw new ArgumentNullException(nameof(bytesProcessor));
            _pipeReader = pipeReader ?? throw new ArgumentNullException(nameof(pipeReader));
        }

        public async Task StartProcessingDataAsync(CancellationToken token)
        {
            while (true)
            {
                token.ThrowIfCancellationRequested();
                
                // reading data from a pipe instance
                ReadResult result = await _pipeReader.ReadAsync(token);
                ReadOnlySequence<byte> buffer = result.Buffer;
                
                // We perform calculations with the data obtained.
                await _bytesProcessor.ProcessBytesAsync(buffer, token);
                
                // indicate to which position the data was processed. In this case, everything is written to the file.
                // in situations where not all data has been processed, you need to create a position manually using the buffer and index
                // in this situation, IBytesProcessor.ProcessBytesAsync can be supplemented by returning this position
                _pipeReader.AdvanceTo(buffer.End);
                
                // if PipeWriter has been completed, reading is no longer necessary
                // this behavior was chosen by me as an example, it depends on business logic
                if (result.IsCompleted)
                {
                    break;
                }
            }
            
            // complete _pipeReader to complete the entire instance of pipe
            _pipeReader.Complete();
        }
    }


BytesProcessor
    public interface IBytesProcessor
    {
        Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token);
    }

    public class ConsoleBytesProcessor : IBytesProcessor
    {
        //Let's imagine that in this class there is a normal constructor and IDisposable
        readonly FileStream _fileStream = new FileStream("buffer", FileMode.Create);
        
        public Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token)
        {
            if (bytesSequence.IsSingleSegment)
            {
                ProcessSingle(bytesSequence.First.Span);
            }
            else
            {
                foreach (var segment in bytesSequence)
                {
                    ProcessSingle(segment.Span);
                }
            }
			
            return Task.CompletedTask;
        }

        private void ProcessSingle(ReadOnlySpan<byte> span)
        {
            _fileStream.Write(span);
        }
    }

Tags:
Hubs:
+8
Comments0

Articles