Comment enregistrer les messages RabbitMQ dans PostgreSQL avec Python ?

Database
Dec. 24, 2023
photo_elie
Auteur
Elie Terrien
Une manière de rendre vos messages persistants est de les sauvegarder dans PostgreSQL

RabbitMQ, un puissant moyen de communication entre applications et objets connectés, facilite le transfert efficace de messages, mais il ne propose pas de mécanisme intégré pour leur stockage à long terme. Pour surmonter cette limitation, l'intégration avec une base de données robuste comme PostgreSQL s'avère essentielle.

Par exemple, lorsque vous exploitez les protocoles AMQP ou MQTT pour la transmission de données depuis vos objets connectés, tels que la surveillance de la température d'une pièce ou la collecte de la consommation d'énergie d'un appareil, vous êtes souvent confronté au besoin impérieux de conserver ces informations pour une exploitation future. RabbitMQ ne fournit pas nativement cette fonctionnalité de persistance des données, mais en synchronisant judicieusement votre flux de messages avec PostgreSQL, vous pouvez créer une solution complète qui permet non seulement la communication en temps réel, mais aussi la sauvegarde durable des données.

Dans ce tutoriel, nous allons vous montrer comment enregistrer les messages RabbitMQ dans PostgreSQL avec Python.

Prérequis pour enregistrer les messages RabbitMQ dans PostgreSQL

Pour suivre ce tutoriel, vous devez disposer des éléments suivants :

un serveur RabbitMQ

un serveur PostgreSQL

Lancer le serveur RabbitMQ et le serveur PostgreSQL

Dans notre exemple, nous allons utiliser Docker pour lancer RabbitMQ et PostgreSQL. Ci-dessous, vous trouverez le fichier docker-compose.yml qui définit les services RabbitMQ et PostgreSQL.

version: "3"

services:
  rabbitmq:
    image: rabbitmq:3.12-management-alpine
    container_name: rabbit_mq_to_db
    environment:
      - RABBITMQ_DEFAULT_USER=user
      - RABBITMQ_DEFAULT_PASS=password
      - RABBITMQ_DEFAULT_VHOST=default_vhost
    ports:
      - 5672:5672
      - 15672:15672

  postgres:
    image: postgres:16-alpine
    container_name: postgres_db
    hostname: postgres
    restart: always
    environment:
      - POSTGRES_DB=postgres_db
      - POSTGRES_USER=postgres_user
      - POSTGRES_PASSWORD=password
    volumes:
      - ./docker-entrypoint-initdb.d/:/docker-entrypoint-initdb.d
    ports:
      - "5432:5432"

Description du fichier docker-compose.yml :
Ce fichier Docker Compose définit deux services, RabbitMQ et PostgreSQL, ainsi que leurs configurations respectives pour créer et orchestrer des conteneurs Docker interagissant ensemble. Voici une explication détaillée du contenu :


Service RabbitMQ:

  • rabbitmq: Nom du service.
  • image: Utilise l'image RabbitMQ version 3.12 avec le plugin management activé qui permet d'accéder à l'interface d'administration de RabbitMQ à l'adresse http://localhost:15672 et de s'y connecter avec les identifiants user et password.
  • container_name: Nom du conteneur créé pour ce service.
  • environment: Définit les variables d'environnement pour RabbitMQ, y compris le nom d'utilisateur, le mot de passe, et le vhost (espace virtuel).
  • ports: Mappe les ports 5672 et 15672 du conteneur RabbitMQ sur les mêmes ports de l'hôte.

Service PostgreSQL:

  • postgres: Nom du service.
  • image: Utilise l'image PostgreSQL version 16 avec Alpine Linux.
  • container_name: Nom du conteneur créé pour ce service.
  • hostname: Nom d'hôte du conteneur PostgreSQL.
  • restart: Politique de redémarrage automatique du conteneur (définie sur "always").
  • environment: Définit les variables d'environnement pour PostgreSQL, y compris le nom de la base de données, le nom d'utilisateur et le mot de passe.
  • volumes: Montre le répertoire local ./docker-entrypoint-initdb.d/ dans le répertoire d'initialisation de la base de données du conteneur PostgreSQL, permettant d'exécuter des scripts SQL d'initialisation.
  • ports: Mappe le port 5432 du conteneur PostgreSQL sur le même port de l'hôte.

Pour lancer RabbitMQ et PostgreSQL, exécutez la commande suivante :

docker-compose up

Créer les scripts Python pour enregistrer les messages RabbitMQ dans PostgreSQL

Maintenant que RabbitMQ et PostgreSQL sont lancés, nous allons créer les scripts Python. Nous aurons besoin de deux scripts :

un publisher RabbitMQ pour envoyer des messages avec Python

un consumer RabbitMQ pour recevoir les messages avec Python et les enregistrer dans PostgreSQL

Nous allons également avoir besoin de la librairie pika qui est une implémentation Python du protocole AMQP 0-9-1 pour se connecter à RabbitMQ.

Enfin, nous aurons besoin de la librairie psycopg2 pour se connecter à PostgreSQL et interagir avec la base de données.

Un publisher RabbitMQ pour envoyer des messages avec Python

Le publisher est un script Python qui va se connecter à RabbitMQ et envoyer des messages dans une queue.

Pour se connecter à RabbitMQ, nous allons utiliser la librairie pika. Pour l'installer, exécutez la commande suivante :

pip install pika

Voici le code du publisher :

import random

import pika
from time import sleep

# Connexion à RabbitMQ
url = 'amqp://user:password@localhost:5672/default_vhost'
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel()

# Création de la queue 'temperature'
channel.queue_declare('temperature')

# Création de la route 'temperature_routing_key' qui permet de lier la queue 'temperature' à l'échange 'amq.direct'
channel.queue_bind('temperature', 'amq.direct', 'temperature_routing_key')
while True:
    sleep(3)
    # Envoi d'un message dans la queue 'temperature'
    channel.basic_publish('amq.direct', 'temperature_routing_key', body=str(random.uniform(0, 100)))

L'url de connexion à RabbitMQ est amqp://user:password@localhost:5672/default_vhost où :

user est le nom d'utilisateur

password est le mot de passe

localhost est l'adresse IP du serveur RabbitMQ

5672 est le port de RabbitMQ

default_vhost est le vhost (espace virtuel) de RabbitMQ

Un consumer RabbitMQ pour recevoir les messages avec Python et les enregistrer dans PostgreSQL

Le consumer est un script Python qui va se connecter à RabbitMQ, lire les messages d'une queue et les enregistrer dans PostgreSQL.

Voici un exemple simple de consumer qui va lire les messages de la queue temperature et les enregistrer dans la table temperature de PostgreSQL :

import pika
import psycopg2

# Connexion à la base de données PostgreSQL
connection_sql = psycopg2.connect(database="postgres_db", user="postgres_user", password="password", host="localhost", port="5432")
cursor = connection_sql.cursor()

# Définition de la fonction callback qui sera appelée lorsqu'un message sera reçu dans la queue 'temperature'
def callback(ch, method, properties, body):
    # Conversion du message en string
    body = body.decode()
    # Insertion du message dans la table 'temperature'
    cursor.execute("INSERT INTO temperature (value) VALUES (%s)", (body,))
    connection_sql.commit()

# Connexion à RabbitMQ
url = 'amqp://user:password@localhost:5672/default_vhost'
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.basic_consume('temperature', callback, auto_ack=True)
channel.start_consuming()
channel.close()
connection.close()

Code source complet en Python

Vous pouvez retrouver le code source complet de ce tutoriel sur Github.

Il est un peu différent de ce qui est présenté dans ce tutoriel, car il contient le fichier de création de la table temperature dans PostgreSQL.

Et utilise des classes pour se connecter à PostgreSQL et pour le consumer RabbitMQ.

Return to blog

Recevez notre newsletter