Source code for ocebuild.pipeline.opencore
## @file
# Copyright (c) 2023, The OCE Build Authors. All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause
##
"""Methods for retrieving and handling OpenCore packages."""
from hashlib import sha256
from shutil import copyfile, copytree
from tempfile import mkdtemp, NamedTemporaryFile
from typing import Generator, Iterator, List, Literal, Optional, Tuple, Union
from mmap import mmap, PROT_READ
from .lock import prune_resolver_entry
from ocebuild.filesystem.cache import UNPACK_DIR
from ocebuild.filesystem.posix import glob, move, remove
from ocebuild.parsers.dict import nested_get, nested_set
from ocebuild.sources.binary import get_stream_digest
#NOTE: This import was remapped from 'third_party' to 'ocebuild.third_party'.
from ocebuild.third_party.cpython.pathlib import Path
[docs]def extract_opencore_archive(pkg: Path,
target: Literal['IA32', 'X64']='X64') -> None:
"""Extracts the contents of an OpenCore archive to a temporary directory.
Args:
pkg: Path to an existing OpenCore package.
target: The desired target architecture of the OpenCore EFI.
"""
tmp_dir = mkdtemp(dir=UNPACK_DIR)
# Extract EFI binaries and tree structure
EFI_DIR = move(glob(pkg, pattern=f'**/{target}/EFI', first=True), tmp_dir)
# Extract documentation files
DOCS_DIR = EFI_DIR.joinpath('..', 'Docs')
changelog_doc = glob(pkg, pattern='**/Docs/Changelog.md', first=True)
move(changelog_doc, DOCS_DIR)
config_doc = glob(pkg, pattern='**/Docs/Configuration.pdf', first=True)
move(config_doc, DOCS_DIR)
diff_doc = glob(pkg, pattern='**/Docs/Differences.pdf', first=True)
move(diff_doc, DOCS_DIR)
# Extract sample config.plist
sample_plist = glob(pkg, pattern='**/Docs/Sample.plist', first=True)
move(sample_plist, DOCS_DIR)
# Extract ACPI samples
acpi_samples = glob(pkg, pattern='**/Docs/AcpiSamples/Binaries/*.aml')
for file in acpi_samples:
move(file, EFI_DIR.joinpath('OC', 'ACPI'))
# Extract bundled utilities
UTILITIES_DIR = EFI_DIR.joinpath('..', 'Utilities')
for dir_ in glob(pkg, pattern='**/Utilities/*/'):
move(dir_, UTILITIES_DIR)
# Overwrite package directory with changes
remove(pkg)
for dir_ in Path(tmp_dir).iterdir():
move(dir_, pkg)
[docs]def extract_ocbinary_archive(pkg: Path, oc_pkg: Path) -> None:
"""Extracts OcBinaryData resources to an existing OpenCore archive.
Args:
pkg: Path to an existing OcBinarData archive.
oc_pkg: Path to an existing OpenCore package.
"""
for dir_ in oc_pkg.joinpath('EFI', 'OC').iterdir():
extract = glob(pkg, pattern=f'**/{dir_.name}/', first=True)
if extract and extract.exists():
copytree(extract, dir_, dirs_exist_ok=True)
# Cleanup
remove(pkg)
def _iterate_entries(opencore_pkg: Path,
opencore_dir: Path
) -> Generator[Tuple[str, str], any, None]:
"""Iterate over the entries in the build configuration."""
for category in map(lambda p: p.name, opencore_dir.iterdir()):
if category not in ('ACPI', 'Drivers', 'Kexts', 'Tools'): continue
for path in map(lambda p: p.relative_to(opencore_pkg).as_posix(),
opencore_dir.joinpath(category).iterdir()):
yield category, path
[docs]def extract_build_entries(opencore_pkg: Path,
resolvers: List[dict],
*args,
__wrapper: Optional[Iterator]=None,
**kwargs
) -> None:
"""Prunes and extracts build entries from an OpenCore package.
Args:
opencore_pkg: The path to the OpenCore package.
resolvers: The build configuration resolvers.
*args: Additional arguments to pass to the optional iterator wrapper.
__wrapper: A wrapper function to apply to the iterator. (Optional)
**kwargs: Additional keyword arguments to pass to the optional iterator wrapper.
Returns:
A Path to the extracted OpenCore directory.
"""
oc_dir = opencore_pkg.joinpath('EFI', 'OC')
# Handle interactive mode for iterator
iterator = set(_iterate_entries(opencore_pkg, oc_dir))
if __wrapper is not None: iterator = __wrapper(iterator, *args, **kwargs)
# Iterate over the entries in the extracted OpenCore package
extract_entries = {}
bundled = list(filter(lambda e: e['specifier'] == '*', resolvers))
for category, path in iterator:
# Include only binaries that are specified as bundled in the build config
matches = filter(lambda e: str(e['__filepath']).find(path) > 0, bundled)
if entry := next(matches, None):
filepath = Path(entry['__filepath']).as_posix()
prune_resolver_entry(resolvers, key='__filepath', value=filepath)
# Add the entry to the extracted entries
relative = f'.{path.rsplit(category, maxsplit=1)[1]}'
name = nested_get(entry, ['name'], default=Path(path).stem)
nested_set(extract_entries, [entry['__category'], name], {
'__dest': Path(filepath),
'__extracted': Path(opencore_pkg.joinpath(path)),
'__path': relative
})
else:
remove(opencore_pkg.joinpath(path))
return extract_entries
[docs]def get_opencore_checksum(file_path: Union[str, Path],
algorithm=sha256
) -> str:
"""Computes the SHA256 checksum of the OpenCore binary.
This will compute the checksum of the `OpenCore.efi` binary, substituting the
embedded public key when using vaulting. Modifications are made to a copy of
the file within a temporary directory, which is validated and deleted after
computing the checksum.
Args:
file_path: The path to the OpenCore binary.
algorithm: The hashlib algorithm to use. Defaults to SHA256.
Raises:
AssertionError: If the file checksum does not match the expected value.
RuntimeError: If the vault header is not found (i.e. the file is malformed).
Returns:
The SHA256 checksum of the OpenCore binary.
"""
# Copy the file contents to the temporary file
file_path = Path(file_path)
with NamedTemporaryFile(mode="r+b",
suffix='-OpenCore.efi',
dir=UNPACK_DIR) as f:
copyfile(file_path.resolve(), f.name)
file_checksum = file_path.checksum
f.seek(0)
# Get the offset of the vault header
vault_header = b'=BEGIN OC VAULT='
with mmap(f.fileno(), 0, PROT_READ) as mf:
offset = mf.find(vault_header)
if offset != -1:
f.seek(offset + len(vault_header))
else:
raise RuntimeError('No vault header found in the file')
# Extract and replace the public key
public_key = f.read(528)
if has_public_key := any(byte for byte in public_key):
f.seek(offset + len(vault_header))
f.write(b'\x00' * 528)
# Compute the de-vaulted checksum
checksum = get_stream_digest(stream=f, algorithm=algorithm)
# Verify that the extraction is reversible
if has_public_key:
f.seek(offset + len(vault_header))
f.write(public_key)
if not file_checksum == get_stream_digest(stream=f, algorithm=algorithm):
raise AssertionError('Checksum extraction failed')
return checksum
__all__ = [
# Functions (4)
"extract_opencore_archive",
"extract_ocbinary_archive",
"extract_build_entries",
"get_opencore_checksum"
]