.NET
Algorithms
C#
15 December 2014

Concurrency структуры в .net. ConcurrentQueue изнутри

ConcurrentQueue можно отнести к lock-free конкурентным структурам данных. В ее реализации нет блокировок (lock, Mutex…) и реализована она с использованием:
— классической функции CompareExchange;
— SpinWait
— volatile (используется как memory-barrier)
В основу ConcurrentQueue заложена структура ring-buffer (кольцевой буфер).


Ring-buffer (кольцевой буфер)


Кольцевой буфер идеально подходит для реализации структуры данных «очередь» (FIFO).

В его основе лежит массив данных и 2 указателя – начало (start) и конец (end).

Предусмотрено две основные операции:
  1. Push — добавление в конец. При добавлении нового элементов в буфер, счетчик end увеличивается на 1 и на его место записывается новый элемент. Если мы «уперлись» в верхнюю границу массива, то значение end обнуляется («переходит» на начало массива) и элементы начинают записываться в начало массива. Запись возможна пока индекс end не достиг индекса start.
  2. Pop — выборка элементов сначала. Выборка элементов происходит с элемента start, последовательно увеличивая его значение, до тех пока не достигнет end. Выборка возможна, пока индекс start не достиг индекса end.


Блочный кольцевой буфер


Устройство ConcurrentQueue немного сложнее, чем классический кольцевой буфер. В его реализации используется понятие сегмента (Segment). ConcurrentQueue состоит из связанного списка (однонаправленного) сегментов. Размер сегмента равен 32.
private class Segment {
    volatile VolatileBool[] m_state;
    volatile T[] m_array;
    volatile int m_low;
    volatile int m_high;
    volatile Segment m_next;
}

Первоначально в ConcurrentQueue создается 1 сегмент

По мере необходимости к нему справа добавляется новые сегменты


В результате получается однонаправленный связанный список. Начало связанного списка задает m_head, конец – m_tail. Ограничения:
  • m_head сегмент может иметь пустые ячейки только слева
  • m_tail сегмент может иметь пустые ячейки только справа
  • если m_head = m_tail то пустые ячейки могут быть как слева, так и справа.
  • В сегментах, между m_head и m_tail пустых ячеек быть не может.

Добавление элемента (Enqueue)


Ниже представлен примерный алгоритм добавление элементов в сегмент.
  • Увеличивается m_high на 1
  • В массив m_array с индексом m_high записывается новое значение.

index = Interlocked.Increment(ref this.m_high);
if (index <= 31)
{
     m_array[index] = value;
     m_state[index].m_value = true;
}

m_state – массив состояния ячеек, если значение true – элемент записан в ячейку, если false — еще нет. По сути, это некий «Commit» записи. Нужен он для того, чтобы между операциями увеличения индекса Interlocked.Increment и записью значения m_array[index] = value не произошло чтение элемента другим потоком. Тогда чтение данных будет осуществляться после выполнения:
while (!this.m_state[index].m_value)
   spinWait2.SpinOnce();


Добавление нового сегмента (Segment.Grow)


Как только m_high текущего сегмента становится равным 31, запись в текущий сегмент прекращается и создается новый сегмент (текущие сегменты продолжают жить своей жизнью).
m_next = new ConcurrentQueue<T>.Segment(this.m_index + 1L, this.m_source);
m_source.m_tail = this.m_next;

m_next – ссылка на следующий сегмент
m_source.m_tail – ссылка последний сегмент списка сегментов.

Выборка элемента (TryDequeue)


В основе выборки элементов из очереди лежат две базовые функциональности:
  • Interlocked.CompareExchange – атомарная операция, которая записывает значение переменной, в случае если ее значение равно сравниваемому значению.
    public static extern int CompareExchange(ref int location1, int value, int comparand);
    

  • SpinWait, из MSDN
    System.Threading.SpinWait is a lightweight synchronization type that you can use in low-level scenarios to avoid the expensive context switches and kernel transitions that are required for kernel events. On multicore computers, when a resource is not expected to be held for long periods of time, it can be more efficient for a waiting thread to spin in user mode for a few dozen or a few hundred cycles, and then retry to acquire the resource. If the resource is available after spinning, then you have saved several thousand cycles. If the resource is still not available, then you have spent only a few cycles and can still enter a kernel-based wait. This spinning-then-waiting combination is sometimes referred to as a two-phase wait operation.


Примерный алгоритм работы выборки:
  1. Получить m_low
  2. Увеличить m_low на 1, с использованием CompareExchange
  3. Если m_low больше 31 – перейти на следующий сегмент
  4. Дождаться коммита (m_state[low].m_value) элемента с индексом m_low.

SpinWait spinWait1 = new SpinWait();
int low = this.Low;
if (Interlocked.CompareExchange(ref this.m_low, low + 1, low) == low)
{
   SpinWait spinWait2 = new SpinWait();
   while (!this.m_state[low].m_value)
      spinWait2.SpinOnce();
   result = this.m_array[low];


Count vs IsEmpty


Код IsEmpty:
ConcurrentQueue<T>.Segment segment = this.m_head;
if (!segment.IsEmpty)
   return false;
if (segment.Next == null)
   return true;
SpinWait spinWait = new SpinWait();
for (; segment.IsEmpty; segment = this.m_head)
{
   if (segment.Next == null)
      return true;
   spinWait.SpinOnce();
}
return false;

Т.е. по сути, это найти первый непустой сегмент. Если он найден – очередь не пуста.

Код Count:
ConcurrentQueue<T>.Segment head;
ConcurrentQueue<T>.Segment tail;
int headLow;
int tailHigh;
this.GetHeadTailPositions(out head, out tail, out headLow, out tailHigh);
if (head == tail)
   return tailHigh - headLow + 1;
return 32 - headLow + 32 * (int) (tail.m_index - head.m_index - 1L) + (tailHigh + 1);

По сути, он ищет первый и последний сегмент и на основе этих двух сегментов вычисляет кол-во элементов.
Вывод — операция Count будет занимать больше процессорного времени, чем IsEmpty.

Снепшот & GetEnumerator


Структура ConcurrentQueue поддерживает технологию снепшотов для получения целостного набора элементов.
Целостные данные возвращают:
  • ToArray
  • ICollection.CopyTo
  • GetEnumerator

Операторы выше так же работаю без блокировок, а целостность достигается за счет введения счетчика
volatile int m_numSnapshotTakers
в рамках всей очереди — число операций, работающих со снепшотами в текущий момент времени. Т.е. каждая операция, которая хочет получить целостную картину, должна реализовать следующую код:
Interlocked.Increment(ref this.m_numSnapshotTakers);
try
{
...//Итератор по всем сегментам
}
finally
{
Interlocked.Decrement(ref this.m_numSnapshotTakers);
}

В дополнении к этому, изменения у нас «пишет» только операция Dequeue, поэтому только в ней проверяется необходимость удалять ссылку на элемент очереди:
if (this.m_source.m_numSnapshotTakers <= 0)
   this.m_array[low] = default (T);

+12
22.4k 108
Comments 4