Source code for ocebuild.pipeline.lock

## @file
# Copyright (c) 2023, The OCE Build Authors. All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause
##
"""Methods for handling and resolving lock files."""

from collections import OrderedDict
from itertools import chain
from os import getcwd

from typing import Dict, Generator, Iterator, List, Optional, Tuple, Union

from ocebuild.parsers.dict import merge_dict, nested_del, nested_get, nested_set
from ocebuild.parsers.regex import re_match, re_search
from ocebuild.parsers.yaml import parse_yaml, write_yaml
from ocebuild.sources.resolver import *


[docs]LOCKFILE_METADATA = { # The version key denotes the revision of the resolved dependencies. 'version': 0, # The cache key is incremented when the dependency cache is revalidated. 'cacheKey': 0 }
"""The current metadata for the lockfile system."""
[docs]LOCKFILE_WARNING_COMMENT = ''' # This file is generated by running "ocebuild" inside your project. # Manual changes might be lost - proceed with caution! '''
"""The warning comment generated for new lockfiles.""" def _category_extension(category: str) -> Tuple[str, str]: """Determine the file extension for the category. Args: category: The category to determine the file extension for. Returns: A tuple containing: - The file extension for the category. - The kind of entry for the category. """ if category == 'ACPI': ext = '.aml'; kind = 'SSDT' elif category == 'Kexts': ext = '.kext'; kind = 'Kext' else: ext = '.efi'; kind = 'Binary' return ext, kind def _format_resolver(resolver: Union[ResolverType, None], base_path: str=getcwd(), as_specifier: bool=False) -> str: """Formats a resolver string for lockfile resolution. Args: resolver: The resolver to format. base_path: The base path to use for relative paths. as_specifier: Whether to instead format the specifier. (Optional) Returns: The formatted resolver string. """ resolution: str = '' resolver_props = dict(resolver) if resolver is not None else {} # Add the resolver or specifier name if isinstance(resolver, GitHubResolver): resolution += f'{resolver.repository}@github' elif isinstance(resolver, DortaniaResolver): resolution += f'acidanthera/{resolver.__name__}@github' elif isinstance(resolver, PathResolver): resolution += f'{resolver.__name__}@file' else: return '*' if as_specifier else None # Add the resolver or specifier version if as_specifier and 'tag' in resolver_props: resolution += f":{resolver_props['tag']}" if 'version' in resolver_props: resolution += f":{resolver_props['version']}" elif isinstance(resolver, PathResolver): resolution += f":{resolver.path.relative_to(base_path)}" # Add the resolver checksum if 'commit' in resolver_props: resolution += f"#commit={resolver_props['commit']}" elif 'checksum' in resolver_props: resolution += f"#checksum={resolver_props['checksum']}" return resolution def _format_dependency_entry(entry: Dict[str, any]) -> dict: """Formats a lockfile entry for writing. Args: entry: The entry to format. Returns: The formatted entry dictionary. """ exclude_keys = ('name',) def _is_private_key(k: str) -> bool: """Determines whether a key is private.""" return k[0] == '_' or k in exclude_keys sorted_keys = ( # Resolver properties 'build', 'version', 'url', 'path', # Revalidation metadata 'resolution', 'specifier', 'kind', 'revision', ) def _try_index(k: str) -> int: """Attempts to find the index of the key in the sorted keys list.""" try: return sorted_keys.index(k) except ValueError: return len(sorted_keys) + 1 def sorted_public_entry(d: dict) -> Generator[tuple, any, None]: """Iterates over a dictionary in sorted order.""" for k,v in sorted(d.items(), key=lambda e: _try_index(e[0])): if not _is_private_key(k): yield k,v public_keys = OrderedDict((k,v) for k,v in sorted_public_entry(entry)) return public_keys #TODO: Handle resolving packages from lockfile # def _format_package_entry()
[docs]def parse_semver_params(entry: Union[str, dict], specifier: str, parameters: Optional[dict]=None ) -> Dict[str, str]: """Parses a semver version entry or specifier for parameters. Args: entry: The entry to parse additional properties from. specifier: The specifier string to parse. parameters: The parameters dictionary to mutate. (Optional) Returns: The parameters dictionary. """ if parameters is None: parameters = {} # Release tag if (prefix := re_match('^=', specifier)): parameters['tag'] = specifier[len(prefix):] params = ('tag', 'branch', 'workflow', 'commit') # (Priority: 1) Named specifier parameters if re_match('^#[a-zA-Z\\-]+=', specifier): for k in params: if (prefix := re_match(f'#.*?{k}=', specifier)): #TODO: Add separation for multiple parameters (,) parameters[k] = specifier[len(prefix):] # (Priority: 2) Unnamed specifier parameters elif (prefix := re_match('^#', specifier)): pattern = specifier[len(prefix):] # Test for whether matched string is a valid hash sha if (sha_long := re_search(r'\b[0-9a-fA-F]{40}\b', pattern)): parameters['commit'] = sha_long elif (sha_short := re_search(r'\b[0-9a-fA-F]{7}\b', pattern)): parameters['commit'] = sha_short # Fall back to assignment as branch name else: parameters['branch'] = pattern # (Override) Named entry parameters if isinstance(entry, dict): for k in params: if k in entry: parameters[k] = entry[k] return parameters
[docs]def parse_specifier(name: str, entry: Union[str, Dict[str, any]], base_path: Optional[str]=getcwd() ) -> Union[GitHubResolver, PathResolver, DortaniaResolver, None]: """Parses a specifier string for a resolver class. Args: name: The name of the entry to parse. entry: The entry to parse. base_path: The base path to use for relative paths. (Optional) Returns: The resolver class for the specifier. """ specifier = nested_get(entry, ['specifier'], default=entry) if not isinstance(specifier, str): specifier = '' parameters: Dict[str, str]={} resolver_props = { '__name__': name, '__specifier__': specifier } # Specifier points to a github repository if isinstance(entry, dict) and 'repository' in entry: # Add repository name to specifier if provided as an object parameter delimiter = '=' if not specifier.startswith('#') else '' if specifier: specifier = delimiter.join([entry['repository'], specifier]) else: specifier = entry['repository'] if (repository := re_match(r'[a-zA-Z0-9\-]+\/[a-zA-Z0-9\-]+', specifier)): parameters['repository'] = repository semver_specifier = specifier[len(repository):] parameters = parse_semver_params(entry, semver_specifier, parameters) # Handle optional flags if 'tarball' in entry: parameters['tarball'] = entry['tarball'] return GitHubResolver(**parameters, **resolver_props) # Specifier points to a Dortania build (or latest) if DortaniaResolver.has_build(name): parameters = parse_semver_params(entry, specifier, parameters) return DortaniaResolver(**parameters, **resolver_props) # Specifier points to a local file if specifier.startswith('file:'): specifier = specifier \ .replace('file://', '') \ .replace('file:', '') if (filepath := PathResolver(base_path, specifier)).exists(): parameters['path'] = filepath return PathResolver(**parameters, **resolver_props) # No resolver matched return None
[docs]def read_lockfile(lockfile_path: str, metadata: bool=False ) -> Union[dict, Tuple[dict, dict]]: """Reads a lockfile from the specified path.""" with open(lockfile_path, 'r', encoding='UTF-8') as f: lockfile, frontmatter = parse_yaml(f, frontmatter=True) # Include lockfile metadata if specified if metadata: metadata = nested_get(frontmatter, ['metadata'], default={}) return lockfile, metadata return lockfile
[docs]def write_lockfile(lockfile_path: str, lockfile: dict, resolvers: dict, metadata: Optional[dict]=None, ) -> dict: """Writes a lockfile to the specified path. Args: lockfile_path: The path to write the lockfile to. lockfile: The lockfile to write. resolvers: The resolved entries to write to the lockfile. metadata: The lockfile metadata to write. (Optional) """ # Format lockfile metadata if not metadata: metadata = LOCKFILE_METADATA metadata['version'] += 1 # Format lockfile header file_header = ['---', *write_yaml(metadata), '---', LOCKFILE_WARNING_COMMENT] # Merge existing lockfile with new dependency entries root = 'dependencies' for resolver in resolvers: entry = _format_dependency_entry(resolver) category = resolver['__category'] key = resolver['name'] lockfile = merge_dict(lockfile, { root: { category: { key: entry } } }) # Parse lockfile into YAML lockfile_entry = write_yaml(lockfile, lines=file_header) with open(lockfile_path, 'w', encoding='UTF-8') as f: f.write("\n".join(lockfile_entry)) return lockfile
[docs]def prune_lockfile(build_config: dict, lockfile: dict) -> List[dict]: """Prunes the lockfile of entries that are not in the build configuration. Args: build_config: The build configuration to prune against. lockfile: The lockfile to prune. Returns: A list of removed lockfile entries. """ removed = [{ '__tree': ['dependencies', c, k], **e } for c in build_config.keys() for k,e in nested_get(lockfile, ['dependencies', c], {}).items() if not nested_get(build_config, [c, k])] for k in removed: nested_del(lockfile, k['__tree']) return removed
[docs]def prune_resolver_entry(resolvers: List[dict], key: str, value: any) -> None: """Prunes a resolver entry from the list of resolvers. Args: resolvers: The list of resolvers to prune from. key: The key to prune by. value: The value to prune by. Raises: ValueError: If the resolver entry does not exist. """ for e in resolvers.copy(): if key in e and e[key] == value: resolvers.remove(e)
def _iterate_build_entries(build_config: dict) -> List[Tuple[str, str, dict]]: """Iterate over the entries in the build configuration.""" def group_entries(category: str, entries: dict): return [(category, name, entry) for name, entry in entries.items()] return list(chain(*[group_entries(c,d) for c,d in build_config.items()]))
[docs]def resolve_specifiers(build_config: dict, lockfile: dict, base_path: str=getcwd(), update: bool=False, force: bool=False, *args, __wrapper: Optional[Iterator]=None, **kwargs ) -> List[dict]: """Resolves the specifiers for each entry in the build configuration. Args: build_config: The build configuration to resolve specifiers for. lockfile: The lockfile to resolve specifiers against. base_path: The base path to use for relative paths. (Optional) update: Whether to update outdated entries in the lockfile. (Optional) force: Whether to force resolve all entries in the build configuration. (Optional) *args: Additional arguments to pass to the optional iterator wrapper. __wrapper: A wrapper function to apply to the iterator. (Optional) **kwargs: Additional keyword arguments to pass to the optional iterator wrapper. Raises: ValueError: If a resolver or the build configuration is invalid. Returns: The resolved build configuration. """ resolvers = [] default_build = nested_get(build_config, ['OpenCorePkg', 'OpenCore', 'build']) # Handle interactive mode for iterator iterator = _iterate_build_entries(build_config) if __wrapper is not None: iterator = __wrapper(iterator, *args, **kwargs) # Resolve the specifiers for each entry in the build configuration for category, name, entry in iterator: entry_path = ['dependencies', category, name] lockfile_entry = nested_get(lockfile, entry_path) # Handle any necessary resolver preprocessing resolver = parse_specifier(name, entry, base_path=base_path) if resolver is None: specifier = '*' nested_set(build_config, [category, name, 'specifier'], specifier) else: specifier = _format_resolver(resolver, base_path, as_specifier=True) # Extract additional properties from the entry ext, kind_ = _category_extension(category) filepath = nested_get(entry, ['__filepath'], default=f'EFI/OC/{category}/{name}{ext}') kind = nested_get(entry, ['__kind'], default=kind_) # Assign default resolver properties resolver_props = { "__category": category, '__filepath': filepath, "__resolver": resolver, "name": name, "specifier": specifier, "kind": kind } # Skip updating entries if not specified if lockfile_entry and not (force or update): # Reserve the `__resolver` key for revalidated entries resolver_props['__resolver'] = None resolvers.append({ **resolver_props, **lockfile_entry }) # Otherwise, prune matching resolvers and outdated entries from lockfile elif specifier == '*': resolvers.append(resolver_props) elif resolver is not None: try: if isinstance(resolver, PathResolver): # Resolve the path for the specifier path = resolver.resolve(strict=True) #pylint: disable=E1123 resolver_props['path'] = f'./{path.relative_to(base_path)}' elif isinstance(resolver, (GitHubResolver, DortaniaResolver)): # Extract the build type (default to OpenCore build type) build = nested_get(entry, ['build'], default=default_build) resolver_props['build'] = build # Resolve the URL for the specifier url = resolver.resolve(build=build) resolver_props['url'] = url # Extract the version or commit from the resolver if 'version' in (props := dict(resolver)): resolver_props['version'] = props['version'] else: raise ValueError(f'Invalid resolver: {resolver}') # Format the resolution resolver_props['resolution'] = _format_resolver(resolver, base_path) except ValueError: continue #TODO: Add warning except FileNotFoundError: continue #TODO: Add warning else: # Check if the resolution is already in the lockfile if force and lockfile_entry: nested_del(lockfile, entry_path) elif update and lockfile_entry: if resolution := nested_get(resolver_props, ['resolution']): if resolution == lockfile_entry['resolution']: continue else: nested_del(lockfile, entry_path) # Extract revision key if resolver_props['__resolver'] is not None: #pylint: disable=cell-var-from-loop props_ = dict(resolver_props['__resolver']) def format_revision(key, algorithm='SHA256'): if key in props_: return " ".join(["{", f"{algorithm}: {props_.get(key)}", "}"]) resolver_props['revision'] = \ format_revision('commit', 'SHA1') or format_revision('checksum') # Add the resolver to the list of resolvers resolvers.append(resolver_props) return resolvers
[docs]def validate_dependencies(lockfile: dict, build_config: dict) -> None: """Verifies that the lockfile is consistent with the build file. Args: lockfile: The lockfile dictionary. build_config: The build configuration dictionary. Raises: AssertionError: If the lockfile does not match the build file. """ dependencies = nested_get(lockfile, ['dependencies'], default={}) if not dependencies: raise AssertionError('Lockfile contains no build configuration entries.') lockfile_keys = set(k for e in dependencies.values() for k in e.keys()) buildcfg_keys = set(k for c,d in build_config.items() for k,e in d.items() if e.get('specifier') != '*') if lockfile_keys == buildcfg_keys: return # Pass: Lockfile is consistent with the build file. elif lockfile_keys.issubset(buildcfg_keys): raise AssertionError('Lockfile is missing new build configuration entries.') elif buildcfg_keys.issubset(lockfile_keys): raise AssertionError('Lockfile contains outdated build configuration entries.') else: raise AssertionError('Lockfile is inconsistent with the build file.')
#TODO: Handle resolving and caching dependencies # def resolve_dependencies(resolvers: dict, ...) -> dict: # """Resolves the dependencies for each entry in the build configuration. __all__ = [ # Constants (2) "LOCKFILE_METADATA", "LOCKFILE_WARNING_COMMENT", # Functions (8) "parse_semver_params", "parse_specifier", "read_lockfile", "write_lockfile", "prune_lockfile", "prune_resolver_entry", "resolve_specifiers", "validate_dependencies" ]