RabbitMQ w mikroserwisach — eventy, routing i dead-letter queues

RabbitMQ pozwala serwisem komunikować się asynchronicznie bez bezpośrednich zależności. Pokazuję jak zaprojektowałem topologię exchanges i kolejek w tym projekcie.

Dlaczego kolejki?

Bez RabbitMQ synchroniczna komunikacja między serwisami wygląda tak:

Frontend → Blog API (HTTP) → OK
Frontend → Analytics API (HTTP) → jeśli Analytics padł, wyświetlenie posta też się nie uda?

To niedopuszczalne. Wyświetlenie posta nie powinno zależeć od tego, czy serwis analityki działa. Z kolejką:

Frontend → RabbitMQ (publish event) → wraca natychmiast
RabbitMQ → Analytics consumer (async) → przetwarza kiedy może

Topologia exchanges i kolejek

Używam topic exchange co daje elastyczny routing oparty na wzorcach:

Exchange: portfolio.events (topic)
  ├── Routing key: post.viewed     → kolejka: analytics.post_views
  ├── Routing key: user.created    → kolejka: blog.user_events
  ├── Routing key: user.updated    → kolejka: blog.user_events
  └── Routing key: user.deleted    → kolejka: blog.user_events

Jeden exchange, wiele konsumerów — każdy słucha na swoich routing keys.

Publisher w PHP (Frontend)

class AnalyticsEventPublisher
{
    public function publishPostViewed(string $postUuid, ?int $userId): void
    {
        $message = new AMQPMessage(
            json_encode([
                'event' => 'post.viewed',
                'post_uuid' => $postUuid,
                'user_id' => $userId,
                'ip' => request()->ip(),
                'timestamp' => now()->toIso8601String(),
            ]),
            ['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
        );

        $this->channel->basic_publish($message, 'portfolio.events', 'post.viewed');
    }
}

Consumer w Laravel (Artisan Command)

class ConsumeUserEvents extends Command
{
    public function handle(): void
    {
        $this->channel->basic_consume(
            'blog.user_events',
            callback: function (AMQPMessage $msg) {
                $data = json_decode($msg->body, true);
                $this->handler->handle($data);
                $msg->ack();
            }
        );

        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
}

Consumer działa jako długożyjący proces — w Dockerze jest osobnym kontenerem blog-consumer, w K8s osobnym Deploymentem.

Dead-letter queues

Jeśli consumer rzuci wyjątek i nie wyśle ack(), wiadomość trafia z powrotem do kolejki i jest ponawiana. Żeby uniknąć nieskończonej pętli, konfiguruję DLQ:

$this->channel->queue_declare('blog.user_events', arguments: new AMQPTable([
    'x-dead-letter-exchange' => 'portfolio.dlx',
    'x-message-ttl' => 30000,
    'x-max-retries' => 3,
]));

Po 3 nieudanych próbach wiadomość trafia do kolejki martwych listów — można ją później przejrzeć i przetworzyć ręcznie.

Comments

No comments yet