1. Publish Messages
To send messages to a RabbitMQ stream create a RabbitMQ Streams publisher
with publisher()
method of
StreamsClient()
class. There are multiple types of
publishers implemented by the following classes
rbfly.streams.Publisher
Send a message and wait for RabbitMQ Streams broker to confirm that the message is received. Multiple asynchronous coroutines can send messages with the same publisher concurrently. This is the slowest way of sending messages. It is the default RbFly publisher.
rbfly.streams.PublisherBatchLimit
Batch multiple messages, then send them to a stream with the flush method. Batch method blocks when a limit of messages is reached, so an application does not run of out memory. Batching and flushing can be performed concurrently from different asynchronous coroutines. This is faster method of sending messages, than the previous one.
rbfly.streams.PublisherBatchFast
Batch multiple messages, then send them to a stream with the flush method. The number of messages in a batch is limited by a maximum length of Python list. An application needs to flush messages on a regular basis to sustain its performance, and to avoid running out of memory. Use it only when messages can be batched and flushed in a sequence, i.e. from the same asynchronous coroutine. This is the fastest method of sending messages to a stream, but provides no coordination between batch and flush methods.
Note
RabbitMQ Streams publisher uses publisher reference name for message
deduplication. By default, a publisher name is set using
<hostname>/<pid>
format.
If the naming scheme is not sufficient, then publisher reference name should be overriden when creating a publisher.
1.1. Send Single Message
To send messages one by one, create a RabbitMQ Streams publisher and send
messages with send()
asynchronous
coroutine:
async with client.publisher('stream-name') as publisher:
message = 'hello'
await publisher.send(message)
The coroutine sends a message and waits for RabbitMQ Streams broker for the published message confirmation.
1.2. Send Batch of Messages
If an application has to send a batch of messages from multiple
asynchronous coroutines, then use publisher implemented by
PublisherBatchLimit
class.
Enqueue a message with batch()
asynchronous coroutine method. The method blocks if an applications reaches
limit of number of messages. To unblock, use
flush()
asynchronous coroutine
method:
async def run_app(client: StreamsClient) -> None:
# note: publisher is flushed on exit of the context manager as well
async with client.publisher('stream-name', cls=PublisherBatchLimit) as publisher:
await asyncio.gather(batch(publisher), flush(publisher))
async def batch(publisher: PublisherBatchLimit) -> None:
await publisher.batch('hello 1')
await publisher.batch('hello 2')
async def flush(publisher: PublisherBatchLimit) -> None:
while True:
await asyncio.sleep(0.2) # flush messages 5 times per second
await publisher.flush()
Use flush()
asynchronous
coroutine to send messages to RabbitMQ Streams broker and wait for
confirmation of receiving the messages.
1.3. Fast Batch Publishing
To send messages in batch mode create publisher using
PublisherBatchFast
class. Enqueue each message with
batch()
method:
async with client.publisher('stream-name', cls=PublisherBatchFast) as publisher:
publisher.batch('hello 1')
publisher.batch('hello 2')
await publisher.flush()
Use flush()
asynchronous
coroutine to send the messages to RabbitMQ Streams broker and wait for
confirmation of receiving the messages.
1.4. Publishing Exceptions
RbFly might raise the following exceptions when publishing messages
TypeError
A message or part of a message is of a type not recognized by RbFly’s AMQP format encoder.
ValueError
A message has invalid value. For example, a message does not fit within the frame of RabbitMQ Streams protocol, or a string exceeds maximum allowed length (
2 ** 32 - 1
).