Skip to content

Commit ca637f1

Browse files
authored
feat: planner client and pressure reconciler with unit tests (#734)
* feat: planner client and pressure reconciler with unit tests Add the library-side components for planner-driven runner management: - PlannerClient for communicating with the planner service - PressureReconciler for scaling runners based on pressure signals - Configuration fields for planner URL and token - Unit tests for both components * bump application version to 0.13.0 * revert aproxy_address fix (moved to fix/aproxy-requirement branch) * change arrange act format in test doc strings * address copilot review comment * address copilot review comment * address copilot review comment * address review comment * use configured min_pressure * remove dead code * remove early exit on error optimization * _handle_create -> _handle_create_runners * use ints instead of floats for pressure * fix stream pressure client * change pressure to ints in unit test * remove ruff noqa * use arrange/act/assert docstrings * use min_pressure instead of fallback runners * use last pressure for fallback * add MissingServerConfigError * rework unit test * lint
1 parent 4ab4bf4 commit ca637f1

File tree

10 files changed

+687
-3
lines changed

10 files changed

+687
-3
lines changed

github-runner-manager/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
[project]
55
name = "github-runner-manager"
6-
version = "0.12.0"
6+
version = "0.13.0"
77
authors = [
88
{ name = "Canonical IS DevOps", email = "is-devops-team@canonical.com" },
99
]

github-runner-manager/src/github_runner_manager/configuration/base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ class ApplicationConfiguration(BaseModel):
5050
non_reactive_configuration: Configuration for non-reactive mode.
5151
reactive_configuration: Configuration for reactive mode.
5252
openstack_configuration: Configuration for authorization to a OpenStack host.
53+
planner_url: Base URL of the planner service.
54+
planner_token: Bearer token to authenticate against the planner service.
5355
reconcile_interval: Seconds to wait between reconciliation.
5456
"""
5557

@@ -61,6 +63,8 @@ class ApplicationConfiguration(BaseModel):
6163
non_reactive_configuration: "NonReactiveConfiguration"
6264
reactive_configuration: "ReactiveConfiguration | None"
6365
openstack_configuration: OpenStackConfiguration
66+
planner_url: Optional[AnyHttpUrl] = None
67+
planner_token: Optional[str] = None
6468
reconcile_interval: int
6569

6670
@staticmethod
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
# Copyright 2026 Canonical Ltd.
2+
# See LICENSE file for licensing details.
3+
4+
"""Planner-driven pressure reconciler.
5+
6+
Creates or deletes runners based on pressure signals from the planner
7+
service. Runs in two independent loops (create/delete) and coordinates
8+
access to the underlying RunnerManager via the provided lock.
9+
"""
10+
11+
from __future__ import annotations
12+
13+
import logging
14+
import time
15+
from dataclasses import dataclass
16+
from threading import Event, Lock
17+
from typing import Optional
18+
19+
from github_runner_manager.errors import MissingServerConfigError
20+
from github_runner_manager.manager.runner_manager import RunnerManager, RunnerMetadata
21+
from github_runner_manager.planner_client import PlannerApiError, PlannerClient
22+
23+
logger = logging.getLogger(__name__)
24+
25+
26+
@dataclass(frozen=True)
27+
class PressureReconcilerConfig:
28+
"""Configuration for pressure reconciliation.
29+
30+
Attributes:
31+
flavor_name: Name of the planner flavor to reconcile.
32+
reconcile_interval: Seconds between timer-based delete reconciliations.
33+
min_pressure: Minimum desired runner count (floor) for the flavor.
34+
Also used as fallback when the planner is unavailable.
35+
"""
36+
37+
flavor_name: str
38+
reconcile_interval: int = 5 * 60
39+
min_pressure: int = 0
40+
41+
42+
class PressureReconciler: # pylint: disable=too-few-public-methods
43+
"""Continuously reconciles runner count against planner pressure.
44+
45+
This reconciler keeps the total number of runners near the desired level
46+
indicated by the planner's pressure for a given flavor. It operates in two
47+
threads:
48+
- create loop: scales up when desired exceeds current total
49+
- delete loop: scales down when current exceeds desired
50+
51+
Concurrency with any other reconcile loop is protected by a shared lock.
52+
53+
The delete loop uses the last pressure seen by the create loop rather than
54+
fetching a fresh value, so it may act on a stale reading if pressure changed
55+
between stream events. This is an accepted trade-off: the window is bounded
56+
by the stream update frequency, and any over-deletion is self-correcting
57+
because the create loop will scale back up on the next pressure event.
58+
59+
Attributes:
60+
_manager: Runner manager used to list, create, and clean up runners.
61+
_planner: Client used to stream pressure updates.
62+
_config: Reconciler configuration.
63+
_lock: Shared lock to serialize operations with other reconcile loops.
64+
_stop: Event used to signal streaming loops to stop gracefully.
65+
_last_pressure: Last pressure value seen in the create stream.
66+
"""
67+
68+
def __init__(
69+
self,
70+
manager: RunnerManager,
71+
planner_client: PlannerClient,
72+
config: PressureReconcilerConfig,
73+
lock: Lock,
74+
) -> None:
75+
"""Initialize reconciler state and dependencies.
76+
77+
Args:
78+
manager: Runner manager interface for creating, cleaning up,
79+
and listing runners.
80+
planner_client: Client used to stream pressure updates.
81+
config: Reconciler configuration.
82+
lock: Shared lock to serialize operations with other reconcile loops.
83+
"""
84+
self._manager = manager
85+
self._planner = planner_client
86+
self._config = config
87+
self._lock = lock
88+
89+
self._stop = Event()
90+
self._last_pressure: Optional[int] = None
91+
92+
def start_create_loop(self) -> None:
93+
"""Continuously create runners to satisfy planner pressure."""
94+
while not self._stop.is_set():
95+
try:
96+
for update in self._planner.stream_pressure(self._config.flavor_name):
97+
if self._stop.is_set():
98+
return
99+
self._handle_create_runners(update.pressure)
100+
except PlannerApiError:
101+
fallback = max(self._last_pressure or 0, self._config.min_pressure)
102+
logger.exception(
103+
"Error in pressure stream loop, falling back to %s runners.",
104+
fallback,
105+
)
106+
self._handle_create_runners(fallback)
107+
time.sleep(5)
108+
109+
def start_delete_loop(self) -> None:
110+
"""Continuously delete runners using last seen pressure on a timer."""
111+
logger.debug("Delete loop: starting, interval=%ss", self._config.reconcile_interval)
112+
while not self._stop.wait(self._config.reconcile_interval):
113+
logger.debug("Delete loop: woke up, _last_pressure=%s", self._last_pressure)
114+
if self._last_pressure is None:
115+
logger.debug("Delete loop: no pressure seen yet, skipping.")
116+
continue
117+
self._handle_timer_reconcile(self._last_pressure)
118+
119+
def stop(self) -> None:
120+
"""Signal the reconciler loops to stop gracefully."""
121+
self._stop.set()
122+
123+
def _handle_create_runners(self, pressure: int) -> None:
124+
"""Create runners when desired exceeds current total.
125+
126+
Args:
127+
pressure: Current pressure value used to compute desired total.
128+
"""
129+
desired_total = self._desired_total_from_pressure(pressure)
130+
logger.debug(
131+
"Create loop: pressure=%s, desired=%s, updating _last_pressure",
132+
pressure,
133+
desired_total,
134+
)
135+
self._last_pressure = pressure
136+
with self._lock:
137+
current_total = len(self._manager.get_runners())
138+
to_create = max(desired_total - current_total, 0)
139+
if to_create <= 0:
140+
logger.info(
141+
"Create loop: nothing to do (desired=%s current=%s)",
142+
desired_total,
143+
current_total,
144+
)
145+
return
146+
logger.info(
147+
"Create loop: creating %s runners (desired=%s current=%s)",
148+
to_create,
149+
desired_total,
150+
current_total,
151+
)
152+
try:
153+
self._manager.create_runners(num=to_create, metadata=RunnerMetadata())
154+
except MissingServerConfigError:
155+
logger.exception(
156+
"Unable to create runners due to missing server configuration (image/flavor)."
157+
)
158+
159+
def _handle_timer_reconcile(self, pressure: int) -> None:
160+
"""Clean up stale runners, then converge toward the desired count.
161+
162+
Scales down (deletes) when current exceeds desired, and scales up
163+
(creates) when current falls below desired after cleanup.
164+
165+
Args:
166+
pressure: Current pressure value used to compute desired total.
167+
"""
168+
desired_total = self._desired_total_from_pressure(pressure)
169+
with self._lock:
170+
self._manager.cleanup()
171+
current_total = len(self._manager.get_runners())
172+
if current_total > desired_total:
173+
to_delete = current_total - desired_total
174+
logger.info(
175+
"Timer: scaling down %s runners (desired=%s current=%s)",
176+
to_delete,
177+
desired_total,
178+
current_total,
179+
)
180+
self._manager.delete_runners(num=to_delete)
181+
elif current_total < desired_total:
182+
to_create = desired_total - current_total
183+
logger.info(
184+
"Timer: scaling up %s runners (desired=%s current=%s)",
185+
to_create,
186+
desired_total,
187+
current_total,
188+
)
189+
try:
190+
self._manager.create_runners(num=to_create, metadata=RunnerMetadata())
191+
except MissingServerConfigError:
192+
logger.exception(
193+
"Unable to create runners due to missing server configuration"
194+
" (image/flavor)."
195+
)
196+
else:
197+
logger.info(
198+
"Timer: no changes needed (desired=%s current=%s)",
199+
desired_total,
200+
current_total,
201+
)
202+
203+
def _desired_total_from_pressure(self, pressure: int) -> int:
204+
"""Compute desired runner total from planner pressure.
205+
206+
Ensures non-negative totals and respects the configured `min_pressure`
207+
floor.
208+
209+
Args:
210+
pressure: Current pressure value from planner.
211+
212+
Returns:
213+
The desired total number of runners.
214+
"""
215+
return max(pressure, self._config.min_pressure, 0)

github-runner-manager/src/github_runner_manager/manager/runner_manager.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,10 +264,15 @@ def get_runners(self) -> tuple[RunnerInstance, ...]:
264264
)
265265

266266
def delete_runners(self, num: int) -> IssuedMetricEventsStats:
267-
"""Delete runners.
267+
"""Delete up to `num` runners, preferring idle ones over busy.
268+
269+
Runners are selected in order: deletable → idle → busy. Busy runners
270+
are only targeted when there are not enough idle runners to satisfy
271+
`num`, and the GitHub API will reject deletion of runners actively
272+
executing a job, so the actual number deleted may be less than `num`.
268273
269274
Args:
270-
num: The number of runner to delete.
275+
num: The maximum number of runners to delete.
271276
272277
Returns:
273278
Stats on metrics events issued during the deletion of runners.
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
# Copyright 2026 Canonical Ltd.
2+
# See LICENSE file for licensing details.
3+
4+
"""Client to interact with the planner service."""
5+
6+
import json
7+
import logging
8+
from dataclasses import dataclass
9+
from typing import Iterable
10+
from urllib.parse import urljoin
11+
12+
import requests
13+
import requests.adapters
14+
import urllib3
15+
from pydantic import AnyHttpUrl, BaseModel
16+
17+
logger = logging.getLogger(__name__)
18+
19+
20+
@dataclass
21+
class PressureInfo:
22+
"""Pressure information for a flavor returned by the planner service.
23+
24+
Attributes:
25+
pressure: Desired total runner count for the flavor.
26+
"""
27+
28+
pressure: int
29+
30+
31+
class PlannerConfiguration(BaseModel):
32+
"""Configuration inputs for the PlannerClient.
33+
34+
Attributes:
35+
base_url: Base URL of the planner service.
36+
token: Bearer token used to authenticate against the planner service.
37+
timeout: Default timeout in seconds for HTTP requests.
38+
"""
39+
40+
base_url: AnyHttpUrl
41+
token: str
42+
timeout: int = 5 * 60
43+
44+
45+
class PlannerApiError(Exception):
46+
"""Represents an error while interacting with the planner service."""
47+
48+
49+
class PlannerClient: # pylint: disable=too-few-public-methods
50+
"""An HTTP client for the planner service."""
51+
52+
def __init__(self, config: PlannerConfiguration) -> None:
53+
"""Initialize client with planner configuration.
54+
55+
Args:
56+
config: Planner service configuration containing base URL,
57+
authentication token, and default request timeout.
58+
"""
59+
self._session = self._create_session()
60+
self._config = config
61+
62+
def stream_pressure(self, name: str) -> Iterable[PressureInfo]:
63+
"""Stream pressure updates for the given flavor.
64+
65+
Args:
66+
name: Flavor name.
67+
68+
Yields:
69+
Parsed pressure updates.
70+
71+
Raises:
72+
PlannerApiError: On HTTP or stream errors.
73+
"""
74+
url = urljoin(str(self._config.base_url), f"/api/v1/flavors/{name}/pressure?stream=true")
75+
try:
76+
with self._session.get(
77+
url,
78+
headers={"Authorization": f"Bearer {self._config.token}"},
79+
timeout=self._config.timeout,
80+
stream=True,
81+
) as response:
82+
response.raise_for_status()
83+
for line in response.iter_lines(decode_unicode=True):
84+
if not line:
85+
continue
86+
try:
87+
data = json.loads(line)
88+
if not isinstance(data, dict) or name not in data:
89+
logger.debug("Skipping non-pressure stream line: %s", line)
90+
continue
91+
yield PressureInfo(pressure=int(data[name]))
92+
except json.JSONDecodeError:
93+
logger.warning("Skipping malformed stream line: %s", line)
94+
continue
95+
except requests.RequestException as exc:
96+
logger.exception("Error while streaming pressure for flavor '%s' from planner.", name)
97+
raise PlannerApiError from exc
98+
99+
@staticmethod
100+
def _create_session() -> requests.Session:
101+
"""Create a requests session with retries and no env proxies.
102+
103+
Returns:
104+
A configured `requests.Session` instance.
105+
"""
106+
adapter = requests.adapters.HTTPAdapter(
107+
max_retries=urllib3.Retry(
108+
total=3,
109+
backoff_factor=0.3,
110+
status_forcelist=[500, 502, 503, 504],
111+
)
112+
)
113+
114+
session = requests.Session()
115+
session.mount("http://", adapter)
116+
session.mount("https://", adapter)
117+
return session
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Copyright 2026 Canonical Ltd.
2+
# See LICENSE file for licensing details.

0 commit comments

Comments
 (0)