Async Programming in Python
Python’s async programming model is a little bit complex and is constantly evolving. It is not easy to decide which programming model to choose without deep knowledge of python internals.
Python provide three async programming models: multithread, multiprocess libraries and coroutine.
When we consider async programming, we should always keep in mind what is the bottleneck of our syncronous system, which means it is necessary to distinguish between “Blocking IO” and “Computation Resource”. Blocking IO is the case when we need to wait for the external system to respond. For example, when reading/writing a file we need to access disk memory, and that can take many CPU cycles. RPC call is another great example. Computation Resource is the case that we have too much to do in one CPU, and in order to improve throughput, we want to distribute the work evenly among multiple CPUs. There’s no “waiting” in this scenario (just the opposite, the CPU works like a dog.)
To put it in a simple way, assume that you are doing some chores today. There are three jobs to complete: wash the clothes, mop the floor and throw the garbage. The naive (synchronous) way is to complete jobs one by one, i.e. after the clothes are washed, then mop the floor, and then throw the garbage. However, since you have a washing machine, you can simply “submit” the job to the washing machine and start mopping the floor right away. You just need to make sure that by the end of the day you collect the clothes from the machine. This approach is better than sitting there waiting for the machine to finish, and the time saved is by reducing the blocking IO. However, after you began mopping the floor, you may found the place is so big (say, you are a thrifty millionaire) that it’s impossible for you to finish in a day. In this case, the only way is to distribute the workload to other people so everybody gets an area they can handle. That’s what we call leveraging parallelism in computation resources.
Multithread
Python’s standard way to convert blocking-IO to non-blocking ones are via threads
. For example, I have an RPC call that takes 1 second to respond, and I need to call 10 times. The syncrounous version could be:
def rpc(x: int):
print(f'calling {x}')
time.sleep(1)
def main():
for i in range(10):
rpc(i)
We can run the profiler to check the performance:
from pstats import Stats
from cProfile import Profile
def profile_cpu(fn):
profiler = Profile()
profiler.runcall(fn)
stats = Stats(profiler)
stats.strip_dirs()
stats.sort_stats('cumulative')
stats.print_stats()
profile_cpu(main)
And as you can see the main program takes about 10 seconds.
272 function calls in 10.004 seconds
To use thread there are several different ways. The most straightforward way is to extend the Thread
class:
from threading import Thread
class RPCThread(Thread):
def __init__(self, x):
super().__init__()
self.x = x
def run(self):
rpc(self.x)
def rpc_with_thread_class():
threads = []
for i in range(10):
thread = RPCThread(i)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
profile_cpu(rpc_with_thread_class)
This reduces our run time to a decent 1 second.
555 function calls in 1.005 seconds
Seems pretty nice so far. However, things can be trickier in real-life scenario. For the case above, we have a fixed time of RPC call (10
). In real life, we may not know how many RPC call should be concurrently issued. The two factors are:
- What triggers the RPC. For example, if the service is listening to certain events to trigger RPC, the frequency of events can vary, so does the concurrency of RPC can vary in response.
- The SLA of the RPC server. Maybe we have a strict limit of QPS the server can handle, and the concurrency should not exceed that limit.
Dynamic Thread
For the first case, we are actually looking at a producer-consumer model (very popular in the computer science world.) The setup could be:
from queue import Queue
queue = Queue()
# our service
def service():
threads = []
event_cnt = 0
while True:
event = queue.get()
thread = RPCThread(event)
thread.start()
threads.append(thread)
# this loop should be running forever, but for the sake of benchmarking, we set a constant loop time
event_cnt += 1
if event_cnt == 1000:
break
for thread in threads:
thread.join()
The naive approach is: get an event from the queue and create a thread for the event. We can easily build a test case for this set up.
def event_provider():
# simulate event source
for i in range(1000):
queue.put(i)
time.sleep(0.01)
# run the provider and consumer in parallel
class ThreadRunner(Thread):
def __init__(self, func):
super().__init__()
self.func = func
def run(self):
self.func()
def benchmark_dynamic_thread():
consumer_thread = ThreadRunner(service)
consumer_thread.start()
event_provider()
consumer_thread.join()
profile_cpu(benchmark_dynamic_thread)
# profile_memory(benchmark_dynamic_thread)
The time is 11.28 seconds and the memory usage is 1.42 MB. Time-wise, the program is pretty decent. However, the memory usage is a little bit high. The reason is we dynamically create a new thread upon each event. Since we have 1000 events, we are creating 1000 threads on the fly. We can improve this approach by reusing a fixed number of threads:
Thread Pooling
event_cnt = 0
THREAD_COUNT = 100
class ConsumerThread(Thread):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
global event_cnt
while True:
event = self.queue.get()
rpc(event)
event_cnt += 1
def benchmark_fixed_thread():
threads = []
for _ in range(THREAD_COUNT):
thread = ConsumerThread(queue)
thread.start()
threads.append(thread)
event_provider()
while event_cnt < 1000:
# busy waiting while the events have not finished
time.sleep(0.1)
profile_cpu(benchmark_fixed_thread)
# profile_memory(benchmark_fixed_thread)
In the approach above we did two modifications:
- We let the thread itself fetch from the queue, so that the thread can be reused.
- We create 100 threads at the start of the program and no more threads dynamically created.
The time is 11.33 seconds, which is slightly over the dynamic thread approach, but the memory usage is 0.41 MB, which is much lower. This means by reusing a fixed number of threads, we effectively reduce the memory overhead of thread creation.
This pattern is so common that python support it natively. We can use concurrent.futures.ThreadPoolExecutor
to achieve the exact same effect with clearer code path. What you need to do is to change the creation of thread into pool.submit
and use the future
object it returns the join all the threads.
from concurrent.futures import ThreadPoolExecutor
event_cnt = 0
queue = Queue()
def thread_pool_service(pool):
futures = []
event_cnt = 0
while True:
event = queue.get()
future = pool.submit(rpc, event)
futures.append(future)
# this loop should be running forever, but for the sake of benchmarking, we set a constant loop time
event_cnt += 1
if event_cnt == 1000:
break
for future in futures:
future.result()
def benchmark_thread_pool():
with ThreadPoolExecutor(max_workers=THREAD_COUNT) as pool:
provider_thread = ThreadRunner(event_provider) # run provider in different thread
provider_thread.start()
thread_pool_service(pool) # start service
profile_memory(benchmark_thread_pool)
The time cost is 11.25 seconds, which is better than the dynamic thread approach. The memory is 0.42 MB, on par with our hand-crafted thread pool. So far, this native support has the best performance both throughput-wise and memory-wise.
Coroutine
The multi-thread parallelism can achieve quite good performance. However, as mentioned above, creating a thread requires non-trivial memory cost. Using a thread pool alleviates this issue, but the pattern is not easily integrated into the programming flow. To address the thread overhead issue and make async programming more generic in python, coroutine is created.
As said in “Effective Python”, Coroutines let you have a very large number of seemingly simultaneous functions in your Python programs. They’re implemented using the async
and await
keywords along with the same infrastructure that powers generators. The function pauses at the await
keyword and resume when the pending result is available.
Coroutine is essentially one thread but with predefined yield points, where the program tells the CPU to suspend the execution and switch to other jobs. In contrast, thread is powered by interrupts, which means the yield time is random. Therefore, while multi-thread has to take context switching overhead and racing condition into consideration, coroutine is free of those concerns.
It is very similar to Nodejs programming model in Javascript. Basically you divide all tasks into syncrounous tasks and asyncrounous tasks, execute the syncrounous tasks in order while submitting asyncrounous tasks to a queue. Check if the asyncrounous tasks are fulfilled after all syncrounous tasks are finished.
async def async_rpc(x):
await asyncio.sleep(1)
async def async_service():
tasks = []
global event_cnt
while True:
event = queue.get()
task = asyncio.create_task(async_rpc(event))
tasks.append(task)
event_cnt += 1
if event_cnt == 1000:
break
await asyncio.gather(*tasks)
async def benchmark_coroutine():
provider_thread = ThreadRunner(event_provider) # run provider in different thread
provider_thread.start()
await async_service()
The coroutine approach costs 11.28 seconds, and the memory usage is about 0.3 MB.
Python Coroutine vs JS Promise
If you have used Javascript before, you may find the async-await programming syntax very familiar. Indeed they share many similarities. For example, python’s async function returns a coroutine
object that you can await and join, and in Javascript async function returns a Promise
object that has similar effect.
The core concepts are similar in both languages - async-await provides a way to write asynchronous code in a synchronous-looking style.
In Javascript, the event loop is built-in. So you can use async functions just like a normal function:
// JavaScript: Built-in event loop
async function jsExample() {
await fetch('https://api.example.com');
// Other I/O operations automatically use the event loop
}
In python, you need to create an event loop to run coroutine.
# Python: Requires explicit event loop
import asyncio
async def py_example():
await asyncio.create_task(some_coroutine())
# Must run in an event loop
# Must explicitly run the event loop
asyncio.run(py_example())
Multiprocess
Everything we talked before is about addressing the blocking IO issue. We have not touched the computation resource part yet. Multi-threading is not able to utilize multiple CPU cores because of the mechanism of global interpreter lock.
Global interpreter lock (GIL) is a kind of mutex that helps python interpreter from messing up its own state. When python code is executed, it is first transformed into bytecode and runs with a stack-based interpreter. The state needs to be maintained for the sake of tasks like garbage collection. The negative side effect of GIL is that it means while multiple threads can be issued at the same time, only one of them is allowed to execute. Therefore, GIL prevents thread from utilizing multiple cores.
Python gets around with this GIL issue and support multiple process execution in a smart way: it simply runs another interpreter (on another core) as child process. since child processes are separate from the main interpreter, their GIL are also separate.
The best part is the syntax for multiprocess is similar to the multi-thread:
# multi thread
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as pool:
pool.submit(do_something)
# multi process
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as pool:
pool.submit(do_something)
What ProcessPoolExecutor
does is, like I said, create a new interpreter child process. It also need to serialize the items into binary data using pickle
and copies the data from the main process to the child process. Therefore, it is impossible to execute non-serializable objects (e.g. closures) with ProcessPoolExecutor
.
def bad_example():
multiplier = 10
def closure_function(x):
return x * multiplier
with ProcessPoolExecutor() as executor:
try:
# This will raise a pickle error
results = list(executor.map(closure_function, range(5)))
except Exception as e:
print(f"Error: {e}")