Filter Messages =============== Applications can use RbFly with RabbitMQ Streams filtering feature to receive specific set of messages and reduce use of network bandwith. The RabbitMQ Streams filtering feature needs to be used when producing and consuming messages with the broker. The feature has been introduced in RabbitMQ 3.13, and RbFly supports it since version 0.9.0. See also - https://www.rabbitmq.com/blog/2023/10/16/stream-filtering - https://www.rabbitmq.com/blog/2023/10/24/stream-filtering-internals Publish and Read with Filtering ------------------------------- RabbitMQ Streams broker uses `Bloom filter `_, a probabilistic data strucutre, to filter chunks of messages sent to a client. This has multiple implications - an application sending messages to a stream has to provide filter values to the broker, so it can build Bloom filter structure server-side - an application receiving messages from a stream has to check for invalid messages - a Bloom filter can match false-positive results, this is broker can send chunks with messages having non-requested filter values - chunks can contain mix of messages with both requested and non-requested filter values An application needs to define a function to extract a filter value from a message. Filter value is always a string, for example if a message is a dictionary like:: {'amount': 10.0, 'country': 'country-code'} then the function can be defined as:: def fv_extract(msg: MessageCtx) -> str: return msg.body['country'] Finally, use the function when creating stream publisher:: async with client.publisher('red-carrot', filter_extract=fv_extract) as publisher: await publisher.send({'amount': 100.0, 'country': 'jp'}) await publisher.send({'amount': 200.0, 'country': 'ie'}) To receive messages for a country, subscribe to a stream with a message filter. Message filter consists of filter extract function and filter values, for example:: msg_filter = MessageFilter(fv_extract, {'ie'}) async for msg in client.subscribe('red-carrot', filter=msg_filter): print(msg) # application receives messages with country `ie` only Filtering Considerations ------------------------ An application, when using RbFly library, orchestrates publishing of batches of messages to a RabbitMQ stream. This might give an illusion that an application can control how RabbitMQ Streams broker stores chunks of messages in segment files, and how the chunks are sent to a client. However, only RabbitMQ Streams broker decides when and how to create the chunks. .. note:: There is some correlation between published batches of messages, and received chunks of messages. It is *not* guaranteed 1-to-1 match. Additionally, a Bloom filter can point the broker to invalid chunks, which means that stream filtering cannot be fully efficient. Despite filtering, an application might receive non-requested data. An example of RabbitMQ Streams broker splitting a batch of messages into two chunks is illustrated on :numref:`fig-filtering-chunks`. .. figure:: filtering-chunks.* :name: fig-filtering-chunks :align: center :target: filtering-chunks.pdf An example of RabbitMQ Streams broker splitting batch of messages into two chunks An application publishes a batch of messages with a Bloom filter value assigned (two messages with `f1` value, and two messages with `f2` value). RabbitMQ receives the batch, splits messages into two chunks, and writes them as a segment file on a disk. This is not always observed, but very possible situation. .. vim: sw=4:et:ai