Эмуляция многопоточности в PHP, применяем stream-функции

Опубликовано: 06.09.2018

Это третья статья об эмуляции многопоточности в языке PHP. Во второй статье мы применили СURL библиотеку , а теперь будем использовать stream-функции для нашей задачи.

Перевод документации:

Streams (потоки) введены в PHP, начиная с версии 4.3.0 для обобщения работы с сетью, файлами, сжатием данных, а также иными процессами, которые пользуются одним набором функций. Простое определение потока – объект ресурса, обладающий "потокообразным" поведением, из которого можно читать, в который можно записывать и внутри которого возможно перемещаться.

Функции stream_socket являются частью streams и используя их, мы можем устанавливать соединения и писать/читать данные, не ждя выполнения предыдущей операции.

У нас есть массив с URL-адресами и наша задача - непоследовательно получить их содержимое. Изучим код, решающий поставленную задачу:

// URL-адреса, содержимое которых нужно получить $pages = array('www.google.ru', 'www.yahoo.com', 'www.yandex.ru', 'www.rambler.ru'); $read_tasks = array(); // задачи чтения $write_tasks = array(); // задачи записи $results = array(); // результаты foreach ($pages as $page) { // открываем сокет для URL $sh = stream_socket_client($page.':80', $errno, $errstr, 10, STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT); if (!$sh) continue; // добавляем в массив задач для записи $write_tasks[$page] = $sh; } while ($write_tasks || $read_tasks) { // массив для сокетов, имеющих возможность для чтения $read_tasks_ = $read_tasks; // массив для сокетов, имеющих возможность для записи $write_tasks_ = $write_tasks; // ждем ответ из сокетов $n = stream_select($read_tasks_, $write_tasks_, $e=null, 10); if ($n > 0) { // доступные для записи сокеты foreach ($write_tasks_ as $sh) { // ищем URL страницы по дескриптору сокета в массиве задач записи $page = array_search($sh, $write_tasks); // удаляем из массива задач записи unset($write_tasks[$page]); // добавляем в массив задач чтения $read_tasks[$page] = $sh; // прописываем заголовки http $headers = "GET / HTTP/1.0\r\n"; $headers .= "Host: ".$page."\r\n"; $headers .= "User-Agent: Mozilla/5.0 (Windows NT 5.1; rv:12.0) Gecko/20100101 Firefox/12.0\r\n"; $headers .= "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8\r\n"; $headers .= "Accept-Language: ru,en-us;q=0.7,en;q=0.3\r\n"; $headers .= "Accept-Encoding: gzip,deflate\r\n"; $headers .= "Accept-Charset: windows-1251,utf-8;q=0.7,*;q=0.7\r\n"; $headers .= "\r\n"; // записываем в сокет if (fwrite($sh, $headers) === false) fclose($sh); } // сокеты, доступные для чтения foreach ($read_tasks_ as $sh) { // ищем URL страницы по дескриптору сокета в массиве задач чтения $page = array_search($sh, $read_tasks); if (!$page) continue; // читаем результат из сокета $result = ''; while ($r = fread($sh, 8192)) $result .= $r; // закрываем сокет fclose($sh); // удаляем из массива задач чтения unset($read_tasks[$page]); // заносим полученное содержимое в массив результатов $results[$page] = $result; } } else { break; } }

В начале инициализируем массивы: $read_tasks – сюда заноситься сокеты, из которых можно читать данные, $write_tasks – массив сокетов, которые готовы к записи, и массив $results – для сохранения полученного контента с нужных нам URL-адресов.

Далее в цикле применяем функцию stream_socket_client открывая для каждого URL сокет. Флаг STREAM_CLIENT_ASYNC_CONNECT указывает, что следующее соединение надо открывать асинхронно (не ждя завершения открытия предыдущего), а флаг STREAM_CLIENT_CONNECT нужен для создания клиентского соединения. В массиве $write_tasks сохраняются дескрипторы сокетов.

Затем, в главном цикле массивам $read_tasks_ и $write_tasks_, которые передаются функции stream_select, присваиваются массивы, которые хранят сокеты для чтения и записи. stream_select функция следит за существованием данных в потоке, а также принимает следующие параметры:

Массив дескрипторов, ожидающих завершение операции чтения; Массив дескрипторов, ожидающих завершения операции записи; Для ситуации поступления внеполосных данных - "out-of-band data"; Таймаут. Массивы по указателям отправляются в функцию, и после возвращения функцией числа > 0, в них будут сохранены сокеты, готовые к дальнейшим манипуляциям.

Далее ищем, какие сокеты доступны для записи данных. Если нужно ещё что-то отправить перед окончательным ответом - удаляем текущий дескриптор из массива для записи и записываем его в массив для чтения. Затем, отправляем нужные http-заголовки для получения содержимого.

Далее, в foreach-цикле смотрим сокеты доступные для чтения, сохраняем содержимое в строку $result, удаляем дескриптор из массива для чтения и закрываем сокет. Содержимое строки $result сохраняем в массив $results. Выполняем эти операции, пока у нас есть открытые сокеты.

После исполнения скрипта полученный контент располагается в массиве $results.

Итак, мы рассмотрели решение нашей задачи с приминением функций stream_socket, в следующей статье рассмотрим асинхронные сокеты для эмуляции многопоточности в PHP.

IRC (Internet Relay Chat)
rss