Running Concurrent Tasks in PHP

Published by Azat Akmyradov

Recently, I worked on a small project where I needed to run concurrent requests to a web server. There were several libraries available to accomplish this, such as AmpPHP, GuzzleHttp, and Fork. All of them turned out to be great solutions for the problem at hand. However, Fork caught my attention due to its lightweight and easy implementation. So, I decided to see how it worked, and it's a pretty simple package to achieve concurrency in PHP. In this article, I want to explain how to accomplish what Fork does using vanilla PHP. We'll use the same techniques that Fork uses to understand what's happening under the hood.

Forking process

To achieve concurrency, we're going to use PHP's pcntl_fork() function. If you want to learn more about it, I highly suggest reading about it on php.net.

The pcntl_fork() function creates a child process that differs from the parent process only in its PID and PPID. Please refer to your system's fork(2) man page for specific details on how fork works on your system.

Here's a simple example of using pcntl_fork() from php.net:

<?php $pid = pcntl_fork(); if ($pid == -1) { die('could not fork'); } else if ($pid) { // we are the parent pcntl_wait($status); //Protect against Zombie children } else { // we are the child }

This code creates a fork of the parent process to create a child process and checks if it was successful or not. In our little project, we're going to create two simple classes called Task.php and Tasks.php to handle batches of tasks that will run concurrently.

Here's the code for Task.php:

<?php class Task { protected string $task; protected $callback; public function __construct(string $task, callable $callback) { $this->task = $task; $this->callback = $callback; } public static function create(string $task, callable $callback): Task { return new static($task, $callback); } public function start() { ($this->callback)(); } public function copy($times): array { $tasks = []; for ($i = 0; $i < $times; $i++) { $tasks[] = $this; } return $tasks; } }

In this code, we provide the user with the option to create a Task with a name and a callback. Once the object is created, you can start the task using $task->start(). The $task->copy() method copies the same exact task and returns an array of the same task so that we can repeat the same task multiple times.

Now, let's handle the collection of tasks to run them concurrently in Tasks.php:

<?php class Tasks { protected array $tasks; protected int $concurrently = 0; public function __construct($tasks) { $this->tasks = $tasks; } public static function batch($tasks): Tasks { return new static($tasks); } public function concurrently($value = 5): Tasks { if (!function_exists('pcntl_fork')) die('PCNTL functions not available on this PHP installation'); $this->concurrently = $value; return $this; } public function start() { if ($this->concurrently < 1) { foreach ($this->tasks as $task) { $task->start(); } return; } for ($start = 0; $start < count($this->tasks); $start += $this->concurrently) { $current = array_slice($this->tasks, $start, $this->concurrently); foreach ($current as $task) { $pid = pcntl_fork(); if (!$pid) { $task->start(); exit(); } } while (pcntl_waitpid(0, $status) != -1) { $status = pcntl_wexitstatus($status); } } } }

This is also a basic class that allows the user to initiate a batch of tasks concurrently. Now, let's see the result of what we get. To try the example, we're going to use webhook.site to see if the tasks are happening concurrently or not. Here's the code for index.php:

<?php require 'Task.php'; require 'Tasks.php'; function makeRequest() { $url = "https://webhook.site/address-here"; $curl = curl_init($url); curl_setopt($curl, CURLOPT_URL, $url); curl_setopt($curl, CURLOPT_RETURNTRANSFER, true); $resp = curl_exec($curl); curl_close($curl); } $tasks = Task::create('request', function () { makeRequest(); sleep(1); })->copy(5); Tasks::batch($tasks) ->concurrently(5) ->start();

We're creating a task that makes a request to webhook.site every second. By using the Tasks class, we can initiate a batch of tasks concurrently. To test how concurrency works, you can try omitting the concurrently() method from Tasks::batch, and you'll see on webhook.site that requests come every other second instead of at the same time.