Pull to refresh

Параллельная обработка IEnumerable в .NET

Reading time 5 min
Views 17K
В предложенной статье рассматривается решение задачи параллельной синхронной обработки IEnumerable, а также откуда такая задача вообще взялась.

Как и во многих других случаях, представленное решение началось со вполне конкретных потребностей.

Преамбула


В одном из внутренних проектов есть необходимость построения развесистого отчёта на 100+ срезов по массиву данных, чтение которого занимает более 12 часов. Сами объёмы данных тоже немаленькие. Совокупность продолжительного чтения и огромных объёмов данных (~1.5M кортежей, каждый из которых «весит» до 50 МБ) вносит в код два ограничения: крайняя нежелательность многократного чтения (50 дней на еженедельный отчёт никто тратить, увы, не готов) и техническая невозможность помещения всей выборки в оперативную память. Предшествовавший мне процесс разработки шёл, очевидно, итеративно: длина метода, агрегирующего данные для срезов составляла около 4000 строк. Передо мной была поставлена задача сделать так, чтобы данный код превратился в поддерживаемый.

Предпосылки


Нитью Ариадны в сложившейся ситуации для меня стало осознание, что «облизывать» одну и ту же сущность по очереди (равно как и чупа-чупс) как минимум некрасиво: даже если мне удастся отрефакторить код до состояния «срез-метод», то я получу обвешанную условиями простыню из сотни с лишним последовательных вызовов. От этого тоже хотелось уйти, так как текущая реализация не только упиралась в медленное чтение, но и добавляла сверху пробуксовку на обработку данных в одном потоке. Сервер, на котором выполняется процесс, обладает 8-ю ядрами и способен распараллелить обработку данных до такого состояния, что она будет занимать меньше времени, чем чтение очередного кортежа из базы.

Идея


Итак, взвесив сложившуюся ситуацию, я сформулировал незамысловатую идею: обрабатывать «поток объектов» в параллельных потоках (threads), где каждому срезу соответствует свой аккуратный метод (или даже лямбда), выполняющийся в отдельном потоке. Поскольку изначально количество объектов нам неизвестно, то для представления «потока объектов» как нельзя лучше подошёл интерфейс IEnumerable<T>. Остаётся лишь сделать так, чтобы каждому потоку достался свой IEnumerable<T>, и чтобы это был один и тот же IEnumerable<T> (прямо как в задачах про монеты и стаканы). Очевидно, что к таким объектам тут же прилипла кличка «клоны».

Нужно было сделать так, чтобы на каждом enumerable.Next() потокам раздавался на чтение единственный экземпляр объекта. Такое решение имеет особенность (это скорее минус, но не страшный): все потоки будут ждать самого медленного брата, то есть чтение будет синхронным. Если честно, я очень надеялся, что кто-то уже за меня это написал. Например, Microsoft сделал что-нибудь подобное в своём Parallel.Linq. Однако, на тот момент ничего подобного нагуглить не удалось — всё, что касалось параллельной обработки коллекций, относилось в параллельной обработке частей одно коллекции (например, Parallel.For()), но никак — к решаемой задаче. Ну что ж, велосипеды я обожаю, напильник в руки!

Ядро решения


В моей голове всегда жило понимание, что за любым интерфейсом скрывается объект. Поэтому для клонирования коллекции, вероятно, нужно или найти, или придумать объект с искомым интерфейсом. А вот и не нужно. Язык C# давно и успешно дарит своим адептам операторы yield return и yield break. Их применение и легло в основу решения.

Думаю, нет смысла объяснять, как была рождена каждая строчка кода. Объясню идею. Для того, чтобы ваших данных стало много, создаётся объект-фабрика с методом GetClone(), который возвращает вызов метода, использующего операторы yield. Чтобы чтение было синхронизировано, и никто ничего не потерял, фабрика запоминает своих читателей и делает так, чтобы чтение очередного объекта из исходного IEnumerable осуществлялось только после того, как все читатели получат свою ссылку на экземпляр предыдущего объекта. Достигается это тем, что каждому читателю присваивается два WaitHandle — «я готов читать» и «я прочитал»:

private IEnumerable<T> _input;
private IEnumerator<T> _inputEnumerator;
private Dictionary<string, AutoResetEvent> _imReadyToRead;
private Dictionary<string, AutoResetEvent> _iVeReadMyValue;
private WaitHandle[] _ivAlreadyReadArray;
private T _nextObject;
private bool _hasCurrent;
private bool _hasNext;
...
private void GetNext()
{
    if (!_hasCurrent) return;
    foreach (var ready in _imReadyToRead) ready.Value.Set();
    do
    {
        WaitHandle.WaitAll(_ivAlreadyReadArray);  // ждём всех, пока начитаются
        _hasNext = _inputEnumerator.MoveNext();  // и только потом берём следующее
        _nextObject = _inputEnumerator.Current;
        lock (_imReadyToRead)
        {
            if (!_hasNext) _hasCurrent = false;
            foreach (var ready in _imReadyToRead) ready.Value.Set();   // и снова взводим наши курки. Будем ждать читателей
        }
    } while (_hasNext);
}


Само чтение осуществляется из IEnumerable, который(ое, ая?) возвращается методом GetCloneInstance.

private T GetCurrent(string subscriber)
{
    T toReturn;
    _imReadyToRead[subscriber].WaitOne(); // готов читать - читай!
    toReturn = _nextObject;
    _iVeReadMyValue[subscriber].Set();
    return toReturn;
}

private IEnumerable<T> GetCloneInstance(string key)
{
    T res;
    do
    {
       res = GetCurrent(key);
       yield return res;
    } while (true);
}


Так, казалось был, проблема распараллеливания решена. Но при предполётной подготовке выяснилась одна особенность метода WaitAll(): он поддерживает работу не более чем 64 экземплярами за раз. Но мне нужно 100+ читателей! Как быть? Да, в общем-то, просто. И я стал клонировать клонов. Из каждых 64 «честных» клона, я выбираю жертву, которую буду клонировать в дальнейшем. Для большого числа читателей у меня могу появляться клоны во 2, 3, 4 и т.д. поколении! Как показало покрытие тестами — вполне жизнеспособные твари. Выглядит это примерно так:

private int _maxCloneOfOne = 64; // не более чем
private IEnumerable<T> _input;   // прародитель всех клонов
private Stack<ICloner<T>> cloners;  // объекты-клонеры, способные родить до 64 копий
private Dictionary<Guid, IEnumerable<T>> clones;  // клоны
private Stack<IEnumerable<T>> missMe; // клоны, которых нельзя отдавать читателям - те, которые тоже расклонированы
				
public IEnumerable<T> GetClone() 
{
	if (cloners.Count == 0)
		cloners.Push(new Cloner<T>(_input)); // создаём первого клонера
	
	var isLast = 
		clones.Count > 0 && 
		clones.Count % (_maxCloneOfOne - 1) == 0; // если ожидаемый клон - последний возможный в потомстве клонера

		ICloner<T> cloner;
		var g = Guid.NewGuid();
		IEnumerable<T> result;

		if (!isLast)
		{
			cloner = cloners.Peek(); // если клон не последний - возвратим его
		}
		else
		{
			// вот если последний - будем его клонировать
			var lastCloneForCloner = cloners.Peek().GetClone(); 
			missMe.Push(lastCloneForCloner);
			cloners.Push(cloner = new Cloner<T>(lastCloneForCloner));
			g = Guid.NewGuid();
		}
		result = cloner.GetClone();
		clones.Add(g, result);
		return result;
}


Вот теперь точно всё. Добавил инициализирующий код, проверки на всякие краевые ситуации, обернул старательно в thy-catch. Работает.


Если кому бы то ни было моё решение покажется нужным или хотя бы интересным, добро пожаловать на страничку на google.code. Желающие поучаствовать в улучшении и привинчивании новых фич получат ключи от всех дверей.
Tags:
Hubs:
+18
Comments 29
Comments Comments 29

Articles