# Copyright (c) 2023, The OCE Build Authors. All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause
"""Methods for formatting and retrieving GitHub source URLs."""

#pylint: disable=consider-using-f-string,bare-except

from datetime import datetime, timedelta
from functools import partial
from urllib.request import Request

from typing import List, Optional, Tuple, Union

from ._lib import request

from ocebuild.constants import ENV
from ocebuild.errors import disable_exception_traceback, GitHubRateLimit
from ocebuild.parsers.dict import nested_get

[docs]def github_api_request(endpoint: Optional[str]=None, url: Optional[str]=None ) -> any: """Gets a GitHub API request. This method will automatically add the GitHub token from the environment. Args: endpoint: GitHub API endpoint. Returns: API response. """ req = Request(f'{endpoint}' if not url else url) if ENV.has('GITHUB_TOKEN'): req.add_header('Authorization', f'token {ENV.GITHUB_TOKEN}') return request(req)
################################################################################ # API Request Guards # ################################################################################
[docs]def github_rate_limit(kind: str='core', raise_error: float=False) -> int: """Gets the GitHub API rate limit. Args: kind: The kind of GitHub API request to query. raise_error: Raise an exception if the rate limit has been exceeded. Returns: Remaining API calls allowed. Raises: Exception: If the rate limit has been exceeded. """ rate_limit = github_api_request('/rate_limit').json() if not raise_error: if kind: return nested_get(rate_limit, ['resources', kind]) return rate_limit elif nested_get(rate_limit, ['resources', kind, 'remaining']) == 0: current_time = reset_time = datetime.fromtimestamp( nested_get(rate_limit, ['resources', kind, 'reset'])) # Format remaining time in a friendly way for error message msg = partial('{} requests exceeded. Try again in {} {}.'.format, kind.capitalize()) if (mins := round((reset_time - current_time) / timedelta(minutes=1))): msg = msg(mins, 'minutes' if mins != 1 else 'minute') elif (secs := round((reset_time - current_time) / timedelta(seconds=1))): msg = msg(secs, 'seconds' if secs != 1 else 'second') # Raise error without stacktrace with disable_exception_traceback(): raise GitHubRateLimit(msg, rate_limit)
[docs]def get_latest_commit(repository: str, branch: str='main'): """Get the latest commit of a branch in a GitHub repository.""" response = github_api_request(f'/repos/{repository}/commits/{branch}') if (commit := response.json()): return commit['sha']
################################################################################ # Parameter formatting/retrieval functions # ################################################################################
[docs]def github_suite_id(repository: str, commit: str, workflow_id: int, status: Optional[str]='completed' ) -> Union[int, None]: """Gets the GitHub check suite ID for a given commit. Args: repository: GitHub repository name. commit: Commit hash. Returns: Check suite ID. """ try: suites_endpoint = f'/repos/{repository}/commits/{commit}/check-suites' for suite in github_api_request(suites_endpoint).json()['check_suites']: check_runs_url = suite['check_runs_url'] if status and suite['status'] != status: continue # Enumerate suites for matching workflow ids for run in github_api_request(url=check_runs_url).json()['check_runs']: if f'/runs/{workflow_id}/job' in run['details_url']: return nested_get(run, ['check_suite', 'id']) # No matching suite found return None except: if not github_rate_limit(raise_error=True): raise
[docs]def github_tag_names(repository: str, get_commits=False ) -> Union[List[str], Tuple[List[str], List[str]]]: """Returns a list of all repository tags. Args: repository: GitHub repository name. get_commits: If True, additionally returns a list of commit hashes. Returns: List of repository tags. """ try: tags_endpoint = f"/repos/{repository}/tags" tags_catalog = github_api_request(tags_endpoint).json() tag_names = [tag['name'] for tag in tags_catalog] if get_commits: tag_commits = [nested_get(tag, ['commit', 'sha']) for tag in tags_catalog] return tag_names, tag_commits return tag_names except: if not github_rate_limit(raise_error=True): raise
[docs]def github_release_catalog(url: str) -> dict: """Gets the catalog entry for a given release. Args: url: GitHub release catalog URL. Returns: Release catalog. """ try: base_url, tag = url.replace('', '/repos').split('/tag/') release_catalog = github_api_request(base_url + '?per_page=100').json() release_entry = next(e for e in release_catalog if e['tag_name'] == tag) if not release_entry: raise ValueError(f'No release catalog entry found for {tag}.') return release_entry except: if not github_rate_limit(raise_error=True): raise
################################################################################ # URL formatting/retrieval functions # ################################################################################
[docs]def github_file_url(repository: str, path: str, branch: str='main', tag: Optional[str]=None, commit: Optional[str]=None, raw: bool=False ) -> str: """Formats a GitHub file URL. Args: repository: GitHub repository name. path: Relative path to file. branch: Branch name. tag: Tag name. commit: Commit hash. raw: If True, returns the raw URL. Returns: URL of the file. Example: >>> github_file_url('foo/bar', path='file.json') # -> "" >>> github_file_url('foo/bar', path='file.json', branch='dev') # -> "" >>> github_file_url('foo/bar', path='file.json', tag='v1.0.0') # -> "" >>> github_file_url('foo/bar', path='file.json', commit='c0ffee') # -> "" >>> github_file_url('foo/bar', path='file.json', raw=True) # -> "" """ prefix = '' if not raw else '' stem = 'blob/' if not raw else '' if commit: return f'https://{prefix}/{repository}/{stem}{commit}/{path}' if tag: return f'https://{prefix}/{repository}/{stem}{tag}/{path}' return f'https://{prefix}/{repository}/{stem}{branch}/{path}'
[docs]def github_archive_url(repository: str, branch: str='main', tag: Optional[str]=None, commit: Optional[str]=None ) -> str: """Formats a GitHub archive URL. Args: repository: GitHub repository name. branch: Branch name. tag: Tag name. commit: Commit hash. Returns: URL of the archive. Example: >>> github_archive_url('foo/bar') # -> "" >>> github_archive_url('foo/bar', branch='dev') # -> "" >>> github_archive_url('foo/bar', tag='v1.0.0') # -> "" >>> github_archive_url('foo/bar', commit='c0ffee') # -> "" """ if commit: return f'{repository}/archive/{commit}.tar.gz' if tag: return f'{repository}/archive/refs/tags/{tag}.tar.gz' return f'{repository}/archive/refs/heads/{branch}.tar.gz'
[docs]def github_release_url(repository: str, tag: Optional[str]=None ) -> str: """Formats a GitHub release URL. Args: repository: GitHub repository name. tag: Tag name. Returns: URL of the release. Example: >>> github_release_url('foo/bar') # -> "" >>> github_release_url('foo/bar', tag='v1.0.0') # -> "" """ try: if not tag: tags_catalog = github_api_request(f'/repos/{repository}/tags').json() tag = tags_catalog[0]['name'] except: if not github_rate_limit(raise_error=True): raise return f'{repository}/releases/tag/{tag}'
[docs]def github_artifacts_url(repository: str, branch: Optional[str]=None, workflow: Optional[str]=None, commit: Optional[str]=None, get_commit=False ) -> Union[str, Tuple[str, str], None]: """Formats a GitHub artifacts URL. Args: repository: GitHub repository name. branch: Branch name. tag: Tag name. commit: Commit hash. get_commit: If True, additionally returns the commit hash. Returns: URL of the artifacts archive. """ try: # Get workflow id (if workflow name is provided) workflow_id: int=None if workflow is not None: workflows_endpoint = f'/repos/{repository}/actions/workflows' for w in github_api_request(workflows_endpoint).json(): if workflow == w['name']: workflow_id = w['id']; break # Filter artifact urls artifacts_endpoint = f'/repos/{repository}/actions/artifacts' catalog = github_api_request(artifacts_endpoint).json() for workflow_run in catalog['artifacts']: # Skip expired artifacts if workflow_run['expired']: continue # Extract workflow properties r_id = workflow_run['id'] w_id = nested_get(workflow_run, ['workflow_run', 'id']) head_branch = nested_get(workflow_run, ['workflow_run', 'head_branch']) head_sha = nested_get(workflow_run, ['workflow_run', 'head_sha']) # Filter run by given parameters if branch and branch != head_branch: continue if workflow_id and workflow_id != w_id: continue if commit and commit != head_sha: continue # Return the first matching artifact url if (suite_id := github_suite_id(repository, head_sha, w_id)): url = f'{repository}/suites/{suite_id}/artifacts/{r_id}' if get_commit: return url, head_sha return url except: if not github_rate_limit(raise_error=True): raise return None
__all__ = [ # Functions (10) "github_api_request", "github_rate_limit", "get_latest_commit", "github_suite_id", "github_tag_names", "github_release_catalog", "github_file_url", "github_archive_url", "github_release_url", "github_artifacts_url" ]