Source code for biomappings.utils

# -*- coding: utf-8 -*-

"""Utilities."""

import os
import re
from pathlib import Path
from subprocess import CalledProcessError, check_output  # noqa: S404
from typing import Any, Mapping, Optional, Tuple

import bioregistry

__all__ = [
    "get_git_hash",
    "get_script_url",
    "get_canonical_tuple",
    "UnregisteredPrefix",
    "UnstandardizedPrefix",
    "InvalidIdentifier",
    "InvalidNormIdentifier",
    "InvalidIdentifierPattern",
    "check_valid_prefix_id",
    "get_curie",
    "CMapping",
]

HERE = Path(__file__).parent.resolve()
ROOT = HERE.parent.parent.resolve()
RESOURCE_PATH = HERE.joinpath("resources")
DOCS = ROOT.joinpath("docs")
IMG = DOCS.joinpath("img")
DATA = DOCS.joinpath("_data")

OVERRIDE_MIRIAM = {
    # ITO is very messy (combines mostly numbers with a few
    # text based labels for top-level terms), has weird bananas,
    # and also not very enjoyable to use so don't worry about them
    "ito",
}


[docs] def get_git_hash() -> Optional[str]: """Get the git hash. :return: The git hash, equals 'UNHASHED' if encountered CalledProcessError, signifying that the code is not installed in development mode. """ rv = _git("rev-parse", "HEAD") if not rv: return None return rv[:6]
def commit(message: str) -> Optional[str]: """Make a commit with the following message.""" return _git("commit", "-m", message, "-a") def push(branch_name: Optional[str] = None) -> Optional[str]: """Push the git repo.""" if branch_name is not None: return _git("push", "origin", branch_name) else: return _git("push") def not_main() -> bool: """Return if on the master branch.""" return "master" != _git("rev-parse", "--abbrev-ref", "HEAD") def get_branch() -> str: """Return current git branch.""" rv = _git("branch", "--show-current") if rv is None: raise RuntimeError return rv def _git(*args: str) -> Optional[str]: with open(os.devnull, "w") as devnull: try: ret = check_output( # noqa: S603,S607 ["git", *args], cwd=os.path.dirname(__file__), stderr=devnull, ) except CalledProcessError as e: print(e) # noqa:T201 return None else: return ret.strip().decode("utf-8")
[docs] def get_script_url(fname: str) -> str: """Get the source path for this script. :param fname: Pass ``__file__`` as the argument to this function. :return: The script's URL to GitHub """ commit_hash = get_git_hash() script_name = os.path.basename(fname) return f"https://github.com/biomappings/biomappings/blob/{commit_hash}/scripts/{script_name}"
[docs] def get_canonical_tuple(mapping: Mapping[str, Any]) -> Tuple[str, str, str, str]: """Get the canonical tuple from a mapping entry.""" source = mapping["source prefix"], mapping["source identifier"] target = mapping["target prefix"], mapping["target identifier"] if source > target: source, target = target, source return (*source, *target)
[docs] class UnregisteredPrefix(ValueError): """Raised for an invalid prefix."""
[docs] class UnstandardizedPrefix(ValueError): """Raised for an unstandardized prefix.""" def __init__(self, prefix: str, norm_prefix: str): """Initialize the error. :param prefix: A CURIE's prefix :param norm_prefix: The normalized prefid """ self.prefix = prefix self.norm_prefix = norm_prefix def __str__(self) -> str: # noqa:D105 return f"{self.prefix} should be standardized to {self.norm_prefix}"
[docs] class InvalidIdentifier(ValueError): """Raised for an invalid identifier.""" def __init__(self, prefix: str, identifier: str): """Initialize the error. :param prefix: A CURIE's prefix :param identifier: A CURIE's identifier """ self.prefix = prefix self.identifier = identifier
[docs] class InvalidIdentifierPattern(InvalidIdentifier): """Raised for an identifier that doesn't match the pattern.""" def __init__(self, prefix: str, identifier: str, pattern): """Initialize the error. :param prefix: A CURIE's prefix :param identifier: A CURIE's identifier :param pattern: A regular expression pattern """ super().__init__(prefix, identifier) self.pattern = pattern def __str__(self) -> str: # noqa:D105 return f"{self.prefix}:{self.identifier} does not match pattern {self.pattern}"
[docs] class InvalidNormIdentifier(InvalidIdentifier): """Raised for an invalid normalized identifier.""" def __init__(self, prefix: str, identifier: str, norm_identifier: str): """Initialize the error. :param prefix: A CURIE's prefix :param identifier: A CURIE's identifier :param norm_identifier: The normalized version of the identifier """ super().__init__(prefix, identifier) self.norm_identifier = norm_identifier def __str__(self) -> str: # noqa:D105 return f"{self.prefix}:{self.identifier} does not match normalized CURIE {self.prefix}:{self.norm_identifier}"
[docs] def check_valid_prefix_id(prefix: str, identifier: str): """Check the prefix/identifier pair is valid. :param prefix: The prefix from a CURIE :param identifier: The local unique identifier from a CURIE :raises UnregisteredPrefix: if the prefix is not registered with the Bioregistry :raises UnstandardizedPrefix: if the prefix is not standardized w.r.t. the Bioregistry :raises InvalidNormIdentifier: if the identifier is not standardized, either against the MIRIAM standard, if available, or against the Bioregistry standard :raises InvalidIdentifierPattern: if the does not match the appropriate regular expression for MIRIAM (if available) or for the Bioregistry. If no regular expression is available, then this check is not applied. :raises RuntimeError: If the preconditions for miriam standardization aren't met. However, this shouldn't be possible in practice, and this documentation is merely a formality. """ resource = bioregistry.get_resource(prefix) if resource is None: raise UnregisteredPrefix(prefix) if prefix != resource.prefix: raise UnstandardizedPrefix(prefix, resource.prefix) miriam_prefix = resource.get_miriam_prefix() if miriam_prefix in OVERRIDE_MIRIAM: return # If this resource has a mapping to MIRIAM, the MIRIAM-specific # normalization will be applied, which e.g., adds missing # redundant prefixes into the local unique identifiers if miriam_prefix is not None: norm_id = resource.miriam_standardize_identifier(identifier) if norm_id is None: raise RuntimeError( "should not be possible since we check for miriam prefix" " before running miriam_standardize_identifier" ) if norm_id != identifier: raise InvalidNormIdentifier(prefix, identifier, norm_id) if prefix == "pr": pattern = None # identifiers.org is broken for uniprot in PR elif prefix == "obi": pattern = re.compile(r"^OBI:\d{7,8}$") # identifiers.org is broken for OBI else: pattern = re.compile(resource.miriam["pattern"]) # If this resource does not have a mapping to MIRIAM, then # the Bioregistry normalization will be applied, which e.g., # strips potential redundant prefixes in local unique identifiers # or any other "bananas" else: norm_id = resource.standardize_identifier(identifier) if norm_id != identifier: raise InvalidNormIdentifier(prefix, identifier, norm_id) pattern = resource.get_pattern_re() if pattern is not None and not pattern.match(identifier): raise InvalidIdentifierPattern(prefix, identifier, pattern)
[docs] def get_curie(prefix: str, identifier: str, *, preferred: bool = False) -> str: """Get a normalized curie from a pre-parsed prefix/identifier pair.""" prefix_norm, identifier_norm = bioregistry.normalize_parsed_curie(prefix, identifier) if prefix_norm is None or identifier_norm is None: raise ValueError(f"could not normalize {prefix}:{identifier}") if preferred: prefix_norm = bioregistry.get_preferred_prefix(prefix_norm) or prefix_norm return f"{prefix_norm}:{identifier_norm}"
#: A filter 3-dictionary of source prefix to target prefix to source identifier to target identifier CMapping = Mapping[str, Mapping[str, Mapping[str, str]]]