Asynchronous communication between Node.js applications using RabbitMQ

Photo by Modestas Urbonas / Unsplash

In the previous post, we saw how to connect Install RabbitMQ on the server and enable the Web administration interface. From this UI, we can browse available queues, messages published in a queue, exchanges, etc...

AMQP is the protocol used to allow the connection between many services. To support most programming languages, connectors will enable you to interact with the server by exchanging messages with others applications.

We will use the Node.js package of the AMQP to communicate between two Node.js applications.

The use case

RabbitMQ is great for event-driven applications, which will act as the bridge to exchange information between applications.

Let's say we are building a website application where the core back-end communicates with a notification service responsible for emailing users. When a user register, we must send a confirmation email with a confirmation code. The email sending should not block the response to the client.

The picture below shows the system architecture.

The architecture of two Node.js applications communicating through RabbitMQ

In the event-driven architecture, we use the term producer and consumer to designate the application that sends the message into the queue and the one that reads the message from the queue.

Prerequisites

To follow this post, you will need to install this tool on your computer:

  • Node.js 16 or higher - download link
  • A Node.js package manager such as NPM or Yarn

Set up the project

The first project will be the producer responsible for sending messages in the queue.

We will use the Node.js starter project we built in this post; the project contains a branch called express that comes with the Express framework. We will use it further in this tutorial.

Run the commands below:


git clone https://github.com/tericcabrel/node-ts-starter.git -b express  app-producer

cd app-producer

cp .env.example .env

# Update the environment variable if needed
nano .env

yarn install

yarn start

Navigate to the URL http://localhost:4500 to ensure everything works.

Connect to the RabbitMQ server

From a Node.js application, to interact with the RabbitMQ server, we will use the Node.js package that abstracts the commands to send. We will use the package named amqplib.

Run the command below to install it:


yarn add amqplib
yarn add -D @types/amqplib

We connect to the server using the connection URI that has the following format:


amqp://<username>:<password>@<host>:<port>

If the port number is not provided, the default value is 5672. Check out this link for more advanced URI connection options.

You can also use a JSON object with the connection options:


{
  protocol: 'amqp',
  hostname: 'localhost',
  port: 5672,
  username: 'guest',
  password: 'guest',
  locale: 'en_US',
  frameMax: 0,
  heartbeat: 0,
  vhost: '/',
}

Based on the credentials created in the previous tutorial, our connection URI is amqp://admin:MyStrong-P4ssw0rd$@rabbitmq.tericcabrel.com

Install RabbitMQ on a Ubuntu Server 22.04
In this post, we will see how to install and manage RabbitMQ on Ubuntu 22.04; enable the Web admin UI that is accessible from a subdomain

Create a file named rabbitmq-connection.ts and add the code below:


import amqplib from 'amqplib';

export const connectToRabbitMQ = async () => {
  return amqplib.connect('amqp://admin:MyStrong-P4ssw0rd$@rabbitmq.tericcabrel.com');
};

Update the file src/index.ts to make the connection to the RabbitMQ server when the server starts.


import { Connection } from 'amqplib';
import { connectToRabbitMQ } from './rabbitmq-connection';

let rabbitConnection: Connection;

// Express server initialization code here...

app.listen(PORT, async () => {
  rabbitConnection = await connectToRabbitMQ();

  console.log(`Application started on URL ${HOST}:${PORT} 🎉`);
});

Run yarn start to launch the application, check out the credentials if there is a connection error.

Send a message in the queue

We need two things to send a message in the queue:

  • The queue name
  • The message to send in the queue represented as a string

We want to send in the queue the message we will be using to send an email to the user. Below is the shape of the JSON object to send,


{
    "fullName": string,
    "emailAddress": string,
    "confirmationCode": number
}

The queue name will be user-registration.

Update the file src/index.ts to add the code below:


app.post('/register', async (req, res) => {
  const { email, name } = req.body;

  // TODO save register in the database

  const QUEUE_NAME = 'user-registration';
  const messageData = {
    fullName: name,
    emailAddress: email,
    confirmationCode: Math.floor(Math.random() * 900000) + 100000,
  };
  const messageDataAsString = JSON.stringify(messageData);

  const channel = await rabbitConnection.createChannel();

  channel.sendToQueue(QUEUE_NAME, Buffer.from(messageDataAsString));

  return res.json({ message: 'User registered successfully' });
});

Before running this code, we will create the queue using the Web administration interface.

Create a queue named "user-registration"

Start the application and open an HTTP client to send a POST request to http://localhost:4500/register

Send a POST request to register a user.

From the Web administration interface, we can browse messages available in the queue user-registration.

Browse messages published in the queue.

These messages are waiting to be consumed by another application, usually called the consumer.

Read a message in the queue

Let's set up a new Node.js project from our boilerplate starter, but we don't Express in this case. We will name the project app-consumer.


git clone https://github.com/tericcabrel/node-ts-starter.git app-consumer

cd app-consumer

yarn install

yarn start

Let's install the AMQP Node.js package.


yarn add amqplib
yarn add -D @types/amqplib

Create a file src/rabbitmq-connection.ts and add the code below:

import amqplib from 'amqplib';

export const connectToRabbitMQ = async () => {
  return amqplib.connect({
    username: 'admin',
    password: 'MyStrong-P4ssw0rd$',
    port: 5672,
    hostname: 'rabbitmq.tericcabrel.com',
    vhost: '/',
  });
};

Instead of connection URI, we used the JSON properties to define the connection options just to see in action. Using the connection string will still work, such as the producer.

Replace the content of the file src/index.ts with the code below:


import { connectToRabbitMQ } from './rabbitmq-connection';

(async () => {
  const rabbitConnection = await connectToRabbitMQ();

  console.log('Successfully connected to RabbitMQ server!');
})();

Run the command yarn start to launch the application and ensure there is no connection error printed in the console.

In the Web administration interface, you can view the list of the applications connected to the RabbitMQ server by clicking on the tab "Connections"

View applications connected to the RabbitMQ server

We see two connections, one from the app-producer and one from the app-consumer we use the username to distinguish the application, but I used the same credentials for both applications. Keep in mind to create a new user for each application.

Let's update the code of the file src/index.ts to read messages from the queue.


import { connectToRabbitMQ } from './rabbitmq-connection';

(async () => {
  const QUEUE_NAME = 'user-registration';

  const rabbitConnection = await connectToRabbitMQ();

  console.log('Successfully connected to RabbitMQ server!');

  const channel = await rabbitConnection.createChannel();

  await channel.assertQueue(QUEUE_NAME);

  await channel.consume(QUEUE_NAME, async (message) => {
    if (!message) {
      console.error('Consumer cancelled by server!');
      return;
    }

    const data = JSON.parse(message.content.toString());

    console.table(data);

    // TODO send an email using the data
  });
})();


We create a channel, assert that the queue exists, and finally consume messages from the queue and perform some action with the message received.

We will not cover the email sending here, but I wrote a post on how to do that in Node.js.

Send email in Node.js using Handlebars and Amazon SES
In this post, we will see how to set up Amazon SES and send emails through SMTP in Node.js using Handlebars and Nodemailer.

Let's test it by running the app-consumer with the command yarn start.

Send a request from Postman to the app-producer, and you will see the output in the console of the app-consumer.

0:00
/
The producer and Consumer applications exchange messages through the RabbitMQ queue.

Message Queue Acknowledgement

In the demo, the producer sent two messages in the queue, and the consumer application received them as expected. Now, restart the consumer application; we will see the same messages printed in the console as if they were just published.

Previously received messages are being received again in the consumer application.

This happens because when the message is received by the app consumer, the Rabbit MQ server doesn't know if the message has been processed successfully, so it doesn't delete the message in the queue but changes the state from ready to unacked so other applications listening to this queue cannot read this message.

To fix this, the consumer application must send a confirmation to the RabbitMQ server. We can do this with a single line of code:


channel.ack(message);

We usually place this instruction in last, but it can vary depending on your use case (You explicitly republish the message in the queue when there is an error).

This is the complete code of the file src/index.ts


import { connectToRabbitMQ } from './rabbitmq-connection';

(async () => {
  const QUEUE_NAME = 'user-registration';

  const rabbitConnection = await connectToRabbitMQ();

  console.log('Successfully connected to RabbitMQ server!');

  const channel = await rabbitConnection.createChannel();

  await channel.assertQueue(QUEUE_NAME);

  await channel.consume(QUEUE_NAME, async (message) => {
    if (!message) {
      console.error('Consumer cancelled by server!');
      return;
    }

    const data = JSON.parse(message.content.toString());

    console.table(data);

    // TODO send an email using the data

    channel.ack(message);
  });
})();

In the same way, if you want to unacknowledge a message so that it can be consumed by other applications listening to the queue, you can use the code below:


channel.reject(message, true);

The second argument indicates whether we want to re-queue the message or not.

Wrap up

RabbitMQ made it easy for two applications to communicate asynchronously. We used the Node.js package for the AMQP to make two Node.js applications communicate, and here are the takeaways:

  • The connection options can be provided as a URI or a JSON object.
  • You should manually create the queue in the Web Administration interface before using it in your applications.
  • The producer must send the message as a string.
  • The consumer should always send an acknowledgment message to the server so the message can be deleted from the queue.
  • You can manually reject a message to re-queue it.

You can find the code source on the GitHub repository.

Follow me on Twitter or subscribe to my newsletter to avoid missing the upcoming posts and the tips and tricks I occasionally share.