5. Filter Messages

Applications can use RbFly with RabbitMQ Streams filtering feature to receive specific set of messages and reduce use of network bandwith.

The RabbitMQ Streams filtering feature needs to be used when producing and consuming messages with the broker.

The feature has been introduced in RabbitMQ 3.13, and RbFly supports it since version 0.9.0.

See also

5.1. Publish and Read with Filtering

RabbitMQ Streams broker uses Bloom filter, a probabilistic data strucutre, to filter chunks of messages sent to a client. This has multiple implications

  • an application sending messages to a stream has to provide filter values to the broker, so it can build Bloom filter structure server-side

  • an application receiving messages from a stream has to check for invalid messages

    • a Bloom filter can match false-positive results, this is broker can send chunks with messages having non-requested filter values

    • chunks can contain mix of messages with both requested and non-requested filter values

An application needs to define a function to extract a filter value from a message. Filter value is always a string, for example if a message is a dictionary like:

{'amount': 10.0, 'country': 'country-code'}

then the function can be defined as:

def fv_extract(msg: MessageCtx) -> str:
    return msg.body['country']

Finally, use the function when creating stream publisher:

async with client.publisher('red-carrot', filter_extract=fv_extract) as publisher:
    await publisher.send({'amount': 100.0, 'country': 'jp'})
    await publisher.send({'amount': 200.0, 'country': 'ie'})

To receive messages for a country, subscribe to a stream with a message filter. Message filter consists of filter extract function and filter values, for example:

msg_filter = MessageFilter(fv_extract, {'ie'})
async for msg in client.subscribe('red-carrot', filter=msg_filter):
    print(msg)  # application receives messages with country `ie` only

5.2. Filtering Considerations

An application, when using RbFly library, orchestrates publishing of batches of messages to a RabbitMQ stream. This might give an illusion that an application can control how RabbitMQ Streams broker stores chunks of messages in segment files, and how the chunks are sent to a client. However, only RabbitMQ Streams broker decides when and how to create the chunks.

Note

There is some correlation between published batches of messages, and received chunks of messages. It is not guaranteed 1-to-1 match.

Additionally, a Bloom filter can point the broker to invalid chunks, which means that stream filtering cannot be fully efficient. Despite filtering, an application might receive non-requested data.

An example of RabbitMQ Streams broker splitting a batch of messages into two chunks is illustrated on Figure 5.1.

_images/filtering-chunks.png

Figure 5.1 An example of RabbitMQ Streams broker splitting batch of messages into two chunks

An application publishes a batch of messages with a Bloom filter value assigned (two messages with f1 value, and two messages with f2 value). RabbitMQ receives the batch, splits messages into two chunks, and writes them as a segment file on a disk. This is not always observed, but very possible situation.