Обновить

Событийно-ориентированный HTTP-сервер на C# с помощью Rx и HttpListener

Программирование.NETC#
Автор оригинала: José F. Romaniello
Достаточно большое название? Да? В этом посте я покажу Вам альтернативный подход в создании простого событийно-ориентированного HTTP-сервера на C#, используя мощь Reactive Extensions.

Введение

Я не очень хорош в объяснениях, поэтому процитирую очень интересную статью от Dan York о событийной модели node.js:
“Традиционный” режим веб-серверов всегда был основан на модели потоков. Когда Вы запускаете Apache или любой другой веб-сервер, он начинает принимать подключения. Когда он принимает подключение, то держит данное подключение открытым до тех пор, пока не закончит обработку страницы, либо другой транзакции. Если чтение страницы с диска или запись результатов в базу данных занимает несколько микросекунд, то веб-сервер блокируется для операций ввода/вывода. (Это именуется как “блокирующее I/O”). Для масштабирования такого типа серверов, Вам потребуется запустить дополнительные копии самого сервера (именуется как “на основе потоков”, т.к. каждая копия обычно требует дополнительный поток операционной системы).
В противоположность этому, Node.JS, использует событийно-ориентированную модель, при которой веб-сервер принимает запросы, быстро ставит их на обработку, затем принимается за следующий запрос. Когда изначальный запрос завершен, то он возвращается в очередь обработки и когда достигает конца очереди, результаты возвращаются обратно (или выполняется все, что потребует следующее действие). Данная модель весьма эффективна и масштабируема, потому что веб-сервер обычно всегда принимает запросы, т.к. не ждет завершения ни одной операции чтения или записи. (Данный метод называется как “неблокирующее I/O” или “событийно-ориентированное I/O”).

Что происходит в мире .NET?

Много вещей происходит вокруг этого в экосистеме .NET:
  • Manos de mono (не для .NET, но близок к нему) был создан не так давно, следуя данной концепции
  • Node.Net — имплементация Node.JS для среды выполнения .Net, используя JScript.Net
  • Kayak – асинхронный HTTP-сервер, написанный на C#
  • Frank является клоном Sinatra, написанный на F#
  • Nancy будет поддерживать асинхронные обработчики скоро

Альтернативный подход

Используя класс HttpListener и Reactive Extensions, мы можем создать нечто наподобие этого:
public class HttpServer : IObservable<RequestContext>, IDisposable
{
  private readonly HttpListener listener;
  private readonly IObservable<RequestContext> stream;

  public HttpServer(string url)
  {
    listener = new HttpListener();
    listener.Prefixes.Add(url);
    listener.Start();
    stream = ObservableHttpContext();
  }

  private IObservable<RequestContext> ObservableHttpContext()
  {
    return Observable.Create<RequestContext>(obs =>
              Observable.FromAsyncPattern<HttpListenerContext>(listener.BeginGetContext,
                                       listener.EndGetContext)()
                   .Select(c => new RequestContext(c.Request, c.Response))
                   .Subscribe(obs))
             .Repeat()
             .Retry()
             .Publish()
             .RefCount();
  }
  public void Dispose()
  {
    listener.Stop();
  }

  public IDisposable Subscribe(IObserver<RequestContext> observer)
  {
    return stream.Subscribe(observer);
  }
}

Некоторые замечания к данному коду:
  • FromAsyncPattern – удобный метод, которые поставляются с Rx. Данный метод конвертирует сигнатуры Begin/End в IObservable
  • RequestContext является легкой оберткой для работы с HttpListener. Я не собираюсь приводить здесь его код, однако Вы сможете посмотреть весь исходный код чуть позже.
  • Повторюсь: если Вы когда-либо видели использование HttpListener, то уверен, вы видели код внутри while цикла. Это то же самое.
  • Пробуйте еще раз: если мы получаем ошибку – тогда пробуем еще раз.
  • Publish/Refcount: это поможет нам создать “теплых” наблюдателей из “холодных”. Они ведут себя наподобие “горячих”. Вы можете прочитать больше тут и тут.

Пример использования

Вы можете создавать веб-приложения любого типа, основанные на данной концепции. Приложение уровня “hello world” будет выглядеть так:
static void Main()
{
    //a stream os messages
    var subject = new Subject<string>();

    using(var server = new HttpServer("http://*:5555/"))
    {
      var handler = server.Where(ctx => ctx.Request.Url.EndsWith("/hello"))
         .Subscribe(ctx => ctx.Respond(new StringResponse("world")));

      Console.ReadLine();
      handler.Dispose();
    }
}

Рекомендую, чтобы все, что Вы будете делать – было асинхронным. Например, если вы подключаетесь к базе данных, то это должно быть асинхронной операцией, и Вы должны будете удерживать вместе callbacks/observables/Tasks и т.п.

Существует еще более интересное применение, которым я бы хотел поделиться, и которое называется long polling:
Long polling является вариацией традиционной техники polling и позволяет эмулировать отправку информации от сервера клиенту. При long polling, клиент запрашивает информацию от сервера в той же манере, что и при нормальном запросе. Однако если сервер не имеет никакой доступной информации для клиента, то вместо отправки пустого ответа, сервер удерживает запрос и ждет доступности информации.

Итак, перед нами наиболее простой пример long polling, работающего через вышеприведенный код:
class Program
{
  static void Main()
  {
    //a stream os messages
    var subject = new Subject<string>();

    using(var server = new HttpServer("http://*:5555/"))
    {
      //the listeners stream and subscription
      var listeners = server
          .Where(ctx => ctx.Request.HttpMethod == "GET")
          .Subscribe(ctx => subject.Take(1) //wait the next message to end the request
                       .Subscribe(m => ctx.Respond(new StringResponse(m))));

      //the publishing stream and subscrition
      var publisher = server
        .Where(ctx => ctx.Request.HttpMethod == "POST")
        .Subscribe(ctx => ctx.Request.InputStream.ReadBytes(ctx.Request.ContentLength)
                   .Subscribe(bts =>
                     {
                      ctx.Respond(new EmptyResponse(201));
                      subject.OnNext(Encoding.UTF8.GetString(bts));
                     }));

      Console.ReadLine();

      listeners.Dispose();
      publisher.Dispose();

    }
  }
}

Как Вы можете видеть, мы заставляем наблюдателей работать… При этом отсутствует какая-либо блокирующая операция. Даже чтение из потока – асинхронная операция.

Хотите увидеть работающий код?

Ниже приведено видео, демонстрирующее работу кода:
image
И, под конец, исходный код опубликован здесь под opensource, если Вы захотите углубиться в него шаг за шагом или просто изучить.
Отдельные благодарности Gustavo Machado, Silvio Massari и ребятам из Nancy framework за советы и часть кода, который я украл у них.
Теги:asyncrxreactive extensionsnode.jshttpвеб-серверevent-driventhread-driven
Хабы: Программирование .NET C#
Рейтинг +44
Количество просмотров 22,3k Добавить в закладки 96
Комментарии
Комментарии 4

Похожие публикации

C++ Developer. Professional
12 марта 202160 000 ₽OTUS
Веб-дизайнер
16 марта 202183 000 ₽GeekBrains
Веб-дизайн
22 марта 202115 000 ₽Loftschool

Лучшие публикации за сутки