4. Deduplicate Messages

RabbitMQ Streams allows applications to publish messages to a stream with message deduplication.

RbFly generates message publishing ids by default. It also supports message deduplication, but it is responsibility of an application to

  • assign reference name to a publisher of messages

  • assign a publishing id to a message in an unique fashion

  • ensure publishing ids of messages are strictly increasing

  • keep track of publishing id of a message between application restarts

Note

A RbFly publisher object remembers publishing id of a last message. Therefore, it is possible to switch betweeen providing a message publishing id by an application, and generating message publishing id by RbFly library.

However, it is not recommended for an application to change the approach to message publishing ids during application lifetime.

4.1. Publish Messages with Id

To enable deduplication of messages create a publisher with an unique reference name for given application. Create context for each message with stream_message_ctx() function, and publish message context with a publisher.

The following example publishes 10 messages every second. Every time, the messages are assigned the same message publishing id. The number of messages in the stream is always 10, even when the script is restarted.

import asyncio
from datetime import datetime

import rbfly.streams as rbs

STREAM = 'rbfly-demo-dedup-stream'

# an application uses an unique publisher name; it allows message
# deduplication between application restarts
PUBLISHER = 'demo-publisher'

async def send_data(client: rbs.StreamsClient) -> None:
    async with client.publisher(STREAM, name=PUBLISHER) as publisher:
        while True:
            for i in range(10):
                # use `stream_message_ctx` function to create message
                # context and assign publishing id
                ctx = rbs.stream_message_ctx('hello', publish_id=i)
                await publisher.send(ctx)

            print('{} messages sent'.format(datetime.now()))
            await asyncio.sleep(1)

@rbs.connection
async def demo(client: rbs.StreamsClient) -> None:
    await client.create_stream(STREAM)
    await send_data(client)

client = rbs.streams_client('rabbitmq-stream://guest:guest@localhost')
asyncio.run(demo(client))

Note

Observe number of messages in a RabbitMQ stream with rabbitmqctl list_queues command, or in the management web console.

When an application is restarted, and a publisher is created with its reference name, then RbFly library queries RabbitMQ Streams broker to get the last message publishing id. A RbFly’s publisher object provides its last message publishing id via message_id attribute.

4.2. Order of Messages

It is an application’s responsibility to publish messages with their publishing id strictly increasing. Providing out-of-order message publishing ids has an undefined result.

When publishing a single message at a time (see Section 1.1), then publishing id, of each new message, has to have greater value comparing to a previous one. When existing messages are sent to a broker, then the order does not matter - messages are ignored due to RabbitMQ Stream’s deduplication feature.

For new messages, the same rule as above applies to a batch publisher. However, when republishing messages, then messages within a batch have to be sorted by their publishing id.

The batch publisher with a batch limit (see Section 1.2) enables an application to batch messages from multiple asynchronous coroutines. Therefore, messages and their publishing ids can be enqueued out-of-order. The publisher sorts messages by their publishing id when sending messages to RabbitMQ Streams broker with flush() method.

The fast batch publisher (see Section 1.3) does not control the order of message publishing ids. An application takes the responsibility to provide them in a strictly increasing order.