Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 89 additions & 38 deletions capycli/common/github_support.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -------------------------------------------------------------------------------
# Copyright (c) 2025 Siemens
# Copyright (c) 2026 Siemens
# All Rights Reserved.
# Author: thomas.graf@siemens.com
# Author: thomas.graf@siemens.com, marvin.fette@siemens.com
#
# SPDX-License-Identifier: MIT
# -------------------------------------------------------------------------------
Expand All @@ -24,6 +24,10 @@

class GitHubSupport:
"""Support methods for accessing GitHub"""
default_wait_time = 60 # seconds
default_gh_api_timeout = 15 # seconds
last_request_error = False

def __init__(self) -> None:
self.github_project_name_regex = re.compile(r"^[a-zA-Z0-9-]+(/[a-zA-Z0-9-]+)*$")

Expand All @@ -33,57 +37,104 @@ def github_request(url: str, username: str = "", token: str = "",
allow_redirects: bool = True, # default in requests
) -> Any:
try:
headers = {}
if token:
headers["Authorization"] = "token " + token
if username:
headers["Username"] = username
response = requests.get(url, headers=headers,
allow_redirects=allow_redirects)
if response.status_code == 429 \
or 'rate limit exceeded' in response.reason \
or 'API rate limit exceeded' in response.json().get('message', ''):
response = requests.get(url, headers=GitHubSupport._gh_request_headers(token, username),
allow_redirects=allow_redirects,
timeout=GitHubSupport.default_gh_api_timeout)

# Check for rate limit errors (403 Forbidden or 429 Too Many Requests)
if GitHubSupport._blocked_by_ratelimit(response):
wait_time = GitHubSupport._calculate_ratelimit_wait_time(response)
print(
Fore.LIGHTYELLOW_EX +
" Github API rate limit exceeded - wait 60s and retry ... " +
f" Github API rate limit exceeded - wait {wait_time}s and retry ... " +
Style.RESET_ALL)
time.sleep(60)
time.sleep(wait_time)
return GitHubSupport.github_request(url, username, token, return_response=return_response)
if response.json().get('message', '').startswith("Bad credentials"):

# Check for credential issues
if GitHubSupport._credential_issue(response):
print_red("Invalid GitHub credential provided - aborting!")
sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SERVICE)

except AttributeError as err:
# response.json() did not return a dictionary
if hasattr(err, 'name'):
name = err.name
else: # Python prior to 3.10
name = err.args[0].split("'")[3]
if not name == 'get':
raise
# Check for not found (404)
if response.status_code == 404:
print(
Fore.LIGHTYELLOW_EX +
f" Resource not found at {url} - could be intended or not ... " +
Style.RESET_ALL)
return response if return_response else {}

except requests.exceptions.JSONDecodeError:
response._content = b'{}'
# Raise an exception for other HTTP errors
response.raise_for_status()

except requests.exceptions.RequestException as ex:
if GitHubSupport.last_request_error:
print_red(
f" Persistent issues accessing {url} " + repr(ex) +
"\n Aborting after retried once!")
sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SERVICE)

except requests.exceptions.ConnectionError as ex:
print(
Fore.LIGHTYELLOW_EX +
f" Connection issues accessing {url} " + repr(ex) +
"\n Retrying in 60 seconds!" +
f"\n Retrying using the same url." +
Style.RESET_ALL)
time.sleep(60)
return GitHubSupport.github_request(url, username, token, return_response=return_response)
return GitHubSupport.github_request(url, username, token, return_response=return_response,
allow_redirects=allow_redirects)

except Exception as ex:
print(
Fore.LIGHTYELLOW_EX +
" Error accessing GitHub: " + repr(ex) +
Style.RESET_ALL)
response = requests.Response()
response._content = \
b'{' + f'"exception": "{repr(ex)}"'.encode() + b'}'
# Reset the error flag on success or after handling exceptions
GitHubSupport.last_request_error = False
return response if return_response else response.json()

@staticmethod
def _gh_request_headers(token: str = "", username: str = "") -> dict:
"""Helper method to construct headers for GitHub API requests."""
headers = {"Accept": "application/vnd.github+json"}
if token:
headers["Authorization"] = "token " + token
if username:
headers["Username"] = username
return headers

@staticmethod
def _blocked_by_ratelimit(response: requests.Response) -> bool:
"""Check if the GitHub API response indicates that the rate limit has been exceeded."""
if not response.status_code == 403 and not response.status_code == 429:
return False
if 'x-ratelimit-remaining' in response.headers:
remaining = int(response.headers['x-ratelimit-remaining'])
return remaining == 0
return False

@staticmethod
def _calculate_ratelimit_wait_time(response: requests.Response) -> int:
"""Calculate the wait time until the GitHub API rate limit resets."""
if 'x-ratelimit-reset' in response.headers:
reset_time = int(response.headers['x-ratelimit-reset'])
current_time = int(time.time())
wait_time = reset_time - current_time
return max(wait_time, 0)
if 'retry-after' in response.headers:
return int(response.headers['retry-after'])
return GitHubSupport.default_wait_time

@staticmethod
def _credential_issue(response: requests.Response) -> bool:
"""Check if the response indicates a credential issue."""
if not response.ok and GitHubSupport._response_is_json(response):
message = response.json().get('message', '')
return "bad credentials" in message.lower()
return False

@staticmethod
def _response_is_json(response: requests.Response) -> bool:
"""Check if the response content is valid JSON."""
try:
response.json()
return True
except Exception:
return False

@staticmethod
def get_repositories(name: str, language: str, username: str = "", token: str = "") -> Any:
"""Query for GitHub repositories"""
Expand Down Expand Up @@ -119,4 +170,4 @@ def get_repo_name(github_url: str) -> str:
def get_repository_info(repository: str, username: str = "", token: str = "") -> Any:
"""Query for a single GitHub repository"""
url = "https://api.github.com/repos/" + repository
return GitHubSupport.github_request(url, username, token)
return GitHubSupport.github_request(url, username, token)
101 changes: 99 additions & 2 deletions tests/test_github_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,112 @@
# SPDX-License-Identifier: MIT
# -------------------------------------------------------------------------------

from unittest.mock import MagicMock

from capycli.common.github_support import GitHubSupport
from tests.test_base import TestBase


class GitHubSupportHtml(TestBase):
class GitHubSupportTest(TestBase):
"""Test class for GitHubSupport methods"""
INPUTFILE1 = "sbom.siemens.capycli.json"
OUTPUTFILE = "test.html"

def test_init(self) -> None:
"""Test GitHubSupport initialization"""
gh_support = GitHubSupport()
self.assertIsNotNone(gh_support.github_project_name_regex)
# Test regex pattern matching
self.assertIsNotNone(gh_support.github_project_name_regex.match("owner/repo"))
self.assertIsNotNone(gh_support.github_project_name_regex.match("sw360/capycli"))
self.assertIsNone(gh_support.github_project_name_regex.match("invalid url with spaces"))

def test_gh_request_headers_no_credentials(self) -> None:
"""Test header construction without credentials"""
headers = GitHubSupport._gh_request_headers()
self.assertEqual(headers, {"Accept": "application/vnd.github+json"})

def test_gh_request_headers_with_token(self) -> None:
"""Test header construction with token"""
headers = GitHubSupport._gh_request_headers(token="test_token")
self.assertEqual(headers["Accept"], "application/vnd.github+json")
self.assertEqual(headers["Authorization"], "token test_token")
self.assertNotIn("Username", headers)

def test_gh_request_headers_with_username_and_token(self) -> None:
"""Test header construction with username and token"""
headers = GitHubSupport._gh_request_headers(token="test_token", username="test_user")
self.assertEqual(headers["Accept"], "application/vnd.github+json")
self.assertEqual(headers["Authorization"], "token test_token")
self.assertEqual(headers["Username"], "test_user")

def test_blocked_by_ratelimit_not_blocked(self) -> None:
"""Test rate limit check when not blocked"""
response = MagicMock()
response.status_code = 200
response.headers = {}
self.assertFalse(GitHubSupport._blocked_by_ratelimit(response))

def test_blocked_by_ratelimit_403_with_remaining(self) -> None:
"""Test rate limit check when blocked with 403"""
response = MagicMock()
response.status_code = 403
response.headers = {'x-ratelimit-remaining': '0'}
self.assertTrue(GitHubSupport._blocked_by_ratelimit(response))

def test_blocked_by_ratelimit_429_with_remaining(self) -> None:
"""Test rate limit check when blocked with 429"""
response = MagicMock()
response.status_code = 429
response.headers = {'x-ratelimit-remaining': '0'}
self.assertTrue(GitHubSupport._blocked_by_ratelimit(response))

def test_blocked_by_ratelimit_403_with_remaining_nonzero(self) -> None:
"""Test rate limit check when 403 but remaining > 0"""
response = MagicMock()
response.status_code = 403
response.headers = {'x-ratelimit-remaining': '10'}
self.assertFalse(GitHubSupport._blocked_by_ratelimit(response))

def test_calculate_ratelimit_wait_time_with_retry_after(self) -> None:
"""Test wait time calculation with retry-after header"""
response = MagicMock()
response.headers = {'retry-after': '30'}
wait_time = GitHubSupport._calculate_ratelimit_wait_time(response)
self.assertEqual(wait_time, 30)

def test_calculate_ratelimit_wait_time_default(self) -> None:
"""Test wait time calculation with no headers"""
response = MagicMock()
response.headers = {}
wait_time = GitHubSupport._calculate_ratelimit_wait_time(response)
self.assertEqual(wait_time, GitHubSupport.default_wait_time)

def test_response_is_json_valid(self) -> None:
"""Test JSON response validation with valid JSON"""
response = MagicMock()
response.json.return_value = {"key": "value"}
self.assertTrue(GitHubSupport._response_is_json(response))

def test_response_is_json_invalid(self) -> None:
"""Test JSON response validation with invalid JSON"""
response = MagicMock()
response.json.side_effect = ValueError("No JSON")
self.assertFalse(GitHubSupport._response_is_json(response))

def test_credential_issue_bad_credentials(self) -> None:
"""Test credential issue detection with bad credentials"""
response = MagicMock()
response.ok = False
response.json.return_value = {"message": "Bad credentials"}
self.assertTrue(GitHubSupport._credential_issue(response))

def test_credential_issue_no_issue(self) -> None:
"""Test credential issue detection with valid response"""
response = MagicMock()
response.ok = True
self.assertFalse(GitHubSupport._credential_issue(response))

def test_get_repositories(self) -> None:
actual = GitHubSupport.get_repositories("capycli", "python")
self.assertIsNotNone(actual, "GitHub request failed")
Expand Down Expand Up @@ -79,5 +176,5 @@ def test_get_repository_info(self) -> None:


if __name__ == "__main__":
APP = GitHubSupportHtml()
APP = GitHubSupportTest()
APP.test_get_repository_info()
Loading