## @file
# Copyright (c) 2023, The OCE Build Authors. All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause
##
"""Methods for handling and resolving lock files."""
from collections import OrderedDict
from itertools import chain
from os import getcwd
from typing import Dict, Generator, Iterator, List, Optional, Tuple, Union
from ocebuild.parsers.dict import merge_dict, nested_del, nested_get, nested_set
from ocebuild.parsers.regex import re_match, re_search
from ocebuild.parsers.yaml import parse_yaml, write_yaml
from ocebuild.sources.resolver import *
"""The current metadata for the lockfile system."""
"""The warning comment generated for new lockfiles."""
def _category_extension(category: str) -> Tuple[str, str]:
"""Determine the file extension for the category.
Args:
category: The category to determine the file extension for.
Returns:
A tuple containing:
- The file extension for the category.
- The kind of entry for the category.
"""
if category == 'ACPI':
ext = '.aml'; kind = 'SSDT'
elif category == 'Kexts':
ext = '.kext'; kind = 'Kext'
else:
ext = '.efi'; kind = 'Binary'
return ext, kind
def _format_resolver(resolver: Union[ResolverType, None],
base_path: str=getcwd(),
as_specifier: bool=False) -> str:
"""Formats a resolver string for lockfile resolution.
Args:
resolver: The resolver to format.
base_path: The base path to use for relative paths.
as_specifier: Whether to instead format the specifier. (Optional)
Returns:
The formatted resolver string.
"""
resolution: str = ''
resolver_props = dict(resolver) if resolver is not None else {}
# Add the resolver or specifier name
if isinstance(resolver, GitHubResolver):
resolution += f'{resolver.repository}@github'
elif isinstance(resolver, DortaniaResolver):
resolution += f'acidanthera/{resolver.__name__}@github'
elif isinstance(resolver, PathResolver):
resolution += f'{resolver.__name__}@file'
else:
return '*' if as_specifier else None
# Add the resolver or specifier version
if as_specifier and 'tag' in resolver_props:
resolution += f":{resolver_props['tag']}"
if 'version' in resolver_props:
resolution += f":{resolver_props['version']}"
elif isinstance(resolver, PathResolver):
resolution += f":{resolver.path.relative_to(base_path)}"
# Add the resolver checksum
if 'commit' in resolver_props:
resolution += f"#commit={resolver_props['commit']}"
elif 'checksum' in resolver_props:
resolution += f"#checksum={resolver_props['checksum']}"
return resolution
def _format_dependency_entry(entry: Dict[str, any]) -> dict:
"""Formats a lockfile entry for writing.
Args:
entry: The entry to format.
Returns:
The formatted entry dictionary.
"""
exclude_keys = ('name',)
def _is_private_key(k: str) -> bool:
"""Determines whether a key is private."""
return k[0] == '_' or k in exclude_keys
sorted_keys = (
# Resolver properties
'build',
'version',
'url',
'path',
# Revalidation metadata
'resolution',
'specifier',
'kind',
'revision',
)
def _try_index(k: str) -> int:
"""Attempts to find the index of the key in the sorted keys list."""
try: return sorted_keys.index(k)
except ValueError: return len(sorted_keys) + 1
def sorted_public_entry(d: dict) -> Generator[tuple, any, None]:
"""Iterates over a dictionary in sorted order."""
for k,v in sorted(d.items(), key=lambda e: _try_index(e[0])):
if not _is_private_key(k): yield k,v
public_keys = OrderedDict((k,v) for k,v in sorted_public_entry(entry))
return public_keys
#TODO: Handle resolving packages from lockfile
# def _format_package_entry()
[docs]def parse_semver_params(entry: Union[str, dict],
specifier: str,
parameters: Optional[dict]=None
) -> Dict[str, str]:
"""Parses a semver version entry or specifier for parameters.
Args:
entry: The entry to parse additional properties from.
specifier: The specifier string to parse.
parameters: The parameters dictionary to mutate. (Optional)
Returns:
The parameters dictionary.
"""
if parameters is None: parameters = {}
# Release tag
if (prefix := re_match('^=', specifier)):
parameters['tag'] = specifier[len(prefix):]
params = ('tag', 'branch', 'workflow', 'commit')
# (Priority: 1) Named specifier parameters
if re_match('^#[a-zA-Z\\-]+=', specifier):
for k in params:
if (prefix := re_match(f'#.*?{k}=', specifier)):
#TODO: Add separation for multiple parameters (,)
parameters[k] = specifier[len(prefix):]
# (Priority: 2) Unnamed specifier parameters
elif (prefix := re_match('^#', specifier)):
pattern = specifier[len(prefix):]
# Test for whether matched string is a valid hash sha
if (sha_long := re_search(r'\b[0-9a-fA-F]{40}\b', pattern)):
parameters['commit'] = sha_long
elif (sha_short := re_search(r'\b[0-9a-fA-F]{7}\b', pattern)):
parameters['commit'] = sha_short
# Fall back to assignment as branch name
else:
parameters['branch'] = pattern
# (Override) Named entry parameters
if isinstance(entry, dict):
for k in params:
if k in entry: parameters[k] = entry[k]
return parameters
[docs]def parse_specifier(name: str,
entry: Union[str, Dict[str, any]],
base_path: Optional[str]=getcwd()
) -> Union[GitHubResolver, PathResolver, DortaniaResolver, None]:
"""Parses a specifier string for a resolver class.
Args:
name: The name of the entry to parse.
entry: The entry to parse.
base_path: The base path to use for relative paths. (Optional)
Returns:
The resolver class for the specifier.
"""
specifier = nested_get(entry, ['specifier'], default=entry)
if not isinstance(specifier, str): specifier = ''
parameters: Dict[str, str]={}
resolver_props = { '__name__': name, '__specifier__': specifier }
# Specifier points to a github repository
if isinstance(entry, dict) and 'repository' in entry:
# Add repository name to specifier if provided as an object parameter
delimiter = '=' if not specifier.startswith('#') else ''
if specifier:
specifier = delimiter.join([entry['repository'], specifier])
else:
specifier = entry['repository']
if (repository := re_match(r'[a-zA-Z0-9\-]+\/[a-zA-Z0-9\-]+', specifier)):
parameters['repository'] = repository
semver_specifier = specifier[len(repository):]
parameters = parse_semver_params(entry, semver_specifier, parameters)
# Handle optional flags
if 'tarball' in entry: parameters['tarball'] = entry['tarball']
return GitHubResolver(**parameters, **resolver_props)
# Specifier points to a Dortania build (or latest)
if DortaniaResolver.has_build(name):
parameters = parse_semver_params(entry, specifier, parameters)
return DortaniaResolver(**parameters, **resolver_props)
# Specifier points to a local file
if specifier.startswith('file:'):
specifier = specifier \
.replace('file://', '') \
.replace('file:', '')
if (filepath := PathResolver(base_path, specifier)).exists():
parameters['path'] = filepath
return PathResolver(**parameters, **resolver_props)
# No resolver matched
return None
[docs]def read_lockfile(lockfile_path: str,
metadata: bool=False
) -> Union[dict, Tuple[dict, dict]]:
"""Reads a lockfile from the specified path."""
with open(lockfile_path, 'r', encoding='UTF-8') as f:
lockfile, frontmatter = parse_yaml(f, frontmatter=True)
# Include lockfile metadata if specified
if metadata:
metadata = nested_get(frontmatter, ['metadata'], default={})
return lockfile, metadata
return lockfile
[docs]def write_lockfile(lockfile_path: str,
lockfile: dict,
resolvers: dict,
metadata: Optional[dict]=None,
) -> dict:
"""Writes a lockfile to the specified path.
Args:
lockfile_path: The path to write the lockfile to.
lockfile: The lockfile to write.
resolvers: The resolved entries to write to the lockfile.
metadata: The lockfile metadata to write. (Optional)
"""
# Format lockfile metadata
if not metadata: metadata = LOCKFILE_METADATA
metadata['version'] += 1
# Format lockfile header
file_header = ['---', *write_yaml(metadata), '---', LOCKFILE_WARNING_COMMENT]
# Merge existing lockfile with new dependency entries
root = 'dependencies'
for resolver in resolvers:
entry = _format_dependency_entry(resolver)
category = resolver['__category']
key = resolver['name']
lockfile = merge_dict(lockfile, { root: { category: { key: entry } } })
# Parse lockfile into YAML
lockfile_entry = write_yaml(lockfile, lines=file_header)
with open(lockfile_path, 'w', encoding='UTF-8') as f:
f.write("\n".join(lockfile_entry))
return lockfile
[docs]def prune_lockfile(build_config: dict, lockfile: dict) -> List[dict]:
"""Prunes the lockfile of entries that are not in the build configuration.
Args:
build_config: The build configuration to prune against.
lockfile: The lockfile to prune.
Returns:
A list of removed lockfile entries.
"""
removed = [{ '__tree': ['dependencies', c, k], **e }
for c in build_config.keys()
for k,e in nested_get(lockfile, ['dependencies', c], {}).items()
if not nested_get(build_config, [c, k])]
for k in removed: nested_del(lockfile, k['__tree'])
return removed
[docs]def prune_resolver_entry(resolvers: List[dict], key: str, value: any) -> None:
"""Prunes a resolver entry from the list of resolvers.
Args:
resolvers: The list of resolvers to prune from.
key: The key to prune by.
value: The value to prune by.
Raises:
ValueError: If the resolver entry does not exist.
"""
for e in resolvers.copy():
if key in e and e[key] == value: resolvers.remove(e)
def _iterate_build_entries(build_config: dict) -> List[Tuple[str, str, dict]]:
"""Iterate over the entries in the build configuration."""
def group_entries(category: str, entries: dict):
return [(category, name, entry) for name, entry in entries.items()]
return list(chain(*[group_entries(c,d) for c,d in build_config.items()]))
[docs]def resolve_specifiers(build_config: dict,
lockfile: dict,
base_path: str=getcwd(),
update: bool=False,
force: bool=False,
*args,
__wrapper: Optional[Iterator]=None,
**kwargs
) -> List[dict]:
"""Resolves the specifiers for each entry in the build configuration.
Args:
build_config: The build configuration to resolve specifiers for.
lockfile: The lockfile to resolve specifiers against.
base_path: The base path to use for relative paths. (Optional)
update: Whether to update outdated entries in the lockfile. (Optional)
force: Whether to force resolve all entries in the build configuration. (Optional)
*args: Additional arguments to pass to the optional iterator wrapper.
__wrapper: A wrapper function to apply to the iterator. (Optional)
**kwargs: Additional keyword arguments to pass to the optional iterator wrapper.
Raises:
ValueError: If a resolver or the build configuration is invalid.
Returns:
The resolved build configuration.
"""
resolvers = []
default_build = nested_get(build_config, ['OpenCorePkg', 'OpenCore', 'build'])
# Handle interactive mode for iterator
iterator = _iterate_build_entries(build_config)
if __wrapper is not None: iterator = __wrapper(iterator, *args, **kwargs)
# Resolve the specifiers for each entry in the build configuration
for category, name, entry in iterator:
entry_path = ['dependencies', category, name]
lockfile_entry = nested_get(lockfile, entry_path)
# Handle any necessary resolver preprocessing
resolver = parse_specifier(name, entry, base_path=base_path)
if resolver is None:
specifier = '*'
nested_set(build_config, [category, name, 'specifier'], specifier)
else:
specifier = _format_resolver(resolver, base_path, as_specifier=True)
# Extract additional properties from the entry
ext, kind_ = _category_extension(category)
filepath = nested_get(entry, ['__filepath'],
default=f'EFI/OC/{category}/{name}{ext}')
kind = nested_get(entry, ['__kind'], default=kind_)
# Assign default resolver properties
resolver_props = {
"__category": category,
'__filepath': filepath,
"__resolver": resolver,
"name": name,
"specifier": specifier,
"kind": kind
}
# Skip updating entries if not specified
if lockfile_entry and not (force or update):
# Reserve the `__resolver` key for revalidated entries
resolver_props['__resolver'] = None
resolvers.append({ **resolver_props, **lockfile_entry })
# Otherwise, prune matching resolvers and outdated entries from lockfile
elif specifier == '*':
resolvers.append(resolver_props)
elif resolver is not None:
try:
if isinstance(resolver, PathResolver):
# Resolve the path for the specifier
path = resolver.resolve(strict=True) #pylint: disable=E1123
resolver_props['path'] = f'./{path.relative_to(base_path)}'
elif isinstance(resolver, (GitHubResolver, DortaniaResolver)):
# Extract the build type (default to OpenCore build type)
build = nested_get(entry, ['build'], default=default_build)
resolver_props['build'] = build
# Resolve the URL for the specifier
url = resolver.resolve(build=build)
resolver_props['url'] = url
# Extract the version or commit from the resolver
if 'version' in (props := dict(resolver)):
resolver_props['version'] = props['version']
else:
raise ValueError(f'Invalid resolver: {resolver}')
# Format the resolution
resolver_props['resolution'] = _format_resolver(resolver, base_path)
except ValueError:
continue #TODO: Add warning
except FileNotFoundError:
continue #TODO: Add warning
else:
# Check if the resolution is already in the lockfile
if force and lockfile_entry:
nested_del(lockfile, entry_path)
elif update and lockfile_entry:
if resolution := nested_get(resolver_props, ['resolution']):
if resolution == lockfile_entry['resolution']: continue
else: nested_del(lockfile, entry_path)
# Extract revision key
if resolver_props['__resolver'] is not None:
#pylint: disable=cell-var-from-loop
props_ = dict(resolver_props['__resolver'])
def format_revision(key, algorithm='SHA256'):
if key in props_:
return " ".join(["{", f"{algorithm}: {props_.get(key)}", "}"])
resolver_props['revision'] = \
format_revision('commit', 'SHA1') or format_revision('checksum')
# Add the resolver to the list of resolvers
resolvers.append(resolver_props)
return resolvers
[docs]def validate_dependencies(lockfile: dict, build_config: dict) -> None:
"""Verifies that the lockfile is consistent with the build file.
Args:
lockfile: The lockfile dictionary.
build_config: The build configuration dictionary.
Raises:
AssertionError: If the lockfile does not match the build file.
"""
dependencies = nested_get(lockfile, ['dependencies'], default={})
if not dependencies:
raise AssertionError('Lockfile contains no build configuration entries.')
lockfile_keys = set(k for e in dependencies.values() for k in e.keys())
buildcfg_keys = set(k for c,d in build_config.items()
for k,e in d.items()
if e.get('specifier') != '*')
if lockfile_keys == buildcfg_keys:
return # Pass: Lockfile is consistent with the build file.
elif lockfile_keys.issubset(buildcfg_keys):
raise AssertionError('Lockfile is missing new build configuration entries.')
elif buildcfg_keys.issubset(lockfile_keys):
raise AssertionError('Lockfile contains outdated build configuration entries.')
else:
raise AssertionError('Lockfile is inconsistent with the build file.')
#TODO: Handle resolving and caching dependencies
# def resolve_dependencies(resolvers: dict, ...) -> dict:
# """Resolves the dependencies for each entry in the build configuration.
__all__ = [
# Constants (2)
"LOCKFILE_METADATA",
"LOCKFILE_WARNING_COMMENT",
# Functions (8)
"parse_semver_params",
"parse_specifier",
"read_lockfile",
"write_lockfile",
"prune_lockfile",
"prune_resolver_entry",
"resolve_specifiers",
"validate_dependencies"
]