2. Read Messages

2.1. Subscribe to a Stream

To read messages from RabbitMQ Streams broker, subscribe to a RabbitMQ stream with subscribe() method of StreamsClient class. The method is an asynchronous iterator, which yields stream messages:

async for msg in client.subscribe('stream-name'):
    print(msg)

The method accepts the offset parameter, which is set to rbfly.streams.Offset.NEXT by default. Stream offsets are described in Section 2.3 in more detail.

2.2. Message Context

A RabbitMQ Streams message has a set of properties like offset or timestamp. Also, AMQP 1.0 message, beside message body, can have additional data attached to it, for example message header or application properties.

The additional information is available via MessageCtx object, which can be retrieved with get_message_ctx() function, for example:

async for msg in client.subscribe('stream-name'):
    print(get_message_ctx().stream_offset)

2.3. Offset Specification

Use RabbitMQ Streams offset specification to declare which messages an application should receive from a stream.

The subscribe() method accepts optional offset parameter, which can be one of the following:

rbfly.streams.Offset.NEXT

Receive new messages from a stream only. Default offset specification.

rbfly.streams.Offset.FIRST

Receive all messages, starting with the very first message in a stream. Equivalent to Offset.offset(0).

rbfly.streams.Offset.LAST

Receive messages from a streams starting with first message stored in the current stream chunk (see also below).

rbfly.streams.Offset.offset()

Receive messages from a stream starting with specific offset value.

rbfly.streams.Offset.reference()

Use the reference to get the offset stored in a stream. Receive messages starting from the next offset (this is offset + 1).

rbfly.streams.Offset.timestamp()

Receive messages from a stream starting with the specified timestamp of a message.

The following diagram visualizes offset location in a stream when each chunk has 100 messages:

                         +- Offset.reference('ref-a') + 1
                         |
chunk 1: [0] [1, ref-a] [2] ... [99]
          |
          +- Offset.FIRST

chunk 2: [100, 1633006475.571] [101, 1633006475.999] ... [199, 1633006477.999]
                                 |                             |
                                 +- Offset.offset(101)         +- Offset.timestamp(1633006476.0)

...    : ...
                                 +- end of stream
                                 |
chunk 10: [900] [901] ... [999]  +
            |                    |
            +- Offset.LAST       +- Offset.NEXT

Note

Timestamp is Erlang runtime system time. It is a view of POSIX time.

2.4. Offset Reference

RabbitMQ Streams supports storing an offset value in a stream using a string reference, and receiving an offset value using the reference.

Use offset reference specification to read messages from a stream starting after stored offset value:

messages = client.subscribe('stream-name', offset=Offset.reference('stream-ref'))
try:
    async for msg in messages:
        print msg
finally:
    await client.write_offset('stream-name', 'stream-ref')

In the example above, RbFly library performs the following actions

  • read offset value, stored for offset reference string stream-ref, from RabbitMQ Streams broker

  • start reading messages of stream stream-name with the offset value increased by one (start reading after stored value)

  • keep offset value of last received message in memory

The example saves offset value with write_offset() method. By default, the method saves offset value of last stream message. Custom offset value, can also be specified, see the method documentation for details.

Note

How and when offset value shall be saved is application dependant.