1. RabbitMQ Streams API

1.1. Broker Connection API

rbfly.streams.streams_client

Create RabbitMQ Streams client using connection URI.

rbfly.streams.connection

Decorator to manage RabbitMQ Streams client connection.

rbfly.streams.StreamsClient

RabbitMQ Streams client.

rbfly.streams.streams_client(uri: str, /, ssl: SSLContext | None = None) StreamsClient[source]

Create RabbitMQ Streams client using connection URI.

Parameters:
  • uri – Connection URI.

  • ssl – TLS/SSL context object.

rbfly.streams.connection(coro: Callable[[Concatenate[Trc, P]], Coroutine[Any, Any, T]]) Callable[[Concatenate[Trc, P]], Coroutine[Any, Any, T]][source]
rbfly.streams.connection(coro: Callable[[Concatenate[Trc, P]], AsyncIterator[T]]) Callable[[Concatenate[Trc, P]], AsyncIterator[T]]

Decorator to manage RabbitMQ Streams client connection.

Streams client implements connection manager abstract class.

Streams client has to be the first parameter of coroutine coro.

Streams client is disconnected on exit of the coroutine using connection manager API.

Parameters:

coro – Coroutine using RabbitMQ Streams client.

class rbfly.streams.StreamsClient(connection_info: ConnectionInfo)[source]

RabbitMQ Streams client.

async create_stream(stream: str) None[source]

Create RabbitMQ stream.

Method ignores error received from RabbitMQ Streams broker if the stream exists.

Parameters:

stream – RabbitMQ stream name.

async delete_stream(stream: str) None[source]

Delete RabbitMQ stream.

Method ignores error received from RabbitMQ Streams broker if the stream does not exist.

Parameters:

stream – RabbitMQ stream name.

publisher(stream: str, *, name: str | None = None) tp.AsyncContextManager[Publisher][source]
publisher(stream: str, *, name: str | None = None, cls: type[T]) tp.AsyncContextManager[T]

Create publisher for RabbitMQ stream.

The single message, AMQP publisher is used by default.

The stream must exist.

Publisher reference name is used for deduplication of messages. By default, the publisher name is <hostname>/<pid>. Override it, if this scheme does not work for a specific application, i.e. when using threads.

Parameters:
  • stream – RabbitMQ stream name.

  • name – RabbitMQ stream publisher reference name.

  • cls – Publisher class.

async subscribe(stream: str, *, offset: Offset = Offset.NEXT, timeout: float = 0, amqp: bool = True) AsyncIterator[rbfly.streams.types.AMQPBody][source]

Subscribe to the stream and iterate over messages.

Parameters:
  • stream – Name of RabbitMQ stream to subscribe to.

  • offset – RabbitMQ Streams offset specification.

  • timeout – Raise timeout error if no message received within specified time (in seconds).

  • amqp – Messages are in AMQP 1.0 format if true. Otherwise no AMQP decoding.

async write_offset(stream: str, reference: str, value: int | None = None) None[source]

Write RabbitMQ stream offset value using the reference string.

When offset value is not specified, then the last message context is retrieved and its stream offset value is stored. If there is no last message context to retrieve, then method does nothing.

Parameters:
  • stream – Name of RabbitMQ stream.

  • reference – Offset reference string.

  • value – Offset value to be stored.

1.2. Publisher and Subscriber API

rbfly.streams.stream_message_ctx

Create message context for RabbitMQ Streams publisher.

rbfly.streams.get_message_ctx

Get current context of AMQP message.

rbfly.streams.StreamsClient

RabbitMQ Streams client.

rbfly.streams.Publisher

RabbitMQ Streams publisher for sending a single message.

rbfly.streams.PublisherBatchLimit

RabbitMQ Streams publisher for sending limited batch of messages.

rbfly.streams.PublisherBatchFast

RabbitMQ Streams publisher for sending a batch of messages.

rbfly.streams.MessageCtx

AMQP message context.

rbfly.streams.Offset

Offset specification for RabbitMQ stream subscription.

rbfly.streams.stream_message_ctx(body: AMQPBody, *, publish_id: int | None = None, app_properties: AMQPAppProperties = {}) MessageCtx

Create message context for RabbitMQ Streams publisher.

Message publish id is optional - a publisher assigns one if not specified. Message publish id can be used for message deduplication. If an application provides message publish ids, then it is its responsibility to track them and keep the ids strictly increasing.

Application properties are part of AMQP message. The properties can be used for filtering or routing.

Parameters:
  • body – Message data to be sent to a stream.

  • publish_id – Message publish id.

  • app_properties – Application properties, part of AMQP message.

rbfly.streams.get_message_ctx() MessageCtx

Get current context of AMQP message.

class rbfly.streams.Publisher

RabbitMQ Streams publisher for sending a single message.

name

name: unicode Publisher reference name.

stream

stream: unicode RabbitMQ stream name.

message_id

message_id: int Last value of published message id.

async send(self, message: AMQPBody | MessageCtx) None

Send AMQP message to a RabbitMQ stream.

The asynchronous coroutine waits for message delivery confirmation from RabbitMQ Streams broker.

A message is simply application data of type AMQPBody, or message context (class MessageCtx).

Parameters:

message – AMQP message to publish.

See also

class rbfly.streams.PublisherBatchLimit

RabbitMQ Streams publisher for sending limited batch of messages.

The publisher performs coordination between the batch and flush asynchronous coroutines to allow sending only limited number of messages.

name

name: unicode Publisher reference name.

stream

stream: unicode RabbitMQ stream name.

message_id

message_id: int Last value of published message id.

async batch(self, message: AMQPBody | MessageCtx, *, max_len: int) None

Enqueue AMQP message for batch processing with RabbitMQ Streams broker.

The asynchronous coroutine blocks when max_len messages are enqueued. To unblock, call PublisherBatchLimit.flush() method.

A message is simply application data of type AMQPBody, or message context (class MessageCtx).

Parameters:
  • message – AMQP message to publish.

  • max_len – Maximum number of messages in a batch.

async flush(self) None

Flush all enqueued messages and unblock PublisherBatchLimit.batch() asynchronous coroutines.

class rbfly.streams.PublisherBatchFast

RabbitMQ Streams publisher for sending a batch of messages.

The number of messages in a single batch is limited by the maximum length of the Python list type on a given platform.

name

name: unicode Publisher reference name.

stream

stream: unicode RabbitMQ stream name.

message_id

message_id: int Last value of published message id.

batch(self, message: AMQPBody | MessageCtx) None

Enqueue AMQP message for batch processing with RabbitMQ Streams broker.

A message is simply application data of type AMQPBody, or message context (class MessageCtx).

Parameters:

message – AMQP message to publish.

async flush(self) None

Flush all enqueued messages.

class rbfly.streams.MessageCtx

AMQP message context.

Variables:
  • body – Message body.

  • annotations – Message annotations.

  • app_properties – Application properties.

  • stream_offset – RabbitMQ stream offset value.

  • stream_timestamp – RabbitMQ stream offset timestamp value.

  • stream_publish_id – RabbitMQ stream message publishing id.

class rbfly.streams.Offset(type: OffsetType, value: int | float | str | None = None)[source]

Offset specification for RabbitMQ stream subscription.

FIRST: Offset = Offset.FIRST

Receive all messages, starting with the very first message in a stream. Equivalent to Offset.offset(0).

LAST: Offset = Offset.LAST

Receive messages from a streams starting with first message stored in the current stream chunk.

NEXT: Offset = Offset.NEXT

Receive new messages from a stream only. Default offset specification.

static offset(offset: int) Offset[source]

Create offset specification with offset value.

Parameters:

offset – Offset value.

static reference(reference: str) Offset[source]

Create offset specification, which queries and stores stream offset with offset reference.

Parameters:

reference – Offset reference string.

static timestamp(timestamp: float) Offset[source]

Create offset specification with timestamp value.

Parameters:

timestamp – Unix timestamp in seconds since epoch.

1.3. Data Types

rbfly.types.Symbol

Symbolic value from a constrained domain as defined by AMQP 1.0.

rbfly.types.AMQPScalar

AMQP simple type.

rbfly.types.AMQPBody

Application data sent as AMQP message.

rbfly.types.AMQPAppProperties

Application properties sent with AMQP message.

class rbfly.types.Symbol(name: str)[source]

Symbolic value from a constrained domain as defined by AMQP 1.0.

The class is also exported via rbfly.streams module.

rbfly.types.AMQPScalar: TypeAlias = None | str | bool | int | float | datetime.datetime | uuid.UUID | rbfly.types.Symbol | bytes

AMQP simple type.

The type is also exported via rbfly.streams module.

rbfly.types.AMQPBody: TypeAlias = collections.abc.Sequence['AMQPBody'] | dict['AMQPBody', 'AMQPBody'] | None | str | bool | int | float | datetime.datetime | uuid.UUID | rbfly.types.Symbol | bytes

Application data sent as AMQP message.

It is sent by a publisher and received by a subscriber.

The type is also exported via rbfly.streams module.

rbfly.types.AMQPAppProperties

Application properties sent with AMQP message.

The type is also exported via rbfly.streams module.

alias of dict[str, None | str | bool | int | float | datetime | UUID | Symbol | bytes]

1.4. Deprecated API

Deprecated since version 0.7.0:

class rbfly.streams.PublisherBatch

Use rbfly.streams.PublisherBatchFast class instead.

class rbfly.streams.PublisherBatchMem

Use rbfly.streams.PublisherBatchLimit class instead.