Création de microservices pilotés par les événements avec AWS Kinesis
Architecture orientée événements avec AWS Kinesis pour l'évolutivité
AWS Kinesis est devenu un pilier pour la construction d’architectures microservices modernes pilotées par événements, permettant le traitement de données en temps réel à grande échelle avec un surcoût opérationnel minimal.

Compréhension de l’architecture microservices pilotée par événements
L’architecture pilotée par événements (EDA) est un modèle de conception où les services communiquent via des événements plutôt que par des appels synchrones directs. Cette approche offre plusieurs avantages :
- Couplage lâche : Les services n’ont pas besoin de connaître l’existence des autres
- Extensibilité : Chaque service évolue de manière indépendante en fonction de sa charge de travail
- Résilience : Les défaillances d’un service ne se propagent pas aux autres
- Flexibilité : De nouveaux services peuvent être ajoutés sans modifier les existants
AWS Kinesis fournit la base pour mettre en œuvre l’EDA en agissant comme un flux d’événements distribué et durable qui découple les producteurs des consommateurs.
Pour une perspective plus large sur les plateformes de streaming, consultez notre guide de démarrage rapide Apache Kafka pour comparer avec les alternatives auto-hébergées.
Aperçu d’AWS Kinesis
AWS propose plusieurs services Kinesis, chacun conçu pour des cas d’utilisation spécifiques. Lors de l’évaluation des solutions de streaming, vous pourriez également souhaiter comparer RabbitMQ sur EKS et SQS pour différents modèles de messagerie et implications de coûts.
Kinesis Data Streams
Le service de streaming principal qui capture, stocke et traite les enregistrements de données en temps réel. Data Streams est idéal pour :
- Des applications de traitement en temps réel personnalisées
- La construction de pipelines de données avec une latence inférieure à la seconde
- Le traitement de millions d’événements par seconde
- La mise en œuvre de modèles de sourcing d’événements
Kinesis Data Firehose
Un service entièrement géré qui livre des données en streaming vers des destinations comme S3, Redshift, Elasticsearch ou des points de terminaison HTTP. Idéal pour :
- Des pipelines ETL simples
- L’agrégation et l’archivage des journaux
- L’analyse quasi temps réel (latence minimale de 60 secondes)
- Les scénarios où aucune logique de traitement personnalisée n’est requise
Kinesis Data Analytics
Traite et analyse les données en streaming en utilisant SQL ou Apache Flink. Les cas d’utilisation incluent :
- Tableaux de bord en temps réel
- ETL en streaming
- Détection d’anomalies en temps réel
- Génération continue de métriques
Pour approfondir les opérations Flink, consultez notre guide Apache Flink sur K8s et Kafka.
Modèles architecturaux avec Kinesis
1. Modèle de Sourcing d’Événements
Le sourcing d’événements stocke tous les changements d’état de l’application comme une séquence d’événements. Kinesis est parfait pour cela. Si vous avez besoin d’un rappel sur les fondamentaux de Python, consultez notre Fiche de référence Python :
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def publish_event(stream_name, event_type, payload):
"""Publier un événement vers le flux Kinesis"""
event = {
'eventId': generate_unique_id(),
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=payload.get('userId', 'default')
)
return response['SequenceNumber']
# Exemple : événement d'inscription utilisateur
publish_event(
stream_name='user-events',
event_type='USER_REGISTERED',
payload={
'userId': '12345',
'email': 'user@example.com',
'registrationDate': '2025-10-30'
}
)
2. CQRS (Command Query Responsibility Segregation)
Séparez les opérations de lecture et d’écriture en utilisant Kinesis comme bus d’événements :
package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type OrderCommand struct {
CommandType string `json:"commandType"`
OrderID string `json:"orderId"`
UserID string `json:"userId"`
Items []OrderItem `json:"items"`
}
func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
data, err := json.Marshal(command)
if err != nil {
return err
}
_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String("order-commands"),
Data: data,
PartitionKey: aws.String(command.OrderID),
})
return err
}
3. Modèle Fan-Out avec Lambda
Traitez les événements d’un seul flux avec plusieurs fonctions Lambda. Pour les implémentations TypeScript avec une sécurité de type renforcée, référez-vous à notre Fiche de référence TypeScript :
// Consommateur Lambda pour les notifications par e-mail
exports.handler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await sendOrderConfirmationEmail(payload);
}
}
};
// Un autre Lambda pour les mises à jour de stock
exports.inventoryHandler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await updateInventory(payload.items);
}
}
};
Meilleures pratiques pour la production
1. Choisir le bon nombre de shards
Calculez vos besoins en shards en fonction de :
- Ingress : 1 Mo/sec ou 1 000 enregistrements/sec par shard
- Egress : 2 Mo/sec par shard (consommateurs standards) ou 2 Mo/sec par consommateur avec fan-out amélioré
def calculate_shards(records_per_second, avg_record_size_kb):
"""Calculer le nombre requis de shards"""
# Capacité d'ingress
ingress_shards = max(
records_per_second / 1000,
(records_per_second * avg_record_size_kb) / 1024
)
return int(ingress_shards) + 1 # Ajouter une marge
2. Mettre en œuvre une gestion d’erreurs appropriée
from botocore.exceptions import ClientError
import time
def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
max_retries=3):
"""Mettre un enregistrement avec une stratégie de nouvelle tentative à backoff exponentiel"""
for attempt in range(max_retries):
try:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Backoff exponentiel
continue
raise
3. Utiliser le Fan-Out Amélioré pour plusieurs consommateurs
Le fan-out amélioré fournit un débit dédié pour chaque consommateur :
# Enregistrer un consommateur avec le fan-out amélioré
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
--consumer-name my-consumer-app
4. Surveiller les métriques clés
Métriques CloudWatch essentielles à suivre :
IncomingRecords: Nombre d’enregistrements mis avec succèsIncomingBytes: Volume d’octets des données entrantesGetRecords.IteratorAgeMilliseconds: Retard des consommateursWriteProvisionedThroughputExceeded: Événements de limitation (throttling)ReadProvisionedThroughputExceeded: Limitation des consommateurs
5. Mettre en œuvre une stratégie de clé de partition appropriée
import hashlib
def get_partition_key(user_id, shard_count=10):
"""Générer une clé de partition avec une distribution uniforme"""
# Utiliser un hachage cohérent pour une distribution uniforme
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_id = hash_value % shard_count
return f"{user_id}#{shard_id}"
Exemple d’implémentation dans le monde réel
Voici un exemple complet d’architecture microservices de traitement de commandes :
import boto3
import json
from decimal import Decimal
from typing import Dict, List
class OrderProcessingService:
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def create_order(self, user_id: str, items: List[Dict]) -> str:
"""Créer une commande et publier des événements"""
order_id = self.generate_order_id()
# Publier l'événement de commande créée
self.publish_event('ORDER_CREATED', {
'orderId': order_id,
'userId': user_id,
'items': items,
'status': 'PENDING',
'total': self.calculate_total(items)
}, partition_key=user_id)
return order_id
def publish_event(self, event_type: str, payload: Dict,
partition_key: str):
"""Publier un événement vers le flux Kinesis"""
event = {
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(event, default=str),
PartitionKey=partition_key
)
class InventoryService:
"""Consomme les événements de commande et met à jour le stock"""
def process_records(self, records):
for record in records:
data = json.loads(record['kinesis']['data'])
if data['eventType'] == 'ORDER_CREATED':
self.reserve_inventory(data['payload'])
elif data['eventType'] == 'ORDER_CANCELLED':
self.release_inventory(data['payload'])
def reserve_inventory(self, order_data):
# Mettre à jour la base de données de stock
for item in order_data['items']:
# Implémentation ici
pass
Stratégie de migration du monolithe aux microservices
Phase 1 : Motif du Ficus Strangler (Strangler Fig Pattern)
Commencez par acheminer des événements spécifiques via Kinesis tout en conservant le monolithe :
- Identifiez les contextes délimités dans votre monolithe
- Créez des flux Kinesis pour les événements inter-contextuels
- Extrait progressivement les services qui consomment ces flux
- Maintenez la compatibilité ascendante avec le monolithe
Phase 2 : Traitement parallèle
Faites fonctionner les anciens et nouveaux systèmes en parallèle :
def dual_write_pattern(legacy_db, kinesis_stream, data):
"""Écrire à la fois vers le système hérité et le flux d'événements"""
try:
# Écrire d'abord vers le nouveau système
publish_to_kinesis(kinesis_stream, data)
# Puis mettre à jour le système hérité
legacy_db.update(data)
except Exception as e:
# Mettre en œuvre la logique de compensation
rollback_kinesis_event(kinesis_stream, data)
raise
Phase 3 : Migration complète
Une fois la confiance établie, acheminez tout le trafic vers l’architecture pilotée par événements.
Stratégies d’optimisation des coûts
Pour des conseils complets sur les modèles d’infrastructure de données, y compris le stockage d’objets et les architectures de base de données, référez-vous à Infrastructure de données pour les systèmes IA : Stockage d’objets, Bases de données, Recherche & Architecture de données IA.
1. Utiliser le mode On-Demand pour les charges de travail variables
Le mode On-Demand (introduit en 2023) s’adapte automatiquement en fonction du trafic :
# Créer un flux en mode on-demand
aws kinesis create-stream \
--stream-name my-stream \
--stream-mode-details StreamMode=ON_DEMAND
2. Mettre en œuvre l’agrégation de données
Réduisez les unités de charge PUT en regroupant les enregistrements :
from aws_kinesis_agg.aggregator import RecordAggregator
def batch_put_records(kinesis_client, stream_name, records):
"""Agrégation d'enregistrements pour réduire les coûts"""
aggregator = RecordAggregator()
for record in records:
aggregator.add_user_record(
partition_key=record['partition_key'],
data=record['data']
)
# Envoyer l'enregistrement agrégé
kinesis_client.put_record(
StreamName=stream_name,
Data=aggregator.serialize(),
PartitionKey=records[0]['partition_key']
)
3. Optimiser la rétention des données
La rétention par défaut est de 24 heures. Ne l’étendez que si nécessaire :
# Définir la rétention à 7 jours
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 168
Meilleures pratiques de sécurité
1. Chiffrement au repos et en transit
# Créer un flux chiffré
kinesis.create_stream(
StreamName='secure-stream',
ShardCount=1,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# Activer le chiffrement
kinesis.start_stream_encryption(
StreamName='secure-stream',
EncryptionType='KMS',
KeyId='alias/aws/kinesis'
)
2. Politiques IAM pour le principe du moindre privilège
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
}
]
}
3. Points de terminaison VPC
Gardez le trafic au sein du réseau AWS. Pour gérer l’infrastructure AWS en tant que code, envisagez d’utiliser Terraform - consultez notre fiche de référence Terraform :
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-east-1.kinesis-streams \
--route-table-ids rtb-12345678
Observabilité et débogage
Traçage distribué avec X-Ray
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
xray_recorder.put_annotation('eventType', event_data['type'])
xray_recorder.put_metadata('payload', event_data['payload'])
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data),
PartitionKey=event_data['id']
)
Requêtes CloudWatch Logs Insights
-- Trouver les temps de traitement lents
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20
-- Suivre les taux d'erreur
fields @timestamp, eventType
| filter error = true
| stats count() by eventType
Modèles avancés
Modèle Saga pour les transactions distribuées
Implémentez des transactions longues durée traversant les microservices :
class OrderSaga:
def __init__(self, kinesis_client, stream_name):
self.kinesis = kinesis_client
self.stream_name = stream_name
self.saga_id = str(uuid.uuid4())
def execute(self, order_data):
"""Exécuter la saga avec une logique de compensation"""
try:
# Étape 1 : Réserver le stock
self.publish_command('RESERVE_INVENTORY', order_data)
# Étape 2 : Traiter le paiement
self.publish_command('PROCESS_PAYMENT', order_data)
# Étape 3 : Expédier la commande
self.publish_command('SHIP_ORDER', order_data)
except SagaException as e:
# Compenser dans l'ordre inverse
self.compensate(e.failed_step)
def compensate(self, failed_step):
"""Exécuter les transactions de compensation"""
compensation_steps = {
'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
}
for step in compensation_steps.get(failed_step, []):
self.publish_command(step, {'sagaId': self.saga_id})
Stratégies de test
Développement local avec LocalStack
# Démarrer LocalStack avec Kinesis
docker run -d \
-p 4566:4566 \
-e SERVICES=kinesis \
localstack/localstack
# Créer un flux de test
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name test-stream \
--shard-count 1
Tests d’intégration
import pytest
from moto import mock_kinesis
@mock_kinesis
def test_event_publishing():
"""Tester la publication d'événements avec Kinesis moqué"""
kinesis = boto3.client('kinesis', region_name='us-east-1')
kinesis.create_stream(StreamName='test-stream', ShardCount=1)
service = OrderProcessingService('test-stream')
order_id = service.create_order('user123', [
{'productId': 'prod1', 'quantity': 2}
])
assert order_id is not None
Ajustement des performances
Optimiser la taille des lots
def optimize_batch_processing(records, batch_size=500):
"""Traiter les enregistrements par lots optimisés"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
process_batch(batch)
Utiliser la mise en pool de connexions
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
kinesis = boto3.client('kinesis', config=config)
Liens utiles
Ressources AWS Kinesis :
- Documentation AWS Kinesis
- Guide du développeur AWS Kinesis Data Streams
- Bibliothèque client Kinesis (KCL)
- Calculateur de tarification AWS Kinesis
- Quotas et limites des flux Kinesis Data Streams
- Blog d’architecture AWS - Architectures pilotées par événements
- Échantillons AWS - Exemples Kinesis
Articles connexes :
- Comparaison des coûts d’hébergement RabbitMQ sur EKS vs SQS
- Fiche de référence TypeScript : Concepts de base et meilleures pratiques
- Fiche de référence Python
Conclusion
AWS Kinesis fournit une base robuste pour la construction d’architectures microservices extensibles et pilotées par événements. En suivant ces modèles et meilleures pratiques, vous pouvez créer des systèmes résilients, évolutifs et maintenables. Commencez petit avec un seul flux d’événements, validez votre architecture et expandez progressivement vers des modèles plus complexes au fur et à mesure que votre système grandit.
La clé du succès réside dans la compréhension de vos besoins en flux de données, le choix du bon service Kinesis pour votre cas d’utilisation et la mise en œuvre d’une surveillance et d’une gestion d’erreurs appropriées dès le départ.