Pull to refresh

распараллеливаем выполнение задач с помощью stream_select()

PHP
Не так уж много кто знает о том, что некоторые задачи в PHP можно заставить выполняться параллельно — и для не этого не нужно прибегать к форкам. В PHP5 есть stream-functions, и среди них — stream_select().

Прочитав статью Cameron Laird (http://www.ibm.com/developerworks/opensource/library/os-php-multitask/index.html?S_TACT=105AGX44&S_CMP=EDU), каждый, кто еще этого не сделал, сможет научиться этой технике, я же в этом топике предлагаю вашему вниманию простой небольшой класс Parastreams, который собсно займется распараллеливанием — а уж что делать с полученными из потоков данными — вы решите сами, указав обработчики данных.



Область применения технологии:
Нужно получить некие данные по сети с нескольких сокетов. Используя stream_select(), вы получите данные от всех сокетов за время, равное времени получения данных с самого медленного из них (при традиционном подходе общее время будет равно сумме времен получения данных от каждого сокета).
Допустим, вы используете поиск с пом. Sphinx. С помощью stream_select() можно заставить несколько запросов к поисковому демону выполняться параллельно (конечно, придется поднапрячься и расковырять sphinxapi, но ничего сверхсложного там нет). Это может пригодиться, когда поиск приводит к двум запросам к поисковому демону (допустим, ищем в постах и в комментариях): два эти запроса к двум, соответственно, индексам, будут выполняться параллельно — то есть получаем оптимизацию и ускорение поиска.

А вот и код класса:

<?php

/**
* Parastreams PHP class:
* a simple tool for performing multiple tasks with PHP - simultaneously (in parallel).
*
* example of usage:
* $ps = new Parastreams();
*
* function parastreams_callback($data) {
* echo $data."\n";
* }
*
* $s = stream_socket_client("localhost:80", $errno,
* $errstr, 10,
* STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);
* fwrite($s, "GET /sleep.php?delay=1 HTTP/1.0\r\nHost: localhost\r\n\r\n");
* $ps->add($s, 'parastreams_callback');
* ... // repeat the above 5 lines as many times as you wish to, adding new streams to $ps.
* $ps->run();// process the streams
*
* Author: Victor Bolshov ( crocodile2u ( the at symbol here ) yandex.ru )
*
* License: use this script without any retrictions.
*
* Based on code by Cameron Laird, you may find his code here:
* www.ibm.com/developerworks/opensource/library/os-php-multitask/index.html?S_TACT=105AGX44&S_CMP=EDU
*
* PHP version used: PHP 5.3.0alpha1 (should be compatible with older versions of PHP5)
*/

class Parastreams {
/**
* streams served by this instance
* @var resource[]
*/
private $streams = array();
/**
* stream events listeners
* @var array
*/
private $listeners = array();
/**
* @var int
*/
private $timeout = 10;
/**
* Constructor
* @param array $arg when specified, add() is called and $arg is passed to add()
* @see add()
*/
function __construct($arg = null)
{
if ($arg)
{
$this->add($arg);
}
}
/**
* add new stream(s)
* @param array | resource $arg either a stream resource or an array like this:
* array(
* array(stream1, listener1),
* array(stream2, listener2),..
*)
* where streamN is a stream resource created with stream_socket_client(),
* and listenerN is a Closure object which is called once the stream becomes readable,
* with the only argument: string $data (the data read from the stream)
* @param callable $arg2 the listener to stream; matters only in case when the first arg is not an array
* @return void
* @throws ParastreamsException
*/
function add($arg1, $arg2 = null)
{
if (is_array($arg1))
{
foreach ($arg1 as $offset => $s)
{
if (! is_array($s))
{
throw new ParastreamsException("Illegal input at offset " . $offset . " (not an array)");
} elseif (count($s = array_values($s)) < 2) {
throw new ParastreamsException("Illegal input at offset " . $offset . " (length is less then 2)");
} elseif (! is_resource($s[0])) {
throw new ParastreamsException("Illegal input at offset " . $offset . " (not a stream resource)");
} elseif (! is_callable($s[1])) {
throw new ParastreamsException("Illegal input at offset " . $offset . " (not a callable)");
}

$this->addOne($s[0], $s[1]);
}
} elseif (is_resource($arg1)) {
if (! is_callable($arg2))
{
throw new ParastreamsException("Argument 2 is expected to be a callable, " . gettype($arg2) . " given");
}
$this->addOne($arg1, $arg2);
} else {
throw new ParastreamsException("Argument 1 is expected to be a resource or an array, " . gettype($arg1) . " given");
}
}
/**
* Start listening to stream events
* @return void
* @throws ParastreamsException
*/
function run()
{
while (count($this->streams))
{
$events = $this->streams;
if (false === stream_select($events, $w = null, $e = null, $this->timeout))
{
throw new ParastreamsException("stream_select() failed!");
} elseif (count($events)) {
$this->processStreamEvents($events);
} else {
throw new ParastreamsException("Time out!");
}
}
}

/* Starting private methods */

private function processStreamEvents($events)
{
foreach ($events as $fp) {
$id = array_search($fp, $this->streams);

$this->invokeListener($fp);

fclose($fp);
unset($this->streams[$id]);
}
}
private function invokeListener($fp)
{
foreach ($this->listeners as $index => $spec) {
if ($spec[0] == $fp)
{
$data = "";
while (! feof($fp))
{
$data .= fread($fp, 1024);
}
call_user_func($spec[1], $data);
unset($this->listeners[$index]);
return ;
}
}
}
private function addOne($stream, $listener)
{
$this->streams[] = $stream;
$this->listeners[] = array($stream, $listener);
}
}

class ParastreamsException extends RuntimeException {}


* This source code was highlighted with Source Code Highlighter.


Пример использования (есть в комментариях, но тем не менее):

test.php:
<?php

require_once 'Parastreams.php';

function parastreams_callback($data) {
echo $data."\n";
};

$streams = array();
for ($i = 1; $i <= 3; ++$i) {
$s = stream_socket_client("localhost:80", $errno,
$errstr, 10,
STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);
fwrite($s, "GET /sleep.php?delay=" . $i . " HTTP/1.0\r\nHost: localhost\r\n\r\n");
$streams[$i] = array($s, 'parastreams_callback');
}

$ps = new Parastreams($streams);
$ps->run();


* This source code was highlighted with Source Code Highlighter.


В примере используется sleep.php, для полноты картины вот он:

<?php

$delay = filter_input(INPUT_GET, 'delay', FILTER_VALIDATE_INT);
if ($delay <= 0) {
$delay = 1;
}

sleep($delay);

echo "was sleeping for $delay seconds\n";


* This source code was highlighted with Source Code Highlighter.
Tags:stream_selectmulti-taskingпараллельно
Hubs: PHP
Total votes 47: ↑44 and ↓3 +41
Views4.2K

Popular right now

Middle PHP-Developer
from 120,000 to 150,000 ₽DKLINEСанкт-Петербург
PHP-разработчик (Middle)
from 130,000 to 170,000 ₽Laptop.ruМоскваRemote job
PHP-разработчик
from 150,000 to 190,000 ₽Laptop.ruМоскваRemote job
PHP Developer
from 120,000 to 150,000 ₽Группа проектов М1-shopRemote job
PHP разработчик
from 150,000 ₽Bash TodayRemote job