# coding: utf-8
"""
Helpful functions and utilities.
"""
from __future__ import annotations
__all__ = [
"is_lazy_iterable", "make_list", "tmp_file", "tmp_dir", "MockModule",
]
import os
import time
import shutil
import tempfile
import contextlib
import subprocess
import signal
import importlib
import six
from collections.abc import MappingView
from types import GeneratorType, ModuleType
from typing import Any
lazy_iter_types = (
GeneratorType,
MappingView,
range,
map,
enumerate,
)
[docs]def is_lazy_iterable(obj: Any) -> bool:
"""
Returns whether *obj* is iterable lazily, such as generators, range objects, maps, etc.
"""
return isinstance(obj, lazy_iter_types)
[docs]def make_list(obj: Any, cast: bool = True) -> list[Any]:
"""
Converts an object *obj* to a list and returns it. Objects of types *tuple* and *set* are
converted if *cast* is *True*. Otherwise, and for all other types, *obj* is put in a new list.
"""
if isinstance(obj, list):
return list(obj)
if is_lazy_iterable(obj):
return list(obj)
if isinstance(obj, (tuple, set)) and cast:
return list(obj)
return [obj]
def verbose_import(
module_name: str,
user: str | None = None,
package: str | None = None,
pip_name: str | None = None,
) -> ModuleType:
try:
return importlib.import_module(module_name, package=package)
except ImportError as e:
if user:
e.msg += f" but is required by {user}"
if pip_name:
e.msg += f" (you may want to try 'pip install --user {pip_name}')"
raise e
[docs]@contextlib.contextmanager
def tmp_file(create=False, delete=True, **kwargs):
"""
Prepares a temporary file and opens a context yielding its path. When *create* is *True*, the
file is created before the context is opened, and deleted upon closing if *delete* is *True*.
All *kwargs* are forwarded to :py:func:`tempfile.mkstemp`.
"""
path = tempfile.mkstemp(**kwargs)[1]
exists = os.path.exists(path)
if not create and exists:
os.remove(path)
elif create and not exists:
open(path, "a").close()
try:
yield path
finally:
if delete and os.path.exists(path):
os.remove(path)
[docs]@contextlib.contextmanager
def tmp_dir(create=True, delete=True, **kwargs):
"""
Prepares a temporary directory and opens a context yielding its path. When *create* is *True*,
the directory is created before the context is opened, and deleted upon closing if *delete* is
*True*. All *kwargs* are forwarded to :py:func:`tempfile.mkdtemp`.
"""
path = tempfile.mkdtemp(**kwargs)
exists = os.path.exists(path)
if not create and exists:
shutil.rmtree(path)
elif create and not exists:
os.makedirs(path)
try:
yield path
finally:
if delete and os.path.exists(path):
shutil.rmtree(path)
_shell_colors = {
"default": 39,
"black": 30,
"red": 31,
"green": 32,
"yellow": 33,
"blue": 34,
"magenta": 35,
"cyan": 36,
}
def colored(s: str, color: str = "white") -> str:
"""
Returns a string *s* in a shell-colored representation.
"""
color_id = _shell_colors.get(color, 39)
return "\033[{}m{}\033[0m".format(color_id, s)
def interruptable_popen(*args, **kwargs):
""" interruptable_popen(*args, stdin_callback=None, stdin_delay=0, interrupt_callback=None, kill_timeout=None, **kwargs) # noqa
Shorthand to :py:class:`Popen` followed by :py:meth:`Popen.communicate` which can be interrupted
by *KeyboardInterrupt*. The return code, standard output and standard error are returned in a
3-tuple.
*stdin_callback* can be a function accepting no arguments and whose return value is passed to
``communicate`` after a delay of *stdin_delay* to feed data input to the subprocess.
*interrupt_callback* can be a function, accepting the process instance as an argument, that is
called immediately after a *KeyboardInterrupt* occurs. After that, a SIGTERM signal is send to
the subprocess to allow it to gracefully shutdown.
When *kill_timeout* is set, and the process is still alive after that period (in seconds), a
SIGKILL signal is sent to force the process termination.
All other *args* and *kwargs* are forwarded to the :py:class:`Popen` constructor.
"""
# get kwargs not being passed to Popen
stdin_callback = kwargs.pop("stdin_callback", None)
stdin_delay = kwargs.pop("stdin_delay", 0)
interrupt_callback = kwargs.pop("interrupt_callback", None)
kill_timeout = kwargs.pop("kill_timeout", None)
# start the subprocess in a new process group
kwargs["preexec_fn"] = os.setsid
p = subprocess.Popen(*args, **kwargs)
# get stdin
stdin_data = None
if callable(stdin_callback):
if stdin_delay > 0:
time.sleep(stdin_delay)
stdin_data = stdin_callback()
if isinstance(stdin_data, six.string_types):
stdin_data = (stdin_data + "\n").encode("utf-8")
# handle interrupts
try:
out, err = p.communicate(stdin_data)
except KeyboardInterrupt:
# allow the interrupt_callback to perform a custom process termination
if callable(interrupt_callback):
interrupt_callback(p)
# when the process is still alive, send SIGTERM to gracefully terminate it
pgid = os.getpgid(p.pid)
if p.poll() is None:
os.killpg(pgid, signal.SIGTERM)
# when a kill_timeout is set, and the process is still running after that period,
# send SIGKILL to force its termination
if kill_timeout is not None:
target_time = time.perf_counter() + kill_timeout
while target_time > time.perf_counter():
time.sleep(0.05)
if p.poll() is not None:
# the process terminated, exit the loop
break
else:
# check the status again to avoid race conditions
if p.poll() is None:
os.killpg(pgid, signal.SIGKILL)
# transparently reraise
raise
# decode outputs
if out is not None:
out = out.decode("utf-8")
if err is not None:
err = err.decode("utf-8")
return p.returncode, out, err
[docs]class MockModule(object):
"""
Mockup object that resembles a module with arbitrarily deep structure such that, e.g.,
.. code-block:: python
tf = MockModule("tensorflow")
print(tf.Graph)
# -> "<MockupModule 'tf' at 0x981jald1>"
will always succeed at declaration time.
.. py:attribute:: _name
type: str
The name of the mock module.
"""
def __init__(self, name: str):
super().__init__()
self._name = name
def __getattr__(self, attr: str) -> "MockModule":
return type(self)(f"{self._name}.{attr}")
def __repr__(self) -> str:
return f"<{self.__class__.__name__} '{self._name}' at {hex(id(self))}>"
def __call__(self, *args, **kwargs) -> None:
raise Exception(f"{self._name} is a mock module and cannot be called")
def __nonzero__(self) -> bool:
return False
def __bool__(self) -> bool:
return False
def __or__(self, other) -> Any:
# forward union type hints
return type(self) | other