"""
Protoc compiler wrapper CLI (using `fire`_)
.. _fire:
https://python-fire.readthedocs.io/en/latest/
"""
from __future__ import annotations
import distutils.log
import distutils.spawn
import functools
import os
import pathlib
import re
import subprocess
import warnings
from typing import List, Optional, Dict
import itertools
import sys
from . import find_proto_files
from .options import CompileOption
package_regex = r'(\w+)((?:\.\w+)*)'
check_packages = functools.partial(re.match, package_regex)
_DEFAULT = object()
[docs]class Compiler:
"""
Wrapper around protobuf compiler.
See ``compile-proto OPTIONS`` for all possible compilation options.
See ``compile-proto`` (no args) and ``compile-proto --help`` for more help
"""
OPTIONS = [o.formatted_argument for o in CompileOption if o.formatted_argument]
"""
Supported compile options, multiple options may be specified.
"""
def __init__(self, protoc=distutils.spawn.find_executable('protoc')):
self.protoc = protoc or self._find_protoc()
def _find_protoc(self) -> Optional[str]:
"""
Searches for a protoc executable respecting the PROTOC
environment variable
Returns:
path to protoc executable or None
"""
protoc = os.environ.get('PROTOC', distutils.spawn.find_executable('protoc'))
if not protoc:
warnings.warn(f"protoc is not found in $PATH."
" Please install it or set the PROTOC environment variable")
return protoc
[docs] def call(self, *proto_files,
includes: str = None,
protohelp=False,
dry_run=False,
**options):
"""
Just a wrapper around the `protoc` compiler.
See ``compile-proto call --help`` to print help of protoc command
See ``compile-proto call -- --help`` for additonal help with the call command
See ``compile-proto protoc`` for the path of the used protoc compiler executable
Args:
includes: Directories to use as includes
protohelp: Use this flag to pass ``--help`` to protoc invocation
options: Will be passed as flags to the protoc command (only ``--flag`` syntax supported, no single dash)
dry_run: If True, don't do anything except printing the command
"""
if 'help' in options:
print("To pass `--help` to protoc, use flag `--protohelp`. "
"Make sure you use only options with --{option} syntax when calling the protoc compiler "
"from `compile-proto call`. i.e. options with single dashes are not supported."
" If you want to get more help about the `call` command, use `compile-proto -- --help`")
return
if protohelp:
options['help'] = protohelp
protoc_command: List[str] = [os.fspath(self.protoc)] # protoc executable
protoc_command += [f"-I{os.fspath(include)}" for include in includes or ()] # includes
protoc_command += [f'--{k}={v}' for k, v in options.items()] # protoc arguments
protoc_command += [os.fspath(f) for f in proto_files] # .proto files
result = 0
distutils.log.debug(" ".join(str(c) for c in protoc_command))
if not dry_run:
print(' '.join(protoc_command))
result = subprocess.call(protoc_command)
if result != 0:
sys.exit(result)
[docs] def compile(self, *protoc_files,
options=_DEFAULT,
quiet=False,
plugin_params: str = '',
output=os.getcwd(),
**kwargs) -> Optional[str]:
"""
Compile for given options, see ``compile-proto compile -- --help``
Args:
quiet: Don't print output from protoc plugin if possible
output: output directory (default: working directory)
options: one or multiple options see `compile-proto OPTIONS` default: [py]
protoc_files: files to compile, passed through to protoc
plugin_params: mapping of additional parameters for the protoc plugin (not the compiler itself)
kwargs: Passed to protoc invocation, see compile-proto call -- --help.
Returns:
the output path
"""
params = (plugin_params,) if plugin_params else ()
if options is _DEFAULT:
options = ['py']
if not options:
warnings.warn("No options specified, no compilation will take place.")
return
for option in CompileOption.from_string_list(options).disjunct:
params += ('quiet',) if quiet else ()
protoc_args = {
f'{option.protoc_plugin_name}_out': option.format_out(output, *params)
}
distutils.log.info(f"Compiling with {protoc_args}")
self.call(*protoc_files, **kwargs, **protoc_args)
[docs]class Rewriter:
"""
Rewrite proto files to force python package structure
which mirrors protobuf package declarations.
"""
_IMPORT = re.compile(r'^import "((?:\w+/)*)(\w+\.proto)";$', flags=re.MULTILINE)
_PACKAGE = re.compile(r'^package {pkg};$'.format(pkg=package_regex), flags=re.MULTILINE)
def __init__(self,
root_package: str,
output_root: os.PathLike = './'):
self._contents: Optional[Dict[pathlib.Path, str]] = None
self._roots: Optional[Dict[pathlib.Path, pathlib.Path]] = None
self.root_package = root_package
self.output_root = output_root
[docs] def read(self, *dirs: os.PathLike):
"""
Reads .proto files from directories, manipulate them with the other commands afterwards.
"""
dirs = [pathlib.Path(s) for s in dirs]
found = {s: find_proto_files(s) for s in dirs if s.is_dir()}
no_proto_dirs = [s for s in dirs if not s in found or not found[s]]
assert not no_proto_dirs, f"Check path[s] {no_proto_dirs}: Not a directory or does not contain a .proto file."
# invert dictionary to lookup parents
self._roots = {path: root for root, paths in found.items() for path in paths}
self._contents = {path: path.read_text(encoding='utf-8') for path in self._roots}
return self
@property
def root_package(self):
"""
root package for generated directory structure
Returns:
str: root package which is enforced
"""
return self._root_package
@root_package.setter
def root_package(self, value):
if value is None:
return
valid = check_packages(value)
assert valid, f"{value} not a valid package name, please use only .-separated package names"
self._root_package = value
@property
def output_root(self):
"""
Root directory of new directory structure.
"""
return self._out_root
@output_root.setter
def output_root(self, value):
self._out_root = pathlib.Path(value)
def _get_package(self, path):
relative_path = path.relative_to(self._roots[path])
package = self._PACKAGE.search(self._contents[path])
return self._fix_package(package) if package else '.'.join(relative_path.parts)
def _fix_package(self, match):
# replace first part of package with self._package
# applying fix_package multiple times is ok since it doesn't change the package
root_pkg, sub_pkgs = match.groups()
sub_pkgs = sub_pkgs[1:].split('.') # skip first char, since it's a dot anyways
pkg_iterator = itertools.dropwhile(lambda p: p in self._root_package, itertools.chain([root_pkg], sub_pkgs))
return '.'.join(itertools.chain([self._root_package], pkg_iterator))
@property
def calculated_packages(self):
"""
Returns:
dict: Dictionary mapping file contents to package names
"""
return {p: self._get_package(p) for p in self._contents or ()}
def _fix_import(self, root: pathlib.Path, match):
path, file = match.groups()
import_package = self.calculated_packages.get(root / path / file)
return '/'.join(itertools.chain(import_package.split('.'), [file])) if import_package else None
def _fix_import_declaration(self, root, match):
return f'import "{self._fix_import(root, match)}";'
[docs] def fix_imports(self):
"""
Modify internal file content representation to match internal file locations, so that imports are importing
the right files.
Returns:
Rewriter: Reference to self to chain commands with `fire`_
"""
process_imports = {p: {statement: self._fix_import(self._roots[p], statement)}
for p, content in self._contents.items()
for statement in self._IMPORT.finditer(content)}
failed = {p: imports for p, imports in process_imports.items() if any(v is None for v in imports.values())}
if any(failed):
warnings.warn('\n'.join("Can't resolve imports {imports} from file {path}".format(
path=path, imports=[k.group(0) for k, v in imports.items() if v is None]
) for path, imports in failed.items()) +
" Make sure the respective files are also included for rewriting")
return self
self._contents = {f: self._IMPORT.sub(functools.partial(self._fix_import_declaration, self._roots[f]), content)
for f, content in self._contents.items()}
return self
[docs] def fix_packages(self):
"""
Modify internal content representation such that ``package`` declarations in ``.proto`` source files match the
internal directory structure
Returns:
Rewriter: Reference to self to chain commands with `fire`_
"""
def make_regex(declared_package):
root_package = declared_package[1]
sub_packages = declared_package[2].replace('.', r'\.') if declared_package[2] else ""
regex_str = r"(?:{}\.)?({})({})".format(self._root_package, root_package, sub_packages)
return re.compile(regex_str)
unique_package_regexes = set(map(
make_regex,
itertools.chain.from_iterable(
self._PACKAGE.finditer(content) for content in self._contents.values()
)
))
for regex in unique_package_regexes:
self._contents = {
f: regex.sub(self._fix_package, content) for f, content in self._contents.items()
}
return self
[docs] def content(self, filename: os.PathLike) -> str:
"""
View the internal file representations
Args:
filename: Input file path
Returns:
str: contents of internal file representation
"""
return self._contents.get(pathlib.Path(filename), f"File for filename {filename} not found. Available"
f" filenames: {', '.join(map(str, self._contents))}")
[docs] def write(self, dry_run=True) -> List[str]:
"""
Write internal content representation to :attr:`.output_root` according to
internal :attr:`package mapping <.calculated_packages>`
Args:
dry_run (bool): if True don't actually write outputs.
Returns:
Rewriter: Reference to self to chain commands with `fire`_
"""
written = []
for file, content in self._contents.items():
out_dir = self.output_root / '/'.join(self._get_package(file).split('.'))
out_dir.mkdir(parents=True, exist_ok=True)
with open(out_dir / file.name, 'w') as output:
if dry_run:
print(f"Fake writing {file.name} in {out_dir} since dry_run is {dry_run}.")
continue
output.write(content)
written += [os.fspath(out_dir / file.name)]
return written
def check_fire():
try:
import fire
except ImportError:
distutils.log.error("Can't use CLI for compiler if python-fire is not installed!"
"Did you install the package with [cli]?")
else:
return fire
[docs]def compile_proto():
"""
Entry point for CLI.
`fire`_ :class:`Compiler`
"""
fire = check_fire()
if fire:
fire.Fire(Compiler)
[docs]def rewrite_proto():
"""
Entry point for CLI
`fire`_ :class:`Rewriter`
"""
fire = check_fire()
if fire:
fire.Fire(Rewriter)