Source code for semantic_release.helpers

from __future__ import annotations

import importlib.util
import logging
import os
import re
import string
import sys
from functools import lru_cache, reduce, wraps
from pathlib import Path, PurePosixPath
from re import IGNORECASE, compile as regexp
from typing import TYPE_CHECKING, Any, Callable, NamedTuple, TypeVar
from urllib.parse import urlsplit

if TYPE_CHECKING:  # pragma: no cover
    from re import Pattern
    from typing import Iterable


log = logging.getLogger(__name__)

number_pattern = regexp(r"(?P<prefix>\S*?)(?P<number>\d[\d,]*)\b")
hex_number_pattern = regexp(
    r"(?P<prefix>\S*?)(?:0x)?(?P<number>[0-9a-f]+)\b", IGNORECASE
)


[docs] def get_number_from_str( string: str, default: int = -1, interpret_hex: bool = False ) -> int: if interpret_hex and (match := hex_number_pattern.search(string)): return abs(int(match.group("number"), 16)) if match := number_pattern.search(string): return int(match.group("number")) return default
[docs] def sort_numerically( iterable: Iterable[str], reverse: bool = False, allow_hex: bool = False ) -> list[str]: # Alphabetically sort prefixes first, then sort by number alphabetized_list = sorted(iterable) # Extract prefixes in order to group items by prefix unmatched_items = [] prefixes: dict[str, list[str]] = {} for item in alphabetized_list: if not ( pattern_match := ( (hex_number_pattern.search(item) if allow_hex else None) or number_pattern.search(item) ) ): unmatched_items.append(item) continue prefix = prefix if (prefix := pattern_match.group("prefix")) else "" if prefix not in prefixes: prefixes[prefix] = [] prefixes[prefix].append(item) # Sort prefixes and items by number mixing in unmatched items as alphabetized with other prefixes return reduce( lambda acc, next_item: acc + next_item, [ ( sorted( prefixes[prefix], key=lambda x: get_number_from_str( x, default=-1, interpret_hex=allow_hex ), reverse=reverse, ) if prefix in prefixes else [prefix] ) for prefix in sorted([*prefixes.keys(), *unmatched_items]) ], [], )
[docs] def text_reducer(text: str, filter_pair: tuple[Pattern[str], str]) -> str: """Reduce function to apply mulitple filters to a string""" if not text: # abort if the paragraph is empty return text filter_pattern, replacement = filter_pair return filter_pattern.sub(replacement, text)
[docs] def format_arg(value: Any) -> str: """Helper to format an argument an argument for logging""" if type(value) == str: return f"'{value.strip()}'" return str(value)
[docs] def check_tag_format(tag_format: str) -> None: if "version" not in (f[1] for f in string.Formatter().parse(tag_format)): raise ValueError( f"Invalid tag_format {tag_format!r}, must use 'version' as a format key" )
_R = TypeVar("_R") _FuncType = Callable[..., _R]
[docs] def logged_function(logger: logging.Logger) -> Callable[[_FuncType[_R]], _FuncType[_R]]: """ Decorator which adds debug logging of a function's input arguments and return value. The input arguments are logged before the function is called, and the return value is logged once it has completed. :param logger: Logger to send output to. """ def _logged_function(func: _FuncType[_R]) -> _FuncType[_R]: @wraps(func) def _wrapper(*args: Any, **kwargs: Any) -> _R: logger.debug( "%s(%s, %s)", func.__name__, ", ".join([format_arg(x) for x in args]), ", ".join([f"{k}={format_arg(v)}" for k, v in kwargs.items()]), ) # Call function result = func(*args, **kwargs) # Log result logger.debug("%s -> %s", func.__qualname__, str(result)) return result return _wrapper return _logged_function
[docs] @logged_function(log) def dynamic_import(import_path: str) -> Any: """ Dynamically import an object from a conventionally formatted "module:attribute" string """ module_name, attr = import_path.split(":", maxsplit=1) # Check if the module is a file path, if it can be resolved and exists on disk then import as a file module_filepath = Path(module_name).resolve() if module_filepath.exists(): module_path = ( module_filepath.stem if Path(module_name).is_absolute() else str(Path(module_name).with_suffix("")).replace(os.sep, ".").lstrip(".") ) if module_path not in sys.modules: log.debug("Loading '%s' from file '%s'", module_path, module_filepath) spec = importlib.util.spec_from_file_location( module_path, str(module_filepath) ) if spec is None: raise ImportError(f"Could not import {module_filepath}") module = importlib.util.module_from_spec(spec) # type: ignore[arg-type] sys.modules.update({spec.name: module}) spec.loader.exec_module(module) # type: ignore[union-attr] return getattr(sys.modules[module_path], attr) # Otherwise, import as a module try: log.debug("Importing module '%s'", module_name) module = importlib.import_module(module_name) log.debug("Loading '%s' from module '%s'", attr, module_name) return getattr(module, attr) except TypeError as err: raise ImportError( str.join( "\n", [ str(err.args[0]), "Verify the import format matches 'module:attribute' or 'path/to/module:attribute'", ], ) ) from err
[docs] class ParsedGitUrl(NamedTuple): """Container for the elements parsed from a git URL""" scheme: str netloc: str namespace: str repo_name: str
[docs] @lru_cache(maxsize=512) def parse_git_url(url: str) -> ParsedGitUrl: """ Attempt to parse a string as a git url http[s]://, git://, file://, or ssh format, into a ParsedGitUrl. supported examples: http://git.mycompany.com/username/myproject.git https://github.com/username/myproject.git https://gitlab.com/group/subgroup/myproject.git https://git.mycompany.com:4443/username/myproject.git git://host.xz/path/to/repo.git/ git://host.xz:9418/path/to/repo.git/ git@github.com:username/myproject.git <-- assumes ssh:// ssh://git@github.com:3759/myproject.git <-- non-standard, but assume user 3759 ssh://git@github.com:username/myproject.git ssh://git@bitbucket.org:7999/username/myproject.git git+ssh://git@github.com:username/myproject.git /Users/username/dev/remote/myproject.git <-- Posix File paths file:///Users/username/dev/remote/myproject.git C:/Users/username/dev/remote/myproject.git <-- Windows File paths file:///C:/Users/username/dev/remote/myproject.git REFERENCE: https://stackoverflow.com/questions/31801271/what-are-the-supported-git-url-formats Raises ValueError if the url can't be parsed. """ log.debug("Parsing git url %r", url) # Normalizers are a list of tuples of (pattern, replacement) normalizers = [ # normalize implicit ssh urls to explicit ssh:// (r"^([\w._-]+@)", r"ssh://\1"), # normalize git+ssh:// urls to ssh:// (r"^git\+ssh://", "ssh://"), # normalize an scp like syntax to URL compatible syntax # excluding port definitions (:#####) & including numeric usernames (r"(ssh://(?:[\w._-]+@)?[\w.-]+):(?!\d{1,5}/\w+/)(.*)$", r"\1/\2"), # normalize implicit file (windows || posix) urls to explicit file:// urls (r"^([C-Z]:/)|^/(\w)", r"file:///\1\2"), ] for pattern, replacement in normalizers: url = re.compile(pattern).sub(replacement, url) # run the url through urlsplit to separate out the parts urllib_split = urlsplit(url) # Fail if url scheme not found if not urllib_split.scheme: raise ValueError(f"Cannot parse {url!r}") # We have been able to parse the url with urlsplit, # so it's a (file|git|ssh|https?)://... structure # but we aren't validating the protocol scheme as its not our business # use PosixPath to normalize the path & then separate out the namespace & repo_name namespace, _, name = ( str(PurePosixPath(urllib_split.path)).lstrip("/").rpartition("/") ) # strip out the .git at the end of the repo_name if present name = name[:-4] if name.endswith(".git") else name # check that we have all the required parts of the url required_parts = [ urllib_split.scheme, # Allow empty net location for file:// urls True if urllib_split.scheme == "file" else urllib_split.netloc, namespace, name, ] if not all(required_parts): raise ValueError(f"Bad url: {url!r}") return ParsedGitUrl( scheme=urllib_split.scheme, netloc=urllib_split.netloc, namespace=namespace, repo_name=name, )