Aufbau ereignisgesteuerter Microservices mit AWS Kinesis

Eventgesteuerte Architektur mit AWS Kinesis für Skalierung

Inhaltsverzeichnis

AWS Kinesis ist zu einem Eckpfeiler für den Aufbau moderner ereignisgesteuerter Microservices-Architekturen geworden und ermöglicht die Echtzeit-Datenverarbeitung im großen Maßstab mit minimalem operativen Aufwand.

amazon-kinesis

Verständnis ereignisgesteuerter Microservices-Architektur

Die ereignisgesteuerte Architektur (Event-Driven Architecture, EDA) ist ein Entwurfsmuster, bei dem Dienste über Ereignisse kommunizieren und nicht über direkte synchrone Aufrufe. Dieser Ansatz bietet mehrere Vorteile:

  • Schwache Kopplung: Dienste müssen nicht voneinander wissen
  • Skalierbarkeit: Jeder Dienst skaliert unabhängig basierend auf seiner Arbeitslast
  • Resilienz: Fehler in einem Dienst breiten sich nicht auf andere aus
  • Flexibilität: Neue Dienste können hinzugefügt werden, ohne bestehende zu verändern

AWS Kinesis bildet das Rückgrat für die Implementierung von EDA, indem es als verteilter, dauerhafter Ereignisstrom fungiert, der Produzenten von Konsumenten entkoppelt.

Für eine breitere Perspektive auf Streaming-Plattformen finden Sie unseren Apache Kafka Quickstart-Leitfaden zum Vergleich mit selbst gehosteten Alternativen.

Übersicht über AWS Kinesis

AWS bietet mehrere Kinesis-Dienste, die jeweils für spezifische Anwendungsfälle konzipiert sind. Bei der Bewertung von Streaming-Lösungen sollten Sie möglicherweise auch den Vergleich von RabbitMQ auf EKS gegenüber SQS für verschiedene Messaging-Muster und Kostenauswirkungen berücksichtigen.

Kinesis Data Streams

Der Kern-Streaming-Dienst, der Datenaufzeichnungen in Echtzeit erfasst, speichert und verarbeitet. Data Streams ist ideal für:

  • Benutzerdefinierte Echtzeit-Verarbeitungsanwendungen
  • Aufbau von Datenpipelines mit Latenzzeiten unter einer Sekunde
  • Verarbeitung von Millionen von Ereignissen pro Sekunde
  • Implementierung von Event-Sourcing-Mustern

Kinesis Data Firehose

Ein vollständig verwalteter Dienst, der Streaming-Daten an Ziele wie S3, Redshift, Elasticsearch oder HTTP-Endpunkte liefert. Am besten geeignet für:

  • Einfache ETL-Pipelines
  • Aggregation und Archivierung von Logs
  • Nahezu Echtzeit-Analysen (mindestens 60 Sekunden Latenz)
  • Szenarien, in denen keine benutzerdefinierte Verarbeitungslogik erforderlich ist

Kinesis Data Analytics

Verarbeitet und analysiert Streaming-Daten mit SQL oder Apache Flink. Anwendungsfälle umfassen:

  • Echtzeit-Dashboards
  • Streaming-ETL
  • Echtzeit-Anomalieerkennung
  • Kontinuierliche Metrikerstellung

Für eine tiefere Einarbeitung in Flink-Operationen finden Sie unseren Apache Flink auf K8s und Kafka-Leitfaden.

Architekturmuster mit Kinesis

1. Event-Sourcing-Muster

Event Sourcing speichert alle Änderungen am Anwendungsstatus als Folge von Ereignissen. Kinesis ist dafür perfekt geeignet. Wenn Sie eine Auffrischung der Python-Grundlagen benötigen, werfen Sie einen Blick auf unser Python-Cheatsheet:

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Veröffentlichen eines Ereignisses im Kinesis-Stream"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }
    
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )
    
    return response['SequenceNumber']

# Beispiel: Benutzerregistrierungsereignis
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS (Command Query Responsibility Segregation)

Trennung von Lese- und Schreiboperationen unter Verwendung von Kinesis als Ereignisbus:

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }
    
    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })
    
    return err
}

3. Fan-Out-Muster mit Lambda

Verarbeitung von Ereignissen aus einem einzelnen Stream mit mehreren Lambda-Funktionen. Für TypeScript-Implementierungen mit stärkerer Typsicherheit verweisen wir auf unser TypeScript-Cheatsheet:

// Lambda-Consumer für E-Mail-Benachrichtigungen
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// Eine weitere Lambda für Lagerbestandsaktualisierungen
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

Best Practices für die Produktion

1. Wahl der richtigen Shard-Anzahl

Berechnen Sie Ihre Shard-Anforderungen basierend auf:

  • Eingangsrate: 1 MB/Sek. oder 1.000 Datensätze/Sek. pro Shard
  • Ausgangsrate: 2 MB/Sek. pro Shard (Standardkonsumenten) oder 2 MB/Sek. pro Konsument mit verbessertem Fan-Out
def calculate_shards(records_per_second, avg_record_size_kb):
    """Berechnung der erforderlichen Anzahl von Shards"""
    # Eingangs-Kapazität
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )
    
    return int(ingress_shards) + 1  # Puffer hinzufügen

2. Implementierung einer korrekten Fehlerbehandlung

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key, 
                          max_retries=3):
    """Datensatz mit exponentieller Rückoff-Wiederholungsfunktion setzen"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Exponentieller Rückzug
                    continue
            raise

3. Verwendung von verbessertem Fan-Out für mehrere Konsumenten

Verbesserter Fan-Out bietet dedizierte Durchsatzkapazität für jeden Konsumenten:

# Konsumenten mit verbessertem Fan-Out registrieren
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. Überwachung wichtiger Metriken

Wesentliche CloudWatch-Metriken zur Verfolgung:

  • IncomingRecords: Anzahl erfolgreich gesetzter Datensätze
  • IncomingBytes: Datenvolumen der eingehenden Daten
  • GetRecords.IteratorAgeMilliseconds: Wie weit Konsumenten zurückliegen
  • WriteProvisionedThroughputExceeded: Drosselungsvorgänge beim Schreiben
  • ReadProvisionedThroughputExceeded: Drosselungsvorgänge beim Lesen

5. Implementierung einer korrekten Strategie für Partitionsschlüssel

import hashlib

def get_partition_key(user_id, shard_count=10):
    """Generiert einen Partitionsschlüssel mit gleichmäßiger Verteilung"""
    # Konsistentes Hashing für gleichmäßige Verteilung verwenden
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

Beispiel für eine Implementierung in der Praxis

Hier ist ein vollständiges Beispiel für eine Microservices-Architektur zur Bestellverarbeitung:

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
    
    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """Bestellung erstellen und Ereignisse veröffentlichen"""
        order_id = self.generate_order_id()
        
        # Bestell-Erstellungs-Ereignis veröffentlichen
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)
        
        return order_id
    
    def publish_event(self, event_type: str, payload: Dict, 
                     partition_key: str):
        """Ereignis im Kinesis-Stream veröffentlichen"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }
        
        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """Verarbeitet Bestellereignisse und aktualisiert den Lagerbestand"""
    
    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])
            
            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])
    
    def reserve_inventory(self, order_data):
        # Lagerbestandsdatenbank aktualisieren
        for item in order_data['items']:
            # Implementierung hier
            pass

Migrationsstrategie vom Monolith zu Microservices

Phase 1: Strangler-Fig-Muster

Beginnen Sie damit, bestimmte Ereignisse durch Kinesis zu leiten, während der Monolith beibehalten wird:

  1. Identifizieren Sie begrenzte Kontexte in Ihrem Monolithen
  2. Erstellen Sie Kinesis-Streams für ereignisübergreifende Kontexte
  3. Extrahieren Sie schrittweise Dienste, die aus diesen Streams konsumieren
  4. Wahren Sie die Abwärtskompatibilität mit dem Monolithen

Phase 2: Parallele Verarbeitung

Führen Sie alte und neue Systeme parallel aus:

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """Schreiben Sie sowohl in das Legacy-System als auch in den Ereignisstrom"""
    try:
        # Zuerst in das neue System schreiben
        publish_to_kinesis(kinesis_stream, data)
        
        # Dann das Legacy-System aktualisieren
        legacy_db.update(data)
    except Exception as e:
        # Kompensationslogik implementieren
        rollback_kinesis_event(kinesis_stream, data)
        raise

Phase 3: Vollständige Migration

Sobald das Vertrauen besteht, leiten Sie den gesamten Verkehr über die ereignisgesteuerte Architektur.

Strategien zur Kostenoptimierung

Für umfassende Leitlinien zu Dateninfrastrukturmustern, einschließlich Objektspeicher und Datenbankarchitekturen, verweisen wir auf Dateninfrastruktur für KI-Systeme: Objektspeicher, Datenbanken, Suche & KI-Datenarchitektur.

1. Verwendung des On-Demand-Modus für variable Arbeitslasten

Der On-Demand-Modus (eingeführt 2023) skaliert automatisch basierend auf dem Traffic:

# Stream mit On-Demand-Modus erstellen
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. Implementierung der Datenaggregation

Reduzieren Sie PUT-Last-Einheiten durch Bündelung von Datensätzen:

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """Datensätze aggregieren, um Kosten zu senken"""
    aggregator = RecordAggregator()
    
    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )
    
    # Aggregierten Datensatz senden
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. Optimierung der Datenhaltung

Die Standard-Haltedauer beträgt 24 Stunden. Verlängern Sie diese nur, wenn notwendig:

# Haltedauer auf 7 Tage setzen
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

Sicherheitsbestpraktiken

1. Verschlüsselung ruhender und übertragener Daten

# Verschlüsselten Stream erstellen
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# Verschlüsselung aktivieren
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. IAM-Richtlinien für das Prinzip der geringsten Rechte

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. VPC-Endpunkte

Halten Sie den Verkehr innerhalb des AWS-Netzwerks. Für die Verwaltung der AWS-Infrastruktur als Code sollten Sie Terraform in Betracht ziehen – siehe unser Terraform-Cheatsheet:

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

Observability und Debugging

Verteiltes Tracing mit X-Ray

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])
    
    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

CloudWatch Logs Insights-Abfragen

-- Langsame Verarbeitungszeiten finden
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- Fehlerraten verfolgen
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

Fortgeschrittene Muster

Saga-Muster für verteilte Transaktionen

Implementieren Sie lange laufende Transaktionen über Microservices hinweg:

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())
    
    def execute(self, order_data):
        """Saga mit Kompensationslogik ausführen"""
        try:
            # Schritt 1: Lagerbestand reservieren
            self.publish_command('RESERVE_INVENTORY', order_data)
            
            # Schritt 2: Zahlung verarbeiten
            self.publish_command('PROCESS_PAYMENT', order_data)
            
            # Schritt 3: Bestellung versenden
            self.publish_command('SHIP_ORDER', order_data)
            
        except SagaException as e:
            # Kompensation in umgekehrter Reihenfolge
            self.compensate(e.failed_step)
    
    def compensate(self, failed_step):
        """Kompensationstransaktionen ausführen"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }
        
        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

Teststrategien

Lokale Entwicklung mit LocalStack

# LocalStack mit Kinesis starten
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# Test-Stream erstellen
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

Integrationstests

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """Testen der Ereignisveröffentlichung mit gemocktem Kinesis"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)
    
    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])
    
    assert order_id is not None

Leistungsoptimierung

Optimierung der Stapelgröße

def optimize_batch_processing(records, batch_size=500):
    """Verarbeitung von Datensätzen in optimierten Stapeln"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

Verwendung von Verbindungspooling

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

AWS Kinesis-Ressourcen:

Verwandte Artikel:

Fazit

AWS Kinesis bietet eine robuste Grundlage für den Aufbau skalierbarer, ereignisgesteuerter Microservices-Architekturen. Durch die Einhaltung dieser Muster und bewährten Praktiken können Sie Systeme erstellen, die resilient, skalierbar und wartbar sind. Beginnen Sie klein mit einem einzelnen Ereignisstrom, validieren Sie Ihre Architektur und erweitern Sie schrittweise zu komplexeren Mustern, während Ihr System wächst.

Der Schlüssel zum Erfolg liegt im Verständnis Ihrer Datenflussanforderungen, der Wahl des richtigen Kinesis-Dienstes für Ihren Anwendungszweck und der Implementierung einer angemessenen Überwachung und Fehlerbehandlung von Anfang an.