7. Concurrency and Parallelism
RbFly uses asynchronous concurrency model, and avoids using threads.
Concurrency means an application executes and switches between multiple tasks on a single CPU core, while an application using threads might run a program on multiple processors of a machine. Threads can run code in a true, parallel manner. However, they are a limited resource, and should be managed by an application.
Performance of programs written in Python language, which try to use threads for parallelism, is hampered by Global Interpreter Lock (GIL). It blocks threads from utilizing multiple processors of a machine.
Despite the limitations of asynchronous concurrency model or Python threads, an application can use RbFly and threads to execute CPU bound tasks. This is possible when part of an application is implemented as a C extension. Such extensions can release GIL, which enables parallelism of a program.
7.1. Asynchronous Concurrency
The implementation of RbFly library is based on asynchronous concurrency model. In Python, asyncio framework provides an asynchronous event loop to execute tasks of RbFly library and of an application. They are executed in a single thread of the application.
After task is started, it initiaties an asynchronous operation, and yields control to the event loop allowing execution of other tasks. The event loop resumes execution of the task, when results of the asynchronous operation are available.
Concurrent execution of tasks can be blocked by a CPU bound task of an application. Such task does not yield control to an event loop, and prevents other tasks from running. For example, a CPU bound task might block RbFly from sending heartbeats by RabbitMQ Streams broker. The broker will close active connections if it does not receive heartbeats from an application in a timely manner. This problem can be avoided with threads.
7.2. Threads
Applications could run CPU bound tasks, if RbFly used threads to execute its tasks like sending heartbeats to RabbitMQ Streams broker.
However, RbFly avoids using threads. They bring additional cost like context switching and increased memory consumption, therefore threads should be managed like other limited resources. The use and management of threads is left up to an application.
A common pattern to maintain threads is to use a thread pool. This, in turn, limits concurrency and parallelism of an application, because the number of parallel tasks is limited by the size of a thread pool (Figure 7.1 and Figure 7.2). Finally, in Python language, the limitation is even more pronounced by GIL.
However, Python’s GIL enables easy integration of libraries written in C language. Such libraries might release GIL when starting calculations, and enable parallelism of an application.
7.3. Example of Parallelism
NumPy is an example of a Python library, which might release GIL when executing calculations.
The demo presented in this subsection combines RbFly library with NumPy to demonstrate parallel execution of tasks.
The script consumes messages from a stream, and performs matrix multiplication with NumPy. The calculation lasts few minutes, and is executed in asyncio thread executor. During execution of the calculations in threads, RbFly is able to execute its tasks on asynchronous event loop
heartbeats are sent RabbitMQ Streams broker
additional task slowly consumes messages from the stream
Note
When script is run on a faster CPU, then increase size of a matrix with -n parameter of the script. A single matrix multiplication should run for 3 minutes, at least. This is to have overlap between the calculation and sending heartbeats to RabbitMQ Streams broker.
32 GB of RAM is required to run the script with the default parameters.
Alternatively, the script can be run with -s parameter. This starts NumPy calculations in the same thread, which is used by tasks of RbFly library. The calculations block RbFly from sending heartbeats to RabbitMQ Streams broker, and the broker eventually closes the current connection. Once application yields control to the event loop, then RbFly attempts reconnection to the server.
import asyncio
import logging
import numpy as np
from datetime import datetime
from functools import partial
import rbfly.streams as rbs
STREAM = 'rbfly-demo-stream-cpubound'
# send data to the stream
async def send_data(client: rbs.StreamsClient) -> None:
async with client.publisher(STREAM) as publisher:
for _ in range(3600):
await publisher.send('hello')
# an additional task receiving data from the stream during calculations
async def receive_data(client: rbs.StreamsClient) -> None:
num_msg = 0
print('{} start receiving messages'.format(datetime.now()))
offset = rbs.Offset.FIRST
async for msg in client.subscribe(STREAM, offset=offset):
num_msg += 1
await asyncio.sleep(1)
if num_msg % 60 == 0:
print('{} done receiving messages, num={}'.format(
datetime.now(), num_msg
))
# read messages from the stream and perform CPU bound calculations
async def read_and_calculate(
client: rbs.StreamsClient,
use_threads: bool,
size: int
) -> None:
loop = asyncio.get_event_loop()
offset = rbs.Offset.FIRST
async for msg in client.subscribe(STREAM, offset=offset):
if use_threads:
await loop.run_in_executor(None, calculate, size)
else:
calculate(size)
await asyncio.sleep(1)
def calculate(size: int):
print('{} starting calculation'.format(datetime.now()))
matrix = np.random.rand(size, size)
result = matrix @ matrix @ matrix
print('{} done calculating'.format(datetime.now()))
@rbs.connection
async def demo(client: rbs.StreamsClient, use_threads: bool, size: int) -> None:
try:
await client.create_stream(STREAM)
await send_data(client)
await asyncio.gather(
read_and_calculate(client, use_threads, size),
receive_data(client)
)
finally:
await client.delete_stream(STREAM)
parser = argparse.ArgumentParser()
parser.add_argument(
'-s', '--no-threads', action='store_true', default=False,
help='use threads to perform cpu bound calculation'
)
parser.add_argument(
'-n', '--size', default=24 * 1024, type=int,
help='size of matrix used for calculation'
)
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
client = rbs.streams_client('rabbitmq-stream://guest:guest@localhost')
use_threads = not args.no_threads
asyncio.run(demo(client, use_threads, args.size))