From fa30aa7f24353268d9f9673353eaac3a813f4229 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 10:07:34 -0500 Subject: [PATCH 01/15] Fix async UDF event loop starvation under heavy load in Jupyter Async UDFs were running directly in uvicorn's event loop via asyncio.create_task, competing with connection handling under heavy concurrent load. This caused unresponsiveness when running from Jupyter notebooks where the event loop is shared. The fix introduces a dedicated event loop in a background thread for async UDF execution. Coroutines are submitted via run_coroutine_threadsafe() and awaited from the server loop, isolating UDF work from HTTP I/O while preserving cooperative async scheduling between UDFs. Co-Authored-By: Claude Opus 4.6 --- singlestoredb/functions/ext/asgi.py | 38 +++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index d9090b38f..468c3de91 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -1000,6 +1000,15 @@ def __init__( self.log_level = log_level self.disable_metrics = disable_metrics + # Dedicated event loop for async UDF execution, isolated from the server loop + self._udf_loop = asyncio.new_event_loop() + self._udf_thread = threading.Thread( + target=self._udf_loop.run_forever, + daemon=True, + name='async-udf-loop', + ) + self._udf_thread.start() + # Configure logging self._configure_logging() @@ -1033,6 +1042,11 @@ def _configure_logging(self) -> None: # Prevent propagation to avoid duplicate or differently formatted messages self.logger.propagate = False + def shutdown(self) -> None: + """Shut down the dedicated UDF event loop.""" + self._udf_loop.call_soon_threadsafe(self._udf_loop.stop) + self._udf_thread.join(timeout=5) + def get_uvicorn_log_config(self) -> Dict[str, Any]: """ Create uvicorn log config that matches the Application's logging format. @@ -1189,15 +1203,23 @@ async def __call__( func_info['colspec'], b''.join(data), ) - func_task = asyncio.create_task( - func(cancel_event, call_timer, *inputs) - if func_info['is_async'] - else to_thread( - lambda: asyncio.run( - func(cancel_event, call_timer, *inputs), + func_task: 'asyncio.Task[Any]' + if func_info['is_async']: + future = asyncio.run_coroutine_threadsafe( + func(cancel_event, call_timer, *inputs), + self._udf_loop, + ) + func_task = asyncio.create_task( + asyncio.wrap_future(future), # type: ignore[arg-type] + ) + else: + func_task = asyncio.create_task( + to_thread( + lambda: asyncio.run( + func(cancel_event, call_timer, *inputs), + ), ), - ), - ) + ) disconnect_task = asyncio.create_task( asyncio.sleep(int(1e9)) if ignore_cancel else cancel_on_disconnect(receive), From c0d280a985f03b0f396bcf7e225a90eac4f0773b Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 10:11:01 -0500 Subject: [PATCH 02/15] Ensure proper cancellation of async UDFs in dedicated loop Cancel the concurrent.futures.Future in the UDF loop on disconnect/timeout so the coroutine is interrupted promptly, not just at the next cancel_on_event row check. Co-Authored-By: Claude Opus 4.6 --- singlestoredb/functions/ext/asgi.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index 468c3de91..5be68f6c4 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -24,6 +24,7 @@ """ import argparse import asyncio +import concurrent.futures import contextvars import dataclasses import datetime @@ -1204,13 +1205,14 @@ async def __call__( ) func_task: 'asyncio.Task[Any]' + udf_future: 'Optional[concurrent.futures.Future[Any]]' = None if func_info['is_async']: - future = asyncio.run_coroutine_threadsafe( + udf_future = asyncio.run_coroutine_threadsafe( func(cancel_event, call_timer, *inputs), self._udf_loop, ) func_task = asyncio.create_task( - asyncio.wrap_future(future), # type: ignore[arg-type] + asyncio.wrap_future(udf_future), # type: ignore[arg-type] ) else: func_task = asyncio.create_task( @@ -1240,12 +1242,16 @@ async def __call__( for task in done: if task is disconnect_task: cancel_event.set() + if udf_future is not None: + udf_future.cancel() raise asyncio.CancelledError( 'Function call was cancelled by client disconnect', ) elif task is timeout_task: cancel_event.set() + if udf_future is not None: + udf_future.cancel() raise asyncio.TimeoutError( 'Function call was cancelled due to timeout', ) From 6f617d749cc1349ddb65607ccc2dc7b441307cb6 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 10:19:46 -0500 Subject: [PATCH 03/15] Fix create_task expecting coroutine, use ensure_future for wrapped future asyncio.create_task() requires a coroutine but asyncio.wrap_future() returns a Future. Use asyncio.ensure_future() which accepts both. Co-Authored-By: Claude Opus 4.6 --- singlestoredb/functions/ext/asgi.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index 5be68f6c4..40520e9bc 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -1211,8 +1211,8 @@ async def __call__( func(cancel_event, call_timer, *inputs), self._udf_loop, ) - func_task = asyncio.create_task( - asyncio.wrap_future(udf_future), # type: ignore[arg-type] + func_task = asyncio.ensure_future( + asyncio.wrap_future(udf_future), ) else: func_task = asyncio.create_task( From efee0a2e4fe424d28c05dd219b197152c7faf919 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 10:29:05 -0500 Subject: [PATCH 04/15] Fix pandas dtype assertions for newer pandas StringDtype Newer pandas versions use StringDtype ('str') instead of 'object' for string columns. Detect the actual dtype at import time and use it in test assertions. Binary columns remain 'object'. Co-Authored-By: Claude Opus 4.6 --- singlestoredb/tests/test_connection.py | 34 ++++++++++++++------------ 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/singlestoredb/tests/test_connection.py b/singlestoredb/tests/test_connection.py index ee392d06a..2ae5cf1d2 100755 --- a/singlestoredb/tests/test_connection.py +++ b/singlestoredb/tests/test_connection.py @@ -22,8 +22,10 @@ try: import pandas as pd has_pandas = True + _pd_str_dtype = str(pd.DataFrame({'a': ['x']}).dtypes['a']) except ImportError: has_pandas = False + _pd_str_dtype = 'object' class TestConnection(unittest.TestCase): @@ -1124,21 +1126,21 @@ def test_alltypes_pandas(self): ('timestamp', 'datetime64[us]'), ('timestamp_6', 'datetime64[us]'), ('year', 'float64'), - ('char_100', 'object'), + ('char_100', _pd_str_dtype), ('binary_100', 'object'), - ('varchar_200', 'object'), + ('varchar_200', _pd_str_dtype), ('varbinary_200', 'object'), - ('longtext', 'object'), - ('mediumtext', 'object'), - ('text', 'object'), - ('tinytext', 'object'), + ('longtext', _pd_str_dtype), + ('mediumtext', _pd_str_dtype), + ('text', _pd_str_dtype), + ('tinytext', _pd_str_dtype), ('longblob', 'object'), ('mediumblob', 'object'), ('blob', 'object'), ('tinyblob', 'object'), ('json', 'object'), - ('enum', 'object'), - ('set', 'object'), + ('enum', _pd_str_dtype), + ('set', _pd_str_dtype), ('bit', 'object'), ] @@ -1266,21 +1268,21 @@ def test_alltypes_no_nulls_pandas(self): ('timestamp', 'datetime64[us]'), ('timestamp_6', 'datetime64[us]'), ('year', 'int16'), - ('char_100', 'object'), + ('char_100', _pd_str_dtype), ('binary_100', 'object'), - ('varchar_200', 'object'), + ('varchar_200', _pd_str_dtype), ('varbinary_200', 'object'), - ('longtext', 'object'), - ('mediumtext', 'object'), - ('text', 'object'), - ('tinytext', 'object'), + ('longtext', _pd_str_dtype), + ('mediumtext', _pd_str_dtype), + ('text', _pd_str_dtype), + ('tinytext', _pd_str_dtype), ('longblob', 'object'), ('mediumblob', 'object'), ('blob', 'object'), ('tinyblob', 'object'), ('json', 'object'), - ('enum', 'object'), - ('set', 'object'), + ('enum', _pd_str_dtype), + ('set', _pd_str_dtype), ('bit', 'object'), ] From e4cc4c4f61819b3325618c12c7804fac6f3a9450 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 10:43:14 -0500 Subject: [PATCH 05/15] Call Application.shutdown() when replacing UDF server Prevents UDF event loop thread leaks when run_udf_app() is called repeatedly in Jupyter notebooks. Co-Authored-By: Claude Opus 4.6 --- singlestoredb/apps/_python_udfs.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/singlestoredb/apps/_python_udfs.py b/singlestoredb/apps/_python_udfs.py index e45718dec..7835307cb 100644 --- a/singlestoredb/apps/_python_udfs.py +++ b/singlestoredb/apps/_python_udfs.py @@ -10,8 +10,9 @@ if typing.TYPE_CHECKING: from ._uvicorn_util import AwaitableUvicornServer -# Keep track of currently running server +# Keep track of currently running server and app _running_server: 'typing.Optional[AwaitableUvicornServer]' = None +_running_app: typing.Optional[Application] = None # Maximum number of UDFs allowed MAX_UDFS_LIMIT = 10 @@ -21,7 +22,7 @@ async def run_udf_app( log_level: str = 'error', kill_existing_app_server: bool = True, ) -> UdfConnectionInfo: - global _running_server + global _running_server, _running_app from ._uvicorn_util import AwaitableUvicornServer try: @@ -38,6 +39,9 @@ async def run_udf_app( if _running_server is not None: await _running_server.shutdown() _running_server = None + if _running_app is not None: + _running_app.shutdown() + _running_app = None # Kill if any other process is occupying the port kill_process_by_port(app_config.listen_port) @@ -72,6 +76,7 @@ async def run_udf_app( if app_config.running_interactively: app.register_functions(replace=True) + _running_app = app _running_server = AwaitableUvicornServer(config) asyncio.create_task(_running_server.serve()) await _running_server.wait_for_startup() From a2087db3e240f8a03f29bc9649c433be5957b70d Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 11:01:30 -0500 Subject: [PATCH 06/15] Add pre-release tag builds with GitHub Release assets Tags matching v*-rc*, v*-test*, v*-alpha*, v*-beta* now trigger the full wheel build pipeline and create a pre-release GitHub Release with all wheels attached. Production releases also attach wheels to the existing release before publishing to PyPI. Co-Authored-By: Claude Opus 4.6 --- .github/workflows/publish.yml | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index fe684d0c7..a77917107 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -7,6 +7,12 @@ name: Publish packages on: + push: + tags: + - 'v*-rc*' + - 'v*-test*' + - 'v*-alpha*' + - 'v*-beta*' release: types: [published] workflow_dispatch: @@ -157,9 +163,9 @@ jobs: runs-on: ubuntu-latest permissions: - id-token: write # Required for OIDC trusted publishing - actions: read # Required for actions/download-artifact - contents: read # Required for repository access + id-token: write # Required for OIDC trusted publishing + actions: read # Required for actions/download-artifact + contents: write # Required for gh release create/upload environment: name: publish @@ -184,8 +190,26 @@ jobs: name: artifacts-macOS path: dist + - name: Create GitHub Release (test tag) + if: github.event_name == 'push' + env: + GH_TOKEN: ${{ github.token }} + run: | + gh release create "${{ github.ref_name }}" \ + --prerelease \ + --generate-notes \ + --title "${{ github.ref_name }}" \ + dist/* + + - name: Upload assets to existing Release + if: github.event_name == 'release' + env: + GH_TOKEN: ${{ github.token }} + run: | + gh release upload "${{ github.event.release.tag_name }}" dist/* --clobber + - name: Publish to PyPI - if: ${{ github.event_name == 'release' || github.event.inputs.publish_pypi == 'true' }} + if: ${{ github.event_name == 'release' || (github.event_name == 'workflow_dispatch' && github.event.inputs.publish_pypi == 'true') }} uses: pypa/gh-action-pypi-publish@release/v1 # - name: Publish Conda package From 0d8ef25c05e64b261294b4551c96ca046689d6a1 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 11:23:28 -0500 Subject: [PATCH 07/15] Propagate cancellation to UDF loop and prevent thread leak on failure - Cancel udf_future when func_task is in pending set after asyncio.wait - Cancel udf_future in finally block to ensure cleanup on any exit path - Wrap post-construction code in try/except to call app.shutdown() if validation, config, or registration fails after Application is created Co-Authored-By: Claude Opus 4.6 --- singlestoredb/apps/_python_udfs.py | 44 ++++++++++++++++------------- singlestoredb/functions/ext/asgi.py | 6 ++++ 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/singlestoredb/apps/_python_udfs.py b/singlestoredb/apps/_python_udfs.py index 7835307cb..1039565c4 100644 --- a/singlestoredb/apps/_python_udfs.py +++ b/singlestoredb/apps/_python_udfs.py @@ -58,28 +58,32 @@ async def run_udf_app( log_level=log_level, ) - if not app.endpoints: - raise ValueError('You must define at least one function.') - if len(app.endpoints) > MAX_UDFS_LIMIT: - raise ValueError( - f'You can only define a maximum of {MAX_UDFS_LIMIT} functions.', - ) - - config = uvicorn.Config( - app, - host='0.0.0.0', - port=app_config.listen_port, - log_config=app.get_uvicorn_log_config(), - ) + try: + if not app.endpoints: + raise ValueError('You must define at least one function.') + if len(app.endpoints) > MAX_UDFS_LIMIT: + raise ValueError( + f'You can only define a maximum of {MAX_UDFS_LIMIT} functions.', + ) - # Register the functions only if the app is running interactively. - if app_config.running_interactively: - app.register_functions(replace=True) + config = uvicorn.Config( + app, + host='0.0.0.0', + port=app_config.listen_port, + log_config=app.get_uvicorn_log_config(), + ) - _running_app = app - _running_server = AwaitableUvicornServer(config) - asyncio.create_task(_running_server.serve()) - await _running_server.wait_for_startup() + # Register the functions only if the app is running interactively. + if app_config.running_interactively: + app.register_functions(replace=True) + + _running_app = app + _running_server = AwaitableUvicornServer(config) + asyncio.create_task(_running_server.serve()) + await _running_server.wait_for_startup() + except Exception: + app.shutdown() + raise print(f'Python UDF registered at {base_url}') diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index 40520e9bc..8f4535d4d 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -1238,6 +1238,9 @@ async def __call__( ) await cancel_all_tasks(pending) + if func_task in pending and udf_future is not None: + cancel_event.set() + udf_future.cancel() for task in done: if task is disconnect_task: @@ -1314,6 +1317,9 @@ async def __call__( await send(self.error_response_dict) finally: + if udf_future is not None: + cancel_event.set() + udf_future.cancel() await cancel_all_tasks(all_tasks) # Handle api reflection From 72b2e3e1ac1efd98350f4ec7f3c81adb404c2a0e Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 11:26:50 -0500 Subject: [PATCH 08/15] Remove checkout and --generate-notes from publish job gh release create doesn't need a git repo when not generating notes. Use --notes "" for empty release body with just assets attached. Co-Authored-By: Claude Opus 4.6 --- .github/workflows/publish.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index a77917107..c2165da7e 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -197,8 +197,8 @@ jobs: run: | gh release create "${{ github.ref_name }}" \ --prerelease \ - --generate-notes \ --title "${{ github.ref_name }}" \ + --notes "" \ dist/* - name: Upload assets to existing Release From 35e92d5329fc328379acdc8efd0a3d8bd8659ac0 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 12:00:30 -0500 Subject: [PATCH 09/15] Trigger fusion-docs only on release, not pre-release tags Changed from push tag trigger (v*.*.*) to release event so it only runs on published releases, not rc/test/alpha/beta tags. Co-Authored-By: Claude Opus 4.6 --- .github/workflows/fusion-docs.yml | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/.github/workflows/fusion-docs.yml b/.github/workflows/fusion-docs.yml index c82840512..75a74ffa6 100644 --- a/.github/workflows/fusion-docs.yml +++ b/.github/workflows/fusion-docs.yml @@ -1,7 +1,6 @@ on: - push: - tags: - - 'v*.*.*' + release: + types: [published] name: Generate Fusion docs @@ -36,6 +35,6 @@ jobs: - name: Upload release asset run: | - gh release upload ${{ github.ref_name }} fusion-docs.zip + gh release upload ${{ github.event.release.tag_name }} fusion-docs.zip env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} From b84d9ead3d191e9b7869a0d98ebadd923d77a001 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 12:03:30 -0500 Subject: [PATCH 10/15] Fix udf_future NameError and lazily initialize UDF event loop - Move udf_future initialization before input_handler['load']() to prevent NameError in finally block if parsing raises - Lazily create UDF event loop on first async UDF invocation instead of unconditionally in __init__, avoiding wasted resources for sync-only or metadata-only usage - Guard shutdown() against None loop/thread Co-Authored-By: Claude Opus 4.6 --- singlestoredb/functions/ext/asgi.py | 32 ++++++++++++++++++----------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index 8f4535d4d..431f814f7 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -1001,14 +1001,8 @@ def __init__( self.log_level = log_level self.disable_metrics = disable_metrics - # Dedicated event loop for async UDF execution, isolated from the server loop - self._udf_loop = asyncio.new_event_loop() - self._udf_thread = threading.Thread( - target=self._udf_loop.run_forever, - daemon=True, - name='async-udf-loop', - ) - self._udf_thread.start() + self._udf_loop: Optional[asyncio.AbstractEventLoop] = None + self._udf_thread: Optional[threading.Thread] = None # Configure logging self._configure_logging() @@ -1043,10 +1037,24 @@ def _configure_logging(self) -> None: # Prevent propagation to avoid duplicate or differently formatted messages self.logger.propagate = False + def _get_udf_loop(self) -> asyncio.AbstractEventLoop: + """Get or create the dedicated UDF event loop.""" + if self._udf_loop is None: + self._udf_loop = asyncio.new_event_loop() + self._udf_thread = threading.Thread( + target=self._udf_loop.run_forever, + daemon=True, + name='async-udf-loop', + ) + self._udf_thread.start() + return self._udf_loop + def shutdown(self) -> None: """Shut down the dedicated UDF event loop.""" - self._udf_loop.call_soon_threadsafe(self._udf_loop.stop) - self._udf_thread.join(timeout=5) + if self._udf_loop is not None: + self._udf_loop.call_soon_threadsafe(self._udf_loop.stop) + if self._udf_thread is not None: + self._udf_thread.join(timeout=5) def get_uvicorn_log_config(self) -> Dict[str, Any]: """ @@ -1196,6 +1204,7 @@ async def __call__( try: all_tasks = [] result = [] + udf_future: 'Optional[concurrent.futures.Future[Any]]' = None cancel_event = threading.Event() @@ -1205,11 +1214,10 @@ async def __call__( ) func_task: 'asyncio.Task[Any]' - udf_future: 'Optional[concurrent.futures.Future[Any]]' = None if func_info['is_async']: udf_future = asyncio.run_coroutine_threadsafe( func(cancel_event, call_timer, *inputs), - self._udf_loop, + self._get_udf_loop(), ) func_task = asyncio.ensure_future( asyncio.wrap_future(udf_future), From 48f148a8eb0b7ad48913a7cbe75904fced2190c4 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 14:08:01 -0500 Subject: [PATCH 11/15] Add checkout to publish job for gh release create gh release create requires a git repo to determine the repository context even without --generate-notes. Co-Authored-By: Claude Opus 4.6 --- .github/workflows/publish.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index c2165da7e..1a0e6d0a6 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -172,6 +172,8 @@ jobs: url: https://pypi.org/p/singlestoredb steps: + - uses: actions/checkout@v3 + - name: Download Linux wheels and sdist uses: actions/download-artifact@v4 with: From 35a66587eb1ef73ac694747cec99366909a2c95d Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 15:18:49 -0500 Subject: [PATCH 12/15] Reset UDF loop state in shutdown() to allow safe reuse After stopping the event loop and joining the thread, set both _udf_loop and _udf_thread back to None so that _get_udf_loop() can safely recreate them if called after shutdown. Co-Authored-By: Claude Opus 4.6 --- singlestoredb/functions/ext/asgi.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index 431f814f7..ea4db0bd2 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -1055,6 +1055,8 @@ def shutdown(self) -> None: self._udf_loop.call_soon_threadsafe(self._udf_loop.stop) if self._udf_thread is not None: self._udf_thread.join(timeout=5) + self._udf_loop = None + self._udf_thread = None def get_uvicorn_log_config(self) -> Dict[str, Any]: """ From f605217d9c83740d09367f36a9321c56e9765511 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Mon, 18 May 2026 09:50:57 -0500 Subject: [PATCH 13/15] Use thread-per-request for async UDFs instead of shared event loop The dedicated shared event loop still caused starvation under concurrent async UDF calls. Switch to the same model used by sync UDFs: each request gets its own thread with asyncio.run(), eliminating loop contention. Co-Authored-By: Claude Opus 4.6 --- singlestoredb/apps/_python_udfs.py | 51 ++++++++++--------------- singlestoredb/functions/ext/asgi.py | 59 ++++------------------------- 2 files changed, 29 insertions(+), 81 deletions(-) diff --git a/singlestoredb/apps/_python_udfs.py b/singlestoredb/apps/_python_udfs.py index 1039565c4..e45718dec 100644 --- a/singlestoredb/apps/_python_udfs.py +++ b/singlestoredb/apps/_python_udfs.py @@ -10,9 +10,8 @@ if typing.TYPE_CHECKING: from ._uvicorn_util import AwaitableUvicornServer -# Keep track of currently running server and app +# Keep track of currently running server _running_server: 'typing.Optional[AwaitableUvicornServer]' = None -_running_app: typing.Optional[Application] = None # Maximum number of UDFs allowed MAX_UDFS_LIMIT = 10 @@ -22,7 +21,7 @@ async def run_udf_app( log_level: str = 'error', kill_existing_app_server: bool = True, ) -> UdfConnectionInfo: - global _running_server, _running_app + global _running_server from ._uvicorn_util import AwaitableUvicornServer try: @@ -39,9 +38,6 @@ async def run_udf_app( if _running_server is not None: await _running_server.shutdown() _running_server = None - if _running_app is not None: - _running_app.shutdown() - _running_app = None # Kill if any other process is occupying the port kill_process_by_port(app_config.listen_port) @@ -58,32 +54,27 @@ async def run_udf_app( log_level=log_level, ) - try: - if not app.endpoints: - raise ValueError('You must define at least one function.') - if len(app.endpoints) > MAX_UDFS_LIMIT: - raise ValueError( - f'You can only define a maximum of {MAX_UDFS_LIMIT} functions.', - ) - - config = uvicorn.Config( - app, - host='0.0.0.0', - port=app_config.listen_port, - log_config=app.get_uvicorn_log_config(), + if not app.endpoints: + raise ValueError('You must define at least one function.') + if len(app.endpoints) > MAX_UDFS_LIMIT: + raise ValueError( + f'You can only define a maximum of {MAX_UDFS_LIMIT} functions.', ) - # Register the functions only if the app is running interactively. - if app_config.running_interactively: - app.register_functions(replace=True) - - _running_app = app - _running_server = AwaitableUvicornServer(config) - asyncio.create_task(_running_server.serve()) - await _running_server.wait_for_startup() - except Exception: - app.shutdown() - raise + config = uvicorn.Config( + app, + host='0.0.0.0', + port=app_config.listen_port, + log_config=app.get_uvicorn_log_config(), + ) + + # Register the functions only if the app is running interactively. + if app_config.running_interactively: + app.register_functions(replace=True) + + _running_server = AwaitableUvicornServer(config) + asyncio.create_task(_running_server.serve()) + await _running_server.wait_for_startup() print(f'Python UDF registered at {base_url}') diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index ea4db0bd2..3f3e71652 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -24,7 +24,6 @@ """ import argparse import asyncio -import concurrent.futures import contextvars import dataclasses import datetime @@ -1001,9 +1000,6 @@ def __init__( self.log_level = log_level self.disable_metrics = disable_metrics - self._udf_loop: Optional[asyncio.AbstractEventLoop] = None - self._udf_thread: Optional[threading.Thread] = None - # Configure logging self._configure_logging() @@ -1037,27 +1033,6 @@ def _configure_logging(self) -> None: # Prevent propagation to avoid duplicate or differently formatted messages self.logger.propagate = False - def _get_udf_loop(self) -> asyncio.AbstractEventLoop: - """Get or create the dedicated UDF event loop.""" - if self._udf_loop is None: - self._udf_loop = asyncio.new_event_loop() - self._udf_thread = threading.Thread( - target=self._udf_loop.run_forever, - daemon=True, - name='async-udf-loop', - ) - self._udf_thread.start() - return self._udf_loop - - def shutdown(self) -> None: - """Shut down the dedicated UDF event loop.""" - if self._udf_loop is not None: - self._udf_loop.call_soon_threadsafe(self._udf_loop.stop) - if self._udf_thread is not None: - self._udf_thread.join(timeout=5) - self._udf_loop = None - self._udf_thread = None - def get_uvicorn_log_config(self) -> Dict[str, Any]: """ Create uvicorn log config that matches the Application's logging format. @@ -1206,7 +1181,6 @@ async def __call__( try: all_tasks = [] result = [] - udf_future: 'Optional[concurrent.futures.Future[Any]]' = None cancel_event = threading.Event() @@ -1215,23 +1189,13 @@ async def __call__( func_info['colspec'], b''.join(data), ) - func_task: 'asyncio.Task[Any]' - if func_info['is_async']: - udf_future = asyncio.run_coroutine_threadsafe( - func(cancel_event, call_timer, *inputs), - self._get_udf_loop(), - ) - func_task = asyncio.ensure_future( - asyncio.wrap_future(udf_future), - ) - else: - func_task = asyncio.create_task( - to_thread( - lambda: asyncio.run( - func(cancel_event, call_timer, *inputs), - ), + func_task = asyncio.create_task( + to_thread( + lambda: asyncio.run( + func(cancel_event, call_timer, *inputs), ), - ) + ), + ) disconnect_task = asyncio.create_task( asyncio.sleep(int(1e9)) if ignore_cancel else cancel_on_disconnect(receive), @@ -1248,23 +1212,18 @@ async def __call__( ) await cancel_all_tasks(pending) - if func_task in pending and udf_future is not None: + if func_task in pending: cancel_event.set() - udf_future.cancel() for task in done: if task is disconnect_task: cancel_event.set() - if udf_future is not None: - udf_future.cancel() raise asyncio.CancelledError( 'Function call was cancelled by client disconnect', ) elif task is timeout_task: cancel_event.set() - if udf_future is not None: - udf_future.cancel() raise asyncio.TimeoutError( 'Function call was cancelled due to timeout', ) @@ -1327,9 +1286,7 @@ async def __call__( await send(self.error_response_dict) finally: - if udf_future is not None: - cancel_event.set() - udf_future.cancel() + cancel_event.set() await cancel_all_tasks(all_tasks) # Handle api reflection From 6e84863cbee93a0a837a38a15c1116b795b4dd4e Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Mon, 18 May 2026 10:12:09 -0500 Subject: [PATCH 14/15] Add cancellable wrapper for responsive async UDF cancellation Wraps the inner coroutine in _cancellable_run which polls cancel_event and raises CancelledError at the next await (~100ms), ensuring vector UDFs respect disconnect/timeout signals without waiting for completion. Co-Authored-By: Claude Opus 4.6 --- singlestoredb/functions/ext/asgi.py | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index 3f3e71652..f70b1ec16 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -113,6 +113,28 @@ async def to_thread( return await loop.run_in_executor(None, func_call) +async def _poll_cancel(cancel_event: threading.Event) -> None: + while not cancel_event.is_set(): + await asyncio.sleep(0.1) + + +async def _cancellable_run( + cancel_event: threading.Event, + coro: Any, +) -> Any: + task = asyncio.create_task(coro) + cancel_check = asyncio.create_task(_poll_cancel(cancel_event)) + done, pending = await asyncio.wait( + [task, cancel_check], return_when=asyncio.FIRST_COMPLETED, + ) + for p in pending: + p.cancel() + if cancel_check in done: + task.cancel() + raise asyncio.CancelledError() + return task.result() + + # Use negative values to indicate unsigned ints / binary data / usec time precision rowdat_1_type_map = { 'bool': ft.LONGLONG, @@ -1192,7 +1214,10 @@ async def __call__( func_task = asyncio.create_task( to_thread( lambda: asyncio.run( - func(cancel_event, call_timer, *inputs), + _cancellable_run( + cancel_event, + func(cancel_event, call_timer, *inputs), + ), ), ), ) From 2f819f53b55339ea54d587649fa027b1dcb01490 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Tue, 19 May 2026 09:00:21 -0500 Subject: [PATCH 15/15] Fix event loop closed error and add comprehensive UDF dispatch tests Replace asyncio.run() with _run_with_graceful_shutdown() that drains pending callbacks before closing the loop, preventing RuntimeError from httpx/anyio TLS cleanup in async UDFs calling OpenAI/LangChain APIs. Add 17 unit tests covering graceful shutdown, cancellation timing, exception propagation, context variable isolation, and concurrent safety. Co-Authored-By: Claude Opus 4.6 --- singlestoredb/functions/ext/asgi.py | 30 ++- singlestoredb/tests/test_udf_event_loop.py | 296 +++++++++++++++++++++ 2 files changed, 325 insertions(+), 1 deletion(-) create mode 100644 singlestoredb/tests/test_udf_event_loop.py diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index f70b1ec16..ca46134d3 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -135,6 +135,34 @@ async def _cancellable_run( return task.result() +def _run_with_graceful_shutdown(coro: Any) -> Any: + """Run a coroutine in a new event loop, draining callbacks before close. + + Unlike asyncio.run(), this prevents 'Event loop is closed' errors from + libraries (httpx/anyio) that schedule cleanup callbacks during teardown. + """ + loop = asyncio.new_event_loop() + try: + asyncio.set_event_loop(loop) + return loop.run_until_complete(coro) + finally: + try: + pending = asyncio.all_tasks(loop) + if pending: + for task in pending: + task.cancel() + loop.run_until_complete( + asyncio.gather(*pending, return_exceptions=True), + ) + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.run_until_complete(loop.shutdown_default_executor()) + finally: + loop.call_soon(loop.stop) + loop.run_forever() + asyncio.set_event_loop(None) + loop.close() + + # Use negative values to indicate unsigned ints / binary data / usec time precision rowdat_1_type_map = { 'bool': ft.LONGLONG, @@ -1213,7 +1241,7 @@ async def __call__( func_task = asyncio.create_task( to_thread( - lambda: asyncio.run( + lambda: _run_with_graceful_shutdown( _cancellable_run( cancel_event, func(cancel_event, call_timer, *inputs), diff --git a/singlestoredb/tests/test_udf_event_loop.py b/singlestoredb/tests/test_udf_event_loop.py new file mode 100644 index 000000000..279f4a184 --- /dev/null +++ b/singlestoredb/tests/test_udf_event_loop.py @@ -0,0 +1,296 @@ +"""Tests for async UDF event loop graceful shutdown.""" +import asyncio +import contextvars +import threading +import time +import unittest +from typing import Any +from typing import List + +from ..functions.ext.asgi import _cancellable_run +from ..functions.ext.asgi import _run_with_graceful_shutdown +from ..functions.ext.asgi import to_thread + + +class TestRunWithGracefulShutdown(unittest.TestCase): + """Test _run_with_graceful_shutdown handles loop cleanup properly.""" + + def test_basic_coroutine(self) -> None: + async def simple() -> int: + return 42 + + result = _run_with_graceful_shutdown(simple()) + self.assertEqual(result, 42) + + def test_callbacks_drained_before_close(self) -> None: + """Simulate httpx/anyio scheduling call_soon during teardown. + + This is the exact pattern that causes 'Event loop is closed' with + asyncio.run() -- a library schedules a callback in its __del__ or + aclose() that fires after the loop is closed. + """ + callback_executed: List[bool] = [] + + async def coroutine_with_cleanup_callback() -> str: + loop = asyncio.get_running_loop() + loop.call_soon(lambda: callback_executed.append(True)) + return 'done' + + result = _run_with_graceful_shutdown(coroutine_with_cleanup_callback()) + self.assertEqual(result, 'done') + self.assertEqual(callback_executed, [True]) + + def test_no_event_loop_closed_error(self) -> None: + """Verify no RuntimeError when cleanup schedules on the loop.""" + errors: List[RuntimeError] = [] + + async def simulate_httpx_teardown() -> str: + loop = asyncio.get_running_loop() + + def deferred_cleanup() -> None: + try: + loop.call_soon(lambda: None) + except RuntimeError as e: + errors.append(e) + + loop.call_soon(deferred_cleanup) + return 'ok' + + result = _run_with_graceful_shutdown(simulate_httpx_teardown()) + self.assertEqual(result, 'ok') + self.assertEqual(errors, []) + + def test_exception_propagates(self) -> None: + async def failing() -> None: + raise ValueError('test error') + + with self.assertRaises(ValueError) as ctx: + _run_with_graceful_shutdown(failing()) + self.assertEqual(str(ctx.exception), 'test error') + + def test_callbacks_drained_even_on_exception(self) -> None: + """Cleanup callbacks still run even if coroutine raises.""" + callback_executed: List[bool] = [] + + async def failing_with_callback() -> None: + loop = asyncio.get_running_loop() + loop.call_soon(lambda: callback_executed.append(True)) + raise ValueError('boom') + + with self.assertRaises(ValueError): + _run_with_graceful_shutdown(failing_with_callback()) + self.assertEqual(callback_executed, [True]) + + def test_pending_tasks_cancelled(self) -> None: + """Background tasks are cancelled during shutdown.""" + async def background() -> None: + await asyncio.sleep(999) + + async def main_with_background_task() -> str: + asyncio.create_task(background()) + return 'done' + + result = _run_with_graceful_shutdown(main_with_background_task()) + self.assertEqual(result, 'done') + + def test_isolation_between_calls(self) -> None: + """Each call gets its own event loop that is closed after use.""" + loops: List[asyncio.AbstractEventLoop] = [] + + async def capture_loop() -> bool: + loops.append(asyncio.get_running_loop()) + return True + + _run_with_graceful_shutdown(capture_loop()) + first_loop = loops[0] + self.assertTrue(first_loop.is_closed()) + + _run_with_graceful_shutdown(capture_loop()) + second_loop = loops[1] + self.assertTrue(second_loop.is_closed()) + + def test_cancellable_run_integration(self) -> None: + """Verify _cancellable_run works inside _run_with_graceful_shutdown.""" + cancel_event = threading.Event() + + async def slow_func() -> str: + return 'completed' + + result = _run_with_graceful_shutdown( + _cancellable_run(cancel_event, slow_func()), + ) + self.assertEqual(result, 'completed') + + def test_cancellation_via_event(self) -> None: + """Verify cancellation propagates through the full stack.""" + cancel_event = threading.Event() + cancel_event.set() + + async def blocked_func() -> str: + await asyncio.sleep(999) + return 'should not reach' + + with self.assertRaises(asyncio.CancelledError): + _run_with_graceful_shutdown( + _cancellable_run(cancel_event, blocked_func()), + ) + + +class TestUDFDispatchEdgeCases(unittest.TestCase): + """Test edge cases in the UDF dispatch stack.""" + + def test_timeout_cancels_running_function(self) -> None: + """Cancel event set from timer thread cancels a blocked coroutine.""" + cancel_event = threading.Event() + + async def long_running() -> str: + await asyncio.sleep(999) + return 'should not reach' + + def set_cancel_after_delay() -> None: + time.sleep(0.2) + cancel_event.set() + + timer = threading.Thread(target=set_cancel_after_delay) + timer.start() + + start = time.monotonic() + with self.assertRaises(asyncio.CancelledError): + _run_with_graceful_shutdown( + _cancellable_run(cancel_event, long_running()), + ) + elapsed = time.monotonic() - start + timer.join() + # 0.2s delay + up to 0.1s poll interval + margin + self.assertLess(elapsed, 0.5) + + def test_exception_propagates_through_full_stack(self) -> None: + """User exception propagates unwrapped through the entire dispatch.""" + cancel_event = threading.Event() + + class CustomUDFError(Exception): + pass + + async def failing_udf() -> None: + raise CustomUDFError('embedding service unavailable') + + with self.assertRaises(CustomUDFError) as ctx: + _run_with_graceful_shutdown( + _cancellable_run(cancel_event, failing_udf()), + ) + self.assertEqual(str(ctx.exception), 'embedding service unavailable') + + def test_cancel_event_detected_within_poll_interval(self) -> None: + """Cancellation is detected within one poll cycle (0.1s).""" + cancel_event = threading.Event() + + async def blocked() -> str: + await asyncio.sleep(999) + return 'unreachable' + + def set_cancel() -> None: + time.sleep(0.05) + cancel_event.set() + + timer = threading.Thread(target=set_cancel) + timer.start() + + start = time.monotonic() + with self.assertRaises(asyncio.CancelledError): + _run_with_graceful_shutdown( + _cancellable_run(cancel_event, blocked()), + ) + elapsed = time.monotonic() - start + timer.join() + # 0.05s delay + 0.1s poll interval + margin + self.assertLess(elapsed, 0.25) + + def test_context_vars_propagate_through_to_thread(self) -> None: + """Context variables are visible inside to_thread executor.""" + test_var: contextvars.ContextVar[str] = contextvars.ContextVar( + 'test_var', + ) + test_var.set('hello_from_parent') + captured: List[str] = [] + + def read_context_var() -> str: + val = test_var.get('NOT_FOUND') + captured.append(val) + return val + + async def run_in_thread() -> str: + return await to_thread(read_context_var) + + result = _run_with_graceful_shutdown(run_in_thread()) + self.assertEqual(result, 'hello_from_parent') + self.assertEqual(captured, ['hello_from_parent']) + + def test_concurrent_requests_isolated(self) -> None: + """Parallel executions don't share state.""" + results: List[Any] = [None, None, None] + + def run_isolated(index: int) -> None: + async def compute() -> int: + await asyncio.sleep(0.05) + return index * 10 + + results[index] = _run_with_graceful_shutdown(compute()) + + threads = [ + threading.Thread(target=run_isolated, args=(i,)) + for i in range(3) + ] + for t in threads: + t.start() + for t in threads: + t.join() + + self.assertEqual(results, [0, 10, 20]) + + def test_sync_function_through_async_wrapper(self) -> None: + """Synchronous function works when wrapped as async coroutine.""" + cancel_event = threading.Event() + + async def sync_as_async() -> int: + # Simulates what decorator.py's async_wrapper does for sync UDFs + return 42 + 1 + + result = _run_with_graceful_shutdown( + _cancellable_run(cancel_event, sync_as_async()), + ) + self.assertEqual(result, 43) + + def test_cancel_event_not_set_on_success(self) -> None: + """Cancel event remains unset after successful execution.""" + cancel_event = threading.Event() + + async def quick() -> str: + return 'fast' + + result = _run_with_graceful_shutdown( + _cancellable_run(cancel_event, quick()), + ) + self.assertEqual(result, 'fast') + self.assertFalse(cancel_event.is_set()) + + def test_callbacks_from_cancelled_tasks_still_drain(self) -> None: + """Background task callbacks drain even when task is cancelled.""" + drained: List[bool] = [] + + async def bg_with_callback() -> None: + loop = asyncio.get_running_loop() + loop.call_soon(lambda: drained.append(True)) + await asyncio.sleep(999) + + async def main() -> str: + asyncio.create_task(bg_with_callback()) + await asyncio.sleep(0.05) # Let background task start + return 'done' + + result = _run_with_graceful_shutdown(main()) + self.assertEqual(result, 'done') + self.assertEqual(drained, [True]) + + +if __name__ == '__main__': + unittest.main()