RabbitMQ w mikroserwisach — eventy, routing i dead-letter queues
RabbitMQ pozwala serwisem komunikować się asynchronicznie bez bezpośrednich zależności. Pokazuję jak zaprojektowałem topologię exchanges i kolejek w tym projekcie.
Dlaczego kolejki?
Bez RabbitMQ synchroniczna komunikacja między serwisami wygląda tak:
Frontend → Blog API (HTTP) → OK
Frontend → Analytics API (HTTP) → jeśli Analytics padł, wyświetlenie posta też się nie uda?
To niedopuszczalne. Wyświetlenie posta nie powinno zależeć od tego, czy serwis analityki działa. Z kolejką:
Frontend → RabbitMQ (publish event) → wraca natychmiast
RabbitMQ → Analytics consumer (async) → przetwarza kiedy może
Topologia exchanges i kolejek
Używam topic exchange co daje elastyczny routing oparty na wzorcach:
Exchange: portfolio.events (topic)
├── Routing key: post.viewed → kolejka: analytics.post_views
├── Routing key: user.created → kolejka: blog.user_events
├── Routing key: user.updated → kolejka: blog.user_events
└── Routing key: user.deleted → kolejka: blog.user_events
Jeden exchange, wiele konsumerów — każdy słucha na swoich routing keys.
Publisher w PHP (Frontend)
class AnalyticsEventPublisher
{
public function publishPostViewed(string $postUuid, ?int $userId): void
{
$message = new AMQPMessage(
json_encode([
'event' => 'post.viewed',
'post_uuid' => $postUuid,
'user_id' => $userId,
'ip' => request()->ip(),
'timestamp' => now()->toIso8601String(),
]),
['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$this->channel->basic_publish($message, 'portfolio.events', 'post.viewed');
}
}
Consumer w Laravel (Artisan Command)
class ConsumeUserEvents extends Command
{
public function handle(): void
{
$this->channel->basic_consume(
'blog.user_events',
callback: function (AMQPMessage $msg) {
$data = json_decode($msg->body, true);
$this->handler->handle($data);
$msg->ack();
}
);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
}
Consumer działa jako długożyjący proces — w Dockerze jest osobnym kontenerem blog-consumer, w K8s osobnym Deploymentem.
Dead-letter queues
Jeśli consumer rzuci wyjątek i nie wyśle ack(), wiadomość trafia z powrotem do kolejki i jest ponawiana. Żeby uniknąć nieskończonej pętli, konfiguruję DLQ:
$this->channel->queue_declare('blog.user_events', arguments: new AMQPTable([
'x-dead-letter-exchange' => 'portfolio.dlx',
'x-message-ttl' => 30000,
'x-max-retries' => 3,
]));
Po 3 nieudanych próbach wiadomość trafia do kolejki martwych listów — można ją później przejrzeć i przetworzyć ręcznie.