Pull to refresh

Comments 36

Спасибо за статью, было интересно почитать про настройки Akka и особенности "внутренней кухни".
А вы не сравнивали kotlin coroutines с Akka по производительности? По использованию кода с coroutines, кажется, будет даже проще чем с Akka.

Нет не сравнивали, хотя идея хорошая, спасибо!

Если будет чем поделиться по итогам, то было бы интересно =)
И к слову, в coroutines есть и свой aktor (правда без remote акторов и т.п.), с ним можно было бы наиболее близкие варианты сравнить в рамках запуска на одной jvm.

обязательно поделимся:)

корутины — это невытесняющая многозадачность, они локальны.
модель акторов совсем про другое — это про изоляцию и прозрачность размещения, они распределенны.
Сравнивать можно в какой-то мере Akka Streams с Koroutines Flow, но не ядро акки с корутинами — они совсе про разное.

модель акторов напоминает мне нечто среднее между распределенным детерминированным автоматом и направленного ациклического графа вычислений

Очень крутая статья, спасибо! Приятно знать, что Akka способна на такое. Не могу удержаться от того, чтобы не задать несколько технических вопросов:


  1. Используете ли вы in-memory кеши типа Caffeine?
  2. Используете ли вы off-heap memory, чтобы съэкономить на GC?
  3. Используете ли вы пуллы объектов, чтобы экономить на аллокациях?
  4. Достаточно часто Akka тянет за собой переход на Scala. Использование Java — это было какое-то технически обоснованное решение, или просто особенность скиллсета разработчиков?
Спасибо за отзыв
1. Конкретно Caffeine явно не используем. В описании к нему написано, что он используется в Akka и Spring. Если так, тогда используем неявно. А вообще, вместо разрозненных кешей мы используем спапшет данных. Снапшет — это уже преобразованные, предагрегированные данные и разложенные по обычным HashMap для быстрого поиска. Снапшет меняется только несколько раз в день, поэтому там достаточно обычных HashMap. Real-time данные раскладываем в ConcurrentHashMap
2. Нет, напрямую off-heap memory не используем. Для экономии на GC используем пулы объектов, про это напишу ниже. Статья как раз о том, что без сильных ухищрений с GC от Java можно получить хорошую производительность. Лично мое мнение, использование напрямую off-heap memory не через фреймворки и библиотеки очень опасно для проекта, начиная с того, что Unsafe рано или поздно задеприкейтят и заканчивая core дампами в продакшене. Если в проекте много мест где хочется использовать Unsafe, тогда, скорее всего, Java не лучший выбор для проекта, возможно стоит рассмотреть связку С/С++ + JNI или вынести высокопроизводительный код в отдельный процесс на том же С/С++.
3. Да используем. В расчетах с матрицами. Каждая строка матрицы это массив и эти массивы переиспользуются в расчетах. Для кеширования используем очередь из JCTools. Потоки берут преаллоцированные массивы из этой очереди и кладут их обратно после завершения вычислений.
4. Скорее особенность скиллсета разработчиков. На скалу переходить не планировали, связка Akka + Java вполне устраивает.

Немного обобщу свои ответы: в нашем проекте мы особо не экономим на аллокациях небольших объектов и позволяем гарбедж коллектору их собирать, кешируем только массивы. Работаем с данными либо через большой снапшет, либо через небольшие кеши поверх ConcurrentHashMap. Не прибегая к особым ухищрениям мы сохраняем для себя удобство разработки на Java, но жертвуем тем, что обслуживание части запросов все-таки выпадает на GC паузы (не больше 5%) и обслуживаются не за 3-4 ms а за 150-200 ms. Для нас это осознанный tradeoff.

Спасибо за подробные ответы! Пока читал, всплыла еще пара вопросов:


  1. Пробовали ли вы коллекторы ZGC и Shenandoah, оптимизированные под latency? Или все вполне хорошо на G1?
  2. Пробовали ли вы GraalVM?
  1. Shenandoah очень хотим попробовать, но у нас есть ограничения по версии jdk, которую можем использовать в продакшене, мы пока на 11, как только появится возможность перейти на 14 Java или выше, обязательно попробуем.
    Zgc нет не пробовали.
  2. С GraalVm, в бесплатной версии не оптимально сделано выполнение simd вычислений, что для нас важно, а платную версию не было возможности протестировать.
хорошие новости, спасибо)

Кстати, если вам интересна тема ухищрений на Java для высокопроизводительных вычислений, то возможно будет интересен доклад https://youtu.be/QV-ue1YMdds

Да, интересуюсь этой темой. Спасибо за видео!

Если я правильно понимаю (раздел "Что мы считаем"), то задача выглядит так:
выполнить вычисления ("расчётное ядро") длительностью 1.5мс 6 раз независимо друг от друга.
Для параллельного вычисления изначально и задействовали akka. Так?
Или есть какие-то другие веские причины, чтобы использовать akka?


Пробовали ForkJoinPool использовать?

>> 6 раз независимо друг от друга
Да, правильно, расчеты независимые. 1,5 ms на один расчет. И мы, конечно же, выполняем эти расчеты параллельно.
>> Для параллельного вычисления изначально и задействовали akka. Так?
Сами расчеты, два вложенных цикла, как раз то место, где akka не используется. Итерации по циклам разбиваются на батчи и обсчитываются параллельно на ForkJoinPool. На выходе получается 0,3 ms вместо 1,5. Почему не akka? Это как раз тот кейс, где акка не очень удобна. Нужно через сообщения выстраивать машину состояний и реагировать в зависимости от посчитанных значений, ForkJoinPool в этом месте удобнее.
>> Или есть какие-то другие веские причины, чтобы использовать akka?
Не очень понял, вопрос относится непосредственно к расчетам или к проекту в целом? Про расчеты написал выше, а про проект, почему акка написано в разделе «Преимущества акторов»

Akka на Java навевает уныние и скуку. Лучше уж сразу начать использовать Scala, даже если только для операций с акторами.

Да, синтаксис на Java будет не такой читаемый, как на Akka, но даже на Java Akka выглядит и работает лучше, чем большинство ad-hoc самопала. :-)

На мой взгляд акторы переусложненная концепция в Clojure есть Agent это по сути то же самое только просто, и эффективно.

Каким образом агента можно горизонтально масштабировать?

Если я вас правильно понял то вы о распределении на несколько узлов.
В статье такая задача не ставится, все работает в рамках одной виртуальной машины.
А если нужно распределить то это уже давно реализовано через систему очередей например.
p.s.
Не знаю что там сейчас но раньше у Akka распределенные вычисления на актерах были на словах а не на деле. Т.к. нужно было подключить руками ту саму очередь или что-то иное.

Как минимум с 2013-го заявляемая location transparency была, ничего переключать не надо.
Я не знаком с Clojure, но мне сложно представить конецепцию проще актора для параллельных вычислений — сообщение на входе — сообщение на выходе:)

Не хватает примера вызывающего актора чтоб понять как выглядит код обработки результата (синхронно/асинхронно?) и насколько это читаемее чем реактивность от которой отказались, как я понял, именно в угоду читаемости.

Я специально в статье привел пример CalculationActor, т.к. в нем есть код по обработке входящих сообщений в createReceive() и onCalculate, и отправки результата в том же onCalculate методом sender.tell.
Код клиента можно посмотреть здесь — в нем отличие в методе preStart, где планируется отправка сообщений каждые 100 ms (эмулирется нагрузка и непрерывная работа).

>> реактивность от которой отказались, как я понял, именно в угоду читаемости.
я бы сказал, что «не перешли», а не «отказались».
Спасибо за статью! После прочтения возникла пара вопросов:

1) Ваши процессы с акторами на проде работают в контейнерах / виртуалках / или непосредственно на железных серверах?
2) Какая в среднем (или медианная) утилизация хипа — то есть из 20gb сколько это живые объекты?
1) На виртуалках, планируем переезд на реальное железо
2) Где-то 16GB живых объектов, в основном это статические данные для расчетов.

Если уточните для каких целей спрашиваете, может быть смогу более развернуто ответить
1) В этом пункте интересует есть ли у вас выигрыш в латенси на железных серверах — если уже потестировали.
2) Вы упоминали что 5% запросов выходит из 4ms скорее всего из-за gc — может быть добавить хипа и сделать что нибудь типа 2x — в вашем случае 2*16 — дать коллектору «оперативный простор» — или вы пробовали и стало хуже или не повлияло?
1) Еще нет такой информации, но вообще гипервизор должен давать не существенный оверхед, проблема скорее в том, что железо шарится между несколькими виртуалками
2) В 2 раза не пробовали увеличить и возможности такой пока нет, но мы исходим из того, что при текущем allocation rate = 10 MB/sec, свободных 4-х GB должно хватать. Мы возлагаем надежды на Shenandoah, возможно поможет, если из коробки не поможет, будем тюнить его как сможем

Давно пользуюсь, правда на Scala (код в разы читабельнее) и Typed Actors (+ Streams).


За исключением графа из CompletableFuture на ForkJoinPool (как poor-man async computation graph), подход практически уникален тем, что даже начинающий программист скорее всего сделает все правильно (или не очень неправильно). C минимальными усилиями code review легко не допускать попадания concurrency в бизнес-логику.


Для многих прикладных задач можно пойти еще дальше и использовать сразу Akka Streams (которые на самом деле Graphs). Оно написано поверх Actors, но очень грамотно, весь Stream/Graph по-умолчанию "спаян" в один синхронный блок и имеет очень низкий runtime overhead. Поставляется с кучей полезного DSL, включая код для работы с сетью, базами данных и т.д.


пример Akka Streams (охлаждение высокочастотных котировок)
// фрагмент кода инициализации Typed Actor, который получает
// высокочастотный поток котировок, "охлаждает" его через
// conflating Akka Stream, и потом скармливает в свой Actor Inbox
// уже для "осмысленной" обработки

// специальный вид материализатора, который привяжет stream к lifetime Actor-а
// может быть полезно для гигиены
implicit val mat = ActorMaterializer.boundToActor(ctx) 

// Stream высокочастотных котировок извне (сокет, файл, 1000 недорогих тестировщиков из азии, и т.п.), точнее еще не сам Stream а только описание как его создавать
val rawPricesDef: Source[PriceTick] = ... 

// Центральная часть, "охладитель", попробуйте написать с нуля с такими же
// характеристиками модульности и тестопригодности
// опять же, инструкции по сборке, а не живой объект
val conflatingShaperDef = Flow[PriceTick]
  // conflate into something like map[symbol, map[tag, value]]
  .conflateWithSeed[PriceBufType](mkSeedBuf)(inflateFeedBuf)
  .throttle(msgPerSec, per = 1.second)
  // unpack map[symbol, map[tag, value]] in to a stream of (symbol, list[PriceTick])
  .mapConcat(deflateFeedBuf)

// весь stream, собранный вместе (опять инструкции)
val conflatedPricesDef = rawPrices
  .via(conflatingShaperDef)
  .to(ActorSink.actorRef(ctx.self))

// наконец, живой объект и побочный эффект создания в mv (описано в документации)
val mv = conflatedPricesDef.run()

Правильно написанный (и примененный) Actor — это как раз конечный автомат (а вы его обозвали недостатком!). Автоматы возможно сделать надежными и протестировать пошагово каждый переход в графе состояний. Это особенно важно если у вас path-dependent (не знаю, как перевести) логика.


В Typed Actors они наконец сдались — там требуется явно возвращать новое состояние после обработки каждого сообщения (и всякие удобные слова в DSL: same, stopped, etc.). Также исправили много чего еще (в основном убрав некоторые automagic-s).


пример конечного автомата на Akka Typed
final case class Response(id: RequestId)
sealed trait Protocol
final case class Request(id: RequestId, replyTo: ActorRef[Response]) extends Protocol

// шаблон для изготовления Actor-а
val actorDef: Behavior[Protocol] = setup { ctx => // создать с живым контекстом
  withTimers { sched => // добавить поддержку таймеров (with lifetime of this Actor)
    // основное "хорошее" состояние, оформлено в виде функции
    // которая возвращает шаблон поведения
    def ONLINE(data: InitialData): Behavior[Protocol] = receiveMessagePartial[Protocol] {
      case Request(requestId) =>
        replyTo ! Response(requestId)
        same // удобняшка из DSL - инструкция остаться в том же состоянии
    }

    // еще одно сообщение из протокола, "секретное"
    final case class InitialLoadTried(outcome: Try[InitialData])

    // загрузить что-то, необходимое для работы
    // асинхронно на blockingExecutor чтобы не блокировать основной Dispatcher
    ctx.pipeToSelf(loadInitialsFromDbAsync())(InitialLoadTried)(blockingExecutor)

    // начальное состояние, оформлено в виде переменной содержащей шаблон поведения
    val EMPTY = receiveMessagePartial[Protocol] {
      case InitialLoadTried(Success(data)) =>
        ONLINE(data) // все хорошо загрузилось, go ONLINE

      case InitialLoadTried(Failure(cause)) =>
        throw new RuntimeException("initial load failed", cause) // о нет! аварийное завершение
    }

    EMPTY // вернуть начальное состояние
  }
}

// экземпляр Actor-а, создаётся в контексте родителя
val actorRef = ctx.spawn(actorDef, "ActorName", ...)
ctx.watch(actorRef) // получить сообщение родителю когда child завершится

Из неудобств — все еще очень много поведения по-умолчанию и настроек сделанных в предположении одной JVM на сервер. Всё надо перепроверять и обвешивать телеметрией в runtime, что кстати не совсем тривиально сделать в бесплатной поставке как aspect-oriented срез. Но все равно на порядок проще, чем исправлять детские поделки из mutex/condition/synchronized и ConcurrentHashMap-s разбросанными ad-hoc.

Спасибо большое за примеры! Очень полезное дополнение.
>> а вы его обозвали недостатком!
я скорее про сравнение с синхронным кодом:
res1 = calc1()
res2 = calc2()
sendResult(res1 + res2)

В случае если переписать на сообщения, то код станет чуть размазаннее, хотя для вашего примера согласен, что акторы удобнее.

Я правильно понимаю, что в вашем коде акторы создаются динамически, на разные состояния (во втором примере)? Существенный ли оверхед на это? (мы стараемся акторы динамически не создавать и кешировать их)

Код типа


receive {
  case SomeMsg(a, b, c) =>
  ...
  AnotherBehavior
}

будет иметь такой же overhead как и context.become(...) в Classic Actor.


Это ни в коем случае не динамический актор а просто (в худшем случае) allocation of a lambda closure и push в стек состояний актора. Ни разу не попалось при profiling ни по CPU ни по памяти, ни по GC. Абсолютно точно тянет автомат биржевого приказа (правда не HFT) не вспотев ни чуточки, но правда для котировок и динамической подписки-отписки я даже не стал пытаться, и просто написал маленький автоматик на enums. Хотя с правильным выбором inbox + dispatcher могло бы и взлететь, наверное — надо мерить и смотреть.


Забыл сказать в первом комментарии — спасибо что поделились своим опытом использования в реальном проекте, было очень интересно узнать и сравнить.

Вам тоже спасибо за дополнения и свой опыт
Спасибо за статью! А как вы делаете fault-tolerence у системы? Из статьи кажется следует, что это забота клиента, почему бы тогда не поднять несколько экземпляров сервиса и не пусть трафик через балансировщик?
>> почему бы тогда не поднять несколько экземпляров сервиса и не пусть трафик через балансировщик
мы примерно так и делаем, есть несколько одинаковых расчетных сервисов, на уровне нашего клиента (джарника с grpc) принимается решение, к какому инстансу подключиться
Sign up to leave a comment.