Source code for rbfly.streams.offset

#
# rbfly - a library for RabbitMQ Streams using Python asyncio
#
# Copyright (C) 2021-2024 by Artur Wroblewski <wrobell@riseup.net>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

"""
Use RabbitMQ Streams offset specification to declare which messages an
application should receive from a stream.

The :py:meth:`~rbfly.streams.StreamsClient.subscribe` method accepts optional
`offset` parameter, which can be one of the following:

:py:const:`rbfly.streams.Offset.NEXT`
    Receive new messages from a stream only. Default offset specification.
:py:const:`rbfly.streams.Offset.FIRST`
    Receive all messages, starting with the very first message in a
    stream. Equivalent to `Offset.offset(0)`.
:py:const:`rbfly.streams.Offset.LAST`
    Receive messages from a streams starting with first message stored in
    the current stream chunk (see also below).
:py:meth:`rbfly.streams.Offset.offset`
    Receive messages from a stream starting with specific offset value.
:py:meth:`rbfly.streams.Offset.reference`
    Use the reference to get the offset stored in a stream. Receive
    messages starting from the next offset (this is `offset + 1`).
:py:meth:`rbfly.streams.Offset.timestamp`
    Receive messages from a stream starting with the specified timestamp of
    a message.

The following diagram visualizes offset location in a stream when each
chunk has 100 messages::

                               +- Offset.reference('ref-a') + 1
                               |
      chunk 1: [0] [1, ref-a] [2] ... [99]
                |
                +- Offset.FIRST

      chunk 2: [100, 1633006475.571] [101, 1633006475.999] ... [199, 1633006477.999]
                                       |                             |
                                       +- Offset.offset(101)         +- Offset.timestamp(1633006476.0)

      ...    : ...
                                       +- end of stream
                                       |
      chunk 10: [900] [901] ... [999]  +
                  |                    |
                  +- Offset.LAST       +- Offset.NEXT

.. note::
   Timestamp is `Erlang runtime system time
   <https://www.erlang.org/doc/apps/erts/time_correction.html#Erlang_System_Time>`_.
   It is a view of POSIX time.
"""

from __future__ import annotations

import dataclasses as dtc
import enum

class OffsetType(enum.IntEnum):
    """
    Offset type for RabbitMQ stream subscription.
    """
    FIRST = 0x0001
    LAST = 0x0002
    NEXT = 0x0003
    OFFSET = 0x0004
    TIMESTAMP = 0x0005
    REFERENCE = 0xffff  # NOTE: used for API design, not part of RabbitMQ
                        #       Streams protocol

[docs] @dtc.dataclass(frozen=True) class Offset: """ Offset specification for RabbitMQ stream subscription. """ type: OffsetType value: int | float | str | None=None #: Receive all messages, starting with the very first message #: in a stream. Equivalent to `Offset.offset(0)`. FIRST: Offset=dtc.field(init=False) #: Receive messages from a streams starting with first message stored #: in the current stream chunk. LAST: Offset=dtc.field(init=False) #: Receive new messages from a stream only. Default offset #: specification. NEXT: Offset=dtc.field(init=False)
[docs] @staticmethod def offset(offset: int) -> Offset: """ Create offset specification with offset value. :param offset: Offset value. """ return Offset(OffsetType.OFFSET, offset)
[docs] @staticmethod def timestamp(timestamp: float) -> Offset: """ Create offset specification with timestamp value. :param timestamp: Unix timestamp in seconds since epoch. """ return Offset(OffsetType.TIMESTAMP, timestamp)
[docs] @staticmethod def reference(reference: str) -> Offset: """ Create offset specification, which queries and stores stream offset with offset reference. :param reference: Offset reference string. """ return Offset(OffsetType.REFERENCE, reference)
def __str__(self) -> str: result = 'Offset.{}'.format(self.type.name) if self not in SIMPLE_OFFSET: result += '({})'.format(self.value) return result __repr__ = __str__
# define constants for API use Offset.FIRST = Offset(OffsetType.FIRST) Offset.LAST = Offset(OffsetType.LAST) Offset.NEXT = Offset(OffsetType.NEXT) # indicate offset sepecifications, which require no parameter SIMPLE_OFFSET = (Offset.FIRST, Offset.LAST, Offset.NEXT) # vim: sw=4:et:ai