Pull to refresh
93.34
Naumen
Мечтай. Создавай. Меняй мир к лучшему

Реактивный мессенджер, или CQRS и ES вместе с Akka и Scala

Reading time21 min
Views23K
В последнее время мы часто слышим о реактивном программировании и видим различные баззворды: message-driven архитектура, event-sourcing, CQRS. К сожалению, на Хабре об этом пишут довольно мало, поэтому я решил исправить ситуацию и поделиться своими знаниями со всеми желающими.

В этой статье мы узнаем об основных особенностях реактивных приложений, рассмотрим, как паттерны CQRS и EventSourcing помогут нам в их создании, а чтобы не было скучно, мы с вами шаг за шагом сделаем свой мессенджер с вебсокетом и акторами, соответствующий всем канонам реактивного программирования. Для реализации всего этого добра, мы будем использовать замечательный язык Scala вместе с не менее превосходной библиотекой Akkа, реализующей модель акторов. Еще, мы будем использовать Play Framework для написания веб-составляющей нашего приложения. Итак, приступим.

Статья предназначена для тех, кто уже знаком со Scala и слышал о модели акторов. Все остальные тоже приглашаются к прочтению, принципы реактивного программирования можно применять вне зависимости от языка и фреймворка.

Что такое реактивное программирование


Идея реактивного программирования описана в реактивном манифесте www.reactivemanifesto.org. Перевод его первой версии уже был на Хабре, а вторая версия незначительно отличается от первой. Давайте рассмотрим краткую вырезку из второй версии. Реактивный манифест гласит, что реактивные приложения имеют несколько важных свойств:

Отзывчивость


Приложение реагирует настолько быстро, насколько это возможно. Отзывчивость — это основа юзабилити и полезности, по той простой причине, что долгие задержки интерфейса не добавляют желания им пользоваться, а так же, отзывчивость означает что проблемы могут быть быстро обнаружены и эффективно решены. Отзывчивые системы фокусируются на предоставлении быстрых и постоянных по времени ожидания ответов, использовании подходящих верхних временных границ для обеспечения стабильного качества обслуживания. Данное постоянное и предсказуемое поведение, в свою очередь упрощает обработку ошибок, укрепляет доверие пользователей, и призывает их к дальнейшему взаимодействию.

Отказоустойчивость


Приложение остается отзывчивым при возникновении сбоя. Это применимо не только к высокодоступным, критически важным системам — любая отказо-неустойчивая система не будет отзывчивой в случае отказа. Устойчивость достигается за счет репликации, локализации, изоляции и делегации. Отказы не выходят за пределы модуля, и посредством изолирования модулей друг от друга можно быть уверенным в том что отдельные части приложения могут отказать и восстановится после сбоя, при этом не приводя к падению всего приложения. Восстановление каждого отказавшего модуля делегируется другому, внешнему модулю, а высокая доступность достигается посредством репликации. Клиенты модуля не имеют головной боли с обработкой отказа модуля.

Эластичность


Приложение остается отзывчивым под различной нагрузкой. Реактивные приложения могут реагировать на изменения нагрузки через увеличение или уменьшение ресурсов предназначенных для ее обработки. Это предполагает архитектуру, не имеющую точек блокировок или центральных узких мест, что выражается в способности к шардингу и репликации модулей, и дальнейшим распределением нагрузки между ними. Результатом этого является масштабируемость при помощи дешевого общеиспользуемого железа (привет, Гугл!).

Ориентированность на передачу сообщений


Реактивные приложения ориентируются на асинхронную передачу сообщений для установки границ между модулями которые обеспечивают слабую связанность, изоляцию, прозрачность местоположения, и предоставляет средства для делегирования ошибок как сообщений. Введение явной передачи сообщений дает возможности для балансировки нагрузки, эластичности и управления потоками данных посредством формирования и мониторинга очереди сообщений, и снижения пропускной способности, когда необходимо. Прозрачность местоположения позволяет использовать одну и ту же логику обработки ошибок как на кластере, так и на одном узле. Неблокирующие коммуникации позволяют получателям сообщений потреблять ресурсы только тогда, когда они активны, что ведет к меньшему оверхеду при работе приложения.



CQRS


CQRS расшифровывается как Command Query Responsibility Segregation (разделение ответственности на команды и запросы). Данный подход к построению архитектуры приложения, в отличие от широко используемого CRUD (Create Retrieve Update Delete) подразумевает то, что возможно использовать разные модели для обновления и чтения информации. Возникает закономерный вопрос, для чего же нам такие извращения? Дело в том, что исходя из того, что модель чтения, и модель записи разделены, мы можем оптимизировать их для этих задач. Например, если для задач чтения лучше подходит денормализация данных, то никто не мешает нам это сделать. Удобнее читать если данные в графовой базе данных — пожалуйста. Хочется хранить все в Key-Value хранилище — да ради бога. Более того, если вы хотите добавить новые фичи в read model, то все что вам нужно после их добавления — перегенерировать модель (стоит сделать оговорку, что если у нас событий на многие гигабайты, то этот процесс не будет таким быстрым, однако мы всегда можем сделать снапшот, который существенно увеличит скорость восстановления).

В принципе, над нормализацией Read-модели можно вообще не заморачиваться, по этой же самой причине. Используя CQRS для оптимизации операций чтения в нашем приложении, мы обеспечиваем отзывчивость нашего приложения. Что нам еще осталось для того чтобы наше приложение было по настоящему реактивным? Правильно, эластичность и отказоустойчивость. Эти черты мы реализуем с помощью паттерна Event Sourcing.



Event Sourcing


Смысл ES в том, что мы храним не текущее состояние нашей модели данных, а всю историю изменений, которые меняют состояние нашего приложения (на самом деле не все изменения, а только те что имеют для нас значение). Для получения текущего состояния мы просто суммируем изменений со всех существующих событий. Что мы понимаем под событием, и чем событие отличается от команды? Команда обозначает то, что кто-то хочет от нас, к тому же ее можно проигнорировать. Событие — это что-то произошедшее, неизменяемый факт.

Преимущество данного подхода в том, что мы никогда ничего не удаляем и не изменяем. Как вы уже догадались, этот дает нам широкие возможности для масштабирования нашего приложения, и в качестве базы данных мы можем использовать хорошо зарекомендовавшие себя NoSQL решения такие как Cassandra или HBase. EventSourcing нам дает отказоустойчивость и эластичность.

Хватит разговоров, покажи нам код


Итак, как было сказано ранее, реализовывать все это дело мы будем с использованием Typesafe stack.

Архитектура нашего приложения будет выглядеть следующим образом:



У пользователя есть возможность читать и отправлять сообщения. Отсылка и принятие сообщений происходит через вебсокет, доступ к которому есть у актора UserConnection. Данный актор отправляет сообщения актору RoomWriter, который помимо записи сообщений в журнал занимается пинанием актора RoomReader, считывающего сообщения из журнала и отправляющего их в обратно актору UserConnection. Кроме всего этого, у нас есть актор Receptionist, который занимается выдачей имен и следит за тем чтобы в приложении не было пользователей с двумя одинаковыми именами. C архитектурой более-менее разобрались, теперь начнем писать код.

RoomWriter


Самым первым мы реализуем тот актор, который занимается записью входящих сообщений в журнал.

Код класса RoomWriter
class RoomWriter(roomLogId: String) extends PersistentActor {

  import RoomWriter._

  override def persistenceId = roomLogId

  val listeners = mutable.Set.empty[ActorRef]

  def receiveRecover = Actor.emptyBehavior

  def receiveCommand = {
    case msg: Message =>
      persistAsync(msg) { _ =>
        listeners foreach (_ ! Update)
      }

    case Listen(ref) => listeners add context.watch(ref)

    case Terminated(ref) => listeners remove ref
  }
}

Что же тут такое написано? Как можно догадаться, мы объявили класс RoomWriter, который имеет три части:
  • идентификатор persistenceId, который необходим для однозначной идентификации событий, которые были произведены данным актором;
  • множество listeners, содержащее набор ссылок на акторы, которые должны получать уведомление о том что в журнале что-то изменилось;
  • два метода, receiveRecover, который вызывается при реплее сообщений из журнала которое происходит при создании актора, и receiveCommand, который используется для обработки сообщений во время нормального функционирования.

Рассмотрим метод receiveCommand чуть подробнее. Данный метод обрабатывает три разных сообщения:
  • при получении сообщения типа Message, происходит асинхронная запись его в журнал, и каждому listener-у отправляется сообщение о том что журнал был обновлен.
  • при получении Listen, мы начинаем следить за жизненным циклом актора, ссылка на который лежит в сообщении, ко всему прочему, ссылка на актор добавляется в множество listener-ов
  • сообщение Terminated содержащее ссылку на умерший актор мы получим если актор за жизненным циклом которого мы следим, вдруг умрет. Если такое происходит (пользователь закрыл браузер), то мы убираем данный актор из списка рассылки.

Правилом хорошего тона считается объявление всех обрабатываемых сообщений и фабричного метода для создания актора в объекте-компаньоне:

Код объекта-компаньона класса RoomWriter
object RoomWriter {

  case class Listen(ref: ActorRef)

  case class Message(author: String, content: String, time: Long)

  case object Update

  def props(roomId: String) = Props(new RoomWriter(roomId))
}

С RoomWriter-ом мы разобрались, теперь самое время взглянуть на актор RoomReader, который получает обновления из журнала, и отсылает их по иерархии выше.

RoomReader


Класс RoomReader
class RoomReader(roomLogId: String, roomWriter: ActorRef, userConnection: ActorRef) extends PersistentView {
  import RoomWriter._

  roomWriter ! Listen(self)

  override def persistenceId = roomLogId

  override def viewId = roomLogId + "-view"

  def receive = {
    case msg @ Message(_, _,sendingTime) if currentTime - sendingTime < tenMinutes =>
      userConnection ! msg
    case msg: Message =>
    case Update => self ! akka.persistence.Update()
  }
}

RoomReader зависит от идентификатора журнала, в зависимости от которого он будет получать его обновления. В нашем случае, этот идентификатор будет совпадать, с идентификатором актора RoomWriter, что будет означать что все что RoomWriter пишет в журнал, будет приходить в RoomReader. Рассмотрим как происходит обработка сообщений:
  • при получении сообщения Message, проверяется время его отправки, и если сообщение старше десяти минут, то оно не будет показано пользователю. Это сделано для того, чтобы к пользователю не приходили тысячи ранее накопленных сообщений.
  • при получении Update, актор читает журнал, и отправляет считанные сообщения пользователю.

Как и в предыдущем случае, наш объект-компаньон:
Код объекта-компаньона класса RoomReader
object RoomReader {

  def currentTime = System.currentTimeMillis()

  val tenMinutes = Duration(10, MINUTES).toMillis

  def props(roomLogId: String, roomWriter: ActorRef, userConnection: ActorRef) = Props(
    new RoomReader(roomLogId, roomWriter, userConnection)
  )
}


Переходим к самому интересному, актору UserConnection, который отвечает за обработку сообщений из вебсокета.

UserConnection


Код класса UserConnection
class UserConnection(receptionist: ActorRef, roomWriter: ActorRef, out: ActorRef, roomLogId: String) extends Actor {
  import actors.UserConnection._

  def receive = waitingForUsername
  
  def waitingForUsername: Receive = {
    case WebSocketInMsg(RegisterMeWithName, username) => receptionist ! UsernameRequest(username)
    case Ack(username) =>
      context become readyToChat(username)
      context actorOf RoomReader.props(roomLogId, roomWriter, self)
      out ! WebSocketOutMsg(currentTime, "system", "welcome")
    case NAck => out ! WebSocketOutMsg(currentTime, "system", "taken")
  }

  def readyToChat(username: String): Receive = {
    case WebSocketInMsg(SendMessage, message) => roomWriter ! Message(username, message, currentMillis)
    case Message(author, content, time) => out ! WebSocketOutMsg(formatTime(time), author, content)
  }
}

У данного актора есть одна особенность, отличающего его от других: он может менять свое поведение и состояние. Изначально, он находится в состоянии ожидания получения имени пользователя. В этом состоянии он может принимать запросы клиента на получение имени, и пересылать их актору отвечающему за выдачу имен. При успешном получении имени, актор переходит в состояние готовности к чату, и начинает пересылать сообщения между частями системы.

Объект-компаньон на этот раз получился весьма большим:

Код объекта-компаньона класса UserConnection
object UserConnection {
  def props(receptionist: ActorRef, roomWriter: ActorRef, out: ActorRef, roomLogId: String) = Props(
    new UserConnection(receptionist, roomWriter, out, roomLogId)
  )

  case class WebSocketInMsg(messageType: Int, messageText: String)
  case class WebSocketOutMsg(time: String, from: String, messageText: String)

  case class UsernameRequest(name: String)
  case class Ack(username: String)
  case object NAck

  val RegisterMeWithName = 0
  val SendMessage = 1

  val formatter = DateTimeFormat.forPattern("HH:mm:ss").withLocale(Locale.US)
  def currentTime = DateTime.now().toString(formatter)
  def currentMillis = System.currentTimeMillis()
  def formatTime(timeStamp: Long) = new DateTime(timeStamp).toString(formatter)
}


Последний актор, который будет удостоен нашего внимания, это Receptionist.

Receptionist


Код класса Receptionist
class Receptionist extends Actor {

  var takenNames = mutable.Map("system" -> self)

  def receive = {
    case UsernameRequest(username) =>
      if (takenNames contains username) {
        sender() ! NAck
      } else {
        takenNames += (username -> context.watch(sender()))
        sender() ! Ack(username)
      }
    case Terminated(ref) => takenNames collectFirst {
      case (name, actor) if actor == ref => name
    } foreach takenNames.remove
  }
}

В его задачи входит выдача имен пользователям: он содержит ассоциативный массив, отображающий имена на actorRef-ы. Так же как и в RoomWriter, мы следим за жизненным циклом акторов которым мы выдали имена, и в случае их смерти удаляем их имена из списка зарегистрированных имен.

Не забываем про объект-компаньон, выносим туда фабричный метод для создания актора:

Код объекта-компаньона класса Receptionist
object Receptionist {
  def props() = Props[Receptionist]
}

Контроллер


На данный момент мы закончили со всеми акторами, которые были у нас планах реализации. Теперь рассмотрим то, каким образом нам связать вебсокет и актор. Для этого, мы воспользуемся теми средствами, которые нам может предложить play framework. Реализуем контроллер нашего приложения следующим образом:

Код контроллера
object Application extends Controller {

  val logId = "akka-is-awesome"
  val roomWriter = Akka.system.actorOf(RoomWriter.props(logId), "writer")
  val receptionist = Akka.system.actorOf(Receptionist.props(), "receptionist")

  def index = Action { implicit request =>
    Ok(views.html.chat())
  }

  implicit val InMsgFormat = Json.format[WebSocketInMsg]
  implicit val InMsgFrameFormatter = FrameFormatter.jsonFrame[WebSocketInMsg]
  
  implicit val OutMsgFormat = Json.format[WebSocketOutMsg]
  implicit val OutMsgFrameFormatter = FrameFormatter.jsonFrame[WebSocketOutMsg]

  def socket = WebSocket.acceptWithActor[WebSocketInMsg, WebSocketOutMsg] { request => out =>
    UserConnection.props(receptionist, roomWriter, out, logId)
  }
}

Сначала, мы создаем два актора: roomWriter и receptionist. Они являются зависимостями для актора UserConnection. Далее, мы описываем то, как форматировать сообщения для передачи их через вебоскет. Наконец, мы описываем, каким образом мы обрабатываем входящие подключения к вебсокету. Встроенный в Play Framework хелпер позволяет делать это невероятно просто.

Настало время создания веб-интерфейса. Для верстки будем использовать фреймворк twitter bootstrap, а angular.js — для реализации бизнес-логики на клиенте.

Клиентская часть кода
angular.module('chatApp', [])
    .controller('ChatCtrl', ['$scope', function($scope) {
 
        var wsUri = "ws://"+window.location.host+"/ws";
        var websocket = new WebSocket(wsUri);
 
 
        $scope.name = "";
        $scope.messages = [];
        $scope.registered = false;
        $scope.taken = false;
        $scope.sendMessage = function () {
            websocket.send(angular.toJson({
                "messageType": 1,
                "messageText":$scope.messageText
            }));
            $scope.messageText = "";
        };
        $scope.sendName = function () {
            if (!$scope.registered) {
                websocket.send(angular.toJson({
                    "messageType": 0,
                    "messageText": $scope.name
                }));
            }
        };
 
        websocket.onmessage = function (e) {
            var msg = angular.fromJson(e.data);
            console.log(e.data);
            if (!$scope.registered) {
                switch (msg.from) {
                    case "system":
                        handleSystemMsg(msg.messageText);
                        break;
                }
            } else {
                $scope.messages.push(msg);
                $scope.$apply();
                var chatWindow = $("#chat-window");
                chatWindow.scrollTop(chatWindow[0].scrollHeight);
            }
        };
 
        function handleSystemMsg(msg) {
            switch (msg) {
                case "welcome":
                    $scope.registered = true;
                    break;
                case "taken":
                    $scope.taken = true;
                    break;
            }
        }
    }]);

Как будет выглядеть наша html-страница:

Html-код приложения
<!DOCTYPE html>
<html ng-app="chatApp">
    <head>
        <meta charset="utf-8">
        <meta http-equiv="X-UA-Compatible" content="IE=edge">
        <meta name="viewport" content="width=device-width, initial-scale=1">
        <meta name="description" content="">
        <meta name="author" content="">
 
        <title>Akka WebSocket Chat</title>
            <!-- Bootstrap core CSS -->
        <link href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.1/css/bootstrap.min.css" rel="stylesheet">
        <script src="https://ajax.googleapis.com/ajax/libs/angularjs/1.3.5/angular.min.js"></script>
            <!-- Custom styles for this template -->
        <link href="@routes.Assets.at("stylesheets/main.css")" rel="stylesheet">
        <script src="@routes.Assets.at("javascripts/chatApp.js")"></script>
    </head>
 
    <body>
        <div ng-controller="ChatCtrl">
 
            <nav class="navbar navbar-inverse navbar-fixed-top" role="navigation">
                <div class="container">
                    <div class="navbar-header">
                        <a class="navbar-brand" href="#">Reactive Messenger</a>
                    </div>
                    <form class="navbar-form navbar-left" ng-submit="sendName()" ng-show="!registered">
                        <div class="form-group">
                            <input type="text" class="form-control" ng-model="name" placeholder="Username" required>
                        </div>
                        <button type="submit" class="btn btn-default">Set name</button>
                    </form>
                </div>
            </nav>
 
            <div class="container" >
                <div class="chat col-lg-6">
                    <div id="chat-window">
                        <ul class="list-group">
                            <li class="list-group-item" ng-repeat="message in messages">
                                <span class="label label-info">{{message.time}}</span>
                                <span class="label label-default">{{message.from}}</span> {{message.messageText}}
                            </li>
                        </ul>
                    </div>
                    <form ng-submit="sendMessage()">
                        <div>
                            <div class="input-group">
                                <input type="text" ng-model="messageText" class="form-control" required>
                                <span class="input-group-btn">
                                    <button class="btn btn-default" type="submit">
                                         Send<span class="glyphicon glyphicon-send" aria-hidden="true"></span>
                                    </button>
                                </span>
                            </div> <!-- /input-group -->
                        </div> <!-- /.col-lg-6 -->
                    </form>
                </div>
 
            </div> <!-- /.container -->
        </div>
 
 
        <!-- Bootstrap core JavaScript
    ================================================== -->
        <!-- Placed at the end of the document so the pages load faster -->
        <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.1/jquery.min.js"></script>
        <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.1/js/bootstrap.min.js"></script>
    </body>
</html>



Scaling out


У нас есть прототип приложения, однако прежде чем выкатывать его в продакшн, нам стоит немного его прокачать. Прокачивать мы его будем следующим образом:
  • Заменим наш эрзац-журнал на что-то по-настоящему хорошее. В данном случае мы возьмем Cassandra, и будем использовать ее для хранения ивентов.
  • Дефолтная Java-сериализация не отличается как стабильностью при изменении формата сообщений так и скоростью при их сериализации. Стоит заменить ее на Google Protobuf или Kryo. В данном случае мы воспользуемся Protobuf-ом.
  • Пользователи нашего мессенджера хотят оставаться в курсе последних новостей, и не хотят читать сообщения старше получаса. Для этого мы изменим логику работы наших акторов, и будем создавать snapshot каждые полчаса, благодаря чему нам не придется восстанавливать всю историю сообщений каждый раз при подключении пользователя.
  • Для того, чтобы приложение могло обработать большое количество пользователей, стоит сделать его распределенным.

При запуске приложения на нескольких серверах его архитектура изменится весьма незначительно. Благодаря тому, что акторы в Akka обладают свойством location transparency, мы можем безболезненно растащить наше приложение на несколько серверов. Более того, наши акторы даже не будут догадываться что теперь они разделены и работают на разных серверах общаясь по сети. Все что нам нужно, это дописать немного кода, и Akka сделает за нас всю остальную работу.
Забегу вперед и предоставлю картинку того, как будет выглядеть наше приложение после всех доработок. В целом, архитектура претерпит незначительные изменения, однако идея останется прежней.



Для использовании cassandra в качестве журнала, нам необходимо
  1. установить cassandra на ноды,
  2. воспользоваться плагином для того чтобы журнал хранился в cassandra.

Первый пункт хорошо описан в официальном мануале, так что особого смысла приводить его тут нет. Стоит заметить только то, что не нужно делать все ноды кассанды seed-нодами, для кластера из трех машин будет достаточно одного сида.

По поводу второго, нам нужно указать в конфиге тип журнала, и прописать адреса нод кассандры. Это можно сделать следующим образом:
Конфигурация akka-persistence
akka.persistence.journal.plugin = "cassandra-journal"

cassandra-journal.contact-points = ["ip1,ip2,ip3"]

akka.persistence.snapshot-store.plugin = "cassandra-snapshot-store"

cassandra-snapshot-store.contact-points = ["ip1,ip2,ip3"]

После подключения кассандры, напишем свой класс для сериализации и десериализации сообщений: сначала мы воспользуемся кодогенератором протобуфа для генерации необходимых классов, а потом с их помощью сделаем сериализатор.

Вот так будет выглядеть протобуф-файл:

Содержимое protobuf-файла
option java_package = "actors.messages";
option optimize_for = SPEED;
message ChatMessage {
  optional string author = 1;
  optional string content = 2;
  optional int64 timestamp = 3;
}

После генерации протобуфом необходимого класса напишем свой сериализатор:

Код сериализатора сообщений
class ChatMessageSerializer extends Serializer {

  def identifier: Int = 193823
 def includeManifest: Boolean = false

 def toBinary(obj: AnyRef): Array[Byte] = obj match {
    case ChatMessage(author, content, timestamp) =>
      ProtoChatMessage.newBuilder()
      .setAuthor(author)
      .setContent(content)
      .setTimestamp(timestamp)
      .build()
      .toByteArray
    case _          => throw new IllegalArgumentException("unknown type " + obj.getClass)
  }

  def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
    val proto = ProtoChatMessage.parseFrom(bytes)
    ChatMessage(proto.getAuthor, proto.getContent, proto.getTimestamp)
  }

}

Итак, у нас теперь есть нормальный журнал, и способ записи в него. Теперь нам нужно придумать способ сохранения сообщений не старше 10 минут. Для этого напишем собственный буфер, который будет сохранять сообщения за последние 10 минут.

Код буфера
class FixedTimeMessageBuffer(duration: Long) extends Traversable[ChatMessage] {
  val list = ListBuffer[ChatMessage]()

  def now = System.currentTimeMillis()

  def old = now - duration

  def append(elem: ChatMessage) = {
    if (elem.timestamp > old) {
      while (list.nonEmpty && list.head.timestamp < old) {
        list.remove(0)
      }
      list.append(elem)
    }
  }

  override def toList = list.toList

  def replace(newList: List[ChatMessage]) = {
    list.clear()
    list ++= newList
  }

  def foreach[U](f: ChatMessage => U) = list.foreach(f)

}

В качестве структуры данных для хранения сообщений мы выбрали ListBuffer — по той причине, что мы только добавляем элементы в конец и удаляем их из начала. ListBuffer позволяет делать эти операции за постоянное время. В дальнейшем мы применим данный буфер в нашем акторе Reader — для того, чтобы ограничить количество сообщений, отправляемых вновь подключившимся клиентам.

Рассмотрим то, как нам разделить акторы по сети. Для того, чтобы наше приложение не падало при отключении одной ноды, а дожидалось ее включения, нам следует прописать соответствующую логику в акторе. Актор RoomWriter должен оповещать RoomReader о новых сообщениях, поэтому ему будет полезно знать состояние RoomReader-a. Данная логика хорошо описывается введением двух состояний в актор.

Новые методы для класса RoomReader
...
sendIdentifyRequest()
 
def sendIdentifyRequest(): Unit = {
  log.info(s"Trying connecting to $roomReaderPath")
  context.actorSelection(roomReaderPath) ! Identify(roomReaderPath)
  context.system.scheduler.scheduleOnce(3.seconds, self, ReceiveTimeout)
}
 
def receiveRecover = Actor.emptyBehavior
 
def receiveCommand = identifying
 
def identifying: Receive = {
  case msg: ChatMessage => persistAsync(msg) { m =>
    log.info(s"Message $m persisted, but the reader isn't available")
  }
  case ActorIdentity(`roomReaderPath`, Some(actor)) =>
    log.info(s"Successfully connected to $roomReaderPath")
    context.watch(actor)
    context.become(active(actor))
  case ActorIdentity(`roomReaderPath`, None) => log.info(s"Remote actor is not available: $roomReaderPath")
  case ReceiveTimeout              => sendIdentifyRequest()
  case _                           => log.info("Not ready yet")
}
 
def active(reader: ActorRef): Receive = {
  case msg: ChatMessage => persistAsync(msg) { _ =>
    reader ! Update
    }
 
  case "snap" => saveSnapshot("foo")
 
  case Terminated(`reader`) =>
    log.info("reader terminated")
    sendIdentifyRequest()
    context.become(identifying)
  case ReceiveTimeout =>
  // ignore
}
...

В методе sendIdentifyRequest мы пытаемся получить ActorRef удаленного актора через отправку ему сообщения Identify. Данное сообщение понимают все акторы, и в ответ присылают нужный нам ActorRef. После получения ActorRef мы переходим в обычное состояние, и начинаем работу. Следует заметить, что мы также начинаем следить за жизненным циклом удаленного актора и в случае его недоступности будем пытаться снова до него достучаться.

Для того, чтобы реализовать подобную логику работы для актора UserConnection, мы создадим отдельный актор, который будет выступать в качестве посредника при общении с бэкендом.

Код класса BackendTalker
class BackendTalker(roomWriterPath: String, roomReaderPath: String) extends Actor with ActorLogging {
  import BackendTalker._
 
  val listeners = collection.mutable.Set[ActorRef]()
 
  sendReaderIdentifyRequest()
  sendWriterIdentifyRequest()
 
  def sendReaderIdentifyRequest(): Unit = {
    log.info("sending identify request to reader")
    context.actorSelection(roomReaderPath) ! Identify(roomReaderPath)
    import context.dispatcher
    context.system.scheduler.scheduleOnce(3.seconds, self, ReaderReceiveTimeout)
  }
 
  def sendWriterIdentifyRequest(): Unit = {
    log.info("sending identify request to writer")
    context.actorSelection(roomWriterPath) ! Identify(roomWriterPath)
    import context.dispatcher
    context.system.scheduler.scheduleOnce(3.seconds, self, WriterReceiveTimeout)
  }
 
  def receive = identifying
 
  def identifying: Receive = {
    case ActorIdentity(`roomWriterPath`, Some(actor)) =>
      log.info(s"Successfully identified writer at $roomWriterPath")
      context.watch(actor)
      context.become(waitingForReader(actor))
 
    case ActorIdentity(`roomReaderPath`, Some(actor)) =>
      log.info(s"Successfully identified reader at $roomReaderPath")
      listeners.foreach(actor ! Listen(_))
      context.watch(actor)
      context.become(waitingForWriter(actor))
 
    case ActorIdentity(path, None) => log.info(s"Remote actor not available: $path")
 
    case ReaderReceiveTimeout =>
      sendReaderIdentifyRequest()
    case WriterReceiveTimeout =>
      sendWriterIdentifyRequest()
 
    case msg: ChatMessage =>
      listeners += context.watch(sender())
      sender() ! ChatMessage("system", "Connection problem, try again later", System.currentTimeMillis())
    case Terminated(userCon) => listeners -= userCon
    case _                           => log.info("Not ready yet")
  }
 
  def waitingForReader(writer: ActorRef): Receive = {
    case ActorIdentity(`roomReaderPath`, Some(reader)) =>
      log.info(s"Successfully identified reader at $roomReaderPath")
      listeners.foreach(reader ! Listen(_))
      context.watch(reader)
      context.become(active(reader, writer))
 
    case ActorIdentity(`roomReaderPath`, None) => log.info(s"Reader actor not available: $roomReaderPath")
    case ReaderReceiveTimeout =>
      sendReaderIdentifyRequest()
    case WriterReceiveTimeout =>
      sendWriterIdentifyRequest()
    case Terminated(`writer`) =>
      log.info("writer terminated")
      sendWriterIdentifyRequest()
      context.become(identifying)
 
    case msg: ChatMessage =>
      listeners += context.watch(sender())
      sender() ! ChatMessage("system", "Connection problem, try again later", System.currentTimeMillis())
    case Terminated(userCon) => listeners -= userCon
    case _ => log.info("Not ready yet")
  }
 
  def waitingForWriter(reader: ActorRef): Receive = {
    case ActorIdentity(`roomWriterPath`, Some(writer)) =>
      log.info(s"Successfully identified writer at $roomWriterPath")
      context.watch(writer)
      context.become(active(reader, writer))
 
    case ActorIdentity(`roomWriterPath`, None) => log.info(s"Reader actor not available: $roomWriterPath")
    case ReaderReceiveTimeout =>
      sendReaderIdentifyRequest()
    case WriterReceiveTimeout =>
      sendWriterIdentifyRequest()
    case Terminated(`reader`) =>
      log.info("reader terminated")
      sendReaderIdentifyRequest()
      context.become(identifying)
 
    case msg: ChatMessage =>
      listeners += context.watch(sender())
      sender() ! ChatMessage("system", "Connection problem, try again later", System.currentTimeMillis())
    case Terminated(userCon) => listeners -= userCon
    case _ => log.info("Not ready yet")
  }
 
  def active(reader: ActorRef, writer: ActorRef): Receive = {
    case l: Listen => reader ! l
    case msg: ChatMessage => writer ! msg
 
    case Terminated(`reader`) =>
      log.info("reader terminated")
      sendReaderIdentifyRequest()
      context.become(waitingForReader(writer))
    case Terminated(`writer`) =>
      log.info("writer terminated")
      sendWriterIdentifyRequest()
      context.become(waitingForWriter(reader))
    case ReaderReceiveTimeout =>
    case WriterReceiveTimeout =>
    // ignore
  }
 
}

В нем, мы реализуем логику ожидания удаленных акторов по аналогии с тем, что мы делали в акторе RoomWriter. В данном случае, нам нужно ожидать подключение к двум акторам сразу, поэтому логика работы немного усложняется.

Остался последний штрих: мы немного перепишем RoomReader для того, чтобы ограничить количество сообщений, которое получают пользователи.

Для этого мы допишем в нем пару строчек.

В конструкторе мы определим наш буфер для хранения сообщений, и напишем вспомогательный метод для работы с ним. Помимо этого, мы запустим scheduler, который раз в 10 минут будет отдавать команду на создание снапшота. Стоит заметить, что команда отдается через отправку сообщения актору, и мы не вызываем метод saveSnapshot напрямую. Это сделано специально, для того чтобы не нарушать принцип того, что работа с mutable данными актора должна производиться только актором. Нарушив этот принцип, мы можем получить трудноуловимые баги.

Дополнение к RoomReader
context.system.scheduler.schedule(tenMinutes, tenMinutes, self, Snap)
val state = FixedTimeMessageBuffer(tenMinutes)
def updateState(msg: ChatMessage) = state.append(msg)

В методе receive мы реализуем возможность сохранения снапшотов по приходу специального сообщения. Также мы реализуем корректное восставновление состояния из снапшота.

Дополнение к RoomReader
case msg:ChatMessage =>
      updateState(msg)
      sendAll(msg)
case Listen(ref) =>
      listeners add context.watch(ref)
      state.foreach(ref ! _)
case Snap => saveSnapshot(state.toList)
case SnapshotOffer(_, snapshot: List[ChatMessage]) => state.replace(snapshot)

Подводя итог, можно сказать о том, что мы реализовали современное веб-приложение, сделанное в духе реактивного программирования. Оно позволяет быстро отвечать на запросы пользователей и к тому же обладает некоторый степенью устойчивости. Однако, его есть куда улучшать. Для того, чтобы наше приложение работало даже в случае падения отдельных узлов, нам стоит воспользоваться модулем akka-cluster, который позволяет быстро реализовывать децентрализованные приложение, не имеющие единой точки отказа. Помимо этого, нам нужно как-то обрабатывать ситуацию, когда поток сообщений слишком большой и акторы не успевают его обрабатывать. Для работы с этим существует экспериментальный модуль akka-streams. Об этом и многом другом мы узнаем в следующей статье.
Tags:
Hubs:
+20
Comments33

Articles

Information

Website
www.naumen.ru
Registered
Founded
Employees
1,001–5,000 employees
Location
Россия