Source code for ocebuild.parsers.yaml

## @file
# Copyright (c) 2023, The OCE Build Authors. All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause
##
"""Parser for converting annotated YAML to a Python dictionary."""

#pylint: disable=cell-var-from-loop

from datetime import datetime
from shlex import split

from typing import List, Literal, Optional, Tuple, Union

from ._lib import _append_tags, _apply_macro, update_cursor
from .dict import flatten_dict, nested_get, nested_set
from .regex import re_search
from .types import decode_data, encode_data


[docs]def parse_yaml_types(stype: str, value: str, schema: Literal['annotated', 'yaml']='yaml' ) -> Union[Tuple[str, any], None]: """Parse YAML types to Python types. Args: stype: YAML type (literal). value: YAML value. schema: Flag to control input schema. Returns: Tuple of parsed type (literal) and value. """ svalue = None if schema == 'annotated': # Parse annotated types if stype == 'Date': svalue = datetime.fromisoformat(value.replace("Z", "+00:00")) elif stype == 'Boolean': svalue = bool(value.lower() == 'true') elif stype == 'Data': stype = 'data' svalue = encode_data(value) elif stype == 'Dict': svalue = {} elif stype == 'Number': if '.' in value: svalue = float(value) else: svalue = int(value) elif stype == 'Array': svalue = [] elif stype == 'String': svalue = value # Handle generic or string types if isinstance(svalue, str) and svalue[:1] in ('"', "'"): svalue = svalue[1:-1] return svalue elif schema == 'yaml': raise NotImplementedError() #TODO
[docs]def write_yaml_types(value: Union[Tuple[str, any], any], schema: Literal['annotated', 'yaml']='yaml' ) -> Tuple[str, any]: """Parse Python types to YAML types. Args: value: Tuple of type (literal) and value. schema: Flag to control output schema. Returns: Tuple of parsed type (literal) and value. """ # Unpack native types stype, svalue = type(value).__name__, value if isinstance(value, tuple): stype, svalue = value def _format_data(data, pad=8): """Formats data into hex strings""" string = decode_data(data, enc='hex') hex_fmt = ' '.join(string[i:i+pad] for i in range(0,len(string), pad)) return f"<{ hex_fmt }>" if schema == 'annotated': # Parse native types if stype == 'date': stype = 'Date '; svalue = str(svalue).replace(' ', 'T').replace('+00:00', 'Z') elif stype in ('bool', 'boolean'): stype = 'Boolean'; svalue = str(svalue).lower() elif stype in ('bytes', 'data'): stype = 'Data '; svalue = _format_data(svalue) elif stype in ('dict', 'dictionary'): stype = 'Dict '; svalue = '(empty)' elif stype == 'float': stype = 'Number '; svalue = str(float(svalue)) elif stype in ('int', 'integer'): stype = 'Number '; svalue = str(int(svalue)) elif stype in ('list', 'array'): stype = 'Array '; svalue = '(empty)' elif stype in ('str', 'string'): stype = 'String '; svalue = f'"{svalue}"' else: max_size = len('Boolean') stype = stype.rjust(max_size).capitalize() svalue = str(value if not isinstance(value, tuple) else value[1]) elif schema == 'yaml': # Parse native types if stype in ('bool', 'boolean'): svalue = str(svalue).lower() elif stype in ('dict', 'object'): svalue = '' elif stype in ('list', 'array'): svalue = '' elif stype in ('str', 'string'): svalue = f'"{svalue}"' else: svalue = str(svalue) # Escape control and reserved characters # @see https://symfony.com/doc/current/reference/formats/yaml.html reserve_chars = (':', '{', '}', '[', ']', ',', '&', '*', '#', '?', '|', '-', '<', '>', '=', '!', '%', '@', '`') control_chars = ('\0', '\x01', '\x02', '\x03', '\x04', '\x05', '\x06', '\a', '\b', '\t', '\n', '\v', '\f', '\r', '\x0e', '\x0f', '\x10', '\x11', '\x12', '\x13', '\x14', '\x15', '\x16', '\x17', '\x18', '\x19', '\x1a', r'\e', '\x1c', '\x1d', '\x1e', '\x1f', r'\N', r'\_', r'\L', r'\P') if any(c for c in control_chars if c in svalue): svalue = f'"{svalue}"' elif any(c for c in reserve_chars if c in svalue): svalue = f'\'{svalue}\'' return stype, svalue
[docs]def parse_yaml(lines: List[str], config: Optional[dict]=None, flags: Optional[List[str]]=None, frontmatter: bool=False ) -> Union[dict, Tuple[dict, dict]]: """Parses YAML (optionally type annotated) into a Python dictionary. Args: lines: YAML lines. config: Dictionary to be populated. flags: List of preprocessor flags. frontmatter: Flag to control frontmatter parsing. Raises: ValueError: If YAML parser reaches an invalid line. Returns: Dictionary populated from YAML entries. """ if config is None: config = {} if flags is None: flags = [] frontmatter_dict = { 'variables': {}, 'tags': [] } i = 0 cursor = { 'keys': [], 'level': 0, 'indent': 0, 'skip': False, 'upshift': False, 'is_frontmatter': False, 'has_tag': None, 'tag_tree': None } for line_ in lines: i += 1; line = line_.rstrip() # Skip empty lines if len(lnorm := line.lstrip()) == 0: continue # Check if first non-whitespace character is a comment if lnorm.startswith('#'): continue # Check if crossing frontmatter if lnorm.startswith('---'): cursor['is_frontmatter'] = not cursor['is_frontmatter'] continue # Extract tokens from line tokens = [p for p in split(lnorm) if p not in ('|', '-')] key = tokens[0][:-1] if (num_tokens := len(tokens)) else None def get_schema(schema: Literal['annotated', 'yaml'] ) -> Union[Tuple[str, str], str]: if schema == 'annotated': return parse_yaml_types(tokens[1], ' '.join(tokens[2:]), schema) elif schema == 'yaml': return ' '.join(tokens[1:]) # Handle parsing frontmatter variables if cursor['is_frontmatter']: value = get_schema('yaml') nested_set(frontmatter_dict, ['variables', key], value) # Add OpenCore build type as global flag if key in ('build', 'target'): flags += [value] continue # Handle preprocessor macros elif (macro := tokens[0]).startswith('@'): # Fix comma-separated macro flags if macro[-1] == ',': for token in tokens[1:]: macro += token _apply_macro(macro, flags, cursor, frontmatter_dict) continue # Skip through macro checking scope elif cursor['skip']: continue #TODO: Handle parsing frontmatter variables # def get_frontmatter(): # """Replaces variables with frontmatter values""" # Handle non-dict yaml arrays if lnorm.startswith('- ') and ': ' not in lnorm: # Extract correct value from tokens key = tokens[0] cursor['upshift'] = True # Fix subsequent non-array entries elif cursor['upshift']: # Treat cursor as if it's in the same level as normal dict keys cursor['keys'] = cursor['keys'][:-1] cursor['level'] -= cursor['indent'] cursor['upshift'] = False # Update cursor position level = len(line[:-len(lnorm)]) if num_tokens == 1 and tokens[0].endswith(':'): update_cursor(level, key, cursor, upshift=1) cursor['tag_tree'] = cursor['keys'] # Update dictionary values elif num_tokens >= 1: # Extract schema and entry value schema = 'annotated' if num_tokens >= 3 else 'yaml' entry = get_schema(schema) # Extract and validate parent tree level tree = cursor['keys'] while len(tree) > level / max(1, cursor['indent']): tree.pop(-1) cursor['level'] -= cursor['indent'] prev_value = nested_get(config, tree) # Handle inline objects or arrays if len(tokens) >= 2: if len(tokens) > 3 and tokens[1] in ('{', '['): tokens = [tokens[0], ' '.join(tokens[1:])] entry = tokens[1] elif tokens[1][0] == '{' and tokens[1][-1] == '}': entry = tokens[1] # Handle initial array values if lnorm.startswith('-'): obj = { key: entry } if num_tokens > 1 else key if isinstance(prev_value, list): prev_value.append(obj) nested_set(config, tree, prev_value) cursor['tag_tree'] = tree + [len(prev_value) - 1] else: nested_set(config, tree, [obj]) cursor['tag_tree'] = tree + [0] else: cursor['tag_tree'] = tree + [key] # Handle object and array traversal if isinstance(prev_value, dict) or prev_value is None: nested_set(config, [*tree, key], entry) elif isinstance(prev_value, list): # Add new key to last dictionary in array if isinstance(prev_value[-1], dict): prev_value[-1][key] = entry # Always append dictionaries to arrays else: prev_value.append(entry) # Update array nested_set(config, tree, prev_value) # Reached invalid line else: raise ValueError(f'Invalid line at position {i}:\n\n{line}') _append_tags(cursor, frontmatter_dict) if frontmatter: frontmatter_dict['flags'] = flags return config, frontmatter_dict return config
[docs]def write_yaml(config: dict, lines: Optional[List[str]]=None, schema: Literal['annotated', 'yaml']='yaml' ) -> List[str]: """Writes a Python dictionary to YAML. Args: lines: YAML lines. config: Dictionary to be written. schema: Flag to control output schema. Returns: YAML lines populated from dictionary entries. """ if lines is None: lines = [] cursor = { 'keys': [], 'indent': 2 } flat_dict = flatten_dict(config) # Pre-process and prettify tree indentations trees = []; max_tree_len = 0 for keys in flat_dict: # Seek or create head index for current tree level for j, key in enumerate(tree := str(keys).split('.')): # Avoid parsing literal array indices if isinstance(tree[j], int): continue # Insert array indices as additional keys elif (re_match := re_search(r'(.*)\[([0-9]+)\]', key, group=None)): key, idx = re_match.groups() tree[j] = key tree[j+1:j+1] = [int(idx)] # Update trees entries if j == len(tree)-1: trees.append(tree) else: continue # Update max tree length if schema == 'annotated': tree_len = cursor['indent']*j + len(f"{key}:") # Update max tree length max_tree_len = max(max_tree_len, tree_len) # Write entries to lines for (tree, value) in zip(trees, flat_dict.values()): # Seek or create head index for current tree level for j, key in enumerate(tree): # Avoid inserting literal array indices if isinstance(tree[j], int): cursor['keys'].append(key) continue # Avoid inserting duplicate keys elif tree[:j+1] == cursor['keys'][:j+1]: continue # Add new key entry to last dict padding = (" "*cursor['indent'])*j entry = None if is_root_key := j == len(tree)-1: # Handle indentation for first array item if isinstance(tree[j-1], int) and tree[:j] != cursor['keys'][:j]: padding = f'{padding[:-2]}- ' # Append value to entry stype, svalue = write_yaml_types(value, schema) if schema == 'annotated': as_literal = False indent = max_tree_len - (cursor['indent']*j + len(f"{key}:")) # Add indentation for comments if key.startswith('#'): comment_key = key[1:].split()[0].lower() if comment_key in ('warning', 'comment'): as_literal = True else: indent += 2 if key[:2] == '# ' else 1 # Format indentation-aligned entries if as_literal: entry = f'{padding}{key}: {value}' else: entry = f'{padding}{key}:{" ".rjust(indent + 1)}{stype} | {svalue}' elif schema == 'yaml': entry = f'{padding}{key}: {svalue}'.rstrip() # Add new dict else: if key.startswith('#'): key = f"'{key}'" entry = f'{padding}{key}:' lines.append(entry) # Update cursor position if is_root_key: cursor['keys'] = tree[:j+1] return lines
__all__ = [ # Functions (4) "parse_yaml_types", "write_yaml_types", "parse_yaml", "write_yaml" ]