"""General utility functions & classes.
"""
import os
import subprocess
import sys
import time
from pathlib import Path
import json
from logging import Logger
from distutils.util import strtobool
from functools import wraps, lru_cache
from typing import Any, Generator, Optional, Union
from collections import namedtuple
import threading
import queue
import hashlib
from contextlib import contextmanager
import numpy as np
def ndarray2str(array: np.ndarray) -> str:
return str(json.dumps(array.tolist()))
def str2ndarray(string: str) -> np.ndarray:
return np.array(json.loads(str(string)))
Timing = namedtuple('Timing', ('t0', 't1', 'elapsed'))
[docs]def timed(f, logger: Logger):
"""Function decorator to measure elapsed time.
Useful for profiling and debugging.
Parameters
----------
f
Any function or method
logger: Logger
A logger to log to
"""
@wraps(f)
def wrap(*args, **kwargs):
ts = time.time()
result = f(*args, **kwargs)
te = time.time()
logger.info(f"{f.__qualname__}() --> {te-ts} s elapsed.")
return result
return wrap
[docs]def logged(f, logger: Logger):
"""Function decorator to log before & after a function call.
Useful for profiling and debugging.
Parameters
----------
f
Any function or method
logger: Logger
A logger to log to
"""
"""Function decorator to log before & after call
"""
@wraps(f)
def wrap(*args, **kwargs):
logger.debug(f"{f.__qualname__}() --> call...")
result = f(*args, **kwargs)
logger.debug(f"{f.__qualname__}() --> done")
return result
return wrap
[docs]class Timer(object):
"""A timer context manager. Starts timing on ``__enter__`` and stops on
``__exit__``.
Timing info can be retrieved with :attr:`~shapeflow.util.Timer.timing`.
"""
_t0: float
_t1: float
_parent: object
_logger: Logger
_message: str
_elapsed: Optional[float]
def __init__(self, parent: object, logger: Logger):
self._parent = parent
self._elapsed = None
self._logger = logger
def __enter__(self, message: str = ''):
self._message = message
self._t0 = time.time()
self._logger.info(f"{self._message}...")
def __exit__(self):
if hasattr(self, '_t0'):
self._t1 = time.time()
self._elapsed = self._t1 - self._t0
self._logger.info(f"{self._message}: {self._elapsed} s. elapsed ")
@property
def timing(self) -> Optional[tuple]:
"""Optional[tuple]: start time, end time, elapsed time.
"""
if all([hasattr(self, attr) for attr in ('_t0', '_t1', '_elapsed')]):
return self._t0, self._t1, self._elapsed
else:
return None
[docs]def restrict(val, minval, maxval):
"""Clamp a value between a minimum and a maximum.
"""
if val < minval:
return minval
if val > maxval:
return maxval
return val
[docs]def frame_number_iterator(total: int,
Nf: int = None,
dt: float = None, fps: float = None) \
-> Generator[int, None, None]:
"""Get an iterator of frame numbers, based on either a number of frames
``Nf`` or a frame interval ``dt``.
Parameters
----------
total: int
The total number of frames.
Nf: int
The number of frames to return. Defaults to ``None``.
dt: float
The frame interval to return. Defaults to ``None``.
If using ``dt``, ``fps`` must also be provided.
fps: float
The framerate. Defaults to ``None``
Raises
------
ValueError
When both ``Nf`` and ``dt`` are ``None``
Returns
-------
Generator
An iterator that returns the requested frame numbers.
"""
if Nf is not None and (dt is None and fps is None): # todo: very awkward, make two methods instead? also, this should be in shapeflow.video instead of here
Nf = min(Nf, total)
for f in np.linspace(0, total, Nf):
yield int(f)
elif (dt is not None and fps is not None) and Nf is None:
df = restrict(dt * fps, 1, total)
for f in np.arange(0, total, df):
yield int(f)
else:
ValueError()
[docs]def before_version(version_a, version_b):
"""Check whether ``version_a`` precedes ``version_b``.
.. note::
Only handles numerics, i.e. ``"1.25b.3v7"`` won't work.
"""
return tuple(int(s) for s in version_a.split('.')) \
< tuple(int(s) for s in version_b.split('.'))
[docs]def after_version(version_a, version_b):
"""Check whether ``version_a`` supercedes ``version_b``.
.. note::
Only handles numerics, i.e. ``"1.25b.3v7"`` won't work.
"""
return not before_version(version_a, version_b)
[docs]def hash_file(path: str, blocksize: int = 1024) -> queue.Queue:
"""Start hashing a file without blocking the current thread.
Parameters
----------
path: str
The path of the file to hash.
blocksize: int
The blocksize step to take when reading the file.
Defaults to 1024.
Returns
-------
queue.Queue
A new ``queue.Queue`` object.
Once the hash is ready, it will be pushed to this queue.
"""
if os.path.isfile:
q: queue.Queue = queue.Queue()
def _hash_file():
nonlocal q
m = hashlib.sha1()
with open(path, 'rb') as f:
while True:
buf = f.read(blocksize)
if not buf:
break
m.update(buf)
hash = m.hexdigest()
q.put(hash)
threading.Thread(target=_hash_file, daemon=True).start()
return q
[docs]@contextmanager
def suppress_stdout():
"""Suppress ``stdout`` within a context.
https://stackoverflow.com/questions/2125702/
"""
with open(os.devnull, "w") as devnull:
old_stdout = sys.stdout
sys.stdout = devnull
try:
yield
finally:
sys.stdout = old_stdout
[docs]def sizeof_fmt(size, suffix='B'):
"""Get a file size in bytes as a human-readable string.
For example::
>>> sizeof_fmt(10**3)
"1 KB"
>>> sizeof_fmt(10**9)
"1 GB"
Parameters
----------
num
A file size in bytes
suffix: str
The suffix to use. Defaults to ``"B"``.
Returns
-------
str
The file size as a human-readable string in decimal bytes.
"""
for unit in ['','K','M','G','T','P','E','Z']:
if abs(size) < 1000.0:
return "%.0f %s%s" % (size, unit, suffix)
size /= 1000.0
return "%.1f %s%s" % (size, 'Y', suffix)
[docs]class Singleton(type):
"""A metaclass for singletons.
"""
_instances: dict = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
[docs]def open_path(path: str) -> None:
"""Open a path in the file browser.
"""
if os.path.isfile(path):
path = os.path.dirname(os.path.realpath(path))
if os.name == 'nt': # Windows
os.startfile(path) # type: ignore
elif os.name == 'darwin': # MacOS
subprocess.Popen(['open', path])
else: # Something else, probably has xdg-open
subprocess.Popen(['xdg-open', path])
[docs]@contextmanager
def ensure_path(path: Union[str, Path]):
""" A hacky way to allow imports from arbitrary directories.
Only use this for testing ``sf.py`` please :(
"""
if isinstance(path, str):
path = Path(path)
path = Path(path).absolute()
path_str = str(path)
try:
assert path.is_dir()
sys.path.insert(0, path_str)
yield
except NotADirectoryError:
raise
finally:
sys.path.remove(path_str)