.. _sec-stream-subscribe: Read Messages ============= Subscribe to a Stream --------------------- To read messages from RabbitMQ Streams broker, subscribe to a RabbitMQ stream with :py:meth:`~rbfly.streams.StreamsClient.subscribe` method of :py:class:`~rbfly.streams.StreamsClient` class. The method is an asynchronous iterator, which yields stream messages:: async for msg in client.subscribe('stream-name'): print(msg) The method accepts the offset parameter, which is set to :py:const:`rbfly.streams.Offset.NEXT` by default. Stream offsets are described in :numref:`stream-offset` in more detail. Message Context --------------- A RabbitMQ Streams message has a set of properties like offset or timestamp. Also, AMQP 1.0 message, beside message body, can have additional data attached to it, for example message header or application properties. The additional information is available via :py:class:`~rbfly.streams.MessageCtx` object, which can be retrieved with :py:func:`~rbfly.streams.get_message_ctx` function, for example:: async for msg in client.subscribe('stream-name'): print(get_message_ctx().stream_offset) .. _stream-offset: Offset Specification -------------------- .. automodule:: rbfly.streams.offset Offset Reference ---------------- RabbitMQ Streams supports storing an offset value in a stream using a string reference, and receiving an offset value using the reference. Use :py:meth:`offset reference specification ` to read messages from a stream starting after stored offset value:: messages = client.subscribe('stream-name', offset=Offset.reference('stream-ref')) try: async for msg in messages: print msg finally: await client.write_offset('stream-name', 'stream-ref') In the example above, RbFly library performs the following actions - read offset value, stored for offset reference string `stream-ref`, from RabbitMQ Streams broker - start reading messages of stream `stream-name` with the offset value increased by one (start reading *after* stored value) - keep offset value of last received message in memory The example saves offset value with :py:meth:`~rbfly.streams.StreamsClient.write_offset` method. By default, the method saves offset value of last stream message. Custom offset value, can also be specified, see :py:meth:`the method documentation ` for details. .. note:: How and when offset value shall be saved is application dependant. .. vim: sw=4:et:ai