Skip to main content

TQDM for an async Queue

· 8 min read
Tony Hallam
Application Consultant @ EPCC

After working through the Real Python tutorial for async functions I was curious about whether I could monitor the status of the async Queue used in one of the examples.

The example works by creating a number of producer coroutines and consumer coroutines that systematically add and remove 'jobs' from the shared queue. You can play with the number of producers and consumers to adjust the speed at which these jobs are produced and consumed which affects the size and status of the queue. You can get a feeling for it intrinsically by watching the standard output but I wondered if you could use TQDM.

note

TQDM is a popular python library for creating progress bars and is fast with a bunch of functionality.

Overall, my objective was to track how many total queue items had been created and how many were finished. This is a bit like the MS Windows file copy model which starts copying immediately and slowly increases the number of files as it scans the hard-drive.

Original Example

The toy example is broken up into a few small sub-functions for making queue items makeitem and for creating random sleep time randsleep which simulates variable levels of computational work. An asynchronous produce method adds between 0 and 10 of these items to the Queue with a random sleep between each item. The consume method pulls an item from the queue and then also performs a random sleep before checking the queue for the next item to consume. The example finishes when the consumer method pool completes its last task item.

Example Script
asyncq.py
#!/usr/bin/env python3
# asyncq.py

import asyncio
import itertools as it
import os
import random
import time

async def makeitem(size: int = 5) -> str:
return os.urandom(size).hex()

async def randsleep(caller=None) -> None:
i = random.randint(0, 10)
if caller:
print(f"{caller} sleeping for {i} seconds.")
await asyncio.sleep(i)

async def produce(name: int, q: asyncio.Queue) -> None:
n = random.randint(0, 10)
for _ in it.repeat(None, n): # Synchronous loop for each single producer
await randsleep(caller=f"Producer {name}")
i = await makeitem()
t = time.perf_counter()
await q.put((i, t))
print(f"Producer {name} added <{i}> to queue.")

async def consume(name: int, q: asyncio.Queue) -> None:
while True:
await randsleep(caller=f"Consumer {name}")
i, t = await q.get()
now = time.perf_counter()
print(f"Consumer {name} got element <{i}>"
f" in {now-t:0.5f} seconds.")
q.task_done()

async def main(nprod: int, ncon: int):
q = asyncio.Queue()
producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
await asyncio.gather(*producers)
await q.join() # Implicitly awaits consumers, too
for c in consumers:
c.cancel()

if __name__ == "__main__":
import argparse
random.seed(444)
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--nprod", type=int, default=5)
parser.add_argument("-c", "--ncon", type=int, default=10)
ns = parser.parse_args()
start = time.perf_counter()
asyncio.run(main(**ns.__dict__))
elapsed = time.perf_counter() - start
print(f"Program completed in {elapsed:0.5f} seconds.")

A counting asyncio.Queue

Queues do not have any history attributes in Python. They understand their overall current size, the order of tasks, the status of tasks and their maximum size but that is it. To know how many items have passed through a Queue we will need to extend Queue with a tracking (counting) parameter.

Sub-classing asyncio.Queue we can add a count instance variable and then when items are put in the queue, we increment count before calling the parent put method. The asynchronous put of the parent class must be propagated to the child class through the use of await which necessitates that the child put is also asynchronous.

class AsyncCountingQueue(asyncio.Queue):

def __init__(self):
self.count = 0
super().__init__()

async def put(self, item: Any) -> None:
self.count += 1
await super().put(item)

Initial failures

I tried a few different patterns of incorporating tqdm.asyncio into the original example. TQDM progress bars are typically defined around known length iterators and because the queue is of dynamic size, its a bit more difficult to track.

Initially I had thought that I could introduce a TQDM progress bar object into the main function and update it manually using an async function. The function was essentially an endless loop that waited for the Queue to be emptied. This worked OK but there were some issues with the final completion of the bar, as the queue would empty before the bar was fully completed leaving a small number of tasks apparently left.

The reason for this was a difficulty in structuring the order of async tasks under this pattern. Specifically, asyncio.Queue.join is a blocking method and has to be the last async operation whilst we want the TQDM bar to essentially wrap the whole async loop. It might be possible to conditionally check the status of the Queue instead of using join but this feels clunky and potentially prone to edge case errors where the queue may empty prematurely before more items are added.

In the second pattern, I opted to use TQDM as a context manager. The manual updating of the progress bar was incorporated into a sub-classing of tqdm.async.tqmd and the context manager wrapped the async loop and the blocking asyncio.Queue.join.

In this case the progress bar does not update until the very end of the context manager block. It is effectively the same as the original example but places the progress bar update after the blocking join.

Solution

In the end, the solution became to incorporate TQDM into the sub-classed asyncio.Queue. Doing this allowed changes to the Queue to be tied to changes to the progress bar. In __init__ the tqdm args and kwargs are saved to be used with the async context manager __aenter__. A _refresh method is introduced that updates the progress progress bar with the queue status.

Additionally, the put and get methods are extended to call _refresh when changes are made to the queue. And finally, when the context manager is exited the progress bar is refreshed before being closed. By tying _refresh to put and get we can side-step the issue of ensuring the progress bar is updated correctly when changes to the queue are made.

class AsyncTQDMQueue(asyncio.Queue):

def __init__(self, *args, **kwargs):
self.count = 0
self.got = 0
self.args = args
self.kwargs = kwargs
self.progress = None
super().__init__()

def _refresh(self):
if self.progress is not None:
self.progress.total = self.count
self.progress.n = self.got
self.progress.refresh

async def put(self, item: Any, msg: str = None) -> None:
self.count += 1
self._refresh()
await super().put(item)
if msg:
tqdm.write(msg)

async def get(self) -> Any:
self.got += 1
self._refresh()
return await super().get()

async def __aenter__(self) -> Any:
self.progress = tqdm(*self.args, **self.kwargs)
return self

async def __aexit__(self, type, value, traceback) -> None:
self.progress.refresh()
self.progress.close()

With the queue-tqdm hybrid class a context manager can be introduced to the main() async loop that correctly tracks the queue status. The use of a context manager here ensures the queue is wrapped by correct logic from the progress bar.

async with AsyncTQDMQueue(
bar_format='{desc}: {bar}{n_fmt}/{total_fmt} [{elapsed}]',
desc="Async Queue"
) as q:
producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]

await asyncio.gather(*producers)
await q.join()
Full Solution Script
asyncq.py
from typing import Any

import asyncio
import itertools as it
import os
import random
import time

from tqdm.asyncio import tqdm


class AsyncTQDMQueue(asyncio.Queue):

def __init__(self, *args, **kwargs):
self.count = 0
self.got = 0
self.args = args
self.kwargs = kwargs
self.progress = None
super().__init__()

def _refresh(self):
if self.progress is not None:
self.progress.total = self.count
self.progress.n = self.got
self.progress.refresh

async def put(self, item: Any, msg: str = None) -> None:
self.count += 1
self._refresh()
await super().put(item)
if msg:
tqdm.write(msg)

async def get(self) -> Any:
self.got += 1
self._refresh()
return await super().get()

async def __aenter__(self) -> Any:
self.progress = tqdm(*self.args, **self.kwargs)
return self

async def __aexit__(self, type, value, traceback) -> None:
self.progress.refresh()
self.progress.close()


async def make_item(size: int = 5) -> str:
return os.urandom(size).hex()


async def rand_sleep(caller: str | None = None, scaler: int = 1) -> None:
i = random.randint(0, 10)
if caller:
tqdm.write(f"{caller} sleeping for {i} seconds.")
await asyncio.sleep(i * scaler)


async def produce(name: int, q: asyncio.Queue) -> None:
n = random.randint(10, 50)
for _ in it.repeat(None, n):
await rand_sleep(caller=f"Producer {name}", scaler=0.5)
i = await make_item()
t = time.perf_counter()
await q.put((i, t), msg=f"Producer {name} added <{i}> to queue.")


async def consume(name: int, q: asyncio.Queue) -> None:
while True:
await rand_sleep(caller=f"Consumer {name}")
i, t = await q.get()
now = time.perf_counter()
tqdm.write(f"Consumer {name} got element <{i}> in {now-t:0.5f} seconds.")
q.task_done()


async def main(nprod: int, ncon: int) -> None:

async with AsyncTQDMQueue(
bar_format="{desc}: {bar}{n_fmt}/{total_fmt} [{elapsed}]", desc="Async Queue"
) as q:
producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]

await asyncio.gather(*producers)
await q.join()

for c in consumers:
c.cancel()


if __name__ == "__main__":
import argparse

random.seed(444)
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--nprod", type=int, default=5)
parser.add_argument("-c", "--ncon", type=int, default=10)
ns = parser.parse_args()
start = time.perf_counter()
asyncio.run(main(**ns.__dict__))
elapsed = time.perf_counter() - start
tqdm.write(f"Program completed in {elapsed:0.5f} seconds.")