Pull to refresh

Comments 32

Спасибо, как раз интересовался пдобным, и было дело уже начал писать свое. Но теперь приторможу, и поглубже ознакомлюсь с предоставленным материалом :)
спасибо за отзыв! подождите следующего материала, там будет больше про нативные РНР решения
спасибо! Упомяну в следующей части наверное
Еще было интересно, если бы вы в сравнении отметили какие сервера умеют делать persitent queue
Во-первых, совсем непонятно, что имелось в виду под persistent queue.
Если это очередь, которая хранит сообщения до момента их получения потребителем, тогда spread этого не умеет. Во всяком случае, в его оригинальной инкарнации. Есть коммерческий продукт на основе spread, который это умеет. По-моему вот это они: www.spreadconcepts.com/secure_spread_info.html

В spread сделать это вообще проблематично в связи с его основной идеей гарантированности операций с очередями в многосерверной конфигурации. Это чистые in-memory queues.
да помоему они обое имеют поддержку этого. Вот же в описании то одинаково: «Enables message reliability in the presence of machine failures, process crashes and recoveries, and network partitions and merges. ».

Вместе с тем, часть MQ хранит сообщения в базе (или файлы), встроенной или внешней, поэтому это можно считать как раз persistent queue, остальные работают с памятью, однако могут также обеспечить устойчивость к сбоям отдельных узлов (видимо это как раз случай spread), или же заявляют сразу, что не рассчитаны на такие случаи.
Да, в spread именно за счет нескольких серверов обеспечивается reliability.
Если бы они сделали встроенный persistence, цены бы не было системе. Но для упрощения и еще по ряду причин, как я понял, они этого не хотели делать.
С другой стороны, гораздо проще, используя spread как network backend, сделать полноценные очереди, чем начинать лепить свои целиком через базу данных, как ниже предлагают, или чем строить сетевую систему с нуля, которая скорее всего будет работать хуже.
Под persistent queue подразумевалась очередь, которая сохраниться при перезапуске сервера очереди.

Т.е. beanstalk не обладает такой очередью, потому что хранить ее в памяти, а starling обладает.
Memcacheq — штука хорошая, благо основана на memcachedb, который, в свою очередь, есть смесь memcached и bdb — с производительностью точно проблем не будет. Но есть один момент. Когда разгребаешь на скриптовом сервере очередь php-скриптами, ну, предположим, это обычный цикл while ($item = $queue->getNext()) $this->handle($item), внутри этого самого handle может иногда случиться что-то нехорошее. В корку php упадет из-за порушившейся вдруг шаред мемори акселератора или редкого бага в экстеншене, или просто в датацентре рубильник не тот заденут. А терять данные в очереди совсем не хочется. То есть, если это сбор статистики средней температуры по больнице, то и фиг с ним, а если это биллинг, то как-то не это самое. :)

Я использую очень простое решение. В mysql (или иной РСУБД) заводится табличка (или, если очереди большие, несколько табличек, обрабатываемых параллельно запущенными скриптами) вида

CREATE TABLE `ProcessQueue` (
  `id` int(20) NOT NULL AUTO_INCREMENT,
  `subscriber_id` int(10) NOT NULL DEFAULT '',
  `error_count` smallint(6) NOT NULL DEFAULT '0',
  `parameters` blob NOT NULL,
  `status` enum('QUEUED','LOCKED','FAILED','DELETED') NOT NULL DEFAULT 'QUEUED',
  `created` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
  `updated` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  KEY `updated` (`updated`,`subscriber_id`,`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8


Subscriber_id — это идентификатор «подписчика», то есть тот, кому элемент очереди предназначается. Parameters — произвольные данные, нужные для обработки, например serialize()-ованные.

Обработка очереди выглядит примерно так:
$PQ = new ProcessQueue($subscriber_id);

$element = false;
$error = false;

while ($element = $PQ->getNextElement($element, $error)) {
    // ... process $element ...
    if (error occurred) {
        $error = true;
    } else {
        $error = false;
    }
}


Сам класс выглядит (после небольших упрощений) вот так. Несмотря на кучу завязок на фреймворк, думаю, его логика должна быть понятной.

А что делать с «залипшими» элементами? Очень просто, по крону с разумной регулярностью разгребаем элементы, где status=«LOCKED» и updated достаточно стар, чтобы было ясно, что оно именно залипло — например, как-то так:

UPDATE ProcessQueue 
SET error_count = error_count + 1, STATUS = IF(error_count>5, 'FAILED', 'QUEUED')
WHERE updated < $время_минут_15_назад AND STATUS='LOCKED'


А ставить и изучать монстроподобные системы ради такой простой вещи, как очереди, как то мне стремно, хотя надо взглянуть, может, найдутся хорошие идеи, чтобы позаимствовать :)
спасибо за подробный комментарий, воспользуюсь как буду практически пробовать свою архитектуру
UFO just landed and posted this here
ApacheMQ — очень мощьное средство! использовал в одном из проектов. Очень понравилась их реализация AJAX доступа к Queue и подобным.
UFO just landed and posted this here
Для хранения очередей в памяти PHP, конечно, подходит плохо, и демон писать смысла особого нет. Но для хранения очереди можно использовать любую СУБД, я выше приводил пример.

А для скриптов, обрабатывающих очереди, нет ничего страшного. Конечно, держать «вечный» цикл невыгодно — даже сам PHP, работая в fastcgi-режиме, имеет ограничение на количество обрабатываемых одним процессом запросов, чтобы не накапливать утечки памяти. Но вполне можно запускать несколько скриптов из крона параллельно, что-то вроде
$ crontab -e
* * * * * /path/to/script1.php
* * * * * /path/to/script2.php


Стратегии завершения можно использовать различные. Можно просто выгребать очередь, пока она не кончится, и на этом завершаться; можно поставить дополнительное условие по времени или количеству итераций. А чтобы не запустились одинаковые обработчики одновременно (если это, конечно, специально не задумано), прекрасно подходит стандартная юниксовая техника pid-файлов. Как-то так:

class Script extends Application {
    // ...........
    protected function checkPid() {
        $this->pid = posix_getpid();
        $pid = (int) @file_get_contents($this->pidfile);

        if ($pid) {
            if (is_dir('/proc') && is_dir('/proc/curproc')) { // use procfs
                $have_pid = is_dir('/proc/'.$pid);
            } else { // if no procfs, use /bin/ps. warning: BSD-specific implementation
                $output = array();
                exec('/bin/ps -auxwwwp '.$pid, $output); 
                $have_pid = !empty($output[1]);
            }
            if ($have_pid) {
                $this->logScriptEvent(self::EVENT_WARNING, 'already running with pid '.$pid. ', exiting');
                exit(1);
            }
        }

        try {
            file_put_contents($this->pidfile, $this->pid . "\n");
        } catch (PhpException $e) {
            $this->logScriptEvent(self::EVENT_WARNING, 'could not write pidfile: '.$e->getMessage());
        }
        return true;
    }
    // ...........
    function run() {
        if (!$this->checkPid()) {
            $this->logScriptEvent(self::EVENT_NOTICE, 'the script ' . get_class($this) . ' is already running');
            exit(0);
        }
        // ....
    }
}


Если есть неободимость распараллелить операции уже после инициализации, тоже стандартный юниксовый подход — fork.
Ах, да. На винде, конечно, это все работать не будет (разве что под cygwin). Хотя я склонен считать это скорее преимуществом =)
UFO just landed and posted this here
«работало неплохо…но на нагрузках ложилась…»

:)

Собственно, с работой с процессорами в php проблем то и нет. fork есть, exec есть, popen есть, что еще надо? :) дело-то не в этом, а в том, как обрабатывать параллельные соединения.

А тут, чтобы понимать, как эти все винтики крутятся, обязательно к прочтению RU.UNIX.PROG FAQ — Как писать сервера.
С процессами, пардон.
Меня тут в привате пинают по делу:

«Для проверки существования процесса не нужно делать то, что вы нагородили. Достаточно сделать kill(pid, 0);»

И действительно.

srv01 ~$ php -r 'var_dump(posix_kill(posix_getpid(),0));'
bool(true)
srv01 ~$ php -r 'var_dump(posix_kill(33333,0));'
bool(false)


Век живи, век учись :)
Ну из опенсорсных JBoss Messaging & Sun OpenMQ ещё — с каждого вендора по продукту, как это в Java-мире принято) Правда про интеграцию со сторонними языками не подскажу.
JBoss Messaging искренне не рекомендую использовать. Низкая производительность, галлюционоз и кривизна.

ActiveMQ и OpenMQ очень круты.
Еще небольшой список по теме:
— WebSphere MQ, кстати, имеет SOAP интерфейс и может быть использована в РНР.
— ObjectWeb JORAM joram.objectweb.org
— OpenJMS openjms.sourceforge.net
— MOM4J mom4j.sourceforge.net
— SwiftMQ www.swiftmq.com

Думаю решение с использованием базы данных от symbix очень интересное, т.к. в большинстве случаев этого с головой хватает.

Еще есть совет из опыта работы с MQ: посмотрите на процесс в целом и подумайте, а нужно ли вам использовать MQ или нет? Как раз учитывая тему высокой производительности, то MQ, особенно с сохранением сообщений, дает большую задержку, которую можно компенсировать только большим количеством параллельных процессов обработки.
спасибо за ссылки, надо будет все пересмотреть. WebSphere yfмеренно не рассматривал, мне показался уж очень корпоративным и громоздким и тянет за собой много IBM-компонентов от инфраструктуры WebSphere
Как раз WebSphere MQ есть самодостаточный сервер и небольшой к тому же.
Sign up to leave a comment.

Articles