Construindo Microsserviços Orientados a Eventos com AWS Kinesis

Arquitetura orientada a eventos com AWS Kinesis para escalabilidade

Conteúdo da página

AWS Kinesis tornou-se uma pedra angular para a construção de arquiteturas modernas de microsserviços orientados a eventos, permitindo o processamento de dados em tempo real em escala com mínimo sobrecarga operacional.

amazon-kinesis

Compreendendo a Arquitetura de Microsserviços Orientados a Eventos

A arquitetura orientada a eventos (EDA) é um padrão de design onde os serviços se comunicam através de eventos em vez de chamadas síncronas diretas. Esta abordagem oferece várias vantagens:

  • Acoplamento frouxo: Os serviços não precisam conhecer a existência uns dos outros
  • Escalabilidade: Cada serviço escala de forma independente com base na sua carga de trabalho
  • Resiliência: Falhas em um serviço não se propagam para outros
  • Flexibilidade: Novos serviços podem ser adicionados sem modificar os existentes

O AWS Kinesis fornece a base para implementar EDA, atuando como um fluxo de eventos distribuído e durável que desacopla produtores de consumidores.

Para uma perspectiva mais ampla sobre plataformas de streaming, consulte nosso guia de início rápido do Apache Kafka para comparação com alternativas auto-hospedadas.

Visão Geral do AWS Kinesis

A AWS oferece vários serviços Kinesis, cada um projetado para casos de uso específicos. Ao avaliar soluções de streaming, você também pode considerar comparar RabbitMQ no EKS vs SQS para diferentes padrões de mensagens e implicações de custos.

Kinesis Data Streams

O serviço de streaming principal que captura, armazena e processa registros de dados em tempo real. Data Streams é ideal para:

  • Aplicações personalizadas de processamento em tempo real
  • Construção de pipelines de dados com latência subsegundo
  • Processamento de milhões de eventos por segundo
  • Implementação de padrões de Event Sourcing

Kinesis Data Firehose

Um serviço totalmente gerenciado que entrega dados de streaming para destinos como S3, Redshift, Elasticsearch ou endpoints HTTP. Melhor para:

  • Pipelines ETL simples
  • Agregação e arquivamento de logs
  • Análise em tempo quase real (latência mínima de 60 segundos)
  • Cenários onde você não precisa de lógica de processamento personalizada

Kinesis Data Analytics

Processa e analisa dados de streaming usando SQL ou Apache Flink. Casos de uso incluem:

  • Dashboards em tempo real
  • ETL de streaming
  • Detecção de anomalias em tempo real
  • Geração contínua de métricas

Para uma análise mais aprofundada das operações do Flink, consulte nosso guia do Apache Flink no K8s e Kafka.

Padrões Arquiteturais com Kinesis

1. Padrão Event Sourcing

O Event Sourcing armazena todas as mudanças no estado da aplicação como uma sequência de eventos. O Kinesis é perfeito para isso. Se precisar de uma revisão sobre os fundamentos do Python, confira nossa Folha de Truques do Python:

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Publica um evento no fluxo Kinesis"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }
    
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )
    
    return response['SequenceNumber']

# Exemplo: Evento de registro de usuário
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS (Segregação de Responsabilidade de Comando e Consulta)

Separe operações de leitura e escrita usando o Kinesis como barramento de eventos:

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }
    
    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })
    
    return err
}

3. Padrão Fan-Out com Lambda

Processa eventos de um único fluxo com múltiplas funções Lambda. Para implementações em TypeScript com maior segurança de tipos, consulte nossa Folha de Truques do TypeScript:

// Consumidor Lambda para notificações por e-mail
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// Outro Lambda para atualizações de inventário
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

Melhores Práticas para Produção

1. Escolhendo a Contagem de Shards Correta

Calcule seus requisitos de shards com base em:

  • Ingresso: 1 MB/seg ou 1.000 registros/seg por shard
  • Egresso: 2 MB/seg por shard (consumidores padrão) ou 2 MB/seg por consumidor com fan-out aprimorado
def calculate_shards(records_per_second, avg_record_size_kb):
    """Calcula o número necessário de shards"""
    # Capacidade de ingresso
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )
    
    return int(ingress_shards) + 1  # Adiciona buffer

2. Implemente Tratamento de Erros Adequado

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key, 
                          max_retries=3):
    """Coloca registro com retry de backoff exponencial"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Backoff exponencial
                    continue
            raise

3. Use Fan-Out Aprimorado para Múltiplos Consumidores

O fan-out aprimorado fornece throughput dedicado para cada consumidor:

# Registra um consumidor com fan-out aprimorado
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. Monitore Métricas-Chave

Métricas essenciais do CloudWatch para acompanhar:

  • IncomingRecords: Número de registros colocados com sucesso
  • IncomingBytes: Volume em bytes de dados de entrada
  • GetRecords.IteratorAgeMilliseconds: O quão atrás os consumidores estão
  • WriteProvisionedThroughputExceeded: Eventos de throttling de escrita
  • ReadProvisionedThroughputExceeded: Throttling de consumidores

5. Implemente uma Estratégia Adequada de Chave de Partição

import hashlib

def get_partition_key(user_id, shard_count=10):
    """Gera chave de partição com distribuição uniforme"""
    # Usa hash consistente para distribuição uniforme
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

Exemplo de Implementação no Mundo Real

Aqui está um exemplo completo de uma arquitetura de microsserviços de processamento de pedidos:

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
    
    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """Cria pedido e publica eventos"""
        order_id = self.generate_order_id()
        
        # Publica evento de criação de pedido
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)
        
        return order_id
    
    def publish_event(self, event_type: str, payload: Dict, 
                     partition_key: str):
        """Publica evento no fluxo Kinesis"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }
        
        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """Consome eventos de pedidos e atualiza o inventário"""
    
    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])
            
            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])
    
    def reserve_inventory(self, order_data):
        # Atualiza banco de dados de inventário
        for item in order_data['items']:
            # Implementação aqui
            pass

Estratégia de Migração de Monolito para Microsserviços

Fase 1: Padrão Strangler Fig

Comece roteando eventos específicos através do Kinesis enquanto mantém o monolito:

  1. Identifique contextos delimitados no seu monolito
  2. Crie fluxos Kinesis para eventos entre contextos
  3. Extraia gradualmente serviços que consomem desses fluxos
  4. Mantenha compatibilidade com o monolito

Fase 2: Processamento Paralelo

Execute ambos os sistemas antigo e novo em paralelo:

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """Escreve tanto no sistema legado quanto no fluxo de eventos"""
    try:
        # Escreve primeiro no novo sistema
        publish_to_kinesis(kinesis_stream, data)
        
        # Depois atualiza o sistema legado
        legacy_db.update(data)
    except Exception as e:
        # Implementa lógica de compensação
        rollback_kinesis_event(kinesis_stream, data)
        raise

Fase 3: Migração Completa

Uma vez estabelecida a confiança, roteie todo o tráfego através da arquitetura orientada a eventos.

Estratégias de Otimização de Custos

Para orientações abrangentes sobre padrões de infraestrutura de dados, incluindo armazenamento de objetos e arquiteturas de banco de dados, consulte Infraestrutura de Dados para Sistemas de IA: Armazenamento de Objetos, Bancos de Dados, Busca e Arquitetura de Dados de IA.

1. Use o Modo Sob Demanda para Cargas de Trabalho Variáveis

O modo sob demanda (introduzido em 2023) escala automaticamente com base no tráfego:

# Cria fluxo com modo sob demanda
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. Implemente Agregação de Dados

Reduza as unidades de payload PUT agrupando registros:

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """Agrega registros para reduzir custos"""
    aggregator = RecordAggregator()
    
    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )
    
    # Envia registro agregado
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. Otimize a Retenção de Dados

A retenção padrão é de 24 horas. Estenda apenas se necessário:

# Define retenção para 7 dias
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

Melhores Práticas de Segurança

1. Criptografia em Repouso e em Trânsito

# Cria fluxo criptografado
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# Habilita criptografia
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. Políticas IAM para Privilégio Mínimo

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. Endpoints VPC

Mantenha o tráfego dentro da rede da AWS. Para gerenciar a infraestrutura AWS como código, considere usar Terraform - veja nossa folha de truques do Terraform:

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

Observabilidade e Depuração

Rastreamento Distribuído com X-Ray

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])
    
    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

Consultas de Insights do CloudWatch Logs

-- Encontra tempos de processamento lentos
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- Acompanha taxas de erro
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

Padrões Avançados

Padrão Saga para Transações Distribuídas

Implementa transações de longa duração entre microsserviços:

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())
    
    def execute(self, order_data):
        """Executa saga com lógica de compensação"""
        try:
            # Passo 1: Reserva inventário
            self.publish_command('RESERVE_INVENTORY', order_data)
            
            # Passo 2: Processa pagamento
            self.publish_command('PROCESS_PAYMENT', order_data)
            
            # Passo 3: Envia pedido
            self.publish_command('SHIP_ORDER', order_data)
            
        except SagaException as e:
            # Compensa em ordem inversa
            self.compensate(e.failed_step)
    
    def compensate(self, failed_step):
        """Executa transações de compensação"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }
        
        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

Estratégias de Teste

Desenvolvimento Local com LocalStack

# Inicia LocalStack com Kinesis
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# Cria fluxo de teste
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

Testes de Integração

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """Testa publicação de eventos com Kinesis mockado"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)
    
    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])
    
    assert order_id is not None

Ajuste de Desempenho

Otimize o Tamanho do Lote

def optimize_batch_processing(records, batch_size=500):
    """Processa registros em lotes otimizados"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

Use Pool de Conexões

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

Recursos do AWS Kinesis:

Artigos Relacionados:

Conclusão

O AWS Kinesis fornece uma base robusta para a construção de arquiteturas de microsserviços escaláveis e orientados a eventos. Seguindo esses padrões e melhores práticas, você pode criar sistemas que são resilientes, escaláveis e mantíveis. Comece pequeno com um único fluxo de eventos, valide sua arquitetura e expanda gradualmente para padrões mais complexos conforme seu sistema cresce.

A chave para o sucesso é entender seus requisitos de fluxo de dados, escolher o serviço Kinesis adequado para seu caso de uso e implementar monitoramento e tratamento de erros adequados desde o início.

Assinar

Receba novos artigos sobre sistemas, infraestrutura e engenharia de IA.