2. Read Messages
2.1. Subscribe to a Stream
To read messages from RabbitMQ Streams broker, subscribe to a RabbitMQ
stream with subscribe()
method of
StreamsClient
class. The method is an
asynchronous iterator, which yields stream messages:
async for msg in client.subscribe('stream-name'):
print(msg)
The method accepts the offset parameter, which is set to
rbfly.streams.Offset.NEXT
by default. Stream offsets are
described in Section 2.3 in more detail.
2.2. Message Context
A RabbitMQ Streams message has a set of properties like offset or timestamp. Also, AMQP 1.0 message, beside message body, can have additional data attached to it, for example message header or application properties.
The additional information is available via
MessageCtx
object, which can be retrieved with
get_message_ctx()
function, for example:
async for msg in client.subscribe('stream-name'):
print(get_message_ctx().stream_offset)
2.3. Offset Specification
Use RabbitMQ Streams offset specification to declare which messages an application should receive from a stream.
The subscribe()
method accepts optional
offset parameter, which can be one of the following:
rbfly.streams.Offset.NEXT
Receive new messages from a stream only. Default offset specification.
rbfly.streams.Offset.FIRST
Receive all messages, starting with the very first message in a stream. Equivalent to Offset.offset(0).
rbfly.streams.Offset.LAST
Receive messages from a streams starting with first message stored in the current stream chunk (see also below).
rbfly.streams.Offset.offset()
Receive messages from a stream starting with specific offset value.
rbfly.streams.Offset.reference()
Use the reference to get the offset stored in a stream. Receive messages starting from the next offset (this is offset + 1).
rbfly.streams.Offset.timestamp()
Receive messages from a stream starting with the specified timestamp of a message.
The following diagram visualizes offset location in a stream when each chunk has 100 messages:
+- Offset.reference('ref-a') + 1
|
chunk 1: [0] [1, ref-a] [2] ... [99]
|
+- Offset.FIRST
chunk 2: [100, 1633006475.571] [101, 1633006475.999] ... [199, 1633006477.999]
| |
+- Offset.offset(101) +- Offset.timestamp(1633006476.0)
... : ...
+- end of stream
|
chunk 10: [900] [901] ... [999] +
| |
+- Offset.LAST +- Offset.NEXT
Note
Timestamp is Erlang runtime system time. It is a view of POSIX time.
2.4. Offset Reference
RabbitMQ Streams supports storing an offset value in a stream using a string reference, and receiving an offset value using the reference.
Use offset reference specification
to read messages from a stream starting after stored offset value:
messages = client.subscribe('stream-name', offset=Offset.reference('stream-ref'))
try:
async for msg in messages:
print msg
finally:
await client.write_offset('stream-name', 'stream-ref')
In the example above, RbFly library performs the following actions
read offset value, stored for offset reference string stream-ref, from RabbitMQ Streams broker
start reading messages of stream stream-name with the offset value increased by one (start reading after stored value)
keep offset value of last received message in memory
The example saves offset value with write_offset()
method. By default, the method saves offset value of last stream message.
Custom offset value, can also be specified, see
the method documentation
for details.
Note
How and when offset value shall be saved is application dependant.