TQDM for an async Queue
After working through the Real Python
tutorial for async
functions I was curious about whether I could monitor the
status of the async Queue used in one of the examples.
The example works by creating a number of producer coroutines and consumer coroutines that systematically add and remove 'jobs' from the shared queue. You can play with the number of producers and consumers to adjust the speed at which these jobs are produced and consumed which affects the size and status of the queue. You can get a feeling for it intrinsically by watching the standard output but I wondered if you could use TQDM.
TQDM is a popular python library for creating progress bars and is fast with a bunch of functionality.
Overall, my objective was to track how many total queue items had been created and how many were finished. This is a bit like the MS Windows file copy model which starts copying immediately and slowly increases the number of files as it scans the hard-drive.
Original Example
The toy example is broken up into a few small sub-functions for making queue items makeitem
and for creating random sleep
time randsleep
which simulates variable levels of computational work. An asynchronous produce
method adds between 0
and 10 of these items to the Queue with a random sleep between each item. The consume
method pulls an item from the queue
and then also performs a random sleep before checking the queue for the next item to consume. The example finishes when
the consumer
method pool completes its last task item.
Example Script
#!/usr/bin/env python3
# asyncq.py
import asyncio
import itertools as it
import os
import random
import time
async def makeitem(size: int = 5) -> str:
return os.urandom(size).hex()
async def randsleep(caller=None) -> None:
i = random.randint(0, 10)
if caller:
print(f"{caller} sleeping for {i} seconds.")
await asyncio.sleep(i)
async def produce(name: int, q: asyncio.Queue) -> None:
n = random.randint(0, 10)
for _ in it.repeat(None, n): # Synchronous loop for each single producer
await randsleep(caller=f"Producer {name}")
i = await makeitem()
t = time.perf_counter()
await q.put((i, t))
print(f"Producer {name} added <{i}> to queue.")
async def consume(name: int, q: asyncio.Queue) -> None:
while True:
await randsleep(caller=f"Consumer {name}")
i, t = await q.get()
now = time.perf_counter()
print(f"Consumer {name} got element <{i}>"
f" in {now-t:0.5f} seconds.")
q.task_done()
async def main(nprod: int, ncon: int):
q = asyncio.Queue()
producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
await asyncio.gather(*producers)
await q.join() # Implicitly awaits consumers, too
for c in consumers:
c.cancel()
if __name__ == "__main__":
import argparse
random.seed(444)
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--nprod", type=int, default=5)
parser.add_argument("-c", "--ncon", type=int, default=10)
ns = parser.parse_args()
start = time.perf_counter()
asyncio.run(main(**ns.__dict__))
elapsed = time.perf_counter() - start
print(f"Program completed in {elapsed:0.5f} seconds.")
A counting asyncio.Queue
Queues do not have any history attributes in Python. They understand their overall current size, the order of tasks, the status of tasks and their maximum size but that is it. To know how many items have passed through a Queue we will need to extend Queue with a tracking (counting) parameter.
Sub-classing asyncio.Queue
we can add a count
instance variable and then when items are put
in the queue, we increment
count
before calling the parent put
method. The asynchronous put
of the parent class must be propagated to the child
class through the use of await
which necessitates that the child put
is also asynchronous.
class AsyncCountingQueue(asyncio.Queue):
def __init__(self):
self.count = 0
super().__init__()
async def put(self, item: Any) -> None:
self.count += 1
await super().put(item)
Initial failures
I tried a few different patterns of incorporating tqdm.asyncio
into the original example. TQDM progress bars are typically
defined around known length iterators and because the queue is of dynamic size, its a bit more difficult to track.
Initially I had thought that I could introduce a TQDM progress bar object into the main
function and update it manually
using an async function. The function was essentially an endless loop that waited for the Queue to be emptied. This worked
OK but there were some issues with the final completion of the bar, as the queue would empty before the bar was fully completed
leaving a small number of tasks apparently left.
The reason for this was a difficulty in structuring the order of async tasks under this pattern. Specifically, asyncio.Queue.join
is a blocking method and has to be the last async operation whilst we want the TQDM bar to essentially wrap the whole
async loop. It might be possible to conditionally check the status of the Queue instead of using join
but this feels clunky
and potentially prone to edge case errors where the queue may empty prematurely before more items are added.
In the second pattern, I opted to use TQDM as a context manager. The manual updating of the progress bar
was incorporated into a sub-classing of tqdm.async.tqmd
and the context manager wrapped the async loop and the blocking
asyncio.Queue.join
.
In this case the progress bar does not update until the very end of the context manager block. It is effectively the same
as the original example but places the progress bar update after the blocking join
.
Solution
In the end, the solution became to incorporate TQDM into the sub-classed asyncio.Queue
. Doing this allowed changes to
the Queue to be tied to changes to the progress bar. In __init__
the tqdm
args
and kwargs
are saved to be used
with the async context manager __aenter__
. A _refresh
method is introduced that updates the progress
progress bar
with the queue status.
Additionally, the put
and get
methods are extended to call _refresh
when changes are made to the queue. And finally,
when the context manager is exited the progress bar is refreshed before being closed. By tying _refresh
to put
and get
we can side-step the issue of ensuring the progress bar is updated correctly when changes to the queue are made.
class AsyncTQDMQueue(asyncio.Queue):
def __init__(self, *args, **kwargs):
self.count = 0
self.got = 0
self.args = args
self.kwargs = kwargs
self.progress = None
super().__init__()
def _refresh(self):
if self.progress is not None:
self.progress.total = self.count
self.progress.n = self.got
self.progress.refresh
async def put(self, item: Any, msg: str = None) -> None:
self.count += 1
self._refresh()
await super().put(item)
if msg:
tqdm.write(msg)
async def get(self) -> Any:
self.got += 1
self._refresh()
return await super().get()
async def __aenter__(self) -> Any:
self.progress = tqdm(*self.args, **self.kwargs)
return self
async def __aexit__(self, type, value, traceback) -> None:
self.progress.refresh()
self.progress.close()
With the queue-tqdm hybrid class a context manager can be introduced to the main()
async loop
that correctly tracks the queue status. The use of a context manager here ensures the queue is wrapped by correct logic
from the progress bar.
async with AsyncTQDMQueue(
bar_format='{desc}: {bar}{n_fmt}/{total_fmt} [{elapsed}]',
desc="Async Queue"
) as q:
producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
await asyncio.gather(*producers)
await q.join()
Full Solution Script
from typing import Any
import asyncio
import itertools as it
import os
import random
import time
from tqdm.asyncio import tqdm
class AsyncTQDMQueue(asyncio.Queue):
def __init__(self, *args, **kwargs):
self.count = 0
self.got = 0
self.args = args
self.kwargs = kwargs
self.progress = None
super().__init__()
def _refresh(self):
if self.progress is not None:
self.progress.total = self.count
self.progress.n = self.got
self.progress.refresh
async def put(self, item: Any, msg: str = None) -> None:
self.count += 1
self._refresh()
await super().put(item)
if msg:
tqdm.write(msg)
async def get(self) -> Any:
self.got += 1
self._refresh()
return await super().get()
async def __aenter__(self) -> Any:
self.progress = tqdm(*self.args, **self.kwargs)
return self
async def __aexit__(self, type, value, traceback) -> None:
self.progress.refresh()
self.progress.close()
async def make_item(size: int = 5) -> str:
return os.urandom(size).hex()
async def rand_sleep(caller: str | None = None, scaler: int = 1) -> None:
i = random.randint(0, 10)
if caller:
tqdm.write(f"{caller} sleeping for {i} seconds.")
await asyncio.sleep(i * scaler)
async def produce(name: int, q: asyncio.Queue) -> None:
n = random.randint(10, 50)
for _ in it.repeat(None, n):
await rand_sleep(caller=f"Producer {name}", scaler=0.5)
i = await make_item()
t = time.perf_counter()
await q.put((i, t), msg=f"Producer {name} added <{i}> to queue.")
async def consume(name: int, q: asyncio.Queue) -> None:
while True:
await rand_sleep(caller=f"Consumer {name}")
i, t = await q.get()
now = time.perf_counter()
tqdm.write(f"Consumer {name} got element <{i}> in {now-t:0.5f} seconds.")
q.task_done()
async def main(nprod: int, ncon: int) -> None:
async with AsyncTQDMQueue(
bar_format="{desc}: {bar}{n_fmt}/{total_fmt} [{elapsed}]", desc="Async Queue"
) as q:
producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
await asyncio.gather(*producers)
await q.join()
for c in consumers:
c.cancel()
if __name__ == "__main__":
import argparse
random.seed(444)
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--nprod", type=int, default=5)
parser.add_argument("-c", "--ncon", type=int, default=10)
ns = parser.parse_args()
start = time.perf_counter()
asyncio.run(main(**ns.__dict__))
elapsed = time.perf_counter() - start
tqdm.write(f"Program completed in {elapsed:0.5f} seconds.")