hbutils.concurrent.parallel

Parallel execution utilities with bounded thread pools and progress tracking.

This module provides tools for executing tasks in parallel while controlling the number of pending tasks and optionally displaying progress information. It is designed to prevent memory pressure that can arise from submitting too many tasks at once by using a bounded queue mechanism implemented via a semaphore.

The module contains the following public components:

  • BoundedThreadPoolExecutor - A thread pool executor that limits the number of pending tasks using a semaphore.

  • parallel_call() - High-level helper for parallel processing of an iterable with progress reporting and exception logging.

Note

The progress display relies on hbutils.logging.tqdm(), which provides a tqdm-compatible interface and falls back to a lightweight progress bar if the real tqdm package is unavailable.

Example:

>>> from hbutils.concurrent.parallel import parallel_call
>>> def square(x):
...     return x * x
>>> parallel_call(range(5), square, desc="Squaring")
Squaring: 100%|██████████| 5/5 [00:00<00:00, ...it/s]

__all__

hbutils.concurrent.parallel.__all__ = ['BoundedThreadPoolExecutor', 'parallel_call']

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

BoundedThreadPoolExecutor

class hbutils.concurrent.parallel.BoundedThreadPoolExecutor(max_workers: int | None = None, max_pending: int | None = None, **kwargs: Any)[source]

A ThreadPoolExecutor with bounded pending tasks.

This executor extends ThreadPoolExecutor to limit the number of pending tasks in the queue. The limit is enforced by a semaphore that blocks submission when the maximum number of pending tasks is reached, providing backpressure to avoid unbounded memory growth.

Parameters:
  • max_workers (Optional[int]) – Maximum number of worker threads. Defaults to None.

  • max_pending (Optional[int]) – Maximum number of pending tasks. If None, no limit is applied.

  • kwargs (Any) – Additional keyword arguments passed to ThreadPoolExecutor.

Example:

>>> executor = BoundedThreadPoolExecutor(max_workers=4, max_pending=10)
>>> future = executor.submit(lambda x: x * 2, 5)
>>> future.result()
10
__init__(max_workers: int | None = None, max_pending: int | None = None, **kwargs: Any) None[source]

Initialize the bounded thread pool executor.

Parameters:
  • max_workers (Optional[int]) – Maximum number of worker threads.

  • max_pending (Optional[int]) – Maximum number of pending tasks. If None, no limit is applied.

  • kwargs (Any) – Additional keyword arguments for ThreadPoolExecutor.

submit(fn: Callable[[...], Any], *args: Any, **kwargs: Any) Future[source]

Submit a callable to be executed with the given arguments.

This method acquires the semaphore before submitting the task to limit the number of pending tasks. If the pending task limit is reached, submission blocks until a slot becomes available. The semaphore is released after task completion in an internal wrapper.

Parameters:
  • fn (Callable[..., Any]) – The callable to execute.

  • args (Any) – Positional arguments for the callable.

  • kwargs (Any) – Keyword arguments for the callable.

Returns:

A future representing the pending execution.

Return type:

concurrent.futures.Future

Raises:

Exception – Re-raises any exception that occurs during submission.

Example:

>>> executor = BoundedThreadPoolExecutor(max_workers=2, max_pending=5)
>>> future = executor.submit(print, "Hello, World!")
>>> future.result()
Hello, World!

parallel_call

hbutils.concurrent.parallel.parallel_call(iterable: Iterable[Any], fn: Callable[[Any], Any], total: int | None = None, desc: str | None = None, max_workers: int | None = None, max_pending: int | None = None, disable_tqdm: bool = False) None[source]

Execute a callable in parallel for each item in an iterable with progress tracking.

This function processes items in parallel using a bounded thread pool, displays a progress bar, and logs any exceptions that occur during processing before re-raising them.

Parameters:
  • iterable (Iterable[Any]) – The iterable containing items to process.

  • fn (Callable[[Any], Any]) – The callable to execute for each item. It should accept a single argument.

  • total (Optional[int]) – Total number of items. If None, attempts to determine the length from the iterable.

  • desc (Optional[str]) – Description for the progress bar. Defaults to a description that includes the function representation.

  • max_workers (Optional[int]) – Maximum number of worker threads. Defaults to min(os.cpu_count(), 16).

  • max_pending (Optional[int]) – Maximum number of pending tasks. If -1, no limit is applied. Defaults to max_workers * 5.

  • disable_tqdm (bool) – Whether to disable the progress bar.

Returns:

This function does not return a value.

Return type:

None

Raises:

Exception – Re-raises exceptions from the worker callable after logging them.

Example:

>>> def process_item(item):
...     print(f"Processing {item}")
>>> parallel_call([1, 2, 3, 4, 5], process_item, desc="Processing items")
Processing items: 100%|██████████| 5/5 [00:00<00:00, 10.00it/s]
Processing 1
Processing 2
Processing 3
Processing 4
Processing 5