Pull to refresh

Практика использования пространства System.Threading при написании многопоточных приложений в .NET.

Reading time6 min
Views5K
Последнее время приходится писать «маленькие» серверы для многопоточной обработки относительно небольших объемов данных. Хочу поделится с хабрасообществом определенным приемом в написании таких приложений.

Все задачи можно формализовать в эти 3 пункта:
1. Есть набор данных.
2. Эти данные нужно обработать (нам не важно как и что обрабатывать, главное это делать параллельно).
3. Данные постоянно поступают новые.
Для иллюстрации отдельных моментов давайте решим задачу выборки данных из множества RSS каналов.
Решение этой задачи я привел в следующем коде, который можно просто скопировать и запустить, код с комментариями, которые, надеюсь доступно описывают отдельные моменты.

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Net;
using System.IO;

namespace ConsoleApplication1
{
 class Program
 {
  internal class RSSServer
  {
   //Главный поток для выборки данных на обработку
   private Thread _server;

   //Переменная контролирующая главный цикл выборки данных
   private bool _start = false;

   //Результирующий список с данными RSS каналов
   public List<string> ResultData
   {
    get;
    set;
   }

   #region Счетчики для контроля установки событий

   private int _countAll = 0; //Количество выбранных данных для обработки
   private int _countEnd = 0; //Количество обработанных данных в текущей выборке

   #endregion

   //Событие с ручным сбросом - самый главный элемент для синхронизации потока выборки новых данных и обработки уже имеющихся
   private ManualResetEvent _mre = new ManualResetEvent(false);

   /// <summary>
   /// Этот метод будет работать в главном потоке
   /// </summary>
   private void GetTask()
   {
    string[] _rssURLs;

    /* Используем "бесконечный" цикл, этот прием наоболее оптимален в таких задачах */
    while (_start)
    {
     _rssURLs = GetURLs(); //Получаем данные из источника
     if (_rssURLs.Length > 0) //Проверка на то, что данные для обработки были получены
     {
      _mre.Reset(); //Сбрасываем событие, теперь пока оно не будет установлено, поток заснет при вызове метода WaitOne
      _countAll = _rssURLs.Length; //Здесь нам сихронизация не нужна, т.к. значение будет изменяться только в этом потоке
      _countEnd = 0; //Здесь тоже не синхронизируем переменную, т.к. она будет установлена в 0, уже после того как все остальные потоки ее меняющие уже закончат работу
      ResultData = new List<string>(_countAll);
      foreach (string s in _rssURLs) //Начинаем обработку данных каждый экземпляр отдельно.
      {
       ProccessingRSS(s);
      }
      _mre.WaitOne(); // Теперь ждем до тех пор, пока у нас не обработаются элементы данных
      foreach (string x in ResultData)
      {
       Console.WriteLine(x);
      }
      /* В этом примере принудительно прерывается выполнение цикла,
       * т.к. иначе данные в ResultData будут просто перезаписываться многократно
       *
       */
      break;
     }
     else
     {
      /* Здесь используется небольшая задержка в случае, если данных для обработки еще не поступило.
       * 200 милисекунд здесь выбрано произвольно, если у вас данных не будет продолжительное время, то конструкция выше
       * просто начнет нагружать процессоры практически на 50%, если не больше. Можете убедится убрав строчку ниже.
       * В реальности данная задержка должна вычисляться в зависимости от различных параметров, я обычно использую алгоритм
       * который после каждого пустого запроса ждет данных на 1 секунду дольше, это ограничено сверху определенным числом,
       * которое либо устанавливается через параметры (предпочтительный вариант), либо сразу зашивается в код.
      */
      Thread.Sleep(200);
     }

    }
   }

   private void ProccessingRSS(string _rssURL)
   {
    /*
     * Вот здесь начинается самое интересное. Для обработки каждой порции данных используется встроенный пул потоков.
     */
    ThreadPool.QueueUserWorkItem(new WaitCallback(ProcRSS), _rssURL);
   }

   private void ProcRSS(object rss)
   {
    //Весь метод помещаем в try
    try
    {
     string _rss = (string)rss;
     HttpWebRequest hwr = (HttpWebRequest)HttpWebRequest.Create(_rss);
     HttpWebResponse hwrr = (HttpWebResponse)hwr.GetResponse();
     string response = (new StreamReader(hwrr.GetResponseStream())).ReadToEnd();
     //Обязательно блокируем объект, в который будет помещаться результат
     lock (ResultData)
     {
      ResultData.Add(response);
     }
    }
    catch
    {
     //Игнорируем все исключения (в реальных задачах этого делать нельзя)
    }
    finally
    {
     Interlocked.Increment(ref _countEnd); //Увеличиваем переменную, специальным методом, т.к. она будет увеличиваться в различных потоках
     if (_countEnd >= _countAll) //Если это был последний обрабатываемый поток, то устанавливаем событие, и в главном потоке начинается новая выборка данных
      _mre.Set();
    }
   }

   private string[] GetURLs()
   {
    return new string[2] { "http://habrahabr.ru/rss/", "http://www.cbr.ru/scripts/RssPress.asp" }; //Источник данных неважен, для примера это просто массив адресов.
   }

   public void StartServer()
   {
    _server = new Thread(new ThreadStart(GetTask));

    //Если у потока будет установлено это свойство в true, то это поток завершится,
    //если завершится работа главного потока, иначе возможно, что поток продолжит свою работу,
    //даже если приложени будет "якобы" закрыто
    _server.IsBackground = true;

    //Это имя будет видно в отладчике,
    //поэтому рекомендую всегда давать имя потокам, которые создаются явно
    _server.Name = "GetTaskThread";

    //Если поток еще не стартовал, то стартуем его
    if ((_server.ThreadState & ThreadState.Unstarted) == ThreadState.Unstarted)
    {
     _start = true; //Устанавливаем свойство в true, что бы выборка данных была постоянной
     _server.Start();
    }
   }

   public void StopServer()
   {
    _start = false;
   }
  }

  static void Main(string[] args)
  {
   RSSServer _rServer = new RSSServer();
   _rServer.StartServer();
   Console.Read();
   _rServer.StopServer();
  }
 }
}

* This source code was highlighted with Source Code Highlighter.


Данная статья ориентирована на тех людей, кто еще не до конца понимает как работать с пространством System.Threading. Если есть какие либо вопросы, задавайте их в коментариях, постараюсь ответить.

Tags:
Hubs:
Total votes 10: ↑7 and ↓3+4
Comments18

Articles