Comment enregistrer les messages RabbitMQ dans PostgreSQL avec Python ?
Auteur
Elie TerrienRabbitMQ, un puissant moyen de communication entre applications et objets connectés, facilite le transfert efficace de messages, mais il ne propose pas de mécanisme intégré pour leur stockage à long terme. Pour surmonter cette limitation, l'intégration avec une base de données robuste comme PostgreSQL s'avère essentielle.
Par exemple, lorsque vous exploitez les protocoles AMQP ou MQTT pour la transmission de données depuis vos objets connectés, tels que la surveillance de la température d'une pièce ou la collecte de la consommation d'énergie d'un appareil, vous êtes souvent confronté au besoin impérieux de conserver ces informations pour une exploitation future. RabbitMQ ne fournit pas nativement cette fonctionnalité de persistance des données, mais en synchronisant judicieusement votre flux de messages avec PostgreSQL, vous pouvez créer une solution complète qui permet non seulement la communication en temps réel, mais aussi la sauvegarde durable des données.
Dans ce tutoriel, nous allons vous montrer comment enregistrer les messages RabbitMQ dans PostgreSQL avec Python.
Prérequis pour enregistrer les messages RabbitMQ dans PostgreSQL
Pour suivre ce tutoriel, vous devez disposer des éléments suivants :
un serveur RabbitMQ
un serveur PostgreSQL
Lancer le serveur RabbitMQ et le serveur PostgreSQL
Dans notre exemple, nous allons utiliser Docker pour lancer RabbitMQ et PostgreSQL. Ci-dessous, vous trouverez le fichier docker-compose.yml
qui définit les services RabbitMQ et PostgreSQL.
version: "3"
services:
rabbitmq:
image: rabbitmq:3.12-management-alpine
container_name: rabbit_mq_to_db
environment:
- RABBITMQ_DEFAULT_USER=user
- RABBITMQ_DEFAULT_PASS=password
- RABBITMQ_DEFAULT_VHOST=default_vhost
ports:
- 5672:5672
- 15672:15672
postgres:
image: postgres:16-alpine
container_name: postgres_db
hostname: postgres
restart: always
environment:
- POSTGRES_DB=postgres_db
- POSTGRES_USER=postgres_user
- POSTGRES_PASSWORD=password
volumes:
- ./docker-entrypoint-initdb.d/:/docker-entrypoint-initdb.d
ports:
- "5432:5432"
Description du fichier docker-compose.yml :
Ce fichier Docker Compose définit deux services, RabbitMQ et PostgreSQL, ainsi que leurs configurations respectives pour créer et orchestrer des conteneurs Docker interagissant ensemble. Voici une explication détaillée du contenu :
Service RabbitMQ:
- rabbitmq: Nom du service.
- image: Utilise l'image RabbitMQ version 3.12 avec le plugin management activé qui permet d'accéder à l'interface d'administration de RabbitMQ à l'adresse http://localhost:15672 et de s'y connecter avec les identifiants user et password.
- container_name: Nom du conteneur créé pour ce service.
- environment: Définit les variables d'environnement pour RabbitMQ, y compris le nom d'utilisateur, le mot de passe, et le vhost (espace virtuel).
- ports: Mappe les ports 5672 et 15672 du conteneur RabbitMQ sur les mêmes ports de l'hôte.
Service PostgreSQL:
- postgres: Nom du service.
- image: Utilise l'image PostgreSQL version 16 avec Alpine Linux.
- container_name: Nom du conteneur créé pour ce service.
- hostname: Nom d'hôte du conteneur PostgreSQL.
- restart: Politique de redémarrage automatique du conteneur (définie sur "always").
- environment: Définit les variables d'environnement pour PostgreSQL, y compris le nom de la base de données, le nom d'utilisateur et le mot de passe.
- volumes: Montre le répertoire local ./docker-entrypoint-initdb.d/ dans le répertoire d'initialisation de la base de données du conteneur PostgreSQL, permettant d'exécuter des scripts SQL d'initialisation.
- ports: Mappe le port 5432 du conteneur PostgreSQL sur le même port de l'hôte.
Pour lancer RabbitMQ et PostgreSQL, exécutez la commande suivante :
docker-compose up
Créer les scripts Python pour enregistrer les messages RabbitMQ dans PostgreSQL
Maintenant que RabbitMQ et PostgreSQL sont lancés, nous allons créer les scripts Python. Nous aurons besoin de deux scripts :
un publisher RabbitMQ pour envoyer des messages avec Python
un consumer RabbitMQ pour recevoir les messages avec Python et les enregistrer dans PostgreSQL
Nous allons également avoir besoin de la librairie pika qui est une implémentation Python du protocole AMQP 0-9-1 pour se connecter à RabbitMQ.
Enfin, nous aurons besoin de la librairie psycopg2 pour se connecter à PostgreSQL et interagir avec la base de données.
Un publisher RabbitMQ pour envoyer des messages avec Python
Le publisher est un script Python qui va se connecter à RabbitMQ et envoyer des messages dans une queue.
Pour se connecter à RabbitMQ, nous allons utiliser la librairie pika
. Pour l'installer, exécutez la commande suivante :
pip install pika
Voici le code du publisher :
import random
import pika
from time import sleep
# Connexion à RabbitMQ
url = 'amqp://user:password@localhost:5672/default_vhost'
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel()
# Création de la queue 'temperature'
channel.queue_declare('temperature')
# Création de la route 'temperature_routing_key' qui permet de lier la queue 'temperature' à l'échange 'amq.direct'
channel.queue_bind('temperature', 'amq.direct', 'temperature_routing_key')
while True:
sleep(3)
# Envoi d'un message dans la queue 'temperature'
channel.basic_publish('amq.direct', 'temperature_routing_key', body=str(random.uniform(0, 100)))
L'url de connexion à RabbitMQ est amqp://user:password@localhost:5672/default_vhost
où :
user
est le nom d'utilisateur
password
est le mot de passe
localhost
est l'adresse IP du serveur RabbitMQ
5672
est le port de RabbitMQ
default_vhost
est le vhost (espace virtuel) de RabbitMQ
Un consumer RabbitMQ pour recevoir les messages avec Python et les enregistrer dans PostgreSQL
Le consumer est un script Python qui va se connecter à RabbitMQ, lire les messages d'une queue et les enregistrer dans PostgreSQL.
Voici un exemple simple de consumer qui va lire les messages de la queue temperature
et les enregistrer dans la table temperature
de PostgreSQL :
import pika
import psycopg2
# Connexion à la base de données PostgreSQL
connection_sql = psycopg2.connect(database="postgres_db", user="postgres_user", password="password", host="localhost", port="5432")
cursor = connection_sql.cursor()
# Définition de la fonction callback qui sera appelée lorsqu'un message sera reçu dans la queue 'temperature'
def callback(ch, method, properties, body):
# Conversion du message en string
body = body.decode()
# Insertion du message dans la table 'temperature'
cursor.execute("INSERT INTO temperature (value) VALUES (%s)", (body,))
connection_sql.commit()
# Connexion à RabbitMQ
url = 'amqp://user:password@localhost:5672/default_vhost'
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.basic_consume('temperature', callback, auto_ack=True)
channel.start_consuming()
channel.close()
connection.close()
Code source complet en Python
Vous pouvez retrouver le code source complet de ce tutoriel sur Github.
Il est un peu différent de ce qui est présenté dans ce tutoriel, car il contient le fichier de création de la table temperature
dans PostgreSQL.
Et utilise des classes pour se connecter à PostgreSQL et pour le consumer RabbitMQ.