Как стать автором
Обновить

Неблокирующие очереди: обмен сообщениями между потоками

Время на прочтение 6 мин
Количество просмотров 13K
Идею к написанию подобного модуля породил PLM нашего корпоративного продукта. Инспектируя нашу дизайн-документацию, нам сказали, что наш код ни в коем случае не должен блокировать таску из которой он вызывается, и вообще отнимать как можно меньше ее времени, такова особенность построения системы, ватчдоги и т.п. Поставленная задача предполагала передачу неких сообщений от одной таски к другой, что то вроде логов, расширенная диагностика. Таска-получатель создана, чтобы записывать результат в файл, поскольку очевидно из исходной таски о записи в файл и речи быть не может. И хотя источник(producer) один, и потребитель(consumer) тоже один и даже наличие мютексов или семафоров не сказалось бы на исходной таске, было решено полностью от них отказаться. Опять же, в будущем возможно было расширение задачи на несколько других тасок, а посему ситуация, когда одна таска ожидает другую хоть и ограниченно допустима (а исходный код для обмена своими информационными сообщениями семафоры таки использует), но очень нежелательна.

Изначально предполагалось сделать статический кольцевой буфер, где каждый элемент содержит бит, определяющий принадлежность источнику или потребителю. Алгоритм предельно прост, источник записывает свои данные в ячейку, где бит равен нулю, и после этого «публикует» изменение, записывая в этот бит единицу и переходит к следующему элементу. Потребитель же из элемента, у которого этот бит равен единице, считывает сообщение, и затем бит обнуляет. Никаких race conditions, вроде бы все хорошо. Но первый же траффик-тест выявил, что за один слайс источник может теоретически выдать порядка 30-40 тысяч элементов. В реальности конечно будет меньше, поскольку он еще что то делает, кроме производства этих строчек, но определить размер буфера, которого было бы достаточно, не представляется возможным. Одна из причин этого еще и нестабильная скорость записи в файл — на некоторых системах стоят CF карты вместо жестких дисков. А терять сообщения очень не хотелось бы.

Порывшись в интернете, я наткнулся на следующее решение, которое и заимплементировал в свою задачу: drdobbs.com/architecture-and-design/210604448
Алгоритм описан достаточно подробно, повторять его здесь не буду.
Два изменения, которые я внес:
1) Не понял, почему освобождает элементы источник, а не потребитель. Освобождение элементов потребителем также не создает race condition(кстати как на русский перевести понятно это словосочетание?). Это снимает часть нагрузки с потребителя и уменьшает используемую память, поскольку потребленные элементы удаляются немедленно.
2) Также траффик тест, а точнее профайлер, выявил, что malloc это относительно дорогая операция. Поскольку максимальный размер исходных сообщений известен, было принято решение сгруппировать выделение памяти одной операцией сразу на 8 элементов. Это дало более чем двукратный прирост скорости, в частности вдвое уменьшило процессорную нагрузку, которую мы добавляем к исходной таске.

Тут сразу надо оговориться, что исходный код был на безплюсовом си, который к тому же в силу договора о неразглашении я не могу публиковать. Маллок к си-шарпу уже как то не относится, поэтому второй пункт уже неприменим. А си-шарп я изучаю уже в свое свободное время, и пишу лично для себя. Предлагали раз работу, не прошел как раз отсутсвием опыта работы с этим волшебным языком, с тех пор и занимаюсь. Ну да ладно, ближе к делу.

Имплементация неблокирующей очереди на C#

Первым делом нужно описать элемент очереди.
        class queItem
        {
          public object message;
          public queItem next;

          public queItem(object message = null)
          {
            this.message = message;
            next = null;
          }
        }

И собственно саму очередь:
      class locklessQueue //thread-to-thread lockless queue
      {
        queItem first;
        queItem last;
      }

Здесь first принадлежит потребителю, а next в элементе last принадлежит источнику. Ни first ни last не должны быть равны null, посему конструктор создает пустой элемент в состоянии «уже потреблен».
        public locklessQueue()
        {
          first = new queItem();
          last = first;
        }

Далее методы добавления в очередь и соответственно извлечения из нее.
        public void produce(object message)
        {
          last.next = new queItem(message);
          last = last.next;
        }

        public bool consume(out object message)
        {
          if (first == last || first.next == null)
          {
            message = null;
            return false;
          }
          message = first.next.message;
          first = first.next;
          return true;
        }
      }

Сам по себе полученный класс мало обоснован, поскольку в дотнет 4.0 уже входит класс ConcurrentQueue, который не только полностью потокобезопасен, но еще и в отличие от полученного класса позволяет добавлять в очередь и изымать из нее одновременно нескольким потокам. И позволяет работать с очередью в 1.5-3 раза быстрее, в сравнении с блокирующим вариантом. пруфлинк
Для сборщика логов класса ConcurrentQueue более чем достаточно. Однако задачу под свое дотнет приложение я расширил, и ConcurrentQueue меня не устроила тем, она безадресная.

Обмен сообщениями между потоками


image
Каждый поток должен иметь возможность отправить сообщение другому потоку, по его имени. В моем случае это обработчик tcp сокетов(клиент или сервер) и собственно потоки обработчики. Каким образом выяснять какому именно обработчику надо отправлять я оставляю за пределами этой заметки.

К сожалению одну из подзадач я так и не смог решить — без блокировки добавлять новый поток, как участника обмена. Хотелось бы посмотреть исходный код ConcurrentQueue, возможно ее решение помогло бы найти ответ. В шарпе правда можно ее и использовать для передачи инициирующих сообщений, или для сообщений от асинхронных методов, но пока что я приведу блокирующее решение, почти такое же, как и было бы в классическом си.
Забегая вперед, скажу, что у принятого решения есть очевидный недостаток, для обработки очередей нужно запускать отдельный поток, это является платой за отсутствие блокировки. Отдельный поток очевидно добавит процессорной нагрузки, но отсутствие блокировки ускорит обработку каждого сообщения. Трудно оценить насколько улучшится/ухудшится производительность и как это будет масштабироваться, возможно проведу такие исследования в будущем.

Итак для каждого потока участника нужно создать две очереди, в одну он будет сообщения отправлять а из другой читать. «Проксирующий» контейнер для этих двух очередей:
    class threadNode
    {
      public string tName;
      public int tid;
      locklessQueue outgoing = new locklessQueue(); //from Messenger to Node 
      locklessQueue incoming = new locklessQueue(); //from node to Messenger

      public threadNode(string tName, int tid)
      {
        this.tid = tid;
        this.tName = tName;
      }

      public void enqueue(messengerItem message) //called by Node
      {
        incoming.produce(message);
      }

      public bool dequeue(out messengerItem message) //called by Node
      {
        object msg;
        bool result = outgoing.consume(out msg);
        message = msg as messengerItem;
        return result;
      }

      public void transmit(messengerItem message) //called by Messenger
      {
        outgoing.produce(message);
      }

      public bool retrieve(out messengerItem message) //called by Messenger
      {
        object msg;
        bool result = incoming.consume(out msg);
        message = msg as messengerItem;
        return result;
      }
    }

Как видно, в очередь кладется объект типа messengerItem, представленный следующим классом:
    class messengerItem
    {
      public string from;
      public string to;
      public object message;

      public messengerItem(string from, string to, object message)
      {
        this.from = from;
        this.to = to;
        this.message = message;
      }
    }

Основной класс я сделал статическим, чтобы иметь возможность из любого места в коде отправить сообщение написав Messenger.send(...);
  public static class Messenger
  {
    static Dictionary<int, threadNode> byTID = new myDictionary<int, threadNode>();
    static Dictionary<string, threadNode> byRegName = new myDictionary<string, threadNode>();
    static Mutex regMutex = new Mutex();    //only one task is allowed to register at a time

Для поиска нужной ноды при передаче сообщения от потока я использую Dictionary, c ключом managedThreadId, а к потоку — ключом является имя представленное при регистрации. Для передачи сообщения от одной ноды в другую, стартую собственный поток, оболочку которого здесь не привожу, вкратце — она в бесконечном цикле дергает messengerFunction, описанный далее, и вызывает Thread.Sleep, если возвращаемое значение false, чтобы отдать слайс.
    static bool messengerFunction()
    {
      bool acted = false;
      messengerItem item;
      threadNode dst;
      Dictionary<string, threadNode> tmp = byRegName;
      foreach (threadNode node in tmp.Values)
      {
        if (tmp != byRegName)
          break;
        if (node.retrieve(out item))
          if (item != null)
          {
            acted = true;
            if(tmp.TryGetValue(item.to,out dst))
            {
              dst.transmit(item);
              sent = true;
            }
          //else discard
          }
      }
      return acted;
    }

Для регистрации потока в мессенджере используется следующая функция, которая на данный момент блокирующая:
     
    static public void register(string tName)
    {
      if (tName == null || tName == "")
        return;
      int tid = Thread.CurrentThread.ManagedThreadId;
      myDictionary<int, threadNode> newbyTID = new myDictionary<int, threadNode>();
      myDictionary<string, threadNode> newbyRegName = new myDictionary<string, threadNode>();
      threadNode newnode = new threadNode(tName, tid);
      newbyTID.Add(tid, newnode);
      newbyRegName.Add(tName, newnode);
      regMutex.WaitOne();
      foreach (threadNode node in byTID.Values)
      {
        newbyTID.Add(node.tid, node);
        newbyRegName.Add(node.tName, node);
      }
      byTID = newbyTID;
      byRegName = newbyRegName;
      regMutex.ReleaseMutex();
    }

Также похожая функция используется для разрегистрации при завершении потока, опущу ее код. Все что осталось — функции отправки и приема сообщений потоками.
    static public void send(string destination, object message)
    {
      int tid = Thread.CurrentThread.ManagedThreadId;
      threadNode node;
      if (byTID.TryGetValue(tid, out node))
        node.enqueue(new messengerItem(node.tName, destination, message));
    }

    static public bool receive(out object message, out string sender)
    {
      int tid = Thread.CurrentThread.ManagedThreadId;
      threadNode node;
      if (!byTID.TryGetValue(tid, out node))
      {
        sender = null;
        message = null;
        return false;
      }
      else
      {
      messengerItem item;
      bool result = node.dequeue(out item);
      if (!result || item == null)
      {
        sender = null;
        message = null;
      }
      else
      {
        message = item.message;
        sender = item.from;
      }
      return result;
      }
    }

Последняя функция используется потоками-обработчиками в цикле, обрабатывая сообщение, или, если оно не получено, выполняя Thread.Sleep().
Теги:
Хабы:
+3
Комментарии 40
Комментарии Комментарии 40

Публикации

Истории

Ближайшие события

PG Bootcamp 2024
Дата 16 апреля
Время 09:30 – 21:00
Место
Минск Онлайн
EvaConf 2024
Дата 16 апреля
Время 11:00 – 16:00
Место
Москва Онлайн
Weekend Offer в AliExpress
Дата 20 – 21 апреля
Время 10:00 – 20:00
Место
Онлайн