3. Manage Connection
Use streams_client()
function to declare connection
to RabbitMQ Streams broker. The function creates RabbitMQ Streams client
object, which is used to create streams,
publish messages to a stream, and
subscribe to a stream.
Whenever an action of an application requires an interaction with RabbitMQ Streams broker, a connection is created, or existing connection is reused.
When a broker is restarted (i.e. after upgrade, or after crash), then RbFly reconnects the client, and retries an interrupted action.
Properly closing a connection to RabbitMQ Streams broker requires use of
connection()
decorator. Use it to decorate a Python
asynchronous coroutine function. The decorator closes a connection to a
broker when the decorated coroutine exits.
The example below is a skeleton of a Python program creating RabbitMQ Streams broker client, implementing asynchronous coroutine to interact with the broker, and to close connection on the coroutine exit:
import asyncio
import rbfly.streams as rbs
@rbs.connection
async def streams_app(client: rbs.StreamsClient) -> None:
...
# interaction with RabbitMQ Streams broker
...
client = rbs.streams_client('rabbitmq-stream://guest:guest@localhost')
asyncio.run(streams_app(client))