3. Manage Connection

Use streams_client() function to declare connection to RabbitMQ Streams broker. The function creates RabbitMQ Streams client object, which is used to create streams, publish messages to a stream, and subscribe to a stream.

Whenever an action of an application requires an interaction with RabbitMQ Streams broker, a connection is created, or existing connection is reused.

When a broker is restarted (i.e. after upgrade, or after crash), then RbFly reconnects the client, and retries an interrupted action.

_images/connmgmt.png

Figure 3.1 RbFly reconnecting an application to RabbitMQ Streams broker in action

Screenshot of Grafana plot of stream messages received by RabbitMQ Streams broker from an application. There are two restarts of RabbitMQ service to update the broker. Each time, RbFly receives disconnection request from the broker, caches messages of an application, and flushes messages to the broker when a new connection is established. No data is lost in the process.

Properly closing a connection to RabbitMQ Streams broker requires use of connection() decorator. Use it to decorate a Python asynchronous coroutine function. The decorator closes a connection to a broker when the decorated coroutine exits.

The example below is a skeleton of a Python program creating RabbitMQ Streams broker client, implementing asynchronous coroutine to interact with the broker, and to close connection on the coroutine exit:

import asyncio
import rbfly.streams as rbs

@rbs.connection
async def streams_app(client: rbs.StreamsClient) -> None:
    ...
    # interaction with RabbitMQ Streams broker
    ...

client = rbs.streams_client('rabbitmq-stream://guest:guest@localhost')
asyncio.run(streams_app(client))