Do not speak Portuguese? Translate this site with Google or Bing Translator
Consume RabbitMQ queues with Laravel jobs

Posted on: October 24, 2024 10:56 AM

Posted by: Renato

Categories: RabbitMQ

Views: 209

title layout
Consume RabbitMQ queues with Laravel jobs
post

![RabbitMQ + Laravel]({{ 'assets/images/posts/consume-rabbitmq-queues-with-laravel-jobs/header.png' | relative_url }})

RabbitMQ, a powerful message broker, is widely used for building scalable and distributed applications. In this guide, we’ll explore the steps to consume RabbitMQ queues effectively within Laravel jobs.

Setting Up RabbitMQ in Laravel

Installation of Required Packages

Before diving into consuming RabbitMQ queues, ensure you have the necessary packages installed in your Laravel application. The php-amqplib/php-amqplib package allows Laravel to communicate with RabbitMQ. Install it using Composer:

composer require php-amqplib/php-amqplib

Configuration

Next, let's create a new configuration file called rabbitmq.php in the config folder to hold all the configs for the connection with the broker:

<?php

return [
    'host' => env('RABBITMQ_HOST'),
    'vhost' => env('RABBITMQ_VHOST'),
    'port' => env('RABBITMQ_PORT'),
    'user' => env('RABBITMQ_USER'),
    'password' => env('RABBITMQ_PASSWORD'),
    'options' => [
        'heartbeat' => 60,
        'connection_timeout' => 10, // Set connection timeout in seconds
        'read_write_timeout' => 60 * 2, // Set read/write timeout in seconds
        'channel_rpc_timeout' => 60 * 2, // Set RPC timeout in seconds
    ],
];

Ensure you have the corresponding environment variables set in your .env file:

RABBITMQ_HOST=your_host
RABBITMQ_VHOST=your_vhost
RABBITMQ_PORT=your_port
RABBITMQ_USER=your_user
RABBITMQ_PASSWORD=your_password

Creating the consumer

Generate a new job using the Laravel Artisan CLI:

php artisan make:job ProcessRabbitMQMessage

Open the generated ProcessRabbitMQMessage job file (app/Jobs/ProcessRabbitMQMessage.php). This job will handle the consumption of messages from the RabbitMQ queue:

<?php

namespace App\Jobs;

use Carbon\Carbon;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ProcessRabbitMQMessage extends Job
{
    public int $timeout = 60 * 60; // 1 hour
    private Carbon $startedAt;

    public function handle(): void
    {
        $connection = new AMQPStreamConnection(
            host: config("rabbitmq.host"),
            port: config("rabbitmq.port"),
            user: config("rabbitmq.user"),
            password: config("rabbitmq.password"),
            vhost: config("rabbitmq.vhost"),
            connection_timeout: config("rabbitmq.options.connection_timeout"),
            read_write_timeout: config("rabbitmq.options.read_write_timeout"),
            heartbeat: config("rabbitmq.options.heartbeat"),
            channel_rpc_timeout: config("rabbitmq.options.channel_rpc_timeout")
        );

        $channel = $connection->channel();

        $channel->basic_consume('your_queue_name', '', false, false, false, false, function (AMQPMessage $message) {
            $this->processMessage($message);
            $message->ack();
        });

        $this->startedAt = now();

        while ($channel->is_consuming()) {
            if ($this->isTimeoutReached()) {
                // These two steps are optional
                $this->cleanup();
                $this->notify();
                break;
            }

            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }

    private function processMessage(AMQPMessage $message): void
    {
        // YOUR CODE HERE
    }

    private function cleanup(): void
    {
        // YOUR CODE HERE
    }

    private function notify(): void
    {
        // YOUR CODE HERE
    }

    private function isTimeoutReached(): bool
    {
        $elapsedTime = $this->startedAt->diffInSeconds(now());

        // Adds 1 minute from the elapse time, so you have time to perform cleanup and notify if necessary.
        // This value is arbitrary and can be changed according to your needs.
        $elapsedTime += 60;

        return $elapsedTime >= $this->timeout;
    }
}

Since we want to continuously consume the queue, let's update the scheduler to dispatch the job hourly. Add the following line to the method schedule on the file App\Console\Kernel:

$schedule->job(new ProcessRabbitMQMessage)->hourly();

Creating a basic producer

Generate a new job using the Laravel Artisan CLI:

php artisan make:job RabbitMQMessageProducer

Open the generated RabbitMQMessageProducer job file (app/Jobs/RabbitMQMessageProducer.php). This job will handle the logic to send messages to RabbitMQ queue:

<?php

namespace App\Jobs;

use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class RabbitMQMessageProducer extends Job
{
    public function handle(): void
    {
        // Setup connection
        $connection = new AMQPStreamConnection(
            host: config("rabbitmq.host"),
            port: config("rabbitmq.port"),
            user: config("rabbitmq.user"),
            password: config("rabbitmq.password"),
            vhost: config("rabbitmq.vhost"),
            connection_timeout: config("rabbitmq.options.connection_timeout"),
            read_write_timeout: config("rabbitmq.options.read_write_timeout"),
            heartbeat: config("rabbitmq.options.heartbeat"),
            channel_rpc_timeout: config("rabbitmq.options.channel_rpc_timeout")
        );

        $channel = $connection->channel();

        // Declare a queue
        $channel->queue_declare('your_queue_name', false, false, false, false);

        $messageBody = 'YOUR MESSAGE';
        $message = new AMQPMessage($messageBody);

        // Publish the message to the queue
        $channel->basic_publish($message, '', 'your_queue_name');

        // Close the channel and connection
        $channel->close();
        $connection->close();
    }
}

Running the jobs

Now that we are both consumer and producer, we can run the jobs to see if everything is working as expected. Let's first dispatch the job RabbitMQMessageProducer a couple of times so we have some messages to process; we are going to use Laravel Tinker to dispatch the job:

php artisan tinker
RabbitMQMessageProducer::dispatchSync();
RabbitMQMessageProducer::dispatchSync();
RabbitMQMessageProducer::dispatchSync();
RabbitMQMessageProducer::dispatchSync();

Now that we dispatched the job four times, let's run the consumer and check if the messages were successfully processed:

php artisan tinker
ProcessRabbitMQMessage::dispatchSync()

If everything goes well, you should see the success exit code on your console.

Retries and Error Handling

Implement retry mechanisms and error handling within your job logic. If a job fails due to a timeout, have mechanisms to retry or log the failure for further investigation.

Handling timeouts while consuming RabbitMQ queues involves a combination of RabbitMQ connection configurations, graceful job handling, and monitoring strategies. It's essential to balance these approaches to ensure efficient queue consumption without causing interruptions or resource constraints in your application.

Conclusion

In summary, leveraging RabbitMQ queues in Laravel for distributed applications involves:

  • Setting up RabbitMQ in Laravel by installing necessary packages and configuring connections.
  • Creating dedicated jobs for message consumption and transmission.
  • Utilizing Laravel's scheduler to orchestrate queue consumption.
  • Implementing retry mechanisms and error handling within job logic for robust message processing.
  • Balancing connection configurations and monitoring strategies for efficient queue consumption without straining application resources.
  • This approach enables seamless communication, efficient message handling, and enhanced system reliability in handling timeouts and failures while consuming RabbitMQ queues in Laravel.

That's all for today 🎉🎉🎉


1

Share

Donate to Site


About Author

Renato

Developer

Add a Comment
Comments 0 Comments

No comments yet! Be the first to comment

Blog Search


Categories

OUTROS (16) Variados (109) PHP (133) Laravel (173) Black Hat (3) front-end (29) linux (114) postgresql (40) Docker (28) rest (5) soap (1) webservice (6) October (1) CMS (2) node (7) backend (13) ubuntu (56) devops (25) nodejs (5) npm (3) nvm (1) git (9) firefox (1) react (7) reactnative (5) collections (1) javascript (7) reactjs (8) yarn (0) adb (1) Solid (2) blade (3) models (1) controllers (0) log (1) html (2) hardware (3) aws (14) Transcribe (2) transcription (1) google (4) ibm (1) nuance (1) PHP Swoole (5) mysql (31) macox (4) flutter (1) symfony (1) cor (1) colors (2) homeOffice (2) jobs (3) imagick (2) ec2 (1) sw (1) websocket (2) markdown (1) ckeditor (1) tecnologia (14) faceapp (1) eloquent (14) query (4) sql (40) ddd (3) nginx (9) apache (4) certbot (1) lets-encrypt (3) debian (12) liquid (1) magento (2) ruby (1) LETSENCRYPT (1) Fibonacci (1) wine (1) transaction (1) pendrive (1) boot (1) usb (1) prf (1) policia (2) federal (1) lucena (1) mongodb (4) paypal (1) payment (1) zend (1) vim (4) ciencia (6) js (1) nosql (1) java (1) JasperReports (1) phpjasper (1) covid19 (1) saude (1) athena (1) cinnamon (1) phpunit (2) binaural (1) mysqli (3) database (42) windows (6) vala (1) json (2) oracle (1) mariadb (4) dev (12) webdev (24) s3 (4) storage (1) kitematic (1) gnome (2) web (2) intel (3) piada (1) cron (2) dba (18) lumen (1) ffmpeg (2) android (2) aplicativo (1) fedora (2) shell (4) bash (3) script (3) lider (1) htm (1) csv (1) dropbox (1) db (3) combustivel (2) haru (1) presenter (1) gasolina (1) MeioAmbiente (1) Grunt (1) biologia (1) programming (22) performance (3) brain (1) smartphones (1) telefonia (1) privacidade (1) opensource (3) microg (1) iode (1) ssh (3) zsh (2) terminal (3) dracula (1) spaceship (1) mac (2) idiomas (1) laptop (2) developer (37) api (5) data (1) matematica (1) seguranca (2) 100DaysOfCode (9) hotfix (1) documentation (1) laravelphp (10) RabbitMQ (3) Elasticsearch (1) redis (2) Raspberry (4) Padrao de design (4) JQuery (1) angularjs (4) Dicas (44) Kubernetes (3) vscode (3) backup (1) angular (3) servers (2) pipelines (1) AppSec (1) DevSecOps (4) rust (1) RustLang (1) Mozilla (1) algoritimo (1) sqlite (1) Passport (2) jwt (5) security (2) translate (1) kube (2) iot (1) politica (2) bolsonaro (1) flow (1) podcast (1) Brasil (1) containers (3) traefik (1) networking (1) host (1) POO (2) microservices (2) bug (1) cqrs (1) arquitetura (3) Architecture (4) sail (3) militar (1) artigo (1) economia (1) forcas armadas (1) ffaa (1) autenticacao (2) autorizacao (2) authentication (4) authorization (3) NoCookies (1) wsl (4) memcached (1) macos (2) unix (2) kali-linux (1) linux-tools (5) apple (1) noticias (2) composer (1) rancher (1) k8s (1) escopos (1) orm (1) jenkins (4) github (5) gitlab (3) queue (1) Passwordless (1) sonarqube (1) phpswoole (1) laraveloctane (1) Swoole (1) Swoole (1) octane (1) Structurizr (1) Diagramas (1) c4 (1) c4-models (1) compactar (1) compression (1) messaging (1) restfull (1) eventdrive (1) services (1) http (1) Monolith (1) microservice (1) historia (1) educacao (1) cavalotroia (1) OOD (0) odd (1) chatgpt (1) openai (3) vicuna (1) llama (1) gpt (1) transformers (1) pytorch (1) tensorflow (1) akitando (1) ia (1) nvidia (1) agi (1) guard (1) multiple_authen (2) rpi (1) auth (1) auth (1) livros (2) ElonMusk (2) Oh My Zsh (1) Manjaro (1) BigLinux (2) ArchLinux (1) Migration (1) Error (1) Monitor (1) Filament (1) LaravelFilament (1) replication (1) phpfpm (1) cache (1) vpn (1) l2tp (1) zorin-os (1) optimization (1) scheduling (1) monitoring (2) linkedin (1) community (1) inteligencia-artificial (2) wsl2 (1) maps (1) API_KEY_GOOGLE_MAPS (1) repmgr (1) altadisponibilidade (1) banco (1) modelagemdedados (1) inteligenciadedados (4) governancadedados (1) bancodedados (2) Observability (1) picpay (1) ecommerce (1) Curisidades (1) Samurai (1) KubeCon (1) GitOps (1) Axios (1) Fetch (1) Deepin (1) vue (4) nuxt (1) PKCE (1) Oauth2 (2) webhook (1) TypeScript (1) tailwind (1) gource (2)

New Articles



Get Latest Updates by Email