Pull to refresh

Использование распараллеливания при обработке данных в C#

Reading time4 min
Views6.9K


Всем добрый день! Я технический специалист, работающий в системе внутреннего аудита, в мои обязанности входит создание инструментов ETL на языке программирования C#.

Периодически источниками данных становятся жестко структурированные файлы формата xml, csv, json или любого другого формата. Иногда их количество становится достаточно большим и постоянно увеличивающимся. Например, в одной из моих задач количество файлов увеличивалось со средней скоростью обновления примерно 150 000 файлов в сутки. Если при этом обработка одного файла (считывание массива байт с жесткого диска в память, трансформация загруженных данных и запись их в базу данных) занимает секунду, то становится понятно, что обработка всех файлов займет более 40 часов. В этом случае мы не сможем обработать эти файлы до конца, так как скорость увеличения количества файлов будет явно выше скорости их обработки.

Одно из решений данной проблемы – разработать приложение, в котором создать пул независимых друг от друга потоков. В потоках будут обрабатываться файлы, выбираемые из общей очереди. Однако в этом случае возникают сложности с синхронизацией работающих потоков и совместного использования ресурсов, так как очень вероятно появление взаимных блокировок.

Что бы избежать этих сложностей компания Microsoft добавила в фреймоворк .Net библиотеку TPL (начиная с версии 4.0). Я расскажу, как используя возможности этой библиотеки решить данную проблему.

Итак, изначально алгоритм работы выглядит следующим образом:

Сканируется каталог хранения файлов и возвращается список (например, List), содержащий данные о всех файлах;
Запускается цикл (for или foreach) в котором данные из очередного файла считываются в память, при необходимости трансформируются и записываются в БД.

Очевидно, что самые затратные по времени операции – это считывание данных с жесткого диска в память и запись данных из памяти в БД.

Попробуем оптимизировать наш алгоритм при помощи библиотеки TPL:

Пункт 1.

Изменим список, возвращаемый функцией сканирования каталога хранения файлов с List на ConcurrentQueue.
Для чего мы это делаем? Дело в том, что класс ConcurrentQueue является потокобезопасным, то есть если одновременно два потока попытаются извлечь данные из этого списка или записать в него данные, то у нас не возникнет исключений (Exception).
Пункт 1 нашего алгоритма будет выглядеть так: сканируется каталог хранения файлов и возвращается список ConcurrentQueue, содержащий данные о всех файлах.

Пункт 2:
Изменим конструкцию формирующую цикл обработки данных из файла. Заменим for на Parallel.For или Parallel.ForEach.

В чем отличие новой конструкции от for? Тут всё просто и в принципе понятно из названия языковой конструкции. Все итерации цикла выполняются в параллельных потоках. В качестве примера я покажу организацию цикла конструкцией Parallel.ForEach:

Parallel.ForEach(listFiles, (currentFile) =>
       	  {
              	var dataFile = getDataFile(currentFile.FullName);
		TransformData(dataFile);
		WriteToDB(dataFile);
               });

где:

listFiles – это коллекция типа ConcurrentQueue содержащая спи-сок файлов в каталоге;
currentFile – элемент коллекции listFiles, который возвращается конструк-цией ForEach;
dataFile – условная некоторая структура данных в памяти, получаемая счи-тыванием содержимого файла в память;
getDataFile – условная функция возвращающая содержимое файла в виде некоторой структуры данных;
TransformData – условная процедура трансформации полученных данных;
WriteToDB – условная процедура записи данных в БД.

В данном примере, с помощью конструкции Parallel.ForEach, мы организуем цикл. В этом цикле, в параллельных потоках, производится считывание данных с жесткого диска, их трансформация и запись в БД. При этом, проблемы организации работы параллельных потоков отсутствуют. Количество параллельных потоков зависит от числа ядер процессора и их загруженности.

Используя предложенный алгоритм, мы ускорим обработку файлов, как минимум в 2 раза. Хотя, конечно, эта цифра будет меняться в зависимости от количества ядер и памяти машины, на которой будет запускаться программа.

Так же для ускорения работы программы нужно вынести запись в БД в отдельный поток, работающий независимо от основного. Сделать это можно при помощи коллекции ConcurrentQueue, чтобы избежать конфликтов при добавлении данных в оче-редь.

Перепишем вышеприведенный пример с учетом оптимизации записи в БД.
Предположим, что процедура чтения файлов возвращает нам данные в DataTable):

Parallel.ForEach(listFiles, (currentFile) =>
       	  {
              	DataTable dataFile = getDataFile(currentFile.FullName);
		TransformData(dataFile);
		threadWriteToDB.ListData.Enqueue(dataFile);
               });

Как видно, вместо строки с вызовом процедуры записи в БД, мы просто добавляем в коллекцию ConcurrentQueue ListData описанную и инициализированную в отдельном потоке, экземпляр которого threadWriteToDB используется в нашем цикле.

Запись в БД происходит уже в отдельном потоке. Запись в БД можно организовать аналогично работе с файлами, с помощью конструкций Parallel.For и/или Paral-lel.Foreach.

В моей задаче, где потребовалась обработка сопоставимого количества файлов, сейчас может обрабатываться в среднем от 200 000 до 400 000 файлов в сутки, при чем скорость ограничивается загрузкой БД и шириной канала данных.
Tags:
Hubs:
-2
Comments9

Articles