1. RabbitMQ Streams API
1.1. Broker Connection API
Create RabbitMQ Streams client using connection URI. |
|
Decorator to manage RabbitMQ Streams client connection. |
|
RabbitMQ Streams client. |
- rbfly.streams.streams_client(uri: str, /, ssl: SSLContext | None = None) StreamsClient [source]
Create RabbitMQ Streams client using connection URI.
- Parameters:
uri – Connection URI.
ssl – TLS/SSL context object.
- rbfly.streams.connection(coro: Callable[[Concatenate[Trc, P]], Coroutine[Any, Any, T]]) Callable[[Concatenate[Trc, P]], Coroutine[Any, Any, T]] [source]
- rbfly.streams.connection(coro: Callable[[Concatenate[Trc, P]], AsyncIterator[T]]) Callable[[Concatenate[Trc, P]], AsyncIterator[T]]
Decorator to manage RabbitMQ Streams client connection.
Streams client implements connection manager abstract class.
Streams client has to be the first parameter of coroutine coro.
Streams client is disconnected on exit of the coroutine using connection manager API.
- Parameters:
coro – Coroutine using RabbitMQ Streams client.
- class rbfly.streams.StreamsClient(connection_info: ConnectionInfo)[source]
RabbitMQ Streams client.
- async create_stream(stream: str) None [source]
Create RabbitMQ stream.
Method ignores error received from RabbitMQ Streams broker if the stream exists.
- Parameters:
stream – RabbitMQ stream name.
- async delete_stream(stream: str) None [source]
Delete RabbitMQ stream.
Method ignores error received from RabbitMQ Streams broker if the stream does not exist.
- Parameters:
stream – RabbitMQ stream name.
- publisher(stream: str, *, name: str | None = None, filter_extract: BloomFilterExtract | None = None) tp.AsyncContextManager[Publisher] [source]
- publisher(stream: str, *, name: str | None = None, filter_extract: BloomFilterExtract | None = None, cls: type[T]) tp.AsyncContextManager[T]
Create publisher for RabbitMQ stream.
The single message, AMQP publisher is used by default.
The stream must exist.
Publisher reference name is used for deduplication of messages. By default, the publisher name is
<hostname>/<pid>
. Override it, if this scheme does not work for a specific application, i.e. when using threads.- Parameters:
stream – RabbitMQ stream name.
name – RabbitMQ stream publisher reference name.
filter_extract – Function to extract values for stream Bloom filter.
cls – Publisher class.
- async subscribe(stream: str, *, offset: Offset = Offset.NEXT, filter: MessageFilter | None = None, timeout: float = 0, amqp: bool = True) AsyncIterator[rbfly.streams.types.AMQPBody] [source]
Subscribe to the stream and iterate over messages.
- Parameters:
stream – Name of RabbitMQ stream to subscribe to.
offset – RabbitMQ Streams offset specification.
filter – RabbitMQ stream message filter.
timeout – Raise timeout error if no message received within specified time (in seconds).
amqp – Messages are in AMQP 1.0 format if true. Otherwise no AMQP decoding.
- async write_offset(stream: str, reference: str, value: int | None = None) None [source]
Write RabbitMQ stream offset value using the reference string.
When offset value is not specified, then the last message context is retrieved and its stream offset value is stored. If there is no last message context to retrieve, then method does nothing.
- Parameters:
stream – Name of RabbitMQ stream.
reference – Offset reference string.
value – Offset value to be stored.
1.2. Publisher and Subscriber API
Create message context for RabbitMQ Streams publisher. |
|
Get current context of AMQP message. |
|
RabbitMQ Streams client. |
|
RabbitMQ Streams publisher for sending a single message. |
|
RabbitMQ Streams publisher for sending limited batch of messages. |
|
RabbitMQ Streams publisher for sending a batch of messages. |
|
AMQP message context. |
|
Offset specification for RabbitMQ stream subscription. |
|
Message filter for RabbitMQ Streams subscription. |
|
Function to extract values from messages for stream Bloom filter. |
- rbfly.streams.stream_message_ctx(body: AMQPBody, *, publish_id: int | None = None, app_properties: AMQPAppProperties = {}) MessageCtx
Create message context for RabbitMQ Streams publisher.
Message publish id is optional - a publisher assigns one if not specified. Message publish id can be used for message deduplication. If an application provides message publish ids, then it is its responsibility to track them and keep the ids strictly increasing.
Application properties are part of AMQP message. The properties can be used for filtering or routing.
- Parameters:
body – Message data to be sent to a stream.
publish_id – Message publish id.
app_properties – Application properties, part of AMQP message.
- rbfly.streams.get_message_ctx() MessageCtx
Get current context of AMQP message.
- class rbfly.streams.Publisher
RabbitMQ Streams publisher for sending a single message.
- name
name: unicode Publisher reference name.
- stream
stream: unicode RabbitMQ stream name.
- message_id
message_id: int Last value of published message id.
- async send(self, message: AMQPBody | MessageCtx) None
Send AMQP message to a RabbitMQ stream.
The asynchronous coroutine waits for message delivery confirmation from RabbitMQ Streams broker.
A message is simply application data of type
AMQPBody
, or message context (classMessageCtx
).- Parameters:
message – AMQP message to publish.
See also
AMQPBody
- class rbfly.streams.PublisherBatchLimit
RabbitMQ Streams publisher for sending limited batch of messages.
The publisher performs coordination between the batch and flush asynchronous coroutines to allow sending only limited number of messages.
- name
name: unicode Publisher reference name.
- stream
stream: unicode RabbitMQ stream name.
- message_id
message_id: int Last value of published message id.
- async batch(self, message: AMQPBody | MessageCtx, *, max_len: int) None
Enqueue AMQP message for batch processing with RabbitMQ Streams broker.
The asynchronous coroutine blocks when max_len messages are enqueued. To unblock, call
PublisherBatchLimit.flush()
method.A message is simply application data of type
AMQPBody
, or message context (classMessageCtx
).- Parameters:
message – AMQP message to publish.
max_len – Maximum number of messages in a batch.
See also
- async flush(self) None
Flush all enqueued messages and unblock
PublisherBatchLimit.batch()
asynchronous coroutines.See also
- class rbfly.streams.PublisherBatchFast
RabbitMQ Streams publisher for sending a batch of messages.
The number of messages in a single batch is limited by the maximum length of the Python list type on a given platform.
- name
name: unicode Publisher reference name.
- stream
stream: unicode RabbitMQ stream name.
- message_id
message_id: int Last value of published message id.
- batch(self, message: AMQPBody | MessageCtx) None
Enqueue AMQP message for batch processing with RabbitMQ Streams broker.
A message is simply application data of type
AMQPBody
, or message context (classMessageCtx
).- Parameters:
message – AMQP message to publish.
See also
- class rbfly.streams.MessageCtx
AMQP message context.
- Variables:
body – Message body.
header – Message header.
delivery_annotations – Message delivery annotations.
annotations – Message annotations.
properties – Message properties.
app_properties – Application properties.
footer – Message footer.
stream_offset – RabbitMQ stream offset value.
stream_timestamp – RabbitMQ stream offset timestamp value.
stream_publish_id – RabbitMQ stream message publishing id.
- class rbfly.streams.Offset(type: OffsetType, value: int | float | str | None = None)[source]
Offset specification for RabbitMQ stream subscription.
- FIRST: Offset = Offset.FIRST
Receive all messages, starting with the very first message in a stream. Equivalent to Offset.offset(0).
- LAST: Offset = Offset.LAST
Receive messages from a streams starting with first message stored in the current stream chunk.
- static offset(offset: int) Offset [source]
Create offset specification with offset value.
- Parameters:
offset – Offset value.
- class rbfly.streams.MessageFilter(extract: Callable[[MessageCtx], str], values: set[str])[source]
Message filter for RabbitMQ Streams subscription.
RabbitMQ Streams broker uses filter values to filter chunks of messages with Bloom filter.
Extract function is used to remove messages with non-requested filter values, i.e. false positives of a Bloom filter.
- Variables:
extract – Function to extract values for Bloom Filter.
values – Set of values to filter stream messages with.
- rbfly.streams.types.BloomFilterExtract
Function to extract values from messages for stream Bloom filter.
alias of
Callable
[[MessageCtx
],str
]
1.3. Data Types
Symbolic value from a constrained domain as defined by AMQP 1.0. |
|
AMQP simple type. |
|
Application data sent as AMQP message. |
|
AMQP message annotations. |
|
Application properties sent with AMQP message. |
- class rbfly.types.Symbol(name: str)[source]
Symbolic value from a constrained domain as defined by AMQP 1.0.
The class is also exported via
rbfly.streams
module.
- rbfly.types.AMQPScalar: TypeAlias = None | str | bool | int | float | datetime.datetime | uuid.UUID | rbfly.types.Symbol | bytes
AMQP simple type.
The type is also exported via
rbfly.streams
module.
- rbfly.types.AMQPBody: TypeAlias = collections.abc.Sequence['AMQPBody'] | dict['AMQPBody', 'AMQPBody'] | None | str | bool | int | float | datetime.datetime | uuid.UUID | rbfly.types.Symbol | bytes
Application data sent as AMQP message.
It is sent by a publisher and received by a subscriber.
The type is also exported via
rbfly.streams
module.
1.4. Deprecated API
Deprecated since version 0.7.0:
- class rbfly.streams.PublisherBatch
Use
rbfly.streams.PublisherBatchFast
class instead.
- class rbfly.streams.PublisherBatchMem
Use
rbfly.streams.PublisherBatchLimit
class instead.