3. Quick Start
The example below demonstrates various RbFly library features, which can be utilized in an application using RabbitMQ Streams:
creating RabbitMQ Streams client using connection URI
creating a stream (demo coroutine)
creating a stream publisher and publishing messages to a stream (send_data coroutine)
subscribing to a stream and receiving messages from a stream (receive_data coroutine)
the script continues to run if RabbitMQ Streams broker stops and starts again
import asyncio
from datetime import datetime
import rbfly.streams as rbs
STREAM = 'rbfly-demo-stream' # stream to send message to
async def send_data(client: rbs.StreamsClient) -> None:
# create publisher and send messages to the stream in a loop
async with client.publisher(STREAM) as publisher:
while True:
await publisher.send('hello')
print('{} message sent'.format(datetime.now()))
await asyncio.sleep(5)
async def receive_data(client: rbs.StreamsClient) -> None:
# subscribe to the stream and receive the messages
async for msg in client.subscribe(STREAM):
print('{} got: {!r}'.format(datetime.now(), msg))
print()
@rbs.connection # close connection to RabbitMQ Streams broker at exit
async def demo(client: rbs.StreamsClient) -> None:
# create stream; operation does nothing if a stream exists; stream can
# be created by an external tool;
# this is first operation to RabbitMQ Streams broker in this demo,
# so a connection is created by RbFly
await client.create_stream(STREAM)
await asyncio.gather(send_data(client), receive_data(client))
# create RabbitMQ Streams client
client = rbs.streams_client('rabbitmq-stream://guest:guest@localhost')
asyncio.run(demo(client))
The source code of the demo can be downloaded from RbFly code repository.
The following sections of the documentation discuss the features of RbFly library in more detail.