Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ All versions prior to 0.9.0 are untracked.

## [Unreleased]

### Added

* `Issuer.identity_token` accepts an optional `redirect_port` argument to bind
the local OAuth redirect server to a fixed port, for OIDC providers that
require a pre-registered redirect URI without `localhost` port wildcards
([#1029](https://github.com/sigstore/sigstore-python/issues/1029))

### Fixed

* Fixed ~60s hang after completing browser-based OIDC authentication.
Expand Down
20 changes: 16 additions & 4 deletions sigstore/_internal/oidc/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,18 @@


class _OAuthFlow:
def __init__(self, client_id: str, client_secret: str, issuer: Issuer):
def __init__(
self,
client_id: str,
client_secret: str,
issuer: Issuer,
redirect_port: int = 0,
):
self._client_id = client_id
self._client_secret = client_secret
self._issuer = issuer
self._server = _OAuthRedirectServer(
self._client_id, self._client_secret, self._issuer
self._client_id, self._client_secret, self._issuer, port=redirect_port
)
self._server_thread = threading.Thread(
target=lambda server: server.serve_forever(),
Expand Down Expand Up @@ -232,8 +238,14 @@ def _auth_params(self, redirect_uri: str) -> dict[str, Any]:


class _OAuthRedirectServer(http.server.HTTPServer):
def __init__(self, client_id: str, client_secret: str, issuer: Issuer) -> None:
super().__init__(("localhost", 0), _OAuthRedirectHandler)
def __init__(
self,
client_id: str,
client_secret: str,
issuer: Issuer,
port: int = 0,
) -> None:
super().__init__(("localhost", port), _OAuthRedirectHandler)
self.oauth_session = _OAuthSession(client_id, client_secret, issuer)
self.auth_response: dict[str, list[str]] | None = None
self._is_out_of_band = False
Expand Down
10 changes: 9 additions & 1 deletion sigstore/oidc.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ def identity_token( # nosec: B107
client_id: str = _DEFAULT_CLIENT_ID,
client_secret: str = "",
force_oob: bool = False,
redirect_port: int = 0,
) -> IdentityToken:
"""
Retrieves and returns an `IdentityToken` from the current `Issuer`, via OAuth.
Expand All @@ -283,6 +284,11 @@ def identity_token( # nosec: B107
The `force_oob` flag controls the kind of flow performed. When `False` (the default),
this function attempts to open the user's web browser before falling back to
an out-of-band flow. When `True`, the out-of-band flow is always used.

The `redirect_port` parameter selects the port for the local OAuth redirect
server. The default of `0` requests an ephemeral port from the OS. Set this
when the OIDC provider requires a pre-registered redirect URI with a fixed
port (some providers do not allow wildcards on `localhost`).
"""

# This function and the components that it relies on are based off of:
Expand All @@ -291,7 +297,9 @@ def identity_token( # nosec: B107
from sigstore._internal.oidc.oauth import _OAuthFlow

code: str
with _OAuthFlow(client_id, client_secret, self) as server:
with _OAuthFlow(
client_id, client_secret, self, redirect_port=redirect_port
) as server:
# Launch web browser
if not force_oob and webbrowser.open(server.base_uri):
print("Waiting for browser interaction...", file=sys.stderr)
Expand Down
39 changes: 38 additions & 1 deletion test/unit/internal/oidc/test_issuer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from unittest.mock import MagicMock, patch
from unittest.mock import ANY, MagicMock, patch

import pytest

Expand All @@ -38,6 +38,43 @@ def test_get_identity_token_bad_code(monkeypatch):
Issuer("https://oauth2.sigstage.dev/auth").identity_token(force_oob=True)


def test_identity_token_passes_redirect_port():
"""
Verify that identity_token() forwards redirect_port to the OAuth flow.
"""
with (
patch("sigstore._internal.oidc.oauth._OAuthFlow") as MockOAuthFlow,
patch("sigstore.oidc.requests.Session") as MockSession,
patch("sigstore.oidc.webbrowser.open"),
patch("sigstore.oidc.IdentityToken"),
):
mock_server = MagicMock()
MockOAuthFlow.return_value.__enter__.return_value = mock_server
mock_server.is_oob.return_value = False
mock_server.base_uri = "http://localhost:8080"
mock_server.redirect_uri = "http://localhost:8080/auth/callback"
mock_server.oauth_session.state = "s"
mock_server.auth_response = {"code": ["c"], "state": ["s"]}

mock_config_response = MagicMock()
mock_config_response.json.return_value = {
"authorization_endpoint": "https://auth.example.com",
"token_endpoint": "https://token.example.com",
}
mock_config_response.raise_for_status.return_value = None
MockSession.return_value.get.side_effect = [mock_config_response]

mock_token_response = MagicMock()
mock_token_response.json.return_value = {"access_token": "t"}
mock_token_response.raise_for_status.return_value = None
MockSession.return_value.post.return_value = mock_token_response

issuer = Issuer("https://issuer.example.com")
issuer.identity_token(redirect_port=8080)

MockOAuthFlow.assert_called_once_with(ANY, ANY, ANY, redirect_port=8080)


def test_identity_token_csrf_protection():
"""
Verify that identity_token() raises IdentityError when the returned state
Expand Down
Loading