Source code for shapeflow.main

"""Main functionality of the ``shapeflow`` server.

Implements ``shapeflow.api`` endpoints related to the global application and
analyzer management.

The classes defined below should not be instantiated individually, but as a
whole using :func:`shapeflow.main.load`, because their setup process is not
very intuitive.
"""

import os
import abc
import time
import pickle
from typing import Dict, List, Optional
from threading import Event, Lock, Thread
import datetime

from flask import Response
import shortuuid
import diskcache
import cv2

from shapeflow.util import open_path, sizeof_fmt
from shapeflow.util.filedialog import filedialog
from shapeflow import get_logger, get_cache, settings, update_settings, ROOTDIR
from shapeflow.core import stream_off, Endpoint, RootException
from shapeflow.api import api, _FilesystemDispatcher, _DatabaseDispatcher, _VideoAnalyzerManagerDispatcher, _VideoAnalyzerDispatcher, _CacheDispatcher, ApiDispatcher
from shapeflow.core.streaming import streams, EventStreamer, PlainFileStreamer, BaseStreamer
from shapeflow.design import check_design
from shapeflow.core.backend import QueueState, AnalyzerState, BaseAnalyzer
from shapeflow.config import schemas, normalize_config, loads, BaseAnalyzerConfig
from shapeflow.video import init, VideoAnalyzer
import shapeflow.plugins

from shapeflow.db import History


log = get_logger(__name__)


class ShapeflowServerInterface(metaclass=abc.ABCMeta):
    @abc.abstractmethod
    def serve(self, host: str, port: int, open: bool) -> None:
        """Serve the application.

        Starts a new :class:`~shapeflow.server.ServerThread` and opens a new
        browser window/tab if requested.

        This method keeps serving until either

        * :func:`~shapeflow.main._Main.quit` is called

        * :func:`~shapeflow.main._Main.unload` is called and no incoming traffic
          is received for 5 seconds.

        * The user interrupts the process with ``Ctrl+C``

        Parameters
        ----------
        host: str
            Host address
        port: int
            Host port
        open: bool
            Whether to open in a browser window/tab after starting the server
        """

    @abc.abstractmethod
    def call_api(self, address: str) -> Response:
        """Dispatch request to the API.

        Arguments are gathered from ``Flask``'s ``request.data`` or
        ``request.args``

        Handles multiple types of return data:

        * ``bytes`` data are handled by ``flask.make_response``

        * for :class:`shapeflow.core.streaming.BaseStreamer` instances,
          custom ``flask.Response`` objects are made from their
          :func:`shapeflow.core.streaming.BaseStreamer.stream` generator

        * all other data are handled by ``flask.jsonify``

        Parameters
        ----------
        address: str
            The address of the endpoint to dispatch to
        """

    @abc.abstractmethod
    def unload(self) -> None:
        """Mark for unload.
        """

    @abc.abstractmethod
    def quit(self) -> None:
        """Mark for quit.
        """

    @abc.abstractmethod
    def restart(self) -> None:
        """Restart the server.
        """

    @abc.abstractmethod
    def active(self) -> None:
        """If the ``_unload`` has been set, cancel it. Should be called for
        incoming traffic.
        """

    @property
    @abc.abstractmethod
    def api(self) -> ApiDispatcher:
        """Get a reference to :data:`shapeflow.api.api` and ensure it has
        been initialized properly with :mod:`shapeflow.main` and bound to this
        :class:`~shapeflow.server.ShapeflowServer` instance.

        This property is used to lazy-load :data:`shapeflow.api.api` on the
        first request. As this takes some time, requests received during
        loading will be locked out until initialization completes. Subsequent
        requests bypass this lock.

        Returns
        -------
        ApiDispatcher
            A reference to :data:`~shapeflow.api.api`
        """

    @property
    @abc.abstractmethod
    def eventstreamer(self) -> EventStreamer:
        """A reference to the server's
        :class:`shapeflow.core.streaming.EventStreamer` instance
        """

[docs]class _Main(object): """Implements root-level :data:`~shapeflow.api.api` endpoints. """ _lock: Lock _server: ShapeflowServerInterface _log: Optional[PlainFileStreamer] def __init__(self, server: ShapeflowServerInterface): self._server = server self._lock = Lock() self._log = None
[docs] @api.ping.expose() def ping(self) -> bool: """Ping the server :attr:`shapeflow.api.ApiDispatcher.ping` Returns ------- bool If the server is up, ``True``. If the server is down, a ``/api/ping`` request will not get any response. """ self._server.active() return True
[docs] @api.map.expose() def map(self) -> Dict[str, List[str]]: """Get the URL map of the API :attr:`shapeflow.api.ApiDispatcher.map` Returns ------- Dict[str, List[str]] A flat ``dict`` mapping each available URL to a list of accepted HTTP methods """ # todo: this is a hacky replacement of Flask map map = { '/api/' + k: ['GET', 'PUT', 'POST', 'OPTIONS'] for k in api.address_space.keys() } va_id_map = { '/api/va/__id__/' + k: ['GET', 'PUT', 'POST', 'OPTIONS'] for k in api.va.__id__.__dir__() if isinstance(getattr(api.va.__id__, k), Endpoint) } return {**map, **va_id_map}
[docs] @api.schemas.expose() def schemas(self) -> dict: """Get the application schemas :attr:`shapeflow.api.ApiDispatcher.schemas` Returns ------- dict A ``dict`` with schemas """ return schemas()
[docs] @api.normalize_config.expose() def normalize_config(self, config: dict) -> dict: """Normalize a configuration ``dict`` with :func:`shapeflow.config.normalize_config` :attr:`shapeflow.api.ApiDispatcher.normalize_config` Parameters ---------- config: dict A configuration ``dict`` Returns ------- dict A normalized configuration ``dict`` """ return normalize_config(config)
[docs] @api.get_settings.expose() def get_settings(self) -> dict: """Get the application settings :attr:`shapeflow.api.ApiDispatcher.get_settings` Returns ------- dict The application settings as a ``dict`` """ return settings.to_dict()
[docs] @api.set_settings.expose() def set_settings(self, settings: dict) -> dict: """Set the application settings :attr:`shapeflow.api.ApiDispatcher.set_settings` Parameters ---------- settings: dict New application settings as a ``dict`` Returns ------- dict The new application settings ``dict``, which may have been modified. """ new_settings = update_settings(settings) self.restart() return new_settings
[docs] @api.events.expose() def events(self) -> EventStreamer: # todo: att Flask server level, should catch & convert to stream response """Get a server-sent event stream :attr:`shapeflow.api.ApiDispatcher.events` Returns ------- EventStreamer A :class:`~shapeflow.core.streaming.BaseStreamer` object, to be streamed by ``Flask`` """ return self._server.eventstreamer
[docs] @api.stop_events.expose() def stop_events(self) -> None: """Stop streaming server-sent events :attr:`shapeflow.api.ApiDispatcher.stop_events` """ self._server.eventstreamer.stop()
[docs] @api.log.expose() def log(self) -> PlainFileStreamer: """Start streaming log file :attr:`shapeflow.api.ApiDispatcher.log` Returns ------- PlainFileStreamer A :class:`~shapeflow.core.streaming.BaseStreamer` object, to be streamed by ``Flask`` """ if self._log is not None: self.stop_log() log.debug("streaming log file") self._log = PlainFileStreamer(path=str(settings.log.path)) return self._log
[docs] @api.stop_log.expose() def stop_log(self) -> None: """Stop streaming log file :attr:`shapeflow.api.ApiDispatcher.stop_log` """ log.debug("stopping log file stream") if self._log is not None: self._log.stop()
[docs] @api.unload.expose() def unload(self) -> bool: """Unload the application. Called when the user closes or refreshes a tab with the user interface. :attr:`shapeflow.api.ApiDispatcher.unload` """ self._server.unload() return True
[docs] @api.quit.expose() def quit(self) -> bool: """Quit the API server. :attr:`shapeflow.api.ApiDispatcher.quit` """ self._server.quit() return True
[docs] @api.restart.expose() def restart(self) -> bool: """Restart the API server :attr:`shapeflow.api.ApiDispatcher.restart` """ self._server.restart() return True
[docs] @api.pid_hash.expose() def pid_hash(self) -> str: """Get the current ``pid`` hash of the API server. :attr:`shapeflow.api.ApiDispatcher.pid_hash` Returns ------- str The hash of the current ``pid``. The actual ``pid`` is not given to avoid cheekiness. """ import hashlib return hashlib.sha1(bytes(os.getpid())).hexdigest() + '\n'
[docs]class _Cache(object): """Implements ``cache`` endpoints in :data:`~shapeflow.api.api`. """ _cache: diskcache.Cache def __init__(self): self._cache = get_cache()
[docs] @api.cache.clear.expose() def clear_cache(self) -> None: """Clear the cache :attr:`shapeflow.api._CacheDispatcher.clear` """ log.info(f"clearing cache") self._cache.clear()
[docs] @api.cache.size.expose() def cache_size(self) -> str: """Get the size of the cache :attr:`shapeflow.api._CacheDispatcher.size` Returns ------- str The size of the cache in human-readable form """ size = sizeof_fmt(self._cache.size) return size
[docs]class _Filesystem(object): """Implements ``fs`` endpoints in :data:`~shapeflow.api.api`. """ _history: History def __init__(self): self._history = History()
[docs] @api.fs.select_video.expose() def select_video(self) -> Optional[str]: """Open a video selection dialog :attr:`shapeflow.api._FilesystemDispatcher.select_video` Returns ------- Optional[str] The path of the selected video file, if any. """ return filedialog.load( title='Select a video file...', pattern=settings.app.video_pattern, pattern_description='Video files', )
[docs] @api.fs.select_design.expose() def select_design(self) -> Optional[str]: """Open a design selection dialog :attr:`shapeflow.api._FilesystemDispatcher.select_design` Returns ------- Optional[str] The path of the selected design file, if any. """ return filedialog.load( title='Select a design file...', pattern=settings.app.design_pattern, pattern_description='Design files', )
[docs] @api.fs.check_video.expose() def check_video(self, path: str) -> bool: """Check if a video file path is valid: * Whether it exists * Whether it's a valid video file that can be opened with ``OpenCV`` :attr:`shapeflow.api._FilesystemDispatcher.check_video` Parameters ---------- path: str A path Returns ------- bool Whether the path is a valid video file """ log.debug(f"checking video file '{path}'") if os.path.isfile(path): try: capture = cv2.VideoCapture(path) if int(capture.get(cv2.CAP_PROP_FRAME_COUNT)) > 0: self._history.add_video_file(path) # todo: overhead? return True else: return False except: return False else: return False
[docs] @api.fs.check_design.expose() def check_design(self, path: str) -> bool: """Check if a design file path is valid: * Whether it exists * Whether it's a valid design file :attr:`shapeflow.api._FilesystemDispatcher.check_design` Parameters ---------- path: str A path Returns ------- bool Whether the path is a valid design file """ log.debug(f"checking design file '{path}'") if os.path.isfile(path): try: check_design(path) self._history.add_design_file(path) # todo: overhead? return True except: return False else: return False
[docs] @api.fs.open_root.expose() def open_root(self) -> None: """Open :data:`shapeflow.ROOTDIR` in the file explorer :attr:`shapeflow.api._FilesystemDispatcher.open_root` """ try: open_path(str(ROOTDIR)) except Exception as e: log.error(f"Could not open {ROOTDIR}: " f"{e.__class__.__name__}: {e}")
class _Database(object): _server: ShapeflowServerInterface _history: History def __init__(self, server: ShapeflowServerInterface): self._server = server self._history = History() self._history.set_eventstreamer(server.eventstreamer) def check_history(self): # todo: move to shapeflow.db if self._history.check(): self._history.clean() else: timestamp = datetime.datetime.fromtimestamp( time.time() ).strftime(settings.format.datetime_format_fs) backup_path = f"{settings.db.path}_broken_{timestamp}" log.warning(f"backing up old history database @ {backup_path}") os.rename(settings.db.path, backup_path)
[docs]class _VideoAnalyzerManager(object): """Implements ``va`` endpoints in :data:`~shapeflow.api.api`. Manages :class:`~shapeflow.core.backend.BaseAnalyzer` instances * Adds / removes instances * Handles saving / loading of application state * Handles analysis queueing * Handles analyzer-specific streams """ _server: ShapeflowServerInterface _history: History _lock: Lock _q_thread: Thread _stop_q: Event _pause_q: Event _q_state: QueueState _dispatcher: _VideoAnalyzerManagerDispatcher __analyzers__: Dict[str, BaseAnalyzer] = {} # todo: analyzer manager should register analyzers with api.va on init """The currently active analyzers. """ ID_LENGTH = 6 """Length of ``id`` strings. Kept relatively short for readable URLs. """ def __init__(self, server: ShapeflowServerInterface): self._server = server self._history = History() self._history.set_eventstreamer(server.eventstreamer) self._lock = Lock() self._stop_q = Event() self._pause_q = Event() self._q_state = QueueState.STOPPED def _set_dispatcher(self, dispatcher: _VideoAnalyzerManagerDispatcher): self._dispatcher = dispatcher def _add(self, analyzer: BaseAnalyzer) -> str: if not hasattr(analyzer, 'id'): id = shortuuid.ShortUUID().random(length=self.ID_LENGTH) # ensure that the id doesn't start with a number there's no collisions while id[0].isdigit() or id in self.__analyzers__.keys(): id = shortuuid.ShortUUID().random(length=self.ID_LENGTH) analyzer._set_id(id) self.__analyzers__[analyzer.id] = analyzer self._dispatcher._add_dispatcher( analyzer.id, _VideoAnalyzerDispatcher(instance=analyzer) ) assert self._dispatcher._update is not None self._dispatcher._update(self._dispatcher) # todo: lame signature; also should be a part of _add_dispatcher probably return analyzer.id def _remove(self, id: str): del self.__analyzers__[id] for k in filter(lambda k: id in k, list(self._dispatcher._address_space.keys())): # todo: this is a bit lame del self._dispatcher._address_space[k] # todo: there should be a _remove_dispatcher probably assert self._dispatcher._update is not None self._dispatcher._update(self._dispatcher) # todo: lame signature def _commit(self): for analyzer in self.__analyzers__.values(): analyzer.commit() def _valid(self, id: str): if not id in self.__analyzers__: raise KeyError(f"no such id: '{id}")
[docs] def notice(self, message: str, persist: bool = False): """Push a notice to the server's ``EventStreamer`` Parameters ---------- message: str The message to push """ self._server.eventstreamer.event( 'notice', id='', data={'message': message, 'persist': persist} )
[docs] @api.va.init.expose() def init(self) -> str: """Initialize a new analyzer. :attr:`shapeflow.api._VideoAnalyzerManagerDispatcher.init` A short unique ``ìd`` will be generated. For example, a new analyzer with ``rG7bgH`` as its ``id`` can be addressed via ``/api/va/rG7bgH``. Returns ------- str The ``id`` string of the newly added analyzer """ with self._lock: analyzer = VideoAnalyzer() analyzer.set_eventstreamer(self._server.eventstreamer) self._history.add_analysis(analyzer) self._add(analyzer) log.info(f"init '{analyzer.id}'") self.save_state() return analyzer.id
[docs] @api.va.close.expose() def close(self, id: str) -> bool: """Close an analyzer. :attr:`shapeflow.api._VideoAnalyzerManagerDispatcher.close` Parameters ---------- id: str The ``id`` string of the analyzer to remove Returns ------- bool Whether the analyzer was removed successfully """ self._valid(id) with self._lock: log.info(f"close '{id}'") analyzer = self.__analyzers__[id] with analyzer.lock(): analyzer.commit() self._remove(id) self.save_state() return True
[docs] @api.va.start.expose() def q_start(self, queue: List[str]) -> bool: """Start analyzing a queue. :attr:`shapeflow.api._VideoAnalyzerManagerDispatcher.start` Parameters ---------- queue: List[str] List of analyzer ``id`` to queue. """ with self._lock: stopped = Event() def target(): if self._q_state == QueueState.STOPPED: self._q_state = QueueState.RUNNING if all(self.__analyzers__[id].can_analyze for id in queue): # todo: handle non-id entries in q log.info(f"analyzing queue: {queue}") for id in queue: while self._pause_q.is_set(): self._q_state = QueueState.PAUSED time.sleep(0.5) self._q_state = QueueState.RUNNING if self._stop_q.is_set(): break if not self.__analyzers__[id].done: self.__analyzers__[id].analyze() else: self.__analyzers__[id].notice( f"already analyzed " f"'{self.__analyzers__[id].get_name}' " f"with the current configuration." ) log.info(f"skipping '{id}'") self._q_state = QueueState.STOPPED else: log.info(f"Can't analyze all of {queue}") else: log.info(f"already started analyzing queue!") self._q_thread = Thread(target=target) self._q_thread.start() self._q_thread.join() self._pause_q.clear() if self._stop_q.is_set(): self._stop_q.clear() return False else: return True
[docs] @api.va.stop.expose() def q_stop(self) -> None: """Stop analyzing the current queue. :attr:`shapeflow.api._VideoAnalyzerManagerDispatcher.stop` """ log.info('stopping analysis queue') if self._pause_q.is_set(): self._pause_q.clear() self._stop_q.set() if settings.app.cancel_on_q_stop: self.q_cancel() else: for analyzer in self.__analyzers__.values(): if analyzer.state == AnalyzerState.ANALYZING: self.notice(f"waiting for {analyzer.config.name} to finish")
[docs] @api.va.cancel.expose() def q_cancel(self) -> None: """Cancel analyzing the current queue. :attr:`shapeflow.api._VideoAnalyzerManagerDispatcher.cancel` """ for analyzer in self.__analyzers__.values(): if analyzer.state == AnalyzerState.ANALYZING: analyzer.cancel()
[docs] @api.va.state.expose() def state(self) -> dict: """Get the queue state and the status of all analyzers. :attr:`shapeflow.api._VideoAnalyzerManagerDispatcher.state` Example:: { "q_state": 0, # QueueState "ids": ["abc123", "def456"], "status": { "abc123": { "state": 7, # AnalyzerState "busy": True, "cached": True, "results": False, "position": 0.7, "progress": 0.75, }, "def456": { "state": 6, # AnalyzerState "busy": False, "cached": True, "results": False, "position": 0.0, "progress": 0.0, }, } } With the state ``int`` values according to the ``Enum`` classes :class:`~shapeflow.core.backend.QueueState` and :class:`~shapeflow.core.backend.AnalyzerState`. Returns ------- dict A state ``dict`` """ with self._lock: return { 'q_state': self._q_state, 'ids': [k for k in self.__analyzers__.keys()], 'status': [a.status() for a in self.__analyzers__.values()], }
[docs] @api.va.save_state.expose() def save_state(self) -> None: """Save application state to ``shapeflow.settings.app.state_path`` :attr:`shapeflow.api._VideoAnalyzerManagerDispatcher.save_state` """ if settings.app.save_state: log.debug(f"saving application state") self._commit() with open(settings.app.state_path, 'wb') as f: pickle.dump({ id: analyzer.model.get('id') for id, analyzer in self.__analyzers__.items() if not analyzer.done }, f)
[docs] @api.va.load_state.expose() def load_state(self) -> None: """Load application state from ``shapeflow.settings.app.state_path`` :attr:`shapeflow.api._VideoAnalyzerManagerDispatcher.load_state` """ if settings.app.load_state: log.info(f"loading application state") try: with open(settings.app.state_path, 'rb') as f: S = pickle.load(f) for id, model_id in S.items(): assert isinstance(id, str) assert isinstance(model_id, int) model = self._history.fetch_analysis(model_id) if model is not None: model.connect(self._history) config_json = model.get_config_json() if config_json is not None: config = loads(config_json) else: raise RootException('invalid config from database') assert isinstance(config, BaseAnalyzerConfig) analyzer = init(config) analyzer._set_id(id) analyzer.set_eventstreamer(self._server.eventstreamer) analyzer.launch() self._add(analyzer) self._history.add_analysis(analyzer, model) except FileNotFoundError: pass except EOFError: pass
[docs] @api.va.stream.expose() def stream(self, id: str, endpoint: str) -> BaseStreamer: """Stream something. :attr:`shapeflow.api._VideoAnalyzerManagerDispatcher.stream` Parameters ---------- id: str The ``id`` of an analyzer endpoint: str The endpoint to stream Returns ------- BaseStreamer A stream handler object """ self._check_streaming(id, endpoint) with self._lock: log.debug(f"stream '{id}/{endpoint}'") method = self._dispatcher[id][endpoint].method return streams.register(self.__analyzers__[id], method)
[docs] @api.va.stream_stop.expose() def stream_stop(self, id: str, endpoint: str) -> None: """Stop streaming something. :attr:`shapeflow.api._VideoAnalyzerManagerDispatcher.stream_stop` Parameters ---------- id: str The ``id`` of an analyzer endpoint: str The endpoint to stop streaming """ try: self._check_streaming(id, endpoint) with self._lock: log.debug(f"stop stream '{id}/{endpoint}'") method = self._dispatcher[id][endpoint].method return streams.unregister(self.__analyzers__[id], method) except KeyError: # id is not valid anymore (probably already closed) pass
def _check_streaming(self, id, endpoint): self._valid(id) if not endpoint in map(lambda e: e.name, api.va[id].endpoints): raise AttributeError(f"no such endpoint: '{endpoint}") if self._dispatcher[id][endpoint].streaming == stream_off: raise ValueError(f"endpoint '{endpoint}' doesn't stream")
[docs]def load(server: ShapeflowServerInterface) -> ApiDispatcher: """Initialize :data:`~shapeflow.api.api` and return a reference to it. Parameters ---------- server: ShapeflowServer The ``shapeflow`` server object Returns ------- ApiDispatcher A reference to :data:`~shapeflow.api.api` """ api._set_instance(_Main(server)) history = History() history.set_eventstreamer(server.eventstreamer) api._add_dispatcher('db', _DatabaseDispatcher(history)) api._add_dispatcher('fs', _FilesystemDispatcher(_Filesystem())) api._add_dispatcher('cache', _CacheDispatcher(_Cache())) _vm = _VideoAnalyzerManager(server) _va = _VideoAnalyzerManagerDispatcher(_vm) _vm._set_dispatcher(_va) api._add_dispatcher('va', _va) if settings.app.load_state: api.dispatch_async('va/load_state') return api