Pull to refresh

Хитрости с логированием в однопоточных неблокирующих серверах.

Reading time 6 min
Views 2.7K
Хочу рассказать об очередном результате моих изысканий в области оптимизации производительности Web-серверов.
На этот раз речь пойдет об оптимизации сложного логирования в однопоточном неблокирующем вэб-сервере.

Введение


Все не раз слышали о том, как lighttpd (далее «лайти») производителен и нетребователен к ресурсам. В последнем пререлизе версии 1.5 появилось много нововведений: в частности увеличена была производительность отдачи увесистых файлов на очень загруженных серверах (более 2-х тысяч потоков с суммарной скоростью в 50-200 МБ/с). Я, как управляющий весьма большим вэб-файлохранилищем, не мог пройти мимо таких возможностей. На тестовую машину с быстрым рейдом и двумя GigabitEthernet адаптерами, завязанными в bond интерфейс с распределением нагрузки, был поставлен дистрибутив Debian линукс. После пары часов проб и ошибок была собрана и настроена на оптимальную производительность связка лайти+php_fcgi. Файлы действительно отдавались очень быстро и 2000 потоков для лайти не оказались трудностями. Но тут я наткнулся на серьезную проблему.
Дело в том, что одно из требований к веб-серверу для работы на файлохранилище — чтоб он имел возможность отправлять строки лога через pipe в специализированную программу, которая бы их особым образом обрабатывала. В этой программе обрабатываться данные о соединении (что за пользователь качал, сколько действительно байт выкачанно за это соединение и т.д.) и отправляются для хранения в БД. Лайти умеет перенаправлять данные через pipe в программу для логирования, но проблема кроется в его однопоточной неблокирующей архитектуре. Т.к. лайти работает в один поток (процесс) и в неблокирующем режиме только отправляет данные в сокеты — то его основной поток уходит в ожидание снятия блока записи пока вспомогательная программа не обработает предыдущую запись лога. А программа эта может подвисать — поскольку соединение с БД — вещь не постоянная: то запрос подвиснет из-за табличной или строчной блокировки, то само соединение по неизвестным причинам потеряется и придется переподключаться. И получится, что при каждой такой задержке лайти перестанет отсылать данные в сокеты и будет выглядеть, как будто сервер работает «рывками» — то работает на полной скорости, то вдруг подвиснет.

Задумка


Решить эту проблему сможет буферизация записей лога и распараллеливание задачи обработки этого лога. Реализовал я это так: всю подсистему логирования я разбил на две части — агрегатор и обработчик. Агрегатор будет отвечать за моментальное вычитывание из канала (pipe) записей лога (чтоб сервер не простаивал), запись этих данных в очередь (FIFO), порождение дочерних обработчиков (описаны далее) и собственно раздачу строк лога из очереди в эти обработчики. Обработчик же — это просто конечный процессор строк лога, который их вычитывает из канала stdin и пишет в базу, после этого сигнализирует агрегатору о том, что он освободился и готов к новой порции данных.

Реализация


Начну с реализации самой простой части — обработчика. Все что он должен — это открыть stdin в блокирующем режиме и в цикле вычитывать из него строки, посылая символ '1' в конце каждой итерации. Завершать работу он должен по обнаружению eof в канале stdin (канал закрыт и данных больше нет). Вот код этой программы:

processor.php:
Copy Source | Copy HTML
  1. #!/usr/bin/php
  2. <?php
  3. $in = fopen('php://stdin', 'r'); //открыли стандартный ввод
  4. $db = NULL;
  5. while ($in_str = fgets($in)) { // вычитали строку со входа
  6.   if (@mysql_ping($db) !== true) { // если не соединены с БД - соединяемся
  7.     @mysql_close($db);
  8.     $db = mysql_connect();
  9.   }
  10.   mysql_query('тут происходит передача $in_str в БД'); // обрабатываем строку в БД
  11.   echo '1'; // говорим агрегатору, что готовы к новым данным
  12. }
  13. mysql_close($db);
  14. fclose($in);
  15. ?>

Агрегатор же реализован несколько сложнее. Сначала опишу несколько стандартных возможностей PHP, которые были использованы.
При чтении из потоков использовались неблокирующие вызовы. Для перевода поток в неблокирующий режим достаточно вызвать функцию stream_set_blocking с самим потоком в качестве первого элемента и с нулем в качестве второго. После этого вызвоа любое чтение или запись в поток будут завершаться моментально, не дожидаясь фактического чтения или записи. Результат выполнения этих операций будет зависить от фактического кол-ва байт, которые смогли быть записаны или прочтены.
Наличие же данных в потоках отслеживалось с помощью функции stream_select, полное описание которой можно найти в руководстве по PHP. Вкратце ее суть вот в чем — в нее передаются три массива, каждый из которых содержит дескрипторы потоков, первый для чтения, второй для записи, третий для особых случаев. Вызов этой функции завершается, когда в одном из переданных потоков произойдет что-либо интересное (в общем случае — исчезнет блокирование операции соответствующей параметру фунции, т.е. в одном из дескрипторов чтения можно будет прочитать данные без блокировки и т. д.). Так же эта функция может завершиться по таймауту, который передается в виде четвертого и пятого параметров, где четвертый — секунды, а пятый — микросекунды.
Теперь об алгоритме работы:
  1. Первым делом агрегатор создает дочерние процессы обработчиков и организует каналы для общения с ними.
  2. Затем обработчик в неблокирующем режиме читает данные из стандартного входа и записывает их в конец очереди.
  3. Проверяя наличие единицы на каналах чтения из обработчиков (опять же в неблокирующем режиме) взводятся нужные флаги в массиве готовности обработчиков.
  4. Если в очереди есть необработанные данные и есть свободный обработчик — отправляем данные в него и опускаем флаг его готовности в соответствующем массиве.
  5. Записываем все интересующие нас дескрипторы в массив и выполняем ожидание по ним с помощью stream_select с таймаутом в одну секунду.
  6. Проверяем очередь на наличие записей и стандартный ввод на конец данных. Если очередь не пуста или есть новые данные — Переходим к пункту 2.
Вот исходный текст агрегатора:

aggregator.php:
Copy Source | Copy HTML
  1. #!/usr/bin/php
  2. <?php
  3. // кол-во обработчиков
  4. define('PROCESSORS_TO_SPAWN', 5);
  5. // полный путь к обработчику
  6. define('PROCESSOR_PATH', '/path/to/processor.php');
  7.  
  8. $in=fopen("php://stdin",'r');
  9. // переводим стандартный ввод в неблокирующий режим
  10. stream_set_blocking($in, 0);
  11.  
  12. // список флагов готовности обработчиков
  13. $processors_states = array_fill(0, PROCESSORS_TO_SPAWN, true);
  14. $processors = array();
  15. $proc_signal_pipes = array();
  16. $proc_data_pipes = array();
  17. $descriptorspec = array(
  18.   0 => array("pipe", "r"),
  19.   1 => array("pipe", "w"),
  20.   2 => array("file", "/dev/null", "a")
  21. );
  22. $buffer = array();
  23.  
  24. // запускаем обработчики и переводим канал чтения в неблокирующий режим
  25. while (count($processors) < PROCESSORS_TO_SPAWN) {
  26.   $processors[] = proc_open(PROCESSOR_PATH, $descriptorspec, $pipes, NULL, NULL);
  27.   stream_set_blocking($pipes[1], 0);
  28.   $proc_data_pipes[] = $pipes[0];
  29.   $proc_signal_pipes[] = $pipes[1];
  30. }
  31.  
  32. while(true) {
  33.   $in_str = fgets($in);
  34.   if($in_str !== false) {
  35.     // тут можно проверять валидность строки лога
  36.     if (true) {
  37.     $buffer[] = $in_str;
  38.     }
  39.   }
  40.   foreach ($processors_states as $proc_num => $processor_state) {
  41.     // в неблокирующем режиме проверяем готовность обработчика
  42.     if (fgets($proc_signal_pipes[$proc_num]) == '1') {
  43.       $processors_states[$proc_num] = true;
  44.     }
  45.   }
  46.   // пока есть свободные обработчики и очередь не пуста - скармливаем им данные
  47.   while (count($buffer) > 0 and
  48.     ($selected_proc = array_search(true, $processors_states)) !== false) {
  49.     $item = array_shift($buffer);
  50.     fwrite($proc_data_pipes[$selected_proc], $item);
  51.     $processors_states[$selected_proc] = false;
  52.   }
  53.   // если стандартный ввод закрыт и очередь пуста - завершаем работу
  54.   if (feof($in) and count($buffer) == 0) {
  55.     break;
  56.   }
  57.   $check_list = $proc_signal_pipes;
  58.   $check_list[] = $in;
  59.   // ожидаем данных для чтения на стандартном вводе или из одного из обработчиков
  60.   stream_select($check_list, $w = NULL, $e = NULL, 1);
  61. }
  62.  
  63. // закрываем обработчики и прибираемся
  64. foreach($processors as $proc_num => $proc) {
  65.   fclose($proc_data_pipes[$proc_num]);
  66.   fclose($proc_signal_pipes[$proc_num]);
  67.   proc_close($proc);
  68. }
  69. fclose($in);
  70.  
  71. ?>


Послесловие


В итоге мы получили отличный инструмент для организации сколь угодно сложного логирования для однопоточных серверов, который дает нам возможность использовать все преимущества сервера, не задерживаясь на сохранении записей лога.
Tags:
Hubs:
+53
Comments 105
Comments Comments 105

Articles