PHP cURL - потокобезопасный?


Я написал PHP-скрипт, который извлекал данные через libcurl и обрабатывал их. Он работал нормально, но по соображениям производительности я изменил его, чтобы использовать десятки рабочих (потоков). Производительность улучшилась более чем в 50 раз, однако теперь php.exe происходит сбой каждые несколько минут, и указанный модуль неисправности php_curl.dll . У меня есть предыдущий опыт работы с многопоточностью в C, но я вообще не использовал его раньше в php.

Я погуглил и, предположительно, cURL является потокобезопасным (по состоянию на 2001): http://curl.haxx.se/mail/lib-2001-01/0001.html Но я не могу найти никаких упоминаний о том, является ли php_curl потокобезопасным.

В случае, если это имеет значение, я запускаю php из командной строки. Моя настройка - Win7 x64, PHP 5.5.11 Потокобезопасный VC11 x86, PHP pthreads 2.0.4 для PHP 5.5 Потокобезопасный VC11 x86.

Вот некоторый псевдокод, чтобы показать, что я делаю

class MyWorker extends Worker
{
    ...
    public function run()
    {
        ...
        while(1)
        {
            ...
            runCURL();
            ...
            sleep(1);
        }
    }
}

function runCURL()
{
    static $curlHandle = null;
    ...
    if(is_null($curlHandle))
    {
        $curlHandle = curl_init();
        curl_setopt($curlHandle, CURLOPT_RETURNTRANSFER, TRUE);
        curl_setopt($curlHandle, CURLOPT_USERAGENT, "My User Agent String");
    }
    curl_setopt($curlHandle, CURLOPT_URL, "The URL");
    curl_setopt($curlHandle, CURLOPT_POSTFIELDS, $data);
    curl_setopt($curlHandle, CURLOPT_HTTPHEADER, $header);
    curl_setopt($curlHandle, CURLOPT_SSL_VERIFYPEER, false);

    $result = curl_exec($curlHandle);
    ...
}
Author: Matt, 2014-04-27

1 answers

Во-первых, типы resource официально не поддерживаются pthreads; дескриптор curl - это resource, поэтому не следует хранить дескрипторы curl в области объектов объектов pthreads, так как они могут быть повреждены.

Упрощение

pthreads обеспечивает простой способ использования рабочих...

Самый простой способ выполнения среди множества потоков - использовать встроенный класс Pool, предоставляемый pthreads:

Http://php.net/pool

В следующий код демонстрирует, как объединить кучу запросов в несколько фоновых потоков:

<?php
define("LOG", Mutex::create());

function slog($message, $args = []) {
    $args = func_get_args();
    if (($message = array_shift($args))) {
        Mutex::lock(LOG);
        echo vsprintf("{$message}\n", $args);
        Mutex::unlock(LOG);
    }
}

class Request extends Threaded {

    public function __construct($url, $post = []) {
        $this->url = $url;
        $this->post = $post;
    }

    public function run() {
        $curl = curl_init();

        curl_setopt($curl, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($curl, CURLOPT_URL, $this->url);

        if ($this->post) {
            curl_setopt($curl, CURLOPT_POSTFIELDS, $this->post);
        }

        $response = curl_exec($curl);

        slog("%s returned %d bytes", $this->url, strlen($response));
    }

    public function getURL()      { return $this->url;      }
    public function getPost()     { return $this->post;     }

    protected $url;
    protected $post;
}

$max = 100;
$urls   = [];
while (count($urls) < $max) {
    $urls[] = sprintf(
        "http://www.google.co.uk/?q=%s", 
        md5(mt_rand()*count($urls)));
}

$pool = new Pool(4);

foreach ($urls as $url) {
    $pool->submit(new Request($url));
}

$pool->shutdown();

Mutex::destroy(LOG);
?>

Ваша конкретная задача требует, чтобы вы сейчас обработали данные, вы можете либо записать эту функциональность в дизайн, подобный описанному выше... или

Делая это необычным

обещания - это супер причудливая форма параллелизма ...

Обещания соответствуют характеру задачи здесь:

  • Сначала: Сделайте запрос
  • Затем: Процесс ответ

Следующий код показывает, как использовать pthreads/promises для выполнения одного и того же запроса и обработки ответов:

<?php
namespace {
    require_once("vendor/autoload.php");

    use pthreads\PromiseManager;
    use pthreads\Promise;
    use pthreads\Promisable;
    use pthreads\Thenable;

    define("LOG", Mutex::create());

    function slog($message, $args = []) {
        $args = func_get_args();
        if (($message = array_shift($args))) {
            Mutex::lock(LOG);
            echo vsprintf("{$message}\n", $args);
            Mutex::unlock(LOG);
        }
    }

    /* will be used by everything to report errors when they occur */
    trait ErrorManager {
        public function onError(Promisable $promised) {
            slog("Oh noes: %s\n", (string) $promised->getError());
        }
    }

    class Request extends Promisable {
        use ErrorManager;

        public function __construct($url, $post = []) {
            $this->url = $url;
            $this->post = $post;
            $this->done = false;
        }

        public function onFulfill() {
            $curl = curl_init();

            curl_setopt($curl, CURLOPT_RETURNTRANSFER, true);
            curl_setopt($curl, CURLOPT_URL, $this->url);

            if ($this->post) {
                curl_setopt($curl, CURLOPT_POSTFIELDS, $this->post);
            }

            $this->response = curl_exec($curl);
        }

        public function getURL()      { return $this->url;      }
        public function getPost()     { return $this->post;     }
        public function getResponse() { return $this->response; }
        public function setGarbage()  { $this->garbage = true; }
        public function isGarbage()   { return $this->garbage; }

        protected $url;
        protected $post;
        protected $response;
        protected $garbage;
    }

    class Process extends Thenable {
        use ErrorManager;

        public function onFulfilled(Promisable $request) {
            slog("%s returned %d bytes\n",  
                $request->getURL(), strlen($request->getResponse()));
        }
    }

    /* some dummy urls */
    $max = 100;
    $urls   = [];
    while (count($urls) < $max) {
        $urls[] = sprintf(
            "http://www.google.co.uk/?q=%s", 
            md5(mt_rand()*count($urls)));
    }

    /* initialize manager for promises */
    $manager = new PromiseManager(4);

    /* create promises to make and process requests */
    while (@++$id < $max) {
        $promise = new Promise($manager, new Request($urls[$id], []));
        $promise->then(
            new Process($promise));
    }

    /* force the manager to shutdown (fulfilling all promises first) */
    $manager->shutdown();

    /* destroy mutex */
    Mutex::destroy(LOG);
}
?>

Композитор:

{
    "require": {
        "krakjoe/promises": ">=1.0.2"
    }
}

Обратите внимание, что Request практически не изменился, все, что было добавлено, - это место для хранения ответа и средство определения того, являются ли объекты мусором.

Для получения подробной информации о сборе мусора из пулов, которая применима к обоим примерам:

Http://php.net/pool.collect

Функция slog существует только для того, чтобы сделать записанный вывод читаемым

Разъясняя это

pthreads не является новым драйвером PDO ...

Многие люди подходят к использованию pthreads так же, как они подошли бы к использованию нового драйвера PDO - предположим, что он работает так же, как и остальной PHP, и что все будет хорошо.

Все может быть не так хорошо и требует исследований: мы выходим за рамки, при этом некоторые "ограничения" должны быть наложены на архитектуру pthreads, чтобы поддерживайте стабильность, это может иметь некоторые странные побочные эффекты.

В то время как pthreads поставляется с исчерпывающей документацией, которая в основном включает примеры в руководстве по PHP, я пока не могу прикрепить следующий документ в руководстве.

Следующий документ дает вам представление о внутреннем устройстве pthreads, каждый должен прочитать его, он написан для вас.

Https://gist.github.com/krakjoe/6437782

 8
Author: Joe Watkins, 2014-04-27 07:14:29