1. Publish Messages

To send messages to a RabbitMQ stream create a RabbitMQ Streams publisher with publisher() method of StreamsClient() class. There are multiple types of publishers implemented by the following classes

rbfly.streams.Publisher

Send a message and wait for RabbitMQ Streams broker to confirm that the message is received. Multiple asynchronous coroutines can send messages with the same publisher concurrently. This is the slowest way of sending messages. It is the default RbFly publisher.

rbfly.streams.PublisherBatchLimit

Batch multiple messages, then send them to a stream with the flush method. Batch method blocks when a limit of messages is reached, so an application does not run of out memory. Batching and flushing can be performed concurrently from different asynchronous coroutines. This is faster method of sending messages, than the previous one.

rbfly.streams.PublisherBatchFast

Batch multiple messages, then send them to a stream with the flush method. The number of messages in a batch is limited by a maximum length of Python list. An application needs to flush messages on a regular basis to sustain its performance, and to avoid running out of memory. Use it only when messages can be batched and flushed in a sequence, i.e. from the same asynchronous coroutine. This is the fastest method of sending messages to a stream, but provides no coordination between batch and flush methods.

Note

RabbitMQ Streams publisher uses publisher reference name for message deduplication. By default, a publisher name is set using <hostname>/<pid> format.

If the naming scheme is not sufficient, then publisher reference name should be overriden when creating a publisher.

1.1. Send Single Message

To send messages one by one, create a RabbitMQ Streams publisher and send messages with send() asynchronous coroutine:

async with client.publisher('stream-name') as publisher:
    message = 'hello'
    await publisher.send(message)

The coroutine sends a message and waits for RabbitMQ Streams broker for the published message confirmation.

1.2. Send Batch of Messages

If an application has to send a batch of messages from multiple asynchronous coroutines, then use publisher implemented by PublisherBatchLimit class.

Enqueue a message with batch() asynchronous coroutine method. The method blocks if an applications reaches limit of number of messages. To unblock, use flush() asynchronous coroutine method:

async def run_app(client: StreamsClient) -> None:
    # note: publisher is flushed on exit of the context manager as well
    async with client.publisher('stream-name', cls=PublisherBatchLimit) as publisher:
        await asyncio.gather(batch(publisher), flush(publisher))

async def batch(publisher: PublisherBatchLimit) -> None:
    await publisher.batch('hello 1')
    await publisher.batch('hello 2')

async def flush(publisher: PublisherBatchLimit) -> None:
    while True:
        await asyncio.sleep(0.2)  # flush messages 5 times per second
        await publisher.flush()

Use flush() asynchronous coroutine to send messages to RabbitMQ Streams broker and wait for confirmation of receiving the messages.

1.3. Fast Batch Publishing

To send messages in batch mode create publisher using PublisherBatchFast class. Enqueue each message with batch() method:

async with client.publisher('stream-name', cls=PublisherBatchFast) as publisher:
    publisher.batch('hello 1')
    publisher.batch('hello 2')
    await publisher.flush()

Use flush() asynchronous coroutine to send the messages to RabbitMQ Streams broker and wait for confirmation of receiving the messages.

1.4. Publishing Exceptions

RbFly might raise the following exceptions when publishing messages

TypeError

A message or part of a message is of a type not recognized by RbFly’s AMQP format encoder.

ValueError

A message has invalid value. For example, a message does not fit within the frame of RabbitMQ Streams protocol, or a string exceeds maximum allowed length (2 ** 32 - 1).