"""
Lightweight progress bar utilities with optional tqdm integration.
This module provides a minimal yet practical progress bar implementation that
mimics the core behavior of :mod:`tqdm`. When the real tqdm library is
available, it is used directly; otherwise, a built-in fallback is provided.
The module contains the following main components:
* :class:`SimpleTqdm` - Lightweight progress bar with timing and ETA support
* :func:`tqdm` - Unified progress bar factory using tqdm when installed
* :func:`trange` - Convenience wrapper for iterating over ranges with progress
.. note::
The fallback implementation is intentionally lightweight and does not
support all tqdm features. Unsupported keyword arguments are ignored
with a warning.
Example::
>>> from hbutils.logging.progress import tqdm, trange
>>> for i in tqdm(range(3), desc="Processing"):
... pass
Processing: 100.0% |====================| 3/3 [00:00<00:00, 3.0it/s]
>>> for i in trange(2, desc="Counting"):
... pass
Counting: 100.0% |====================| 2/2 [00:00<00:00, 2.0it/s]
"""
import os
import shutil
import sys
import threading
import time
import warnings
from typing import Any, Iterable, Iterator, List, Optional, TextIO, Union
try:
from tqdm import tqdm as _origin_tqdm
except (ImportError, ModuleNotFoundError):
_origin_tqdm = None
__all__ = [
'SimpleTqdm',
'tqdm',
'trange',
]
[docs]
class SimpleTqdm:
"""
A lightweight implementation of tqdm progress bar.
This class provides basic progress bar functionality similar to tqdm,
with support for iteration tracking, time estimation, and customizable display.
:ivar _instances: Global list of all active SimpleTqdm instances
:vartype _instances: list
:ivar _lock: Thread lock for managing concurrent progress bar updates
:vartype _lock: threading.Lock
:ivar _last_print_time: Timestamp of the last global print operation
:vartype _last_print_time: float
:ivar _update_interval: Minimum interval between display updates
:vartype _update_interval: float
"""
# Global manager for handling multiple progress bars
_instances: List["SimpleTqdm"] = []
_lock = threading.Lock()
_last_print_time = 0.0
_update_interval = 0.1 # Update interval
[docs]
def __init__(
self,
iterable: Optional[Iterable[Any]] = None,
desc: Optional[str] = None,
total: Optional[int] = None,
leave: bool = True,
file: Optional[TextIO] = None,
ncols: Optional[int] = None,
mininterval: float = 0.1,
ascii: Optional[bool] = None,
disable: bool = False,
unit: str = 'it',
unit_scale: bool = False,
initial: int = 0,
position: Optional[int] = None,
unit_divisor: int = 1000,
**kwargs: Any,
):
"""
Initialize a SimpleTqdm progress bar.
:param iterable: Iterable to decorate with a progress bar
:type iterable: iterable, optional
:param desc: Prefix for the progress bar
:type desc: str, optional
:param total: The number of expected iterations
:type total: int, optional
:param leave: If True, keep all traces of the progress bar upon termination
:type leave: bool
:param file: Specifies where to output the progress messages
:type file: file-like object, optional
:param ncols: The width of the entire output message
:type ncols: int, optional
:param mininterval: Minimum progress display update interval in seconds
:type mininterval: float
:param ascii: If True, use ASCII characters for the progress bar
:type ascii: bool, optional
:param disable: Whether to disable the entire progress bar
:type disable: bool
:param unit: String that will be used to define the unit of each iteration
:type unit: str
:param unit_scale: If True, the number of iterations will be scaled automatically
:type unit_scale: bool
:param initial: The initial counter value
:type initial: int
:param position: Specify the line offset to print this bar
:type position: int, optional
:param unit_divisor: Divisor for unit scaling (1000 or 1024)
:type unit_divisor: int
:param kwargs: Additional keyword arguments (will trigger a warning if provided)
.. warning::
Extra keyword arguments are ignored by this lightweight implementation.
Example::
>>> with SimpleTqdm(range(5), desc="Processing") as pbar:
... for item in pbar:
... time.sleep(0.01)
Processing: 100.0% |====================| 5/5 [00:00<00:00, 5.0it/s]
"""
if kwargs:
warnings.warn(
f'You are using {self.__class__.__name__} provided by hbutils library, '
f'which is an lightweight alternative of real tqdm. '
f'Arguments {kwargs!r} are ignored because they are not supported. '
f'If you really need them, we suggest you can use tqdm by installing it with `pip install tqdm`.'
)
self.iterable = iterable
self.desc = desc or ""
self.total = total
self.leave = leave
self.file = file or sys.stderr
self.disable = disable
self.unit = unit
self.unit_scale = unit_scale
self.unit_divisor = unit_divisor
self.mininterval = mininterval
self.position = position
self.ncols = ncols
self.ascii = ascii
# Internal state
self.n = initial
self.start_time: Optional[float] = None
self.last_print_time = 0.0
self.last_print_n = 0
# If an iterable is provided and total is not specified, try to get its length
if iterable is not None and total is None:
try:
self.total = len(iterable) # type: ignore[arg-type]
except (TypeError, AttributeError):
self.total = None
# Register to global manager
if not self.disable:
with SimpleTqdm._lock:
SimpleTqdm._instances.append(self)
def _format_sizeof(self, num: float, suffix: str = "", divisor: Optional[int] = None) -> str:
"""
Format number size with unit scaling support.
:param num: The number to format
:type num: float
:param suffix: Suffix to append to the formatted number
:type suffix: str
:param divisor: Divisor for unit scaling (1000 or 1024)
:type divisor: int, optional
:return: Formatted string with appropriate unit
:rtype: str
Example::
>>> pbar = SimpleTqdm(total=1000, unit_scale=True)
>>> pbar._format_sizeof(1500, "B")
'1.5kB'
"""
if divisor is None:
divisor = self.unit_divisor
# Define units
if divisor == 1024:
units = ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']
else: # divisor == 1000
units = ['', 'k', 'M', 'G', 'T', 'P', 'E', 'Z']
if not self.unit_scale:
return f"{num:.0f}{suffix}"
for unit in units:
if abs(num) < divisor:
if unit == '':
return f"{num:.0f}{unit}{suffix}"
else:
return f"{num:.1f}{unit}{suffix}"
num /= divisor
return f"{num:.1f}{units[-1]}{suffix}"
def _get_terminal_width(self) -> int:
"""
Get the terminal width.
:return: Terminal width in characters
:rtype: int
Example::
>>> pbar = SimpleTqdm()
>>> width = pbar._get_terminal_width()
>>> width >= 80
True
"""
if self.ncols:
return self.ncols
try:
# Try using shutil.get_terminal_size()
return shutil.get_terminal_size().columns
except (AttributeError, OSError):
try:
# Try getting from environment variable
return int(os.environ.get('COLUMNS', 80))
except (ValueError, TypeError):
# Default width
return 80
def _create_progress_bar(self, percentage: float, bar_width: int) -> str:
"""
Create a progress bar string.
:param percentage: Completion percentage (0-100)
:type percentage: float
:param bar_width: Width of the progress bar in characters
:type bar_width: int
:return: Formatted progress bar string
:rtype: str
Example::
>>> pbar = SimpleTqdm()
>>> pbar._create_progress_bar(50, 20)
'|==========>---------|'
"""
if bar_width <= 0:
return ""
filled_width = int(bar_width * percentage / 100)
if self.ascii:
# Use ASCII characters
filled_char = '='
empty_char = '-'
tip_char = '>' if filled_width < bar_width and percentage > 0 else ''
else:
# Use simple characters
filled_char = '='
empty_char = '-'
tip_char = '>' if filled_width < bar_width and percentage > 0 else ''
# Build progress bar
if tip_char and filled_width > 0:
bar = filled_char * (filled_width - 1) + tip_char + empty_char * (bar_width - filled_width)
else:
bar = filled_char * filled_width + empty_char * (bar_width - filled_width)
return f"|{bar}|"
[docs]
def __enter__(self) -> "SimpleTqdm":
"""
Enter the context manager.
:return: Self instance
:rtype: SimpleTqdm
Example::
>>> with SimpleTqdm(range(10)) as pbar:
... for item in pbar:
... pass
"""
if self.start_time is None:
self.start_time = time.time()
return self
[docs]
def __exit__(self, exc_type: Optional[type], exc_val: Optional[BaseException], exc_tb: Optional[Any]) -> None:
"""
Exit the context manager.
:param exc_type: Exception type if an exception occurred
:type exc_type: type, optional
:param exc_val: Exception value if an exception occurred
:type exc_val: Exception, optional
:param exc_tb: Exception traceback if an exception occurred
:type exc_tb: traceback, optional
Example::
>>> with SimpleTqdm(range(10)) as pbar:
... for item in pbar:
... pass
"""
self.close()
[docs]
def __iter__(self) -> Iterator[Any]:
"""
Iterate over the wrapped iterable with progress tracking.
:return: Iterator yielding items from the wrapped iterable
:rtype: iterator
:raises TypeError: If no iterable was provided during initialization
Example::
>>> for item in SimpleTqdm(range(5)):
... print(item)
0
1
2
3
4
"""
if self.iterable is None:
raise TypeError("'NoneType' object is not iterable")
if self.start_time is None:
self.start_time = time.time()
try:
for item in self.iterable:
yield item
self.update(1)
finally:
if not self.leave:
self.close()
[docs]
def update(self, n: int = 1) -> None:
"""
Update the progress bar by incrementing the counter.
:param n: Increment to add to the internal counter
:type n: int
Example::
>>> pbar = SimpleTqdm(total=100)
>>> pbar.update(10)
>>> pbar.n
10
"""
if self.disable:
return
if self.start_time is None:
self.start_time = time.time()
self.n += n
# Check if display needs to be updated
current_time = time.time()
if (current_time - self.last_print_time) >= self.mininterval:
self.refresh()
[docs]
def refresh(self) -> None:
"""
Force refresh of the progress bar display.
Example::
>>> pbar = SimpleTqdm(total=100)
>>> pbar.n = 50
>>> pbar.refresh() # Force display update
"""
if self.disable:
return
current_time = time.time()
self.last_print_time = current_time
# Use global lock to ensure multiple progress bars don't conflict
with SimpleTqdm._lock:
self._display()
def _display(self) -> None:
"""
Display the current progress bar state.
This internal method formats and outputs the progress bar to the file stream.
It calculates the terminal width, builds the progress bar components including
description, percentage, progress numbers, time information, ETA, and speed,
then outputs the formatted string to the specified file.
Example::
>>> pbar = SimpleTqdm(total=100, desc="Processing")
>>> pbar.n = 50
>>> pbar._display() # Outputs: Processing: 50.0% |=========>---------| 50/100 [00:05<00:05, 10.0it/s]
"""
if self.start_time is None:
return
current_time = time.time()
elapsed = current_time - self.start_time
# Get terminal width
terminal_width = self._get_terminal_width()
# Build display string parts
parts = []
# Description
desc_part = f"{self.desc}: " if self.desc else ""
# Percentage and progress numbers
if self.total is not None:
percentage = (self.n / self.total) * 100 if self.total > 0 else 0
percent_part = f"{percentage:6.1f}%"
# Use formatted number display
current_formatted = self._format_sizeof(self.n, self.unit)
total_formatted = self._format_sizeof(self.total, self.unit)
progress_part = f"{current_formatted}/{total_formatted}"
else:
percentage = 0
percent_part = ""
# Use formatted number display
progress_part = self._format_sizeof(self.n, self.unit)
# Time information
time_part = f"[{self._format_time(elapsed)}"
# Estimated time remaining
eta_part = ""
if self.total is not None and self.n > 0 and elapsed > 0:
rate = self.n / elapsed
if rate > 0:
eta = (self.total - self.n) / rate
eta_part = f"<{self._format_time(eta)}"
# Speed
speed_part = ""
if elapsed > 0:
rate = self.n / elapsed
# Use formatted speed display
rate_formatted = self._format_sizeof(rate, f"{self.unit}/s")
speed_part = f", {rate_formatted}"
# Combine fixed parts
fixed_parts = [desc_part, percent_part, progress_part, time_part]
if eta_part:
fixed_parts.append(eta_part)
if speed_part:
fixed_parts.append(speed_part)
fixed_parts.append("]")
# Calculate fixed parts length
fixed_text = "".join(fixed_parts)
fixed_length = len(fixed_text)
# Reserve space for progress bar
# Progress bar format: |====>----|
min_bar_width = 10 # Minimum progress bar width
bar_overhead = 2 # Characters used by | and |
available_width = terminal_width - fixed_length - bar_overhead - 1 # -1 for safety margin
bar_width = max(min_bar_width, min(available_width, 50)) # Limit maximum width to 50
# Create progress bar
if self.total is not None:
progress_bar = self._create_progress_bar(percentage, bar_width)
else:
# Display simple activity indicator when total is unknown
progress_bar = self._create_activity_bar()
# Combine final output
if self.total is not None:
line = f"{desc_part}{percent_part} {progress_bar} {progress_part} {time_part}"
if eta_part:
line += eta_part
if speed_part:
line += speed_part
line += "]"
else:
line = f"{desc_part}{progress_bar} {progress_part} {time_part}"
if speed_part:
line += speed_part
line += "]"
# Ensure it doesn't exceed terminal width
if len(line) > terminal_width:
line = line[:terminal_width - 3] + "..."
# Clear current line and output
self.file.write(f"\r{line}")
self.file.flush()
_ACTIVITY = ['|', '/', '-', '\\', '|', '/', '-', '\\']
def _create_activity_bar(self, speed: float = 3.0) -> str:
"""
Create an activity indicator for cases without a known total.
:param speed: Speed of the animation (cycles per second)
:type speed: float
:return: Current activity indicator character
:rtype: str
Example::
>>> pbar = SimpleTqdm()
>>> indicator = pbar._create_activity_bar()
>>> indicator in ['|', '/', '-', '\\\\']
True
"""
if self.start_time is None:
self.start_time = time.time()
idx = int((time.time() - self.start_time) * speed) % len(self._ACTIVITY)
return self._ACTIVITY[idx]
def _format_time(self, seconds: float) -> str:
"""
Format time display.
:param seconds: Time in seconds
:type seconds: float
:return: Formatted time string in format HH:MM:SS or MM:SS or SS.SSs
:rtype: str
Example::
>>> pbar = SimpleTqdm()
>>> pbar._format_time(65)
'01:05'
>>> pbar._format_time(3665)
'01:01:05'
"""
if seconds < 60:
return f"{seconds:05.2f}s"
elif seconds < 3600:
mins, secs = divmod(int(seconds), 60)
return f"{mins:02d}:{secs:02d}"
else:
hours, remainder = divmod(int(seconds), 3600)
mins, secs = divmod(remainder, 60)
return f"{hours:02d}:{mins:02d}:{secs:02d}"
[docs]
def set_description(self, desc: Optional[str] = None, refresh: bool = True) -> None:
"""
Set the progress bar description.
:param desc: New description text
:type desc: str, optional
:param refresh: Whether to refresh the display immediately
:type refresh: bool
Example::
>>> pbar = SimpleTqdm(range(100))
>>> pbar.set_description("Processing files")
"""
self.desc = desc or ""
if refresh:
self.refresh()
[docs]
def set_postfix(self, ordered_dict: Optional[dict] = None, refresh: bool = True, **kwargs: Any) -> None:
"""
Set postfix information (simplified implementation, not actually displayed).
This method exists for API compatibility with tqdm but does not display
the postfix information in SimpleTqdm.
:param ordered_dict: Dictionary of postfix key-value pairs
:type ordered_dict: dict, optional
:param refresh: Whether to refresh the display immediately
:type refresh: bool
:param kwargs: Additional postfix key-value pairs
Example::
>>> pbar = SimpleTqdm(range(100))
>>> pbar.set_postfix(loss=0.5, accuracy=0.95)
"""
if refresh:
self.refresh()
[docs]
def close(self) -> None:
"""
Close the progress bar and clean up resources.
This method performs a final refresh of the progress bar, adds a newline
if leave is True, or clears the line if leave is False, and removes the
instance from the global manager.
Example::
>>> pbar = SimpleTqdm(range(100))
>>> for item in pbar:
... pass
>>> pbar.close()
"""
if self.disable:
return
# Final refresh
self.refresh()
# If leave is True, add newline; otherwise clear current line
if self.leave:
self.file.write("\n")
else:
# Clear current line
terminal_width = self._get_terminal_width()
self.file.write("\r" + " " * terminal_width + "\r")
self.file.flush()
# Remove from global manager
with SimpleTqdm._lock:
try:
SimpleTqdm._instances.remove(self)
except ValueError:
pass
[docs]
def clear(self) -> None:
"""
Clear the current display.
This method clears the progress bar from the terminal by overwriting
the current line with spaces.
Example::
>>> pbar = SimpleTqdm(range(100))
>>> pbar.clear() # Clear the progress bar from terminal
"""
if not self.disable:
terminal_width = self._get_terminal_width()
self.file.write("\r" + " " * terminal_width + "\r")
self.file.flush()
[docs]
def write(self, s: str, file: Optional[TextIO] = None, end: str = "\n", nolock: bool = False) -> None:
"""
Write text without disrupting the progress bar display.
This method allows writing messages to the output stream without
interfering with the progress bar. It temporarily clears the progress
bar, writes the message, and then refreshes the progress bar.
:param s: String to write
:type s: str
:param file: File object to write to
:type file: file-like object, optional
:param end: String appended after the last value
:type end: str
:param nolock: If True, don't use the global lock
:type nolock: bool
Example::
>>> pbar = SimpleTqdm(range(100))
>>> pbar.write("Processing complete")
"""
fp = file or self.file
if not nolock:
with SimpleTqdm._lock:
self.clear()
fp.write(str(s) + end)
fp.flush()
self.refresh()
else:
fp.write(str(s) + end)
fp.flush()
[docs]
def tqdm(
iterable: Optional[Iterable[Any]] = None,
desc: Optional[str] = None,
total: Optional[int] = None,
leave: bool = True,
file: Optional[TextIO] = None,
ncols: Optional[int] = None,
mininterval: float = 0.1,
ascii: Optional[bool] = None,
disable: bool = False,
unit: str = 'it',
unit_scale: bool = False,
initial: int = 0,
position: Optional[int] = None,
unit_divisor: int = 1000,
**kwargs: Any,
) -> Union["SimpleTqdm", Any]:
"""
tqdm-compatible interface for creating progress bars.
This function provides a unified interface that uses the real tqdm library if available,
otherwise falls back to SimpleTqdm. It maintains API compatibility with the standard
tqdm library while providing a lightweight alternative when tqdm is not installed.
:param iterable: Iterable to decorate with a progress bar
:type iterable: iterable, optional
:param desc: Prefix for the progress bar
:type desc: str, optional
:param total: The number of expected iterations
:type total: int, optional
:param leave: If True, keep all traces of the progress bar upon termination
:type leave: bool
:param file: Specifies where to output the progress messages
:type file: file-like object, optional
:param ncols: The width of the entire output message
:type ncols: int, optional
:param mininterval: Minimum progress display update interval in seconds
:type mininterval: float
:param ascii: If True, use ASCII characters for the progress bar
:type ascii: bool, optional
:param disable: Whether to disable the entire progress bar
:type disable: bool
:param unit: String that will be used to define the unit of each iteration
:type unit: str
:param unit_scale: If True, the number of iterations will be scaled automatically
:type unit_scale: bool
:param initial: The initial counter value
:type initial: int
:param position: Specify the line offset to print this bar
:type position: int, optional
:param unit_divisor: Divisor for unit scaling (1000 or 1024)
:type unit_divisor: int
:param kwargs: Additional keyword arguments passed to the underlying implementation
:return: Progress bar instance (either real tqdm or SimpleTqdm)
:rtype: tqdm or SimpleTqdm
Example::
>>> for i in tqdm(range(3), desc="Processing"):
... time.sleep(0.01)
Processing: 100.0% |====================| 3/3 [00:00<00:00, 3.0it/s]
"""
if _origin_tqdm:
return _origin_tqdm(
iterable=iterable,
desc=desc,
total=total,
leave=leave,
file=file,
ncols=ncols,
mininterval=mininterval,
ascii=ascii,
disable=disable,
unit=unit,
unit_scale=unit_scale,
initial=initial,
position=position,
unit_divisor=unit_divisor,
**kwargs,
)
else:
return SimpleTqdm(
iterable=iterable,
desc=desc,
total=total,
leave=leave,
file=file,
ncols=ncols,
mininterval=mininterval,
ascii=ascii,
disable=disable,
unit=unit,
unit_scale=unit_scale,
initial=initial,
position=position,
unit_divisor=unit_divisor,
**kwargs,
)
[docs]
def trange(*args: int, **kwargs: Any) -> Union["SimpleTqdm", Any]:
"""
Shortcut for tqdm(range(*args), **kwargs).
This is a convenience function that creates a progress bar for a range iterator.
It's equivalent to calling tqdm(range(*args), **kwargs).
:param args: Positional arguments passed to range()
:type args: int
:param kwargs: Keyword arguments passed to tqdm()
:return: Progress bar instance wrapping a range iterator
:rtype: tqdm or SimpleTqdm
Example::
>>> for i in trange(3, desc="Counting"):
... time.sleep(0.01)
Counting: 100.0% |====================| 3/3 [00:00<00:00, 3.0it/s]
"""
return tqdm(range(*args), **kwargs)