5. Filter Messages
Applications can use RbFly with RabbitMQ Streams filtering feature to receive specific set of messages and reduce use of network bandwith.
The RabbitMQ Streams filtering feature needs to be used when producing and consuming messages with the broker.
The feature has been introduced in RabbitMQ 3.13, and RbFly supports it since version 0.9.0.
See also
5.1. Publish and Read with Filtering
RabbitMQ Streams broker uses Bloom filter, a probabilistic data strucutre, to filter chunks of messages sent to a client. This has multiple implications
an application sending messages to a stream has to provide filter values to the broker, so it can build Bloom filter structure server-side
an application receiving messages from a stream has to check for invalid messages
a Bloom filter can match false-positive results, this is broker can send chunks with messages having non-requested filter values
chunks can contain mix of messages with both requested and non-requested filter values
An application needs to define a function to extract a filter value from a message. Filter value is always a string, for example if a message is a dictionary like:
{'amount': 10.0, 'country': 'country-code'}
then the function can be defined as:
def fv_extract(msg: MessageCtx) -> str:
return msg.body['country']
Finally, use the function when creating stream publisher:
async with client.publisher('red-carrot', filter_extract=fv_extract) as publisher:
await publisher.send({'amount': 100.0, 'country': 'jp'})
await publisher.send({'amount': 200.0, 'country': 'ie'})
To receive messages for a country, subscribe to a stream with a message filter. Message filter consists of filter extract function and filter values, for example:
msg_filter = MessageFilter(fv_extract, {'ie'})
async for msg in client.subscribe('red-carrot', filter=msg_filter):
print(msg) # application receives messages with country `ie` only
5.2. Filtering Considerations
An application, when using RbFly library, orchestrates publishing of batches of messages to a RabbitMQ stream. This might give an illusion that an application can control how RabbitMQ Streams broker stores chunks of messages in segment files, and how the chunks are sent to a client. However, only RabbitMQ Streams broker decides when and how to create the chunks.
Note
There is some correlation between published batches of messages, and received chunks of messages. It is not guaranteed 1-to-1 match.
Additionally, a Bloom filter can point the broker to invalid chunks, which means that stream filtering cannot be fully efficient. Despite filtering, an application might receive non-requested data.
An example of RabbitMQ Streams broker splitting a batch of messages into two chunks is illustrated on Figure 5.1.