Source code for shapeflow.cli

"""Tiny commands to be called from sf.py

Calling from the commandline::

   python sf.py --do <command name> <arguments>
"""

import os
import sys
import time
import glob
import socket
import json
import abc
from pathlib import Path
import argparse
import shutil
from functools import lru_cache
from typing import List, Callable, Optional, Tuple
from urllib.request import urlretrieve
from zipfile import ZipFile

from distutils.util import strtobool
import git
import requests

from shapeflow import __version__, get_logger, settings
from shapeflow.util import before_version, after_version, suppress_stdout


log = get_logger(__name__)

# type aliases
OptArgs = Optional[List[str]]
Parsing = Callable[[OptArgs], None]


[docs]class CliError(Exception): pass
[docs]class IterCommand(abc.ABCMeta): """Command iterator metaclass. Iterates over its subclasses, skipping any without a ``__command__``. If any of these should remain abstract, they shouldn't define one. """ __command__: str """Command name. This is how the command is addressed from the commandline. """ # todo: nope, doesn't work' def __str__(cls): try: return cls.__command__ except AttributeError: return super().__str__() @property def sub(cls): """Returns True if this class is a subcommand """ return hasattr(cls, '__command__') def __iter__(cls): return iter([c for c in cls.__subclasses__() if c.sub]) @property def dict(cls) -> dict: """Get a ``dict`` mapping all defined command names to their respective class """ return {str(sub):sub for sub in cls}
[docs] def __getitem__(cls, item: str) -> 'IterCommand': """Get a subcommand by its __command__ """ if item in cls.dict.keys(): return cls.dict[item] else: return getattr(cls, item)
[docs] @abc.abstractmethod def __usage__(cls) -> str: """Usage string"""
[docs] @abc.abstractmethod def __help__(cls) -> str: """Help string"""
[docs]class Command(abc.ABC, metaclass=IterCommand): """Abstract command. * handles argument parsing & execution * subclasses can implement their functionality in :meth:`Command.command()` """ parser: argparse.ArgumentParser args: argparse.Namespace sub_args: List[str] def __init__(self, args: OptArgs = None): if args is None: # gather commandline arguments args = sys.argv[1:] try: self.args, self.sub_args = self._parse(args) self.command() except argparse.ArgumentError as e: raise CliError( f'{self.__class__}: could not parse arguments' ) except TypeError as e: raise CliError( f'{self.__class__}: type error' ) @abc.abstractmethod def command(self) -> None: """Implement the command here """
[docs] @classmethod def __help__(cls) -> str: """Cleaned-up help string """ return cls._fix_call(cls.parser.format_help())
[docs] @classmethod def __usage__(cls) -> str: """Cleaned-up usage string """ usage = cls.parser.format_usage()[7:].strip() return cls._fix_call(usage)
@classmethod def _parse(cls, args: OptArgs) -> Tuple[argparse.Namespace, List[str]]: return cls.parser.parse_known_args(args) @classmethod def _fix_call(cls, text: str) -> str: """Fix text by appending __command__ to the program name """ if cls.sub: call = ' '.join([cls.parser.prog, str(cls)]) return text.replace(cls.parser.prog, call) else: return text
[docs]class Sf(Command): """Commandline entry point. This is the command that gets called first and calls any subcommands if requested. """ parser = argparse.ArgumentParser( description=f"""https://github.com/ybnd/shapeflow v{__version__}""", add_help=False # we're defining a custom --help that should override # the built-in one ) parser.add_argument( '-h', '--help', action='store_true', help="show this help message; " "for help with a specific command, call --help [command]" ) parser.add_argument( '--version', action='store_true', help="show the version" ) def __init__(self, args: OptArgs = None): # note: if the command argument is added as a class attribute, # Command subclasses will be left out of the choices if they # are defined _after_ this class. self.parser.add_argument( 'command', default=None, nargs="?", choices=Command.dict, metavar='command', help="execute one of the commands listed below, default: serve" ) super().__init__(args) def command(self) -> None: if self.args.help: if self.args.command is not None: # print the help string of the requested command print(Command[self.args.command].__help__()) else: # print own help string print(self.__help__()) # print usage of commands print("commands:") for c in Command: print(" " + c.__usage__()) print() elif self.args.version: print(__version__) else: if self.args.command is None: # default command Command['serve'](self.sub_args) else: # dispatch arguments to command Command[self.args.command](self.sub_args)
[docs]class Serve(Command): """Starts the ``shapeflow`` server. """ __command__ = 'serve' parser = argparse.ArgumentParser( description=__doc__ ) HOST = '127.0.0.1' PORT = 7951 parser.add_argument( '--host', type=str, default=HOST, help=f"the host to serve from (default: {HOST})" ) parser.add_argument( '--port', type=int, default=PORT, help=f"the port to serve from (default: {PORT}" ) parser.add_argument( '--background', action='store_true', help="don't open a browser window" ) def command(self): self._replace() from shapeflow.server import ShapeflowServer server = ShapeflowServer() server.serve( host=self.args.host, port=self.args.port, open=(not self.args.background) ) log.info('exit') def _in_use(self) -> bool: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: return s.connect_ex((self.args.host, self.args.port)) == 0 # todo: look for actual return code def _replace(self): if self._in_use(): log.info('address already in use') requests.post(f"http://{self.args.host}:{self.args.port}/api/quit") while self._in_use(): time.sleep(0.1) log.info('previous server instance quit')
[docs]class Dump(Command): """Dump application schemas and settings to JSON """ __command__ = 'dump' parser = argparse.ArgumentParser( description=__doc__ ) parser.add_argument( '--pretty', action='store_true', help='indent JSON' ) parser.add_argument( '--dir', type=Path, default=Path.cwd(), help='directory to dump to' ) def command(self): from shapeflow.config import schemas if not self.args.dir.is_dir(): log.warning(f"making directory '{self.args.dir}'") self.args.dir.mkdir() self._write('schemas', schemas()) self._write('settings', settings.to_dict()) def _write(self, file, d): with open(self.args.dir / (file + '.json'), 'w+') as f: f.write(json.dumps(d, indent=2 if self.args.pretty else None))
[docs]class GitMixin(abc.ABC): """Metaclass for commands interacting with the git repository """ URL = 'https://github.com/ybnd/shapeflow/' _repo = None _latest = None @property def repo(self) -> git.Repo: if self._repo is None: self._repo = git.Repo() self._repo.remote().fetch() return self._repo @property def latest(self) -> str: if self._latest is None: response = requests.head(self.URL + 'releases/latest') self._latest = response.headers['location'].split('/')[-1] return self._latest
[docs] @lru_cache() def is_up_to_date(self, tag) -> bool: if self.is_at_release(tag): return not before_version(tag, self.latest) else: return self.repo.head.commit.hexsha == self.repo.remote().repo.head.commit.hexsha # todo: can also be _after_ though
@property def tag(self) -> str: try: return self.repo.git.describe('--exact-match', '--tag') except git.GitCommandError: return ''
[docs] @lru_cache() def is_at_release(self, tag: str) -> bool: return requests.head(self.URL + 'releases/tag/' + tag).status_code == 200
@property def ui_url(self) -> str: return self.URL + f'releases/download/{self.tag}/dist-{self.tag}.tar.gz' def _is_a_ref(self, ref: str) -> bool: try: return len(self.repo.git.rev_list('-n', '1', ref)) > 0 except git.GitCommandError: return False def _is_after_0_4_4(self, ref: str) -> bool: try: return before_version('0.4.4', ref) except ValueError: # ~ https://stackoverflow.com/a/3006050/12259362 branch = git.Head(self.repo, 'refs/heads/' + ref).commit.hexsha t0_4_4 = git.TagReference(self.repo, 'refs/heads/0.4.4').commit.hexsha return branch == t0_4_4 or len( self.repo.git.rev_list('--boundary', f'{t0_4_4}..{branch}') ) > 0 def _get_compiled_ui(self) -> None: if self.is_at_release(self.tag): if after_version(self.tag, '0.3'): try: from urllib.request import urlopen from tarfile import open open( fileobj=urlopen(self.ui_url), mode='r|gz' ).extractall(path='ui/') print('Downloaded compiled UI.') except: raise CliError( f'No compiled UI for "{self.tag}" at {self.ui_url}.\n' f'This is probably an issue with this release, ' f'please inform the maintaner.' ) else: raise CliError( 'No compiled UI can be downloaded because the repository ' 'is currently not at a release version.\n' 'Please check out a release or compile the UI yourself with' '\n\n\tcd ui && npm run build\n' ) def _handle_local_changes(self, force: bool) -> bool: if self.repo.is_dirty(): if force or self._prompt_discard_changes(): self.repo.head.reset(working_tree=True) else: return False return True def _prompt_discard_changes(self) -> bool: changed = "\n\t".join( [item.a_path for item in self.repo.index.diff(None)] \ + self.repo.untracked_files ) return bool(strtobool(input( f'Local changes to\n\n {changed} \n\n' f'will be overwritten. Continue? (y/n) ' )))
[docs]class Update(Command, GitMixin): """Update the application """ __command__ = 'update' parser = argparse.ArgumentParser( description=__doc__ ) parser.add_argument( '--discard-changes', action='store_true', help='discard local changes without prompting' ) def command(self): if not self.is_up_to_date(self.tag): if self._handle_local_changes(self.args.discard_changes): self._update() else: print('Already up to date.') def _update(self) -> None: if self.repo.head.is_detached: # repo is at a tag, get the latest release self.repo.git.checkout(self.latest) self._get_compiled_ui() else: # repo is at a branch, pull the branch if there are new commits result = self.repo.git.pull() print(result) print( 'Note: the repository is at a branch. Please recompile the UI ' '\n\n\t cd ui && npm run build \n\n' )
[docs]class Checkout(Command, GitMixin): """Check out a specific version of the application. Please not you will not have access to this command if you check out a version before 0.4.4 """ __command__ = 'checkout' parser = argparse.ArgumentParser( description=__doc__ ) parser.add_argument( 'ref', type=str, help="the tag or branch to check out" ) parser.add_argument( '--discard-changes', action='store_true', help='discard local changes without prompting' ) def command(self) -> None: if not self._is_a_ref(self.args.ref): raise CliError(f'Not a valid reference: "{self.args.ref}"') if self._is_after_0_4_4(self.args.ref) or self._checkout_anyway(): if self._handle_local_changes(self.args.discard_changes): self.repo.git.checkout(self.args.ref) print(f'Checked out "{self.args.ref}"') self._get_compiled_ui() def _checkout_anyway(self) -> bool: return bool(strtobool(input( f'After checking out "{self.args.ref}" you won\'t be able to use ' f'the "update" or "checkout" commands (they were added later, ' f'in v0.4.4) Continue? (y/n) ' )))
[docs]class GetCompiledUi(Command, GitMixin): """Get the compiled UI for the current version """ __command__ = 'get-compiled-ui' parser = argparse.ArgumentParser( description=__doc__ ) parser.add_argument( '--replace', action='store_true', help='replace the current UI without prompting' ) def command(self) -> None: ui_dir = Path('ui/dist/') exists = ui_dir.exists() if exists: if not any(ui_dir.iterdir()): ui_dir.rmdir() elif self.args.replace or self._prompt_replace_ui(): shutil.rmtree(ui_dir) else: return None self._get_compiled_ui() def _prompt_replace_ui(self) -> bool: return bool(strtobool(input('Replace the current UI? (y/n) ')))
[docs]class SetupCairo(Command): """Set up cairo DLLs (Windows) ~ https://github.com/preshing/cairo-windows/ """ __command__ = 'setup-cairo' parser = argparse.ArgumentParser( description=__doc__ ) URL = 'https://github.com/preshing/cairo-windows/releases/download/with-tee/cairo-windows-1.17.2.zip' def command(self) -> None: if self._on_windows(): self._setup() else: print(f'Not on Windows.') return None def _cleanup(self) -> None: """Remove cairo files from the current Python environment. """ for file in self.env.glob('cairo.*'): file.unlink() def _setup(self) -> None: """Install cairo into the current Python environment. This DLL has been known to behave weirdly with regards to Windows/Python combinations of different 'bitness', so the strategy here is to try both versions and keep whichever works. """ if not self._cairo_works(): self._get_cairo_dlls() try: # try with x64 cairo.dll self._cleanup() for file in Path().glob('cairo*/lib/x64/cairo.*'): file.replace(self.env / file.name) if self._cairo_works(): print('Installed x64 cairo DLL') return None # try with x86 cairo.dll self._cleanup() for file in Path().glob('cairo*/lib/x86/cairo.*'): file.replace(self.env / file.name) if self._cairo_works(): print('Installed x86 cairo DLL') return None else: raise EnvironmentError('Could not install cairo DLL') finally: for f in Path().glob('cairo*'): shutil.rmtree(f) else: print('It seems that cairo is already working properly.') @classmethod def _get_cairo_dlls(cls) -> None: urlretrieve(cls.URL, 'cairo.zip') with ZipFile('cairo.zip', 'r') as z: z.extractall() Path('cairo.zip').unlink() @staticmethod def _cairo_works() -> bool: try: with suppress_stdout(): import cairosvg return True except OSError: return False @staticmethod def _on_windows() -> bool: return os.name == 'nt' @property def env(self) -> Path: return Path(sys.executable).parent
[docs]class Declutter(Command): """Hide clutter files from the repository """ CLUTTER = ['mypy.ini', 'tox.ini', '__pycache__'] __command__ = 'declutter' parser = argparse.ArgumentParser( description=__doc__ ) parser.add_argument( '--undo', action='store_true', help='unhide clutter' ) def command(self) -> None: if not self.args.undo: self._declutter() else: self._reclutter() def _declutter(self) -> None: if os.name == 'nt': # Pre-emptively create __pycache__ so we can hide it now. if not os.path.isdir('__pycache__'): os.mkdir('__pycache__') for file in self.CLUTTER + glob.glob('.*'): if Path(file).exists(): os.system(f'attrib +h "{file}"') elif os.name == 'darwin': for file in self.CLUTTER: if Path(file).exists(): os.system(f'chflags hidden "{file}"') else: with open('.hidden', 'w+') as f: f.write('\n'.join(self.CLUTTER)) def _reclutter(self) -> None: if os.name == 'nt': for file in self.CLUTTER + glob.glob('.*'): if Path(file).exists(): os.system(f'attrib -h "{file}"') elif os.name == 'darwin': for file in self.CLUTTER: if Path(file).exists(): os.system(f'chflags nohidden "{file}"') else: Path('.hidden').unlink()