Pull to refresh

Эволюция одного алгоритма

Reading time17 min
Views7.9K

Некоторое время назад мой коллега попросил помочь ему с одной проблемой. Проблему я ему решил, но кроме того, мне показалось, что на решении этой проблемы можно объяснить несколько алгоритмов и приёмов программирования. А также показать ускорение времени выполнения алгоритма с 25 сек до 40 мс.


Постановка задачи


Для личного проекта моему коллеге понадобился алгоритм нахождения для заданного видео пятидесяти максимально похожих видео. Похожесть предполагалось оценивать по количеству совпадающих выставленных тегов. Чем больше тегов у видео совпадает, тем более они похожи. Из этого можно сразу сделать несколько выводов:


  • все теги под видео можно объединить в одну группу;
  • таких групп точно будет не больше чем самих видео;
  • если видео похоже на другое видео из какой-то группы тегов, то оно в такой же степени похоже на другие видео из этой группы;

Выходит, что достаточно работать только с группами тегов. В первой версии коллега решил хранить теги в таблице тегов: у каждого видео есть ссылка на ID группы тегов, а сами группы представляют собой последовательность булевых значений, которые показывают выставлен ли соответствующий тег. На C# группа тегов выглядит вот так:


public class TagsGroup {
    bool[] InnerTags { get; }
}

Коллега предполагал, что на сайте у него будет не более одного миллиона видео, а различных тегов не более 4000, для круглого счёта можно взять 4096 = 2^12.
Тогда класс TagsGroup можно представить вот в таком виде:


public class TagsGroup {
    public const int TagsGroupLength = 4096;
    bool[] InnerTags { get; }

    public TagsGroup(bool[] innerTags) {
        if (innerTags == null)
            throw new ArgumentException(nameof(innerTags));
        if (innerTags.Length != TagsGroupLength) {
            throw new ArgumentException(nameof(innerTags));
        }
        InnerTags = innerTags;
    }
}

Теперь необходимо проверить две группы тегов на похожесть. В текущих условиях это превращается в простую проверку на совпадение true в соответствующих элементах массивов InnerTags у двух групп тегов:


public static int MeasureSimilarity(TagsGroup a, TagsGroup b) {
    int result = 0;
    for (int i = 0; i < TagsGroupLength; i++) {
        if (a.InnerTags[i] && a.InnerTags[i] == b.InnerTags[i])
            result++;
    }
    return result;
}

Теперь осталось только подсчитать похожесть нужной группы тегов с каждой существующей группой и выбрать пятьдесят наиболее похожих. Я поставил сам себе ещё условие обеспечить устойчивость выборки, т.е. в итоговой выборке будут пятьдесят групп тегов, у которых MeasureSimilarity выдал наибольший результат и при этом у групп тегов с одинаковым MeasureSimilarity меньший индекс будет у тех, у кого был меньший индекс в исходной существующей группе. Подробнее можно почитать, например, здесь: https://ru.wikipedia.org/wiki/Устойчивая_сортировка.
Для решения этой задачи я решил сделать класс SimilarTagsCalculator, вот его код:


SimilarTagsCalculator
class SimilarTagsCalculator {
    TagsGroup[] Groups { get; }

    public SimilarTagsCalculator(TagsGroup[] groups) {
        if (groups == null)
            throw new ArgumentException(nameof(groups));
        Groups = groups;
    }

    public TagsGroup[] GetFiftyMostSimilarGroups(TagsGroup value) {
        const int resultLength = 50;
//список, где хранятся наиболее похожие на данный момент группы тегов
        var list = new List<TagsSimilarityInfo>(resultLength);
//бежим по всем доступным группам тегов
        for (int groupIndex = 0; groupIndex < Groups.Length; groupIndex++) {
            TagsGroup tagsGroup = Groups[groupIndex];
//измеряем похожесть текущей группы с нужной
            int similarityValue = TagsGroup.MeasureSimilarity(value, tagsGroup);
//создаём инфо-объект сравнения
            TagsSimilarityInfo newInfo = new TagsSimilarityInfo(groupIndex, similarityValue);
//если текущая группа менее похожая, чем наименее похожая из найденных,
            if (list.Count == resultLength && list[resultLength - 1].CompareTo(newInfo) == -1) { 
                continue; //то нет смысла её добавлять
            }
//бинарным поиском ищем место, куда следует вставить инфо-объект сравнения
            int index = ~list.BinarySearch(newInfo);
            list.Insert(index, newInfo); //вставляем
            if (list.Count > resultLength) { 
//если количество элементов превысило нужное,
//то просто выталкиваем последний, т.к. он гарантированно хуже
                list.RemoveAt(resultLength);
            }
        }
        //конвертируем инфо-объекты в результат
        TagsGroup[] result = new TagsGroup[resultLength];
        for (int i = 0; i < resultLength; i++) {
            result[i] = Groups[list[i].Index];
        }
        return result;
    }
}

и структуру TagsSimilarityInfo:


TagsSimilarityInfo
struct TagsSimilarityInfo : IComparable<TagsSimilarityInfo>, IComparable {
    public int Index { get; }

    public int Similarity { get; }

    public TagsSimilarityInfo(int index, int similarity) {
        Index = index;
        Similarity = similarity;
    }

    public bool Equals(TagsSimilarityInfo other) {
        return Index == other.Index && Similarity == other.Similarity;
    }

    public override bool Equals(object obj) {
        return obj is TagsSimilarityInfo other && Equals(other);
    }

    public override int GetHashCode() {
        unchecked {
            return (Index * 397) ^ Similarity;
        }
    }

    public int CompareTo(TagsSimilarityInfo other) {
        int similarityComparison = other.Similarity.CompareTo(Similarity);
        return similarityComparison != 0 ? similarityComparison : Index.CompareTo(other.Index);
    }

    public int CompareTo(object obj) {
        if (ReferenceEquals(null, obj))
            return 1;
        return obj is TagsSimilarityInfo other ? CompareTo(other) : throw new ArgumentException($"Object must be of type {nameof(TagsSimilarityInfo)}");
    }
}

Я подготовил три бенчмарка для этого алгоритма:


  • полностью рандомный бенчмарк, т.е. количество выставленных тегов в группах случайно и группа тегов, с которой будем сравнивать, тоже случайна;
  • количество выставленных тегов в группах идёт по возрастающей, сравнивать будем с группой, в которой все теги выставлены. Получается самыми подходящими должны быть какие-то из последних групп тегов;
  • аналогичное, что и выше, но количество выставленных тегов идёт по убывающей. Самыми подходящими будут первые 50 групп тегов;

Вот результаты бенчмарка для миллиона групп:


BenchmarkDotNet=v0.11.5, OS=Windows 10.0.17134.765 (1803/April2018Update/Redstone4)
Intel Core i7-6700 CPU 3.40GHz (Skylake), 1 CPU, 8 logical and 4 physical cores
Frequency=3328126 Hz, Resolution=300.4694 ns, Timer=TSC
.NET Core SDK=3.0.100-preview5-011568
[Host]: .NET Core 3.0.0-preview5-27626-15 (CoreCLR 4.6.27622.75, CoreFX 4.700.19.22408), 64bit RyuJIT


Method Mean Error StdDev Allocated
RandomTest 25.054 s 0.1786 s 0.1670 s 1.53 KB
AscendantTest 4.180 s 0.0174 s 0.0162 s 1.53 KB
DescendantTest 4.147 s 0.0118 s 0.0104 s 1.53 KB

Разброс времени выполнения очень большой, к тому же 25 секунд — это очень долго, мой коллега не согласен столько ждать. Значит займёмся оптимизациями. Сейчас можно выделить три основных направления для ускорения программы:


  • метод MeasureSimilarity;
  • алгоритм в теле цикла в GetFiftyMostSimilarGroups;
  • сам цикл в GetFiftyMostSimilarGroups;

Будем рассматривать каждое из трёх направлений последовательно.


Предсказание ветвлений


Рассмотрим вначале метод MeasureSimilarity.


public static int MeasureSimilarity(TagsGroup a, TagsGroup b) {
    int result = 0;
    for (int i = 0; i < TagsGroupLength; i++) {
        if (a.InnerTags[i] && a.InnerTags[i] == b.InnerTags[i])
            result++;
    }
    return result;
}

В предыдущем бенчмарке очень большой разброс времени выполнения между рандомным тестом и любым из последовательных. Группы тегов для последовательных тестов создавались по такому принципу:


  • требуемое количество групп делилось на пакеты. Количество пакетов — максимальное количество тегов в группе;
  • для каждой группы в i-м пакете первые i тегов выставлялись;

Получается, что каждая группа тегов в этих тестах состоит из двух подряд идущих частей из выставленных и невыставленных тегов. В MeasureSimilarity есть все предпосылки для того, чтобы в текущих условиях предсказание ветвлений процессором оказывало значительный эффект. Чтобы это проверить достаточно написать бенчмарк, в котором сравнивается время работы MeasureSimilarity для случайных данных и для последовательных:


int GetSimilaritySum(TagsGroup[] tagsGroups) {
    int result = 0;
    foreach (TagsGroup tagsGroup in tagsGroups) {
        result += TagsGroup.MeasureSimilarity(tagsGroup, etalon);
    }
    return result;
}

[Benchmark]
public int Sorted() => GetSimilaritySum(sortedGroups);

[Benchmark]
public int Unsorted() => GetSimilaritySum(unsortedGroups);

Method Mean Error StdDev
Sorted 3.704 s 0.0411 s 0.0364 s
Unsorted 8.211 s 0.0381 s 0.0338 s

Тестировался миллион групп тегов, но в Sorted в каждой группе вначале шло несколько выставленных тегов, а потом невыставленные, а в Unsorted такое же количество выставленных тегов было случайно разбросано по всей группе.
Разница в 5 секунд впечатляет, и с этим надо что-то делать. Чтобы избавиться от влияния предсказания ветвлений и в целом ускорить метод, нужно избавиться от самих ветвлений. В MeasureSimilarity ветвление только одно — проверка на то, что в двух группах выставлены соответствующие теги. Давайте прикинем, в каких случаях условие будет истинным, для этого составим таблицу истинности условия:


a.InnerTags[i] b.InnerTags[i] Result
False False False
False True False
True False False
True True True

Таблица истинности полностью совпадает с логическим "И", т.е. результат истинный тогда и только тогда, когда оба тега истинны, значит условие можно сократить до: if (a.InnerTags[i] && b.InnerTags[i]). Но таким образом условие всё равно остаётся. На следующем этапе сделаем так, чтобы прибавление к result выполнялось всегда, для этого перепишем тело цикла вот так:


int t = a.InnerTags[i] && b.InnerTags[i] ? 1 : 0;
result += t;

Мы всё равно не избавились от условия и на самом деле даже сделали метод медленнее. Но зато теперь стало очевидным, что если тип у InnerTags изменить с bool на byte (1 для true, и 0 для false), то можно избавиться от условия в тернарном операторе. Тогда класс TagsGroup будет выглядеть вот так:


TagsGroup
public class TagsGroup {
    public const int TagsGroupLength = 4096;
    byte[] InnerTags { get; }

    public static int MeasureSimilarity(TagsGroup a, TagsGroup b) {
        int result = 0;
        for (int i = 0; i < TagsGroupLength; i++) {
            int t = a.InnerTags[i] & b.InnerTags[i];
            result += t;
        }
        return result;
    }

    public TagsGroup(bool[] innerTags) {
        if (innerTags == null)
            throw new ArgumentException(nameof(innerTags));
        if (innerTags.Length != TagsGroupLength) {
            throw new ArgumentException(nameof(innerTags));
        }
        InnerTags = new byte[TagsGroupLength];
        for (int i = 0; i < TagsGroupLength; i++) {
            InnerTags[i] = (byte) (innerTags[i] ? 1 : 0);
        }
    }
}

Вот результаты бенчмарка для обновленного MeasureSimilarity:


Method Mean Error StdDev
Sorted 3.180 s 0.0118 s 0.0111 s
Unsorted 3.236 s 0.0622 s 0.0764 s

было:
Method Mean Error StdDev
Sorted 3.704 s 0.0411 s 0.0364 s
Unsorted 8.211 s 0.0381 s 0.0338 s

а вот для обновленного главного бечмарка:


Method Mean Error StdDev Allocated
RandomTest 3.219 s 0.0492 s 0.0436 s 1.53 KB
AscendantTest 3.223 s 0.0117 s 0.0110 s 1.53 KB
DescendantTest 3.422 s 0.0697 s 0.0999 s 1.53 KB

было:
Method Mean Error StdDev Allocated
RandomTest 25.054 s 0.1786 s 0.1670 s 1.53 KB
AscendantTest 4.180 s 0.0174 s 0.0162 s 1.53 KB
DescendantTest 4.147 s 0.0118 s 0.0104 s 1.53 KB

По-моему, уже стало здорово. Для уверенных в том, что всё ускорение произошло только из-за того, что булевый тип был заменён на байт, я запустил бенчмарк для такого тела цикла:


int t = a.InnerTags[i] & b.InnerTags[i];
if (t == 1)
    result += t;

и вот какие получил результаты:


Method Mean Error StdDev
Sorted 3.760 s 0.0746 s 0.1541 s
Unsorted 8.628 s 0.1699 s 0.2382 s

Упаковывание данных


В каждой группе много тегов, и их количество никак нельзя уменьшить. Кроме того, необходимо сравнивать теги с одинаковыми индексами, и нельзя дать окончательный ответ, не проверив все теги. Значит, нам в любом случае придётся итерироваться по всей группе тегов. Было бы здорово суметь как-то распараллелить эту задачу, чтобы можно было за условно одну операцию обрабатывать несколько тегов. Можно сделать это через настоящее распараллеливание, а можно через специальную упаковку данных, чем и воспользуемся. Каждый тег сейчас представляет собой 1 или 0. В result просто накапливается результат операции "И". Но эту же логическую операцию можно применять и не только над однобитными числами. C# позволяет это делать без особых проблем вплоть до 64 битных чисел (можно и больше через BitArray, но это уже не то). Если представить две группы тегов как набор 64 битных чисел с выставленными соответствующими битами, то можно будет проводить операцию "И" над каждой такой группой 64 битных чисел. Непонятно только, что потом делать с результатом. Посмотрим ещё раз на тело цикла:


int t = a.InnerTags[i] & b.InnerTags[i];
result += t;

result увеличивается на 1 каждый раз, когда t == 1 и не изменяется когда t == 0. В итоге result станет равен тому, сколько раз результатом a.InnerTags[i] & b.InnerTags[i] была единица. Соответственно можно было бы сохранить все результаты a.InnerTags[i] & b.InnerTags[i] в какой-нибудь массив, и в result записать только количество единиц в этом массиве. При выполнении операции "И" над больше чем n-битными числами будет n-битный результат и достаточно будет только знать сколько выставленных бит среди n. Количество выставленных бит в числе неизменно, а значит можно предподсчитать эти числа. Предподсчитывать для 64 бит нет смысла, т.к. мы не найдём столько оперативной памяти. Для 32 бит уже можно найти пространство на современных компьютерах, но это по-прежнему очень много. Память под 16 бит найти несложно, но подсчёт будет относительно долгим. В качестве компромисса предподсчитаем для 8 битных чисел:


GenerateCountOfSettedBits
static readonly byte[] CountOfSettedBits = GenerateCountOfSettedBits();

static byte[] GenerateCountOfSettedBits() {
// в result для каждого i хранится сколько битов выставлено в i-м числе.
    byte[] result = new byte[256]; 
// вспомогательный массив, в нём будет представлено число i в виде битов, 
// будем к нему прибавлять единицу в столбик
    int[] b = new int[8];
// перебираем все восьмибитные числа
    for (int i = 1; i < 256; i++) {
//сбрасываем счётчик выставленных битов в текущем числе
        int settedBitsCount = 0;
//остаток, который надо прибавить к следующему биту
        int m = 1;
//бежим по битам
        for (int j = 0; j < 8; j++) {   
//прибавляем остаток к текущему биту
            b[j] += m;
//записываем в остаток, если число в текущем бите превысило 2.
            m = b[j] >> 1;
//записываем в текущий бит число без учёта остатка
            b[j] = b[j] & 1;        
//возможно, увеличиваем количество выставленных битов в текущем числе
            settedBitsCount += b[j];
        }
        result[i] = (byte) settedBitsCount; //записываем в ответ
    }
    return result;
}

теперь конструктор TagsGroup выглядит вот так:


const int BucketSize = 8;

public TagsGroup(bool[] innerTags) {
    if (innerTags == null)
        throw new ArgumentException(nameof(innerTags));
    if (innerTags.Length != TagsGroupLength) {
        throw new ArgumentException(nameof(innerTags));
    }
    int index = 0; //индекс среди исходных тегов
    InnerTags = new byte[TagsGroupLength / BucketSize];
//перебираем внутренние пакеты
    for (int i = 0; i < TagsGroupLength / BucketSize; i++) {
//перебираем теги в пределах пакета
        for (int j = 0; j < BucketSize; j++, index++) {
//увеличиваем текущий проект на 2, чтобы освободить место для нового
            InnerTags[i] <<= 1;
//выставляем бит в пакете
            InnerTags[i] += (byte) (innerTags[index] ? 1 : 0);
        }
    }
}

А MeasureSimilarity стал выглядеть вот так:


public static int MeasureSimilarity(TagsGroup a, TagsGroup b) {
    int result = 0;
    for (int i = 0; i < TagsGroupLength / BucketSize; i++) {
        int t = a.InnerTags[i] & b.InnerTags[i];
        result += CountOfSettedBits[t];
    }
    return result;
}

Можно запустить большой бенчмарк и убедиться, что всё стало лучше:


Method Mean Error StdDev Allocated
RandomTest 560.5 ms 8.285 ms 7.344 ms 1.53 KB
AscendantTest 570.1 ms 4.108 ms 3.431 ms 1.53 KB
DescendantTest 608.1 ms 5.691 ms 5.324 ms 1.53 KB

Можно ли сделать метод MeasureSimilarity ещё быстрее? Конечно! Для этого достаточно осознать тот факт, что регистры общего назначения сейчас в основном 64 битные, а мы гоняем в них восьмибитные данные. Для этого размер пакета, в которые пакуются исходные теги, увеличим до 64 бит и перепишем нужные методы:


const int BucketSize = 64;

ulong[] InnerTags { get; }

public static int MeasureSimilarity(TagsGroup a, TagsGroup b) {
    int result = 0;
    for (int i = 0; i < TagsGroupLength / BucketSize; i++) {
        ulong t = a.InnerTags[i] & b.InnerTags[i];
        for (int j = 0; j < BucketSize / 8; j++) {
            result += CountOfSettedBits[t & 255];
            t >>= 8;
        }
    }
    return result;
}

и получится:


Method Mean Error StdDev Allocated
RandomTest 533.3 ms 4.802 ms 4.492 ms 1.53 KB
AscendantTest 550.9 ms 5.435 ms 5.084 ms 1.53 KB
DescendantTest 567.6 ms 3.879 ms 3.439 ms 1.53 KB

Потом можно развернуть внутренний цикл:


MeasureSimilarity
public static int MeasureSimilarity(TagsGroup a, TagsGroup b) {
    int result = 0;
    for (int i = 0; i < TagsGroupLength / BucketSize; i++) {
        ulong t = a.InnerTags[i] & b.InnerTags[i];
        result += CountOfSettedBits[t & 255];
        t >>= 8;
        result += CountOfSettedBits[t & 255];
        t >>= 8;
        result += CountOfSettedBits[t & 255];
        t >>= 8;
        result += CountOfSettedBits[t & 255];
        t >>= 8;
        result += CountOfSettedBits[t & 255];
        t >>= 8;
        result += CountOfSettedBits[t & 255];
        t >>= 8;
        result += CountOfSettedBits[t & 255];
        t >>= 8;
        result += CountOfSettedBits[t & 255];
    }
    return result;
}

Method Mean Error StdDev Allocated
RandomTest 370.5 ms 2.802 ms 2.484 ms 1.53 KB
AscendantTest 395.8 ms 2.682 ms 2.509 ms 1.53 KB
DescendantTest 419.5 ms 3.352 ms 2.971 ms 1.53 KB

Можно ли ещё быстрее? Да! Если использовать нововведения из .NET Core 3.0. Хоть эта версия всё ещё в превью, но в ней с самого начала есть реализация некоторых интринсиков. В Intel Intrinsic Guide есть интринсик _mm_popcnt_u64. Который согласно описанию: "Count the number of bits set to 1 in unsigned 64-bit integer a, and return that count in dst.". Это же как раз то, чего мы пытаемся добиться! В .NET Core 3.0 Preview 5 этот интринсик реализован в System.Runtime.Intrinsics.X86.Popcnt.X64.PopCount(Как верно заметил в комментариях a-tk перед использованием интринсиков необходимо проверять, что процессор их поддерживает. В данном случае проверять условие System.Runtime.Intrinsics.X86.Popcnt.X64.IsSupported). С его использованием код метода MeasureSimilarity станет таким:


public static int MeasureSimilarity(TagsGroup a, TagsGroup b) {
    int result = 0;
    for (int i = 0; i < TagsGroupLength / BucketSize; i++) {
        ulong t = a.InnerTags[i] & b.InnerTags[i];
        result += (int) System.Runtime.Intrinsics.X86.Popcnt.X64.PopCount(t);
    }
    return result;
}

а время выполнения:


Method Mean Error StdDev Allocated
RandomTest 59.33 ms 1.148 ms 0.9585 ms 1.53 KB
AscendantTest 74.87 ms 1.479 ms 1.9748 ms 1.53 KB
DescendantTest 119.46 ms 2.321 ms 2.8509 ms 1.53 KB

Впечатляет.
Мне неизвестны способы, которые могут значительно ускорить MeasureSimilarity и при этом не сильно испортить читаемость. Думаю, с этим методом можно закончить.


Структуры данных


Разберёмся теперь с телом цикла в методе GetFiftyMostSimilarGroups:


GetFiftyMostSimilarGroups
public TagsGroup[] GetFiftyMostSimilarGroups(TagsGroup value) {
    const int resultLength = 50;
    List<TagsSimilarityInfo> list = new List<TagsSimilarityInfo>(50);
    for (int groupIndex = 0; groupIndex < Groups.Length; groupIndex++) {
        TagsGroup tagsGroup = Groups[groupIndex];
        int similarityValue = TagsGroup.MeasureSimilarity(value, tagsGroup);
        TagsSimilarityInfo newInfo = new TagsSimilarityInfo(groupIndex, similarityValue);
        if (list.Count == resultLength && list[resultLength - 1].CompareTo(newInfo) == -1) {
            continue;
        }
        int index = ~list.BinarySearch(newInfo);
        list.Insert(index, newInfo);
        if (list.Count > resultLength) {
            list.RemoveAt(resultLength);
        }
    }
    TagsGroup[] result = new TagsGroup[resultLength];
    for (int i = 0; i < resultLength; i++) {
        result[i] = Groups[list[i].Index];
    }
    return result;
}

Напомню коротко, что здесь происходит:


  • внутри list хранится отсортированный список пятидесяти наиболее подходящих групп тегов, фактически от меньшего к большему, если сравнивать TagsSimilarityInfo;
  • вставляем новую рассматриваемую группу в list с сохранением сортировки;
  • если элементов в list больше пятидесяти, то наименее похожую группу удаляем (её инфо-объект будет самым большим и будет находиться в самом конце list);

Т.е. получается, что нам надо очень быстро находить самый большой элемент коллекции, уметь быстро вставлять и удалять. Для решения таких задач существуют специальные структуры данных. Первое что приходит на ум — куча. У неё вставка выполняется за O(log N), получение максимума за O(1), удаление элемента за O(log N). Проблема только в том, что по куче нельзя проитерироваться по возрастанию элементов без её модификации. В BCL бинарной кучи нет, поэтому я написал её сам:


BinaryHeap
public class BinaryHeap<T>:IEnumerable<T> where T : IComparable<T> {
    readonly List<T> innerList;

    public BinaryHeap(int capacity) {
        innerList = new List<T>(capacity);
    }

    public int Count => innerList.Count;

    public T Max => innerList[0];

    public void Add(T value) {
        innerList.Add(value);
        int i = Count - 1;
        int parent = (i - 1) >> 1;

        while (i > 0 && innerList[parent].CompareTo(innerList[i]) == -1) {
            Swap(i, parent);

            i = parent;
            parent = (i - 1) >> 1;
        }
    }

    void Swap(int a, int b) {
        T temp = innerList[a];
        innerList[a] = innerList[b];
        innerList[b] = temp;
    }

    void Heapify(int i) {
        for (;;) {
            int leftChild = (i << 1) | 1;
            int rightChild = (i + 1) << 1;
            int largestChild = i;

            if (leftChild < Count && innerList[leftChild].CompareTo(innerList[largestChild]) == 1) {
                largestChild = leftChild;
            }

            if (rightChild < Count && innerList[rightChild].CompareTo(innerList[largestChild]) == 1) {
                largestChild = rightChild;
            }

            if (largestChild == i) {
                break;
            }
            Swap(i, largestChild);
            i = largestChild;
        }
    }

    public void RemoveMax() {
        innerList[0] = innerList[Count - 1];
        innerList.RemoveAt(Count - 1);
        Heapify(0);
    }

    public IEnumerator<T> GetEnumerator() {
        return innerList.GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator() {
        return ((IEnumerable) innerList).GetEnumerator();
    }
}

соответствующую реализацию метода GetFiftyMostSimilarGroups можно будет найти в исходниках статьи (ссылка внизу).
Кроме кучи может подойти бинарное дерево поиска. Сбалансированное бинарное дерево поиска может обеспечить вставку за O(log N), получение максимума за O(log N), удаление элемента за O(log N). Плюс такой структуры в том, что по ней можно итерироваться в возрастающем порядке и, кроме того, красно-чёрное дерево поиска в BCL реализовано внутри SortedSet (в большом фреймворке получение максимума сильно медленнее, чем в .netcore 3.0, и аллоцирует память). Реализацию GetFiftyMostSimilarGroups для SortedSet можно будет найти в исходниках к статье.
Результаты бенчмарков всех трёх реализаций GetFiftyMostSimilarGroups:


Method Алгоритм сортировки Mean Allocated
RandomTest List 60.06 ms 1704 B
RandomTest SortedSet 65.46 ms 24384 B
RandomTest Heap 60.55 ms 2912 B
AscendantTest List 75.42 ms 1704 B
AscendantTest SortedSet 161.12 ms 9833424 B
AscendantTest Heap 86.87 ms 2912 B
DescendantTest List 119.23 ms 880 B
DescendantTest SortedSet 125.03 ms 3024 B
DescendantTest Heap 118.62 ms 2088 B

Исходная реализация с листом выигрывает почти везде по времени, и точно везде по памяти. Происходит это из-за того, что у алгоритма с листом вставка выполняется за O(log N) на поиск, и почти O(1) на вставку, т.к. копирование такого малого количества элементов происходит очень быстро, получение максимума за O(1), удаление элемента тоже за O(1), т.к. в .net удаление последнего элемента из листа заменяется на запись в последний элемент пустого значения (в .net core для структур даже ничего не записывается). Если бы требовалось выдать не 50, а допустим 1000 наиболее похожих групп, то, скорее всего, алгоритм с листом не подошёл бы. На самом деле всё это немножко спекулятивные рассуждения, т.к. можно каждый из алгоритмов ещё поднастроить.


Многопоточность


Теперь осталось попробовать улучшить сам цикл в GetFiftyMostSimilarGroups. На ум приходит только многопоточность. Идея в том, чтобы разбить весь список групп на несколько пакетов. В каждом пакете найти 50 наиболее похожих групп тегов, а потом среди них найти окончательные 50 самых похожих.
Многопоточная версия GetFiftyMostSimilarGroups выглядит вот так:


GetFiftyMostSimilarGroupsMultiThread
public TagsGroup[] GetFiftyMostSimilarGroupsMultiThread(TagsGroup value) {
    const int resultLength = 50;
//количество потоков, соответственно и количество пакетов
    const int threadsCount = 4;
//размер одного пакета
    int bucketSize = Groups.Length / threadsCount;
    var tasks = new Task<List<TagsSimilarityInfo>>[threadsCount];
    for (int i = 0; i < threadsCount; i++) {
        int leftIndex = i * bucketSize;             //начальный индекс в пакете
        int rightIndex = (i + 1) * bucketSize;      //конечный индекс в пакете
//Создаём и стартуем таску
        tasks[i] = Task<List<TagsSimilarityInfo>>.Factory.StartNew(() => GetFiftyMostSimilarGroupsMultiThreadCore(value, leftIndex, rightIndex));
    }
    Task.WaitAll(tasks);
//сюда сохраним результаты тасок
    var taskResults = new List<TagsSimilarityInfo>[threadsCount];
    for (int i = 0; i < threadsCount; i++) {
        taskResults[i] = tasks[i].Result;
    }
//собираем со всех тасок лучший результат
    return MergeTaskResults(resultLength, threadsCount, taskResults);
}

Метод GetFiftyMostSimilarGroupsMultiThreadCore выглядит один в один как обычный GetFiftyMostSimilarGroups:


GetFiftyMostSimilarGroupsMultiThreadCore
List<TagsSimilarityInfo> GetFiftyMostSimilarGroupsMultiThreadCore(TagsGroup value, int leftIndex, int rightIndex) {
    const int resultLength = 50;
    List<TagsSimilarityInfo> list = new List<TagsSimilarityInfo>(resultLength);
    for (int groupIndex = leftIndex; groupIndex < rightIndex; groupIndex++) {
        TagsGroup tagsGroup = Groups[groupIndex];
        int similarityValue = TagsGroup.MeasureSimilarity(value, tagsGroup);
        TagsSimilarityInfo newInfo = new TagsSimilarityInfo(groupIndex, similarityValue);
        if (list.Count == resultLength && list[resultLength - 1].CompareTo(newInfo) == -1) {
            continue;
        }
        int index = ~list.BinarySearch(newInfo);
        list.Insert(index, newInfo);
        if (list.Count > resultLength) {
            list.RemoveAt(resultLength);
        }
    }
    return list;
}

Самое интересное происходит в MergeTaskResults. Необходимо из инфо-объектов групп тегов в taskResults выбрать пятьдесят наиболее подходящих. Код, написанный здесь, очень похож на сортировку слиянием. Только сливаются не два массива, а threadsCount массивов, но основная идея такая же: на каждом этапе просматриваются некоторые элементы из каждого массива, из них выбирается наиболее подходящий, он записывается в ответ, и это всё повторяется:


MergeTaskResults
TagsGroup[] MergeTaskResults(int resultLength, int threadsCount, List<TagsSimilarityInfo>[] taskResults) {
    TagsGroup[] result = new TagsGroup[resultLength];
    int[] indices = new int[threadsCount]; 
    for (int i = 0; i < resultLength; i++) {
        int minIndex = 0; 
        TagsSimilarityInfo currentBest = taskResults[minIndex][indices[minIndex]];
        for (int j = 0; j < threadsCount; j++) {
            var current = taskResults[j][indices[j]];
            if (current.CompareTo(currentBest) == -1) {
                minIndex = j;
                currentBest = taskResults[minIndex][indices[minIndex]];
            }
        }
        int groupIndex = currentBest.Index;
        result[i] = Groups[groupIndex];
        indices[minIndex]++;
    }
    return result;
}

  • В indices хранятся индексы рассматриваемых позиций из taskResults;
  • minIndex — индекс у taskResults, в котором в данный момент находится наиболее подходящий элемент;
  • currentBest — инфо-объект текущей наиболее подходящей сейчас группы тегов;
  • current — инфо-объект рассматриваемой сейчас группы тегов;

И результаты итогового бенчмарка:


Method Mean Error StdDev Allocated
RandomTest 28.76 ms 0.5677 ms 1.414 ms 1.4 KB
AscendantTest 32.36 ms 0.8930 ms 2.591 ms 1.4 KB
DescendantTest 41.36 ms 0.8908 ms 2.626 ms 1.4 KB

А вот что было в начале:
Method Mean Error StdDev Allocated
RandomTest 25054 ms 1786 ms 1670 ms 1.53 KB
AscendantTest 4180 ms 174 ms 162 ms 1.53 KB
DescendantTest 4147 ms 118 ms 104 ms 1.53 KB

И этого удалось добиться всего лишь рассуждениями и поэтапной эволюцией существующего кода. Естественно код неидеален. Совершенно нет обработки ошибок, а половина методов упадёт, если исходные данные не будут кратны 4 или 50. Но, надеюсь, основные идеи понятны.


Весь код можно найти здесь

Tags:
Hubs:
+22
Comments47

Articles