from enum import IntEnum, Enum
import diskcache
import sys
import abc
import time
import threading
from contextlib import contextmanager
from typing import Any, List, Optional, Tuple, Dict, Type, Mapping
import numpy as np
import pandas as pd
from pydantic import Field
from shapeflow import settings, get_logger, get_cache
from shapeflow.api import api
from shapeflow.core import RootException, RootInstance, Described
from shapeflow.maths.colors import Color, HsvColor, as_hsv
from shapeflow.util.meta import describe_function
from shapeflow.util import Timer, Timing
from shapeflow.core.db import BaseAnalysisModel
from shapeflow.core.config import Factory, BaseConfig, Instance, Configurable
from shapeflow.core.streaming import EventStreamer
from shapeflow.core.interface import InterfaceType
log = get_logger(__name__)
[docs]class BackendSetupError(RootException):
msg = 'Error while setting up backend'
[docs]class BackendError(RootException):
msg = 'Error in backend'
[docs]class CacheAccessError(RootException):
msg = 'Trying to access cache out of context'
_BLOCKED = 'BLOCKED'
[docs]class PushEvent(Enum):
"""Categories of server-pushed events.
"""
STATUS = 'status'
CONFIG = 'config'
NOTICE = 'notice'
[docs]class AnalyzerState(IntEnum):
"""The state of an analyzer
"""
UNKNOWN = 0
INCOMPLETE = 1
CAN_LAUNCH = 2
LAUNCHED = 3
CAN_FILTER = 4
CAN_ANALYZE = 5
ANALYZING = 6
DONE = 7
CANCELED = 8
ERROR = 9
[docs] @classmethod
def can_launch(cls, state: int) -> bool:
"""Returns ``True`` if an analyzer can launch from this state
"""
return state in [
cls.CAN_LAUNCH,
cls.LAUNCHED,
cls.CAN_ANALYZE,
cls.DONE,
cls.CANCELED,
]
[docs] @classmethod
def is_launched(cls, state: int) -> bool:
"""Returns ``True`` if an analyzer is launched in this state
"""
return state in [
cls.LAUNCHED,
cls.CAN_FILTER,
cls.CAN_ANALYZE,
cls.DONE,
cls.ANALYZING,
cls.CANCELED,
]
[docs]class QueueState(IntEnum):
"""The state of the analysis queue
"""
STOPPED = 0
RUNNING = 1
PAUSED = 2
[docs]class CachingInstance(Instance): # todo: consider a waterfall cache: e.g. 2 GB in-memory, 4GB on-disk, finally the actual video
"""Cache method results with ``diskcache.Cache``
"""
_cache: Optional[diskcache.Cache]
def __init__(self, config: BaseConfig = None):
super(CachingInstance, self).__init__(config)
self._open_cache()
def __del__(self):
self._close_cache()
def _get_key(self, method, *args) -> str:
# Key should be instance-independent to handle multithreading
# and caching between application runs.
# Hashing the string is a significant performance hit.
return f"{describe_function(method)}{args}"
def _to_cache(self, key: str, value: Any):
if self._cache is None:
raise CacheAccessError
self._cache.set(key, value)
def _from_cache(self, key: str) -> Optional[Any]:
if self._cache is None:
raise CacheAccessError
return self._cache.get(key)
def _block(self, key: str):
if self._cache is None:
raise CacheAccessError
self._cache.set(key, _BLOCKED)
def _is_blocked(self, key: str) -> bool:
if self._cache is None:
raise CacheAccessError
return key in self._cache \
and isinstance(self._cache[key], str) \
and self._from_cache(key) == _BLOCKED
def _touch_keys(self, keys: List[str]):
if self._cache is None:
raise CacheAccessError
for key in keys:
if key in self._cache:
self._cache.touch(key)
def _drop(self, key: str):
if self._cache is None:
raise CacheAccessError
del self._cache[key]
def _is_cached(self, method, *args):
if self._cache is None:
raise CacheAccessError
return self._get_key(method, *args) in self._cache
[docs] def cached_call(self, method, *args, **kwargs): # todo: kwargs necessary?
"""Call a method or get the result from the cache if available.
Parameters
----------
method
The method to call
args
Any positional arguments to pass on to the method
kwargs
Any keyword arguments to pass on to the method
Returns
-------
Any
Whatever the method returns.
"""
key = self._get_key(method, *args)
if self._cache is not None:
# Check if the file's already cached
if key in self._cache:
t0 = time.time()
while self._is_blocked(key) and time.time() < t0 + settings.cache.block_timeout:
# Some other thread is currently reading the same frame
# Wait a bit and try to get from cache again
log.debug(f'{self.__class__.__qualname__}: '
f'waiting for {key} to be released...', 5)
time.sleep(0.01)
value = self._from_cache(key)
if isinstance(value, str) and value == _BLOCKED:
log.warning(f'{self.__class__.__qualname__}: '
f'timed out waiting for {key}.')
else:
log.debug(f"{self.__class__.__qualname__}: "
f"read cached {key}.")
return value
# Cache a temporary string to 'block' the key
log.debug(f"{self.__class__.__qualname__}: caching {key}")
log.vdebug(f"{self.__class__.__qualname__}: block {key}.")
self._block(key)
log.vdebug(f"{self.__class__.__qualname__}: execute {key}.")
value = method(*args, **kwargs)
log.vdebug(f"{self.__class__.__qualname__}: write {key}.")
self._to_cache(key, value)
return value
else:
log.vdebug(f"Execute {key}.")
return method(*args, **kwargs)
def _open_cache(self, override: bool = False):
if settings.cache.do_cache or override:
log.debug(f"{self.__class__.__qualname__}: "
f"opening cache @ {settings.cache.dir}")
self._cache = get_cache()
else:
self._cache = None
def _close_cache(self):
if self._cache is not None:
log.debug(f"{self.__class__.__qualname__}: "
f"closing cache @ {settings.cache.dir}")
self._cache.close()
self._cache = None
[docs]class FeatureConfig(BaseConfig):
"""Abstract :class:`~shapeflow.core.backend.Feature` parameters"""
pass
[docs]class Feature(Configurable, metaclass=abc.ABCMeta): # todo: should probably use Config for parameters after all :)
"""A feature implements interactions between BackendElements to
produce a certain value
"""
_color: Optional[Color]
_state: Optional[np.ndarray]
_label: str = '' # todo: keep these in the config instead?
"""Label string, to be used in exported data and the user interface
"""
_unit: str = ''
"""Unit string, to be used in exported data and the user interface
"""
_elements: Tuple[Instance, ...] = ()
_config: Optional[FeatureConfig]
_global_config: FeatureConfig
_config_class: Type[FeatureConfig] = FeatureConfig
def __init__(self, elements: Tuple[Instance, ...], global_config: FeatureConfig, config: Optional[dict] = None):
self._skip = False
self._ready = False
self._elements = elements
self._global_config = global_config
if config is not None:
self._config = global_config.__class__(**config)
else:
self._config = None
self._color = HsvColor(h=0,s=200,v=255) # start out as red
[docs] def calculate(self, frame: np.ndarray, state: np.ndarray = None) \
-> Tuple[Any, Optional[np.ndarray]]:
"""Calculate the feature for the given frame
Parameters
----------
frame : np.ndarray
A video frame
state : Optional[np.ndarray]
An ``np.ndarray`` for the state image. Should have the same
dimensions as the ``frame``
Returns
-------
Any
The calculated feature value
Optional[np.ndarray]
If a state image was provided, return this state frame with the
feature's state frame added onto it. If not, return ``None``.
"""
"""Calculate the feature for given frame
"""
if state is not None:
state = self.state(frame, state)
return self.value(frame), state
[docs] @classmethod
def label(cls) -> str:
"""The label of this feature. Used in the user interface and results.
"""
return cls._label
[docs] @classmethod
def unit(cls) -> str:
"""The unit of this feature. Used in the user interface and results.
"""
return cls._unit
@property
@abc.abstractmethod
def skip(self) -> bool:
"""Whether this feature should be skipped
"""
@property
@abc.abstractmethod
def ready(self) -> bool:
"""Whether this feature is ready to be calculated
"""
def set_color(self, color: Color):
self._color = color
@property
def color(self) -> Color:
"""Color of the feature in figures.
A feature's color is handled by its :class:`~shapeflow.core.backend.FeatureSet`
as not to overlap with any other features in that set.
"""
if self._color is not None:
return self._color
else:
raise ValueError
@abc.abstractmethod
def _guideline_color(self) -> Color:
"""Returns the 'guideline color' of a feature, which is used to resolve
to the final color within a feature set.
"""
[docs] @abc.abstractmethod # todo: we're dealing with frames explicitly, so maybe this should be an shapeflow.video thing...
def state(self, frame: np.ndarray, state: np.ndarray) -> np.ndarray:
"""Return the feature's state image for a given frame
"""
[docs] @abc.abstractmethod
def value(self, frame: np.ndarray) -> Any:
"""Compute the value of the feature for a given frame
"""
@property
def config(self):
"""The configuration of the feature.
Default to the global configuration if no specific one is provided.
"""
if self._config is not None:
return self._config
else:
return self._global_config
[docs]class FeatureSet(Configurable):
"""A set of :class:`~shapeflow.core.backend.Feature` instances
"""
_feature: Tuple[Feature, ...]
_colors: Tuple[Color, ...]
_config_class = FeatureConfig
def __init__(self, features: Tuple[Feature, ...]):
self._features = features
[docs] def resolve_colors(self) -> Tuple[Color, ...]:
"""Resolve the colors of all features in this set so that none of them
overlap.
"""
guideline_colors = [
as_hsv(f._guideline_color()) for f in self._features
]
min_v = 20.0
max_v = 255.0
tolerance = 15
bins: list = []
# todo: clean up binning
for index, color in enumerate(guideline_colors):
if not bins:
bins.append([index])
else:
in_bin = False
for bin in bins:
if abs(
float(color.h) -
np.mean([guideline_colors[i].h for i in bin])
) < tolerance:
bin.append(index)
in_bin = True
break
if not in_bin:
bins.append([index])
for bin in bins:
if len(bin) < 4:
increment = 60.0
else:
increment = (max_v - min_v) / len(bin)
for repetition, index in enumerate(bin):
self._features[index].set_color(
HsvColor(
h=guideline_colors[index].h,
s=220,
v=int(max_v - repetition * increment)
)
)
self._colors = tuple([feature.color for feature in self._features])
return self.colors
@property
def colors(self) -> Tuple[Color, ...]:
"""The resolved colors in this feature set
"""
return self._colors
@property
def features(self) -> Tuple[Feature, ...]:
"""The features in this feature set
"""
return self._features
[docs] def calculate(self, frame: np.ndarray, state: Optional[np.ndarray]) -> Tuple[List[Any], Optional[np.ndarray]]:
"""Calculate all features in this set for a given frame
Parameters
----------
frame : np.ndarray
An image
state : Optional[np.ndarray]
An empty ``np.ndarray`` for the state image. Should have the same
dimensions as the ``frame``
Returns
-------
List[Any]
The calculated feature values
Optional[np.ndarray]
If a state image was provided, return the composite state image of
this feature set. If not, return ``None``.
"""
values = []
for feature in self.features:
value, state = feature.calculate(frame=frame, state=state)
values.append(value)
return values, state
[docs]class FeatureType(InterfaceType):
""":class:`~shapeflow.core.backend.Feature` factory
"""
_type = Feature
_mapping: Mapping[str, Type[Feature]] = {}
_config_type = FeatureConfig
[docs] def get(self) -> Type[Feature]:
"""Get the :class:`~shapeflow.core.backend.Feature` for this feature type
"""
feature = super().get()
assert issubclass(feature, Feature)
return feature
[docs] def config_schema(self) -> dict:
"""The ``pydantic`` configuration schema for
this type of :class:`~shapeflow.core.backend.Feature`
"""
return self.get().config_schema()
[docs] @classmethod
def __modify_schema__(cls, field_schema):
"""Modify ``pydantic`` schema to include units and labels.
"""
super().__modify_schema__(field_schema)
field_schema.update(
units={ k:v._unit for k,v in cls._mapping.items() },
labels={ k:v._label for k,v in cls._mapping.items() }
)
[docs]class BaseAnalyzerConfig(BaseConfig):
"""Abstract analyzer configuration.
"""
video_path: Optional[str] = Field(default=None)
design_path: Optional[str] = Field(default=None)
name: Optional[str] = Field(default=None)
description: Optional[str] = Field(default=None)
[docs]class BaseAnalyzer(Instance, RootInstance, metaclass=abc.ABCMeta):
"""Abstract analyzer.
"""
_config: BaseAnalyzerConfig
_state: int
_busy: bool
_progress: float
_cancel: threading.Event
_error: threading.Event
results: Dict[str, pd.DataFrame]
_timer: Timer
_video_hash: Optional[str]
_design_hash: Optional[str]
_model: Optional[BaseAnalysisModel]
_eventstreamer: Optional[EventStreamer]
_runs: int
def __init__(self, config: BaseAnalyzerConfig = None, eventstreamer: EventStreamer = None):
self.set_eventstreamer(eventstreamer)
super().__init__(config)
self._timer = Timer(self, log)
self._launched = False
self._hash_video = None
self._hash_design = None
self._state: AnalyzerState = AnalyzerState.INCOMPLETE
self._busy = False
self._progress = 0.0
self._model = None
self._cancel = threading.Event()
self._error = threading.Event()
def set_model(self, model: BaseAnalysisModel):
self._model = model
if self.config.name is None:
self.config(name=self.get_name())
self._runs = self.model.get_runs()
@property
def model(self):
"""The database model associated with this analyzer.
"""
return self._model
@property
def runs(self):
"""The number of runs performed for this analysis.
"""
return self._runs
def _new_run(self):
self._runs += 1
@property
def eventstreamer(self):
"""Return the server-sent event streamer.
"""
return self._eventstreamer
[docs] def set_eventstreamer(self, eventstreamer: EventStreamer = None):
"""Set the server-sent event streamer.
"""
self._eventstreamer = eventstreamer
[docs] def event(self, category: PushEvent, data: dict) -> None:
"""Push an event.
Parameters
----------
category : PushEvent
The category of event to push
data : dict
The data to push
"""
if self.eventstreamer is not None:
self.eventstreamer.event(category.value, self.id, data)
[docs] def notice(self, message: str, persist: bool = False) -> None:
"""Push a notice.
Parameters
----------
message : str
The notice message to push
persist : bool
Whether the notice should be persistent (i.e. stay up on the user
interface until dismissed manually)
"""
self.event(
PushEvent.NOTICE,
data={'message': message, 'persist': persist}
)
log.warning(f"'{self.id}': {message}")
[docs] @api.va.__id__.commit.expose()
def commit(self) -> bool:
"""Save video analysis configuration to history database
:attr:`shapeflow.api._VideoAnalyzerDispatcher.commit`
"""
if self._model is not None:
log.debug(f"committing '{self.id}'")
self._model.store()
return True
else:
return False
[docs] @abc.abstractmethod
def can_launch(self) -> bool:
"""
Returns
-------
bool
Whether this analyzer can launch
"""
[docs] @abc.abstractmethod
def can_filter(self) -> bool:
"""
Returns
-------
bool
Whether this analyzer can filter
"""
[docs] @abc.abstractmethod
def can_analyze(self) -> bool:
"""
Returns
-------
bool
Whether this analyzer can analyze
"""
@property
def launched(self):
return AnalyzerState.is_launched(self.state)
def set_state(self, state: int, push: bool = True):
self._state = state
if push:
self.push_status()
@property
def state(self) -> AnalyzerState:
assert isinstance(self._state, AnalyzerState) # todo: fix int / AnalyzerState typing
return self._state
@property
def done(self) -> bool:
return self.state == AnalyzerState.DONE
[docs] @api.va.__id__.state_transition.expose()
def state_transition(self, push: bool = True) -> int:
"""Handle state transitions.
:attr:`shapeflow.api._VideoAnalyzerDispatcher.state_transition`
Parameters
----------
push : bool
Whether to push an event once the state is set.
Returns
-------
int
The resulting state ~ :class:`~shapeflow.backend.AnalyzerState`
"""
# todo: actually type as AnalyzerState
if self.state == AnalyzerState.INCOMPLETE and self.can_launch():
self.set_state(AnalyzerState.CAN_LAUNCH, push)
elif self.state == AnalyzerState.LAUNCHED or self.state == AnalyzerState.CAN_FILTER:
if self.can_analyze():
self.set_state(AnalyzerState.CAN_ANALYZE, push)
elif self.can_filter():
self.set_state(AnalyzerState.CAN_FILTER, push)
else:
self.set_state(AnalyzerState.LAUNCHED, push)
elif self.state == AnalyzerState.DONE or self.state == AnalyzerState.CANCELED:
self.set_progress(0.0, push=False)
if self.can_analyze():
self.set_state(AnalyzerState.CAN_ANALYZE, push)
elif self.can_filter():
self.set_state(AnalyzerState.CAN_FILTER, push)
elif self.launched:
self.set_state(AnalyzerState.LAUNCHED, push)
else:
if self.can_analyze():
self.set_state(AnalyzerState.CAN_ANALYZE, push)
elif self.can_filter():
self.set_state(AnalyzerState.CAN_FILTER, push)
elif self.launched:
self.set_state(AnalyzerState.LAUNCHED, push)
elif self.can_launch():
self.set_state(AnalyzerState.CAN_LAUNCH, push)
else:
self.set_state(AnalyzerState.ERROR)
return int(self.state)
def set_busy(self, busy: bool, push: bool = True):
self._busy = busy
if push:
self.push_status()
@property
def busy(self) -> bool:
return self._busy
@contextmanager
def busy_context(self, busy_state: AnalyzerState = None, done_state: AnalyzerState = None):
if done_state is None:
done_state = self.state
try:
if busy_state is not None:
self.set_state(busy_state)
self.set_busy(True)
yield
finally:
self.set_busy(False)
self.set_state(done_state)
[docs] @api.va.__id__.cancel.expose()
def cancel(self) -> None:
"""Cancel a running analysis.
:attr:`shapeflow.api._VideoAnalyzerDispatcher.cancel`
"""
super().cancel()
self.set_state(AnalyzerState.CANCELED)
[docs] def error(self) -> None:
super().error()
self.set_state(AnalyzerState.ERROR)
def set_progress(self, progress: float, push: bool = True):
self._progress = progress
if push:
self.push_status()
@property
def progress(self) -> float:
return self._progress
@abc.abstractmethod
def _launch(self):
"""Initializes nested objects
"""
@property
def config(self) -> BaseAnalyzerConfig:
return self._config
@abc.abstractmethod
def _new_results(self):
"""
"""
[docs] @abc.abstractmethod
def analyze(self) -> bool:
"""
"""
@property
@abc.abstractmethod
def position(self) -> float:
"""
"""
@property
@abc.abstractmethod
def cached(self) -> bool:
"""
"""
@property
@abc.abstractmethod
def has_results(self) -> bool:
"""
"""
[docs] @api.va.__id__.status.expose()
def status(self) -> dict:
"""Get the analyzer's status.
:attr:`shapeflow.api._VideoAnalyzerDispatcher.status`
"""
status = {
'state': self.state,
'busy': self.busy,
'cached': self.cached,
'results': self.has_results,
'position': self.position,
'progress': self.progress,
}
return status
def push_status(self):
self.event(PushEvent.STATUS, self.status())
[docs] @api.va.__id__.get_config.expose()
def get_config(self, do_tag=False) -> dict:
"""Get the analyzer's configuration.
:attr:`shapeflow.api._VideoAnalyzerDispatcher.get_config`
"""
self._gather_config()
config = self.config.to_dict(do_tag)
return config
[docs] @abc.abstractmethod
def set_config(self, config: dict, silent: bool = False) -> dict:
"""Set the analyzer's configuration
:attr:`shapeflow.api._VideoAnalyzerDispatcher.set_config`
Parameters
----------
config: dict
A configuration ``dict``
silent: bool
If ``True``, don't push server-sent events. ``False`` by default.
Returns
-------
dict
The updated configuration ``dict``. Some fields may have been
changed due to incompatibility.
"""
@abc.abstractmethod
def _gather_config(self):
"""
"""
[docs] @api.va.__id__.launch.expose()
def launch(self) -> bool:
"""Launch the analyzer.
:attr:`shapeflow.api._VideoAnalyzerDispatcher.launch`
If the analyzer's configuration is sufficiently filled out,
the analyzer will instantiate any other objects it needs to start
configuring the analysis further.
To launch, the analyzer needs at least
* A valid video file
* A valid design file
* At least one valid feature configuration
Returns
-------
bool
Whether the launch was successful or not.
"""
with self.lock():
if self.can_launch():
self._launch()
# Commit to history
self.commit()
# Push events
self.set_state(AnalyzerState.LAUNCHED)
self.event(PushEvent.CONFIG, self.get_config())
# State transition (may change from LAUNCHED ~ config)
self.state_transition()
return self.launched
else:
log.warning(f"{self.__class__.__qualname__} can not be launched.") # todo: try to be more verbose
return False
def get_name(self) -> str:
try:
return self.model.get_name()
except AttributeError:
return self.id
[docs] @api.va.__id__.get_db_id.expose()
def get_db_id(self) -> int:
"""Get the database id of this analyzer.
:attr:`shapeflow.api._VideoAnalyzerDispatcher.get_db_id`
"""
return self.model.get_id()
[docs] @contextmanager
def time(self, message: str = ''):
"""A timing context.
"""
try:
self._timer.__enter__(message)
yield self
finally:
self._timer.__exit__()
@property
def timing(self) -> Optional[Timing]:
"""Get the timing info from the latest run of :func:`~shapeflow.core.backend.BaseAnalyzer.time`
"""
if self._timer.timing is not None:
return Timing(*self._timer.timing)
else:
return None
def export(self):
raise NotImplementedError
@property
def description(self):
return self._description
[docs]class AnalyzerType(Factory):
"""Analyzer type factory
"""
_type = BaseAnalyzer
_mapping: Mapping[str, Type[Described]] = {}
[docs] def get(self) -> Type[BaseAnalyzer]:
"""Return the analyzer type.
"""
t = super().get()
assert issubclass(t, self._type)
return t
[docs] def config_schema(self) -> dict:
"""Return the config schema of the analyzer type.
"""
return self.get().config_class()().schema()