Aufbau ereignisgesteuerter Microservices mit AWS Kinesis
Eventgesteuerte Architektur mit AWS Kinesis für Skalierung
AWS Kinesis ist zu einem Eckpfeiler für den Aufbau moderner ereignisgesteuerter Microservices-Architekturen geworden und ermöglicht die Echtzeit-Datenverarbeitung im großen Maßstab mit minimalem operativen Aufwand.

Verständnis ereignisgesteuerter Microservices-Architektur
Die ereignisgesteuerte Architektur (Event-Driven Architecture, EDA) ist ein Entwurfsmuster, bei dem Dienste über Ereignisse kommunizieren und nicht über direkte synchrone Aufrufe. Dieser Ansatz bietet mehrere Vorteile:
- Schwache Kopplung: Dienste müssen nicht voneinander wissen
- Skalierbarkeit: Jeder Dienst skaliert unabhängig basierend auf seiner Arbeitslast
- Resilienz: Fehler in einem Dienst breiten sich nicht auf andere aus
- Flexibilität: Neue Dienste können hinzugefügt werden, ohne bestehende zu verändern
AWS Kinesis bildet das Rückgrat für die Implementierung von EDA, indem es als verteilter, dauerhafter Ereignisstrom fungiert, der Produzenten von Konsumenten entkoppelt.
Für eine breitere Perspektive auf Streaming-Plattformen finden Sie unseren Apache Kafka Quickstart-Leitfaden zum Vergleich mit selbst gehosteten Alternativen.
Übersicht über AWS Kinesis
AWS bietet mehrere Kinesis-Dienste, die jeweils für spezifische Anwendungsfälle konzipiert sind. Bei der Bewertung von Streaming-Lösungen sollten Sie möglicherweise auch den Vergleich von RabbitMQ auf EKS gegenüber SQS für verschiedene Messaging-Muster und Kostenauswirkungen berücksichtigen.
Kinesis Data Streams
Der Kern-Streaming-Dienst, der Datenaufzeichnungen in Echtzeit erfasst, speichert und verarbeitet. Data Streams ist ideal für:
- Benutzerdefinierte Echtzeit-Verarbeitungsanwendungen
- Aufbau von Datenpipelines mit Latenzzeiten unter einer Sekunde
- Verarbeitung von Millionen von Ereignissen pro Sekunde
- Implementierung von Event-Sourcing-Mustern
Kinesis Data Firehose
Ein vollständig verwalteter Dienst, der Streaming-Daten an Ziele wie S3, Redshift, Elasticsearch oder HTTP-Endpunkte liefert. Am besten geeignet für:
- Einfache ETL-Pipelines
- Aggregation und Archivierung von Logs
- Nahezu Echtzeit-Analysen (mindestens 60 Sekunden Latenz)
- Szenarien, in denen keine benutzerdefinierte Verarbeitungslogik erforderlich ist
Kinesis Data Analytics
Verarbeitet und analysiert Streaming-Daten mit SQL oder Apache Flink. Anwendungsfälle umfassen:
- Echtzeit-Dashboards
- Streaming-ETL
- Echtzeit-Anomalieerkennung
- Kontinuierliche Metrikerstellung
Für eine tiefere Einarbeitung in Flink-Operationen finden Sie unseren Apache Flink auf K8s und Kafka-Leitfaden.
Architekturmuster mit Kinesis
1. Event-Sourcing-Muster
Event Sourcing speichert alle Änderungen am Anwendungsstatus als Folge von Ereignissen. Kinesis ist dafür perfekt geeignet. Wenn Sie eine Auffrischung der Python-Grundlagen benötigen, werfen Sie einen Blick auf unser Python-Cheatsheet:
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def publish_event(stream_name, event_type, payload):
"""Veröffentlichen eines Ereignisses im Kinesis-Stream"""
event = {
'eventId': generate_unique_id(),
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=payload.get('userId', 'default')
)
return response['SequenceNumber']
# Beispiel: Benutzerregistrierungsereignis
publish_event(
stream_name='user-events',
event_type='USER_REGISTERED',
payload={
'userId': '12345',
'email': 'user@example.com',
'registrationDate': '2025-10-30'
}
)
2. CQRS (Command Query Responsibility Segregation)
Trennung von Lese- und Schreiboperationen unter Verwendung von Kinesis als Ereignisbus:
package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type OrderCommand struct {
CommandType string `json:"commandType"`
OrderID string `json:"orderId"`
UserID string `json:"userId"`
Items []OrderItem `json:"items"`
}
func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
data, err := json.Marshal(command)
if err != nil {
return err
}
_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String("order-commands"),
Data: data,
PartitionKey: aws.String(command.OrderID),
})
return err
}
3. Fan-Out-Muster mit Lambda
Verarbeitung von Ereignissen aus einem einzelnen Stream mit mehreren Lambda-Funktionen. Für TypeScript-Implementierungen mit stärkerer Typsicherheit verweisen wir auf unser TypeScript-Cheatsheet:
// Lambda-Consumer für E-Mail-Benachrichtigungen
exports.handler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await sendOrderConfirmationEmail(payload);
}
}
};
// Eine weitere Lambda für Lagerbestandsaktualisierungen
exports.inventoryHandler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await updateInventory(payload.items);
}
}
};
Best Practices für die Produktion
1. Wahl der richtigen Shard-Anzahl
Berechnen Sie Ihre Shard-Anforderungen basierend auf:
- Eingangsrate: 1 MB/Sek. oder 1.000 Datensätze/Sek. pro Shard
- Ausgangsrate: 2 MB/Sek. pro Shard (Standardkonsumenten) oder 2 MB/Sek. pro Konsument mit verbessertem Fan-Out
def calculate_shards(records_per_second, avg_record_size_kb):
"""Berechnung der erforderlichen Anzahl von Shards"""
# Eingangs-Kapazität
ingress_shards = max(
records_per_second / 1000,
(records_per_second * avg_record_size_kb) / 1024
)
return int(ingress_shards) + 1 # Puffer hinzufügen
2. Implementierung einer korrekten Fehlerbehandlung
from botocore.exceptions import ClientError
import time
def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
max_retries=3):
"""Datensatz mit exponentieller Rückoff-Wiederholungsfunktion setzen"""
for attempt in range(max_retries):
try:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponentieller Rückzug
continue
raise
3. Verwendung von verbessertem Fan-Out für mehrere Konsumenten
Verbesserter Fan-Out bietet dedizierte Durchsatzkapazität für jeden Konsumenten:
# Konsumenten mit verbessertem Fan-Out registrieren
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
--consumer-name my-consumer-app
4. Überwachung wichtiger Metriken
Wesentliche CloudWatch-Metriken zur Verfolgung:
IncomingRecords: Anzahl erfolgreich gesetzter DatensätzeIncomingBytes: Datenvolumen der eingehenden DatenGetRecords.IteratorAgeMilliseconds: Wie weit Konsumenten zurückliegenWriteProvisionedThroughputExceeded: Drosselungsvorgänge beim SchreibenReadProvisionedThroughputExceeded: Drosselungsvorgänge beim Lesen
5. Implementierung einer korrekten Strategie für Partitionsschlüssel
import hashlib
def get_partition_key(user_id, shard_count=10):
"""Generiert einen Partitionsschlüssel mit gleichmäßiger Verteilung"""
# Konsistentes Hashing für gleichmäßige Verteilung verwenden
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_id = hash_value % shard_count
return f"{user_id}#{shard_id}"
Beispiel für eine Implementierung in der Praxis
Hier ist ein vollständiges Beispiel für eine Microservices-Architektur zur Bestellverarbeitung:
import boto3
import json
from decimal import Decimal
from typing import Dict, List
class OrderProcessingService:
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def create_order(self, user_id: str, items: List[Dict]) -> str:
"""Bestellung erstellen und Ereignisse veröffentlichen"""
order_id = self.generate_order_id()
# Bestell-Erstellungs-Ereignis veröffentlichen
self.publish_event('ORDER_CREATED', {
'orderId': order_id,
'userId': user_id,
'items': items,
'status': 'PENDING',
'total': self.calculate_total(items)
}, partition_key=user_id)
return order_id
def publish_event(self, event_type: str, payload: Dict,
partition_key: str):
"""Ereignis im Kinesis-Stream veröffentlichen"""
event = {
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(event, default=str),
PartitionKey=partition_key
)
class InventoryService:
"""Verarbeitet Bestellereignisse und aktualisiert den Lagerbestand"""
def process_records(self, records):
for record in records:
data = json.loads(record['kinesis']['data'])
if data['eventType'] == 'ORDER_CREATED':
self.reserve_inventory(data['payload'])
elif data['eventType'] == 'ORDER_CANCELLED':
self.release_inventory(data['payload'])
def reserve_inventory(self, order_data):
# Lagerbestandsdatenbank aktualisieren
for item in order_data['items']:
# Implementierung hier
pass
Migrationsstrategie vom Monolith zu Microservices
Phase 1: Strangler-Fig-Muster
Beginnen Sie damit, bestimmte Ereignisse durch Kinesis zu leiten, während der Monolith beibehalten wird:
- Identifizieren Sie begrenzte Kontexte in Ihrem Monolithen
- Erstellen Sie Kinesis-Streams für ereignisübergreifende Kontexte
- Extrahieren Sie schrittweise Dienste, die aus diesen Streams konsumieren
- Wahren Sie die Abwärtskompatibilität mit dem Monolithen
Phase 2: Parallele Verarbeitung
Führen Sie alte und neue Systeme parallel aus:
def dual_write_pattern(legacy_db, kinesis_stream, data):
"""Schreiben Sie sowohl in das Legacy-System als auch in den Ereignisstrom"""
try:
# Zuerst in das neue System schreiben
publish_to_kinesis(kinesis_stream, data)
# Dann das Legacy-System aktualisieren
legacy_db.update(data)
except Exception as e:
# Kompensationslogik implementieren
rollback_kinesis_event(kinesis_stream, data)
raise
Phase 3: Vollständige Migration
Sobald das Vertrauen besteht, leiten Sie den gesamten Verkehr über die ereignisgesteuerte Architektur.
Strategien zur Kostenoptimierung
Für umfassende Leitlinien zu Dateninfrastrukturmustern, einschließlich Objektspeicher und Datenbankarchitekturen, verweisen wir auf Dateninfrastruktur für KI-Systeme: Objektspeicher, Datenbanken, Suche & KI-Datenarchitektur.
1. Verwendung des On-Demand-Modus für variable Arbeitslasten
Der On-Demand-Modus (eingeführt 2023) skaliert automatisch basierend auf dem Traffic:
# Stream mit On-Demand-Modus erstellen
aws kinesis create-stream \
--stream-name my-stream \
--stream-mode-details StreamMode=ON_DEMAND
2. Implementierung der Datenaggregation
Reduzieren Sie PUT-Last-Einheiten durch Bündelung von Datensätzen:
from aws_kinesis_agg.aggregator import RecordAggregator
def batch_put_records(kinesis_client, stream_name, records):
"""Datensätze aggregieren, um Kosten zu senken"""
aggregator = RecordAggregator()
for record in records:
aggregator.add_user_record(
partition_key=record['partition_key'],
data=record['data']
)
# Aggregierten Datensatz senden
kinesis_client.put_record(
StreamName=stream_name,
Data=aggregator.serialize(),
PartitionKey=records[0]['partition_key']
)
3. Optimierung der Datenhaltung
Die Standard-Haltedauer beträgt 24 Stunden. Verlängern Sie diese nur, wenn notwendig:
# Haltedauer auf 7 Tage setzen
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 168
Sicherheitsbestpraktiken
1. Verschlüsselung ruhender und übertragener Daten
# Verschlüsselten Stream erstellen
kinesis.create_stream(
StreamName='secure-stream',
ShardCount=1,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# Verschlüsselung aktivieren
kinesis.start_stream_encryption(
StreamName='secure-stream',
EncryptionType='KMS',
KeyId='alias/aws/kinesis'
)
2. IAM-Richtlinien für das Prinzip der geringsten Rechte
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
}
]
}
3. VPC-Endpunkte
Halten Sie den Verkehr innerhalb des AWS-Netzwerks. Für die Verwaltung der AWS-Infrastruktur als Code sollten Sie Terraform in Betracht ziehen – siehe unser Terraform-Cheatsheet:
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-east-1.kinesis-streams \
--route-table-ids rtb-12345678
Observability und Debugging
Verteiltes Tracing mit X-Ray
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
xray_recorder.put_annotation('eventType', event_data['type'])
xray_recorder.put_metadata('payload', event_data['payload'])
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data),
PartitionKey=event_data['id']
)
CloudWatch Logs Insights-Abfragen
-- Langsame Verarbeitungszeiten finden
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20
-- Fehlerraten verfolgen
fields @timestamp, eventType
| filter error = true
| stats count() by eventType
Fortgeschrittene Muster
Saga-Muster für verteilte Transaktionen
Implementieren Sie lange laufende Transaktionen über Microservices hinweg:
class OrderSaga:
def __init__(self, kinesis_client, stream_name):
self.kinesis = kinesis_client
self.stream_name = stream_name
self.saga_id = str(uuid.uuid4())
def execute(self, order_data):
"""Saga mit Kompensationslogik ausführen"""
try:
# Schritt 1: Lagerbestand reservieren
self.publish_command('RESERVE_INVENTORY', order_data)
# Schritt 2: Zahlung verarbeiten
self.publish_command('PROCESS_PAYMENT', order_data)
# Schritt 3: Bestellung versenden
self.publish_command('SHIP_ORDER', order_data)
except SagaException as e:
# Kompensation in umgekehrter Reihenfolge
self.compensate(e.failed_step)
def compensate(self, failed_step):
"""Kompensationstransaktionen ausführen"""
compensation_steps = {
'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
}
for step in compensation_steps.get(failed_step, []):
self.publish_command(step, {'sagaId': self.saga_id})
Teststrategien
Lokale Entwicklung mit LocalStack
# LocalStack mit Kinesis starten
docker run -d \
-p 4566:4566 \
-e SERVICES=kinesis \
localstack/localstack
# Test-Stream erstellen
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name test-stream \
--shard-count 1
Integrationstests
import pytest
from moto import mock_kinesis
@mock_kinesis
def test_event_publishing():
"""Testen der Ereignisveröffentlichung mit gemocktem Kinesis"""
kinesis = boto3.client('kinesis', region_name='us-east-1')
kinesis.create_stream(StreamName='test-stream', ShardCount=1)
service = OrderProcessingService('test-stream')
order_id = service.create_order('user123', [
{'productId': 'prod1', 'quantity': 2}
])
assert order_id is not None
Leistungsoptimierung
Optimierung der Stapelgröße
def optimize_batch_processing(records, batch_size=500):
"""Verarbeitung von Datensätzen in optimierten Stapeln"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
process_batch(batch)
Verwendung von Verbindungspooling
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
kinesis = boto3.client('kinesis', config=config)
Nützliche Links
AWS Kinesis-Ressourcen:
- AWS Kinesis-Dokumentation
- Entwicklerleitfaden für AWS Kinesis Data Streams
- Kinesis Client Library (KCL)
- AWS Kinesis Preisrechner
- Kinesis Data Streams Quoten und Limits
- AWS Architecture Blog - Ereignisgesteuerte Architekturen
- AWS Samples - Kinesis-Beispiele
Verwandte Artikel:
- Vergleich der Hosting-Kosten von RabbitMQ auf EKS und SQS
- TypeScript-Cheatsheet: Kernkonzepte und bewährte Praktiken
- Python-Cheatsheet
Fazit
AWS Kinesis bietet eine robuste Grundlage für den Aufbau skalierbarer, ereignisgesteuerter Microservices-Architekturen. Durch die Einhaltung dieser Muster und bewährten Praktiken können Sie Systeme erstellen, die resilient, skalierbar und wartbar sind. Beginnen Sie klein mit einem einzelnen Ereignisstrom, validieren Sie Ihre Architektur und erweitern Sie schrittweise zu komplexeren Mustern, während Ihr System wächst.
Der Schlüssel zum Erfolg liegt im Verständnis Ihrer Datenflussanforderungen, der Wahl des richtigen Kinesis-Dienstes für Ihren Anwendungszweck und der Implementierung einer angemessenen Überwachung und Fehlerbehandlung von Anfang an.