Source code for shapeflow.db

import os
import json
from typing import Optional, Tuple, List, Dict, Type
from pathlib import Path
import datetime
import sqlite3

from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import create_engine, Column, Integer, Float, String, DateTime, ForeignKey

import pandas as pd

from shapeflow.api import api
from shapeflow.core import RootInstance
from shapeflow.core.db import Base, DbModel, SessionWrapper, FileModel, BaseAnalysisModel
from shapeflow import settings, get_logger, ResultSaveMode
from shapeflow.core.config import __meta_sheet__
from shapeflow.config import normalize_config, VideoAnalyzerConfig
from shapeflow.core.streaming import EventStreamer

from shapeflow.core.backend import BaseAnalyzer, BaseAnalyzerConfig


log = get_logger(__name__)


[docs]class VideoFileModel(FileModel): """Database model of a video file. """ __tablename__ = 'video_file'
[docs] def resolve(self) -> 'VideoFileModel': video = super().resolve() assert isinstance(video, VideoFileModel) return video
[docs]class DesignFileModel(FileModel): """Database model of a design file. """ __tablename__ = 'design_file'
[docs] def resolve(self) -> 'DesignFileModel': design = super().resolve() assert isinstance(design, DesignFileModel) return design
[docs]class ConfigModel(DbModel): """Database model of a configuration. """ __tablename__ = 'config' id = Column(Integer, primary_key=True) video = Column(Integer, ForeignKey('video_file.id')) design = Column(Integer, ForeignKey('design_file.id')) analysis = Column(Integer, ForeignKey('analysis.id')) json = Column(String) added = Column(DateTime)
[docs]class ResultModel(DbModel): """Database model of a result. """ __tablename__ = 'results' id = Column(Integer, primary_key=True) analysis = Column(Integer, ForeignKey('analysis.id')) run = Column(Integer) config = Column(Integer, ForeignKey('config.id')) feature = Column(String) """The feature that was analyzed""" data = Column(String) """Results of the analysis. In JSON, formatted ~ ``pandas.DataFrame.to_json(orient='split')``""" started = Column(DateTime) finished = Column(DateTime) elapsed = Column(Float)
[docs]class AnalysisModel(BaseAnalysisModel): """Database model of an analysis. Contains a reference to a :class:`~shapeflow.core.backend.BaseAnalyzer` instance. """ __tablename__ = 'analysis' _analyzer: Optional[BaseAnalyzer] _video: Optional[VideoFileModel] _design: Optional[DesignFileModel] _config: Optional[ConfigModel] _added_by_context: Dict[str, datetime.datetime] id = Column(Integer, primary_key=True) runs = Column(Integer) video = Column(Integer, ForeignKey('video_file.id')) design = Column(Integer, ForeignKey('design_file.id')) config = Column(Integer, ForeignKey('config.id')) results = Column(Integer, ForeignKey('results.id')) name = Column(String) description = Column(String) added = Column(DateTime) modified = Column(DateTime) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._resolve_attributes() def _resolve_attributes(self): for attr in ['_analyzer', '_video', '_design', '_config']: if not hasattr(self, attr): setattr(self, attr, None)
[docs] def get_name(self) -> str: """ Returns ------- str Name of the analysis from the database. Empty names are reset to '#{id}' """ with self.session(add=False): if self.name is None: self.name = f"#{self.id}" return self.name
[docs] def get_runs(self) -> int: """ Returns ------- int Number of completed runs for the analysis """ with self.session(add=False): if self.runs is None: self.runs = 0 return self.runs
[docs] def get_id(self) -> int: """ Returns ------- int The database id of the analysis """ with self.session(add=False): return self.id
def _set_analyzer(self, analyzer: BaseAnalyzer): self._analyzer = analyzer def _add_video(self, path: str) -> VideoFileModel: model = VideoFileModel(path=path) model.connect(self) return model def _add_design(self, path: str) -> DesignFileModel: model = DesignFileModel(path=path) model.connect(self) return model def _resolve_files(self): if self._analyzer.config.video_path and ( self._video is None or self._video.get('path') != self._analyzer.config.video_path): try: self._video = self._add_video( path=self._analyzer.config.video_path) except ValueError as e: pass if self._analyzer.config.design_path and ( self._design is None or self._design.get('path') != self._analyzer.config.design_path): try: self._design = self._add_design( path=self._analyzer.config.design_path) except ValueError as e: pass if self._video is not None: self._video = self._video.resolve() if self._design is not None: self._design = self._design.resolve() with self.session() as s: if self._video is not None: self.video = self._video.id if self._design is not None: self.design = self._design.id def _add_config(self, json: str) -> Optional[ConfigModel]: with self.session(): video = self.video design = self.design analysis = self.id if video is not None or design is not None: model = ConfigModel( video=video, design=design, analysis=analysis, json=json, added=datetime.datetime.now(), ) model.connect(self) return model else: return None
[docs] def store(self): # todo: consider passing analyzer to store() instead of keeping a reference """Store analysis information from the :class:`~shapeflow.core.backend.BaseAnalyzer` to the database. """ self._resolve_attributes() if self._analyzer is not None: config_json = json.dumps(self._analyzer.get_config(do_tag=True)) self._resolve_files() if self._config is None: self._config = self._add_config(json=config_json) else: if config_json != self._config.get('json'): self._config = self._add_config(json=config_json) with self.session() as s: if self._analyzer.config.name is not None: if not self._analyzer.config.name.strip(): self._analyzer.config.name = f"#{self.id}" else: self.name = self._analyzer.config.name.strip() self._analyzer.config.name = self.name if self._analyzer.config.description is not None: self.description = self._analyzer.config.description self.runs = self._analyzer.runs s.commit() if self._config is not None: self.config = self._config.id # Store results for k, df in self._analyzer.results.items(): # Add columnsfe if not df.isnull().all().all(): # todo: doesn't save results if there's *one* NaN? model = ResultModel( analysis=self.id, run=self.runs, config=self.config, feature=k, data=df.to_json(orient='split'), ) # todo: should have a _results: Dict[ <?>, ResultsModel] so these don't spawn new results each time s.add(model) # Store timing info t = self._analyzer.timing if t is not None: model.started = datetime.datetime.fromtimestamp(t.t0) model.finished = datetime.datetime.fromtimestamp(t.t1) model.elapsed = t.elapsed s.commit() self.results = model.id
[docs] def export_result(self, run: int = None, manual: bool = False): """Export a result to disk Parameters ---------- run : int The run to export manual : bool Whether this export request is manual (i.e. explicitly requested by the user). This setting determines whether to follow ``settings.app.save_result_manual`` or ``settings.app.save_result_auto`` when choosing where or whether to actually save. """ with self.session() as s: if self.runs is None or self.runs < 1: raise ValueError(f"'{self}' has no runs to export!") # If no run specified, export the latest if run is None: run = self.runs results = list( s.query(ResultModel).filter_by(analysis=self.id).filter_by(run=run) ) config = json.loads( s.query(ConfigModel).filter_by(id=results[0].config).first().json ) video = s.query(VideoFileModel).filter_by(id=self.video).first() design = s.query(DesignFileModel).filter_by(id=self.design).first() base_f = None if manual: if settings.app.save_result_manual == ResultSaveMode.next_to_video: base_f = str(os.path.splitext(config['video_path'])[0]) elif settings.app.save_result_manual == ResultSaveMode.next_to_design: base_f = str(os.path.splitext(config['design_path'])[0]) elif settings.app.save_result_manual == ResultSaveMode.directory: base_f = os.path.join( str(settings.app.result_dir), f"{self.name} run {run}" ) else: if settings.app.save_result_auto == ResultSaveMode.next_to_video: base_f = str(os.path.splitext(config['video_path'])[0]) elif settings.app.save_result_auto == ResultSaveMode.next_to_design: base_f = str(os.path.splitext(config['design_path'])[0]) elif settings.app.save_result_auto == ResultSaveMode.directory: base_f = os.path.join( str(settings.app.result_dir), f"{self.name} run {run}" ) if base_f is not None: f = base_f + ' ' + datetime.datetime.now().strftime( settings.format.datetime_format_fs ) + '.xlsx' w = pd.ExcelWriter(f) # Features to separate sheets for result in results: df = pd.read_json(result.data, orient='split') df.to_excel(w, sheet_name=result.feature) # Metadata in a separate sheet pd.DataFrame([json.dumps({ 'config': config, 'video_hash': video.hash, 'design_hash': design.hash, }, indent=2)]).to_excel( w, sheet_name=__meta_sheet__ ) w.save() w.close() log.info(f"'{self.id}' results exported to {f}") else: log.warning(f"'{self.id}' results were not exported!")
[docs] def load_config(self, video_path: str = None, design_path: str = None, include: List[str] = None) -> Optional[dict]: """Load configuration from the database. Parameters ---------- video_path : str Path to video file design_path : str Path to design file include : List[str] List of fields which must be included in the configuration. If a matching ConfigModel doesn't provide all of these, the other matches will be parsed to complete it. Returns ------- dict Configuration dict, if a matching config is found. Otherwise, returns ``None`` """ if include is None: include = ['transform', 'masks'] # Check whether all fields in include are valid for field in include: assert field in VideoAnalyzerConfig.__fields__, \ f"'{field}' in `include` is not a `VideoAnalyzerConfig` field." if video_path is not None: self._video = self._add_video(path=video_path) if design_path is not None: self._design = self._add_design(path=design_path) if self._video is not None: self._video = self._video.resolve() if self._design is not None: self._design = self._design.resolve() # Query for latest usages of video.id & design.id) with self.session() as s: q = s.query(ConfigModel) q = q.filter(ConfigModel.video == self._video.id) if self._design is not None: q = q.filter(ConfigModel.design == self._design.id) q = q.filter(ConfigModel.analysis != self.id) config = {} for match in q.order_by(ConfigModel.id.desc()): match_config = normalize_config(json.loads(match.json)) # Assimilate `include` fields from match for field in include: if field in match_config: config[field] = match_config[field] # Check if enough info in ìncluded config ok = [] if 'transform' in config and 'transform' in include: # 'transform' field should contain ROI if 'roi' in config['transform']: if config['transform']['roi'] is not {}: ok.append(True) include.remove('transform') if 'masks' in config and 'masks' in include: # 'masks' field should not be empty if len(config['masks']) > 0: ok.append(True) include.remove('masks') if len(ok) > 0 and all(ok): break return config else: return None
[docs] def get_config_json(self) -> Optional[str]: with self.session() as s: return s.query(ConfigModel.json).\ filter(ConfigModel.id == self.config).first()[0] # todo: why does it return a tuple of length 1?
def _fetch_latest_config(self) -> Optional[ConfigModel]: with self.session() as s: return s.query(ConfigModel). \ order_by(ConfigModel.added.desc()). \ first() # todo: check if ordering by datetime works properly def _added(self, context: str = None) -> datetime.datetime: if self._config is None: self._config = self._fetch_latest_config() if not hasattr(self, '_added_by_context'): self._added_by_context = {} with self.session() as s: if context not in self._added_by_context: if self._config is not None: assert isinstance(self._config.added, datetime.datetime) return self._config.added else: assert isinstance(self.added, datetime.datetime) return self.added else: return self._added_by_context[context] def _step_config(self, filter, order, context: str = None) -> Tuple[Optional[dict], Optional[int]]: with self.session() as s: q = list( s.query(ConfigModel).\ filter(ConfigModel.video == self.video).\ filter(ConfigModel.design == self.design).\ filter(filter).\ order_by(order) ) for match in q: assert isinstance(match, ConfigModel) assert isinstance(match.json, str) # todo: fail more gracefully if json is empty; skip & remove from database config = normalize_config(json.loads(match.json)) if context is None: self._config = match self._config.connect(self) s.add(self._config) return config, match.id else: assert self._analyzer is not None if context in config and config[context] != self._analyzer.get_config()[context]: self._config = None assert isinstance(match.added, datetime.datetime) self._added_by_context[context] = match.added return {context: config[context]}, match.id return None, None
[docs] def get_undo_config(self, context: str = None) -> Tuple[Optional[dict], Optional[int]]: """Undo configuration. If a ``context`` is supplied, ensure that the ``context`` field changes, but the other fields remain the same Parameters ---------- context : str Name of a ``VideoAnalyzerConfig`` field Raises ------ ValueError If ``context`` is not a ``VideoAnalyzer`` field """ if context is None or context in VideoAnalyzerConfig.__fields__: return self._step_config( ConfigModel.added < self._added(context), ConfigModel.added.desc(), context ) else: raise ValueError(f"Invalid undo context '{context}'")
[docs] def get_redo_config(self, context: str = None) -> Tuple[Optional[dict], Optional[int]]: """Redo configuration. If a ``context`` is supplied, ensure that the ``context`` field changes, but the other fields remain the same Parameters ---------- context : str Name of a ``VideoAnalyzerConfig`` field Raises ------ ValueError If ``context`` is not a ``VideoAnalyzer`` field """ if context is None or context in VideoAnalyzerConfig.__fields__: return self._step_config( ConfigModel.added > self._added(context), ConfigModel.added, context ) else: raise ValueError(f"Invalid redo context '{context}'")
[docs]class History(SessionWrapper, RootInstance): """Interface to the history database """ _eventstreamer: EventStreamer def __init__(self, path: Path = None): super().__init__() if path is None: path = Path(str(settings.db.path)) self._engine = create_engine(f'sqlite:///{str(path)}') try: Base.metadata.create_all(self._engine) except sqlite3.OperationalError as e: if "already exists" in str(e): pass else: log.error(f"could not create tables - {e.__class__.__name__}: {str(e)}") self._session_factory = scoped_session(sessionmaker(bind=self._engine)) def set_eventstreamer(self, eventstreamer: EventStreamer): self._eventstreamer = eventstreamer def notice(self, message: str, persist: bool = False): self._eventstreamer.event( 'notice', id='', data={'message': message, 'persist': persist} )
[docs] def add_video_file(self, path: str) -> VideoFileModel: """Add a video file to the database. Duplicate files are resolved to their original entry. Parameters ---------- path : str The path of the file to add Returns ------- VideoFileModel A database model of the file. Will reference the original entry if the user tried to add a previously used file again. """ file = VideoFileModel(path=path) file.connect(self) file.resolve() return file
[docs] def add_design_file(self, path: str) -> DesignFileModel: """Add a design file to the database. Duplicate files are resolved to their original entry. Parameters ---------- path : str The path of the file to add Returns ------- DesignFileModel A database model of the file. Will reference the original entry if the user tried to add a previously used file again. """ file = DesignFileModel(path=path) file.connect(self) file.resolve() return file
[docs] def add_analysis(self, analyzer: BaseAnalyzer, model: AnalysisModel = None) -> AnalysisModel: """Add a new analysis to the database. Parameters ---------- analyzer : BaseAnalyzer The analyzer object to add to the database model : AnalysisModel Optionally, an existing model can be specified. Defaults to ``None`` Returns ------- AnalysisModel If a ``model`` is provided, no new :class:`~AnalysisModel` will be created and the ``analyzer`` will be linked to the existing ``model`` instead. """ if model is None: with self.session() as s: model = AnalysisModel() s.add(model) model.connect(self) model._set_analyzer(analyzer) analyzer.set_model(model) return model
[docs] def fetch_analysis(self, id: int) -> Optional[AnalysisModel]: """Fetch an analysis model from the database. Parameters ---------- id : int Database id of the analysis to fetch """ with self.session() as s: return s.query(AnalysisModel).filter(AnalysisModel.id == id).\ first()
# @history.expose(history.get_recent_paths)
[docs] @api.db.get_recent_paths.expose() def get_paths(self) -> Dict[str, List[str]]: """Fetch the latest video and design file paths from the database. :attr:`shapeflow.api._DatabaseDispatcher.get_recent_paths` Number of paths is limited by ``settings.app.recent_files`` """ with self.session() as s: return { 'video_path': [r[0] for r in s.query(VideoFileModel.path).\ order_by(VideoFileModel.used.desc()).\ limit(settings.app.recent_files).all()], 'design_path': [r[0] for r in s.query(DesignFileModel.path). \ order_by(DesignFileModel.used.desc()). \ limit(settings.app.recent_files).all()] }
# @history.expose(history.get_result_list)
[docs] @api.db.get_result_list.expose() def get_result_list(self, analysis: int) -> dict: """Fetch the result list for a given analysis :attr:`shapeflow.api._DatabaseDispatcher.get_result_list` Parameters ---------- analysis : int Database id of the analysis Returns ------- dict A ``dict`` mapping run id ``int`` to result id ``int`` """ with self.session() as s: runs = s.query(AnalysisModel).\ filter(AnalysisModel.id == analysis).first().runs return { run: s.query(ResultModel).\ filter(ResultModel.analysis == analysis).\ filter(ResultModel.run == run).first().finished for run in range(1, runs+1) }
# @history.expose(history.get_result)
[docs] @api.db.get_result.expose() def get_result(self, analysis: int, run: int) -> dict: """Fetch the result for a given analysis and run :attr:`shapeflow.api._DatabaseDispatcher.get_result` Parameters ---------- analysis : int Database id of the analysis run : int Run number of the result to fetch Returns ------- dict Analysis results, as a ``dict`` formatted ~ ``pandas.DataFrame.to_json(orient='split')`` """ with self.session() as s: return { r.feature: json.loads(r.data) for r in s.query(ResultModel).\ filter(ResultModel.analysis == analysis).\ filter(ResultModel.run == run) }
# @history.expose(history.export_result)
[docs] @api.db.export_result.expose() def export_result(self, analysis: int, run: int = None) -> bool: """Export the result for a given analysis and run :attr:`shapeflow.api._DatabaseDispatcher.export_result` Parameters ---------- analysis : int Database id of the analysis run : int Run number of the result to fetch Returns ------- bool ``True`` if exported, ``False`` if something went wrong. """ with self.session() as s: try: a = s.query(AnalysisModel).filter_by( id=analysis ).first() a.connect(self) a.export_result(run=run, manual=True) return True except Exception as e: log.error(f"{e.__class__.__name__}: {e}") self.notice( f"could not export analysis '{analysis}' run '{run}'" ) return False
[docs] def check(self) -> bool: """Check the database's integrity (somewhat). Makes sure that the required tables exist and that their columns match. Returns ------- bool ``True`` if everything's fine, ``False`` if the database is messed up """ log.debug('checking history') ok = [] models = [ VideoFileModel, DesignFileModel, ConfigModel, ResultModel, AnalysisModel ] with self.session() as s: db = s.bind.connect() cursor = db.connection.cursor() cursor.execute( "SELECT name FROM sqlite_master WHERE type='table';") tables = [name[0] for name in cursor.fetchall()] for model in models: table = model.__tablename__ # type: ignore if table in tables: ok.append(True) else: log.warning(f"table '{table}' is missing from the database.") ok.append(False) cursor.execute(f"PRAGMA table_info({table})") columns = {column[1]: column[2] for column in cursor.fetchall()} model_columns = { column.name: str(column.type) for column in model.__table__.columns # type: ignore } if columns == model_columns: ok.append(True) else: log.warning(f"table '{table}' columns don't match.") ok.append(False) db.close() return all(ok)
# @history.expose(history.clean)
[docs] @api.db.clean.expose() def clean(self) -> None: """Clean the database. :attr:`shapeflow.api._DatabaseDispatcher.clean` * remove 'video_file & 'design_file' entries with <null> path * resolve entries with ``<null>`` hash * remove 'analysis' entries with ``<null>`` config * remove 'config' entries with ``<null>`` json * for 'analysis' entries older than ``settings.db.cleanup_interval`` * remove all non-primary 'config' entries * remove all non-primary 'results' entries """ log.debug(f"cleaning history") threshold = datetime.datetime.now() - datetime.timedelta( days=settings.db.cleanup_interval ) with self.session() as s: s.query(VideoFileModel).filter_by(path=None).delete() s.query(DesignFileModel).filter_by(path=None).delete() unhashed = list(s.query(VideoFileModel).filter_by(hash=None)) \ + list(s.query(DesignFileModel).filter_by(hash=None)) for f in unhashed: f._queue_hash(f.path) s.query(ConfigModel).filter_by(json=None).delete() s.query(AnalysisModel).filter_by(config=None).delete() for old in s.query(AnalysisModel).\ filter(AnalysisModel.modified < threshold): s.query(ConfigModel). \ filter(ConfigModel.analysis == old.id). \ filter(ConfigModel.id != old.config).delete() s.query(ResultModel). \ filter(ResultModel.analysis == old.id). \ filter(ResultModel.id != old.results).delete() for f in unhashed: f.connect(self) f.resolve()
# @history.expose(history.forget)
[docs] @api.db.forget.expose() def forget(self) -> None: """Remove everything. :attr:`shapeflow.api._DatabaseDispatcher.forget` """ log.info(f"clearing history") models = [ AnalysisModel, VideoFileModel, DesignFileModel, ConfigModel, ResultModel ] with self.session() as s: for model in models: s.query(model).delete()