Pull to refresh

Comments 7

Спасибо за статью, но код довольно запутанный и следить за ним не просто (Port вызывает методы AbstractActor и наоборот, каждый из них по отдельности нельзя понять), было бы неплохо добавить разбор того что происходит в простой паре producer-consumer и что означает block, поскольку он не блокирующий.


Пока не ясно чем этот подход лучше чего-то вроде:


class ProducerSubscription<T> extends Subscription<T> with Runnable {
  private Subscriber<T> subscriber; // from constructor
  private AtomicLong demand = new AtomicLong();
  private AtomicLong spawnedId = new AtomicLong();

  public void request(long size) {
    if (demand.getAndAdd(size) == 0) {
      executor.execute(this::send);
    }
  }

  private void send() {
    long id = spawnedId.incrementAndGet();
    // ensure only one send spawned most of the time
    while (spawnedId.get() == id) {
      long batch = demand.getAndSet(0);
      if (batch == 0) break;
      while (batch-- > 0) {
        subscriber.onNext(....);
      }
    }
  }
}
«что означает block, поскольку он не блокирующий»
block() блокирует алгоритм актора. О блокировке используемого потока исполнения мы вообще не говорим, поскольку нам это делать запрещено.

«чем этот подход лучше чего-то вроде»
в вашем примере нет разделения на системную и пользовательскую часть. Если я захочу сделать Publisher с другой функциональностью, мне нечего будет позаимствовать из вашего примера. А хотелось бы при написании функциональности не думать о системных вещах типа предотвращения нежелательного параллельного запуска одной и той же задачи.

Далее, вы не расписали subscriber.onNext(....). Где вы собрались вычислять очередное значение для onNext? там же, где и вызов onNext? Это снижает параллелизм в случае недобросовестного клиента, который нагружает свой onNext тяжелыми вычислениями. По хорошему, для вычисления генерируемых значений нужен отдельный актор.
block() блокирует алгоритм актора

вот об этом я и говорю — эта фраза не объясняет ничего, описанию этого алгоритма стоило бы уделить больше внимания


в вашем примере нет разделения на системную и пользовательскую часть
вы не расписали subscriber.onNext(....)

Как раз эта часть может быть например абстрактным методом, реализуемым клиентской частью или лямбдой, передаваемой снаружи.


Это снижает параллелизм в случае недобросовестного клиента, который нагружает свой onNext тяжелыми вычислениями

Тут согласен, циркулярный буфер и отдельная таска отправки решит эту проблему. Этот код призван только показать, что решить задачу можно просто, вопрос в том — что дает усложнение?

"эта часть может быть например абстрактным методом"
и в какой момент он должен исполнятся? У вас для него не предусмотрено места. Он должен работать вне ProducerSubscription.

"описанию этого алгоритма стоило бы уделить больше внимания"
Да, мне как-то говорил один профессор, что ключевые моменты в статье надо повторять по три раза. Я же блокировку актора описал один раз:

Асинхронные программы не могут блокировать потоки… Блокировать свое исполнение они должны другим способом. Этот другой способ заключается в том, что они просто покидают рабочий поток, на котором исполнялись, но перед этим организуют свое возвращение к работе, как только семафор наполнится.

«было бы неплохо добавить разбор того что происходит в простой паре producer-consumer»
Итак, какой-то актор имеет ссылку на входной порт следующего актора, как нарисовано на картинках. Он вызывает операцию на акторе, что может привести к изменению его состояния. Так, если порт типа InPort был пуст, и туда поместили токен, то порт переходит из заблокированного состояния в разблокированное, и извещает об этом AbstractActor с помощью вызова decBlocked(). Если в результате число заблокированных портов стало равно нулю, то актор отправляется на исполнение, а контрольный порт блокируется, чтобы число заблокированных портов стало не равно нулю, и не произошло повторной отправки на исполнение.
Начав работать, актор вызывает пользовательскую процедуру whenNext, которая, скорее всего, извлечет токен из входного порта и тем самым его заблокирует. Когда пользовательская процедура закончит свой вызов, контрольный порт автоматически разблокируется. Если к этому моменту входной порт будет снова заполнен, это приведет к повторному запуску актора. Если нет, то актор будет ждать поступления токена на входной порт.

Разобрался немного, но как например контролируется запись в InPort? Ведь если старое значение еще не прочитано, новое перезапишет старое? Не требует ли такой подход слишком тщательно следить за взаимодействием, писать в каждый порт только раз за круг, не забыть записать в какой-то порт?

любой нормальный буфер имеет ограниченный размер, и неважно, 1 или 1000, все равно надо следить за переполнением. И да, за один раунд можно безопасно прочитать не более одного значения из входного порта и записать не более одного значения в выходной порт. Чтобы прочитать или записать больше, надо опрашивать состояние портов и быть готовым к тому, что это не удастся. Так что лучше и не опрашивать вовсе, а сразу выходить на следующий раунд.
Sign up to leave a comment.

Articles