hbutils.concurrent.parallel

Module for parallel execution utilities with bounded thread pool and progress tracking.

This module provides a bounded thread pool executor that limits both the number of worker threads and pending tasks, along with a convenient parallel_call function for processing iterables in parallel with progress tracking.

The bounded executor prevents memory issues when submitting large numbers of tasks by controlling the maximum number of pending tasks in the queue using a semaphore mechanism.

BoundedThreadPoolExecutor

class hbutils.concurrent.parallel.BoundedThreadPoolExecutor(max_workers: int | None = None, max_pending: int | None = None, **kwargs)[source]

A ThreadPoolExecutor with bounded pending tasks using a semaphore.

This executor extends the standard ThreadPoolExecutor to limit the number of pending tasks in the queue, preventing memory issues when submitting a large number of tasks. The semaphore mechanism ensures that task submission blocks when the pending task limit is reached, providing backpressure to prevent unbounded memory growth.

Parameters:
  • max_workers (Optional[int]) – Maximum number of worker threads. Defaults to None.

  • max_pending (Optional[int]) – Maximum number of pending tasks. If None, no limit is applied.

  • kwargs – Additional keyword arguments passed to ThreadPoolExecutor.

Example::
>>> executor = BoundedThreadPoolExecutor(max_workers=4, max_pending=10)
>>> future = executor.submit(lambda x: x * 2, 5)
>>> result = future.result()
>>> print(result)
10
__init__(max_workers: int | None = None, max_pending: int | None = None, **kwargs)[source]

Initialize the BoundedThreadPoolExecutor.

Parameters:
  • max_workers (Optional[int]) – Maximum number of worker threads.

  • max_pending (Optional[int]) – Maximum number of pending tasks. If None, no limit is applied.

  • kwargs – Additional keyword arguments for ThreadPoolExecutor.

submit(fn, *args, **kwargs)[source]

Submit a callable to be executed with the given arguments.

Acquires the semaphore before submitting to limit pending tasks. If the semaphore is at its limit, this call will block until a slot becomes available. The semaphore is released in the wrapper function after task completion.

Parameters:
  • fn (Callable) – The callable to execute.

  • args – Positional arguments for the callable.

  • kwargs – Keyword arguments for the callable.

Returns:

A Future representing the pending execution.

Return type:

Future

Raises:

Re-raises any exception that occurs during submission.

Example::
>>> executor = BoundedThreadPoolExecutor(max_workers=2, max_pending=5)
>>> future = executor.submit(print, "Hello, World!")
>>> future.result()
Hello, World!

parallel_call

hbutils.concurrent.parallel.parallel_call(iterable: Iterable, fn: Callable[[Any], None], total: int | None = None, desc: str | None = None, max_workers: int | None = None, max_pending: int | None = None, disable_tqdm: bool = False)[source]

Execute a callable in parallel for each item in an iterable with progress tracking.

This function processes items from an iterable in parallel using a bounded thread pool, displaying a progress bar and logging any errors that occur during processing. It provides a convenient high-level interface for parallel processing with automatic progress tracking and error handling.

Parameters:
  • iterable (Iterable) – The iterable containing items to process.

  • fn (Callable[[Any], None]) – The callable to execute for each item. Should accept a single argument.

  • total (Optional[int]) – Total number of items. If None, attempts to determine from iterable length.

  • desc (Optional[str]) – Description for the progress bar. Defaults to a description with the function name.

  • max_workers (Optional[int]) – Maximum number of worker threads. Defaults to min(cpu_count, 16).

  • max_pending (Optional[int]) – Maximum number of pending tasks. If -1, no limit is applied. Defaults to max_workers * 5.

  • disable_tqdm (bool) – Whether to disable the progress bar. Defaults to False.

Example::
>>> def process_item(item):
...     print(f"Processing {item}")
>>> parallel_call([1, 2, 3, 4, 5], process_item, desc="Processing items")
Processing items: 100%|██████████| 5/5 [00:00<00:00, 10.00it/s]
Processing 1
Processing 2
Processing 3
Processing 4
Processing 5