3. Quick Start

The example below demonstrates various RbFly library features, which can be utilized in an application using RabbitMQ Streams:

  • creating RabbitMQ Streams client using connection URI

  • creating a stream (demo coroutine)

  • creating a stream publisher and publishing messages to a stream (send_data coroutine)

  • subscribing to a stream and receiving messages from a stream (receive_data coroutine)

  • the script continues to run if RabbitMQ Streams broker stops and starts again

import asyncio
from datetime import datetime

import rbfly.streams as rbs

STREAM = 'rbfly-demo-stream'  # stream to send message to

async def send_data(client: rbs.StreamsClient) -> None:
    # create publisher and send messages to the stream in a loop
    async with client.publisher(STREAM) as publisher:
        while True:
            await publisher.send('hello')

            print('{} message sent'.format(datetime.now()))
            await asyncio.sleep(5)

async def receive_data(client: rbs.StreamsClient) -> None:
    # subscribe to the stream and receive the messages
    async for msg in client.subscribe(STREAM):
        print('{} got: {!r}'.format(datetime.now(), msg))
        print()

@rbs.connection  # close connection to RabbitMQ Streams broker at exit
async def demo(client: rbs.StreamsClient) -> None:
    # create stream; operation does nothing if a stream exists; stream can
    # be created by an external tool;
    # this is first operation to RabbitMQ Streams broker in this demo,
    # so a connection is created by RbFly
    await client.create_stream(STREAM)

    await asyncio.gather(send_data(client), receive_data(client))

# create RabbitMQ Streams client
client = rbs.streams_client('rabbitmq-stream://guest:guest@localhost')

asyncio.run(demo(client))

The source code of the demo can be downloaded from RbFly code repository.

The following sections of the documentation discuss the features of RbFly library in more detail.