2525from dvsim .report .data import IPMeta , ToolMeta
2626from dvsim .runtime .legacy import LegacyLauncherAdapter
2727from dvsim .scheduler .core import Scheduler
28+ from dvsim .scheduler .resources import ResourceManager , StaticResourceProvider
2829
2930__all__ = ()
3031
@@ -454,6 +455,16 @@ async def test_duplicate_jobs(fxt: Fxt) -> None:
454455 # Check names of all jobs are unique (i.e. no duplicates are returned).
455456 assert_that (len (names ), equal_to (len (set (names ))))
456457
458+ @staticmethod
459+ async def _parallelism_test_helper (
460+ fxt : Fxt , scheduler : Scheduler , num_jobs : int , expected_parallelism : int
461+ ) -> None :
462+ """Test helper to check that scheduler parallelism reaches the expected level."""
463+ assert_that (fxt .mock_ctx .max_concurrent , equal_to (0 ))
464+ result = await scheduler .run ()
465+ _assert_result_status (result , num_jobs )
466+ assert_that (fxt .mock_ctx .max_concurrent , equal_to (expected_parallelism ))
467+
457468 @staticmethod
458469 @pytest .mark .asyncio
459470 @pytest .mark .timeout (DEFAULT_TIMEOUT )
@@ -462,10 +473,7 @@ async def test_parallel_dispatch(fxt: Fxt, num_jobs: int) -> None:
462473 """Test that many jobs can be dispatched in parallel."""
463474 jobs = make_many_jobs (fxt .tmp_path , num_jobs )
464475 scheduler = Scheduler (jobs , fxt .backends , MOCK_BACKEND )
465- assert_that (fxt .mock_ctx .max_concurrent , equal_to (0 ))
466- result = await scheduler .run ()
467- _assert_result_status (result , num_jobs )
468- assert_that (fxt .mock_ctx .max_concurrent , equal_to (num_jobs ))
476+ await TestScheduling ._parallelism_test_helper (fxt , scheduler , num_jobs , num_jobs )
469477
470478 @staticmethod
471479 @pytest .mark .asyncio
@@ -484,10 +492,46 @@ async def test_max_parallel(
484492 else :
485493 fxt .mock_legacy_backend .max_parallelism = max_parallel
486494 scheduler = Scheduler (jobs , fxt .backends , MOCK_BACKEND )
487- assert_that (fxt .mock_ctx .max_concurrent , equal_to (0 ))
488- result = await scheduler .run ()
489- _assert_result_status (result , num_jobs )
490- assert_that (fxt .mock_ctx .max_concurrent , equal_to (min (num_jobs , max_parallel )))
495+ expected_parallel = min (num_jobs , max_parallel )
496+ await TestScheduling ._parallelism_test_helper (fxt , scheduler , num_jobs , expected_parallel )
497+
498+ @staticmethod
499+ @pytest .mark .asyncio
500+ @pytest .mark .timeout (DEFAULT_TIMEOUT )
501+ @pytest .mark .parametrize ("num_a_jobs" , [5 , 10 , 20 ])
502+ @pytest .mark .parametrize ("num_b_jobs" , [7 , 13 , 26 ])
503+ @pytest .mark .parametrize ("limit" , [2 , 20 , 35 ])
504+ async def test_resource_parallelism (
505+ fxt : Fxt , num_a_jobs : int , num_b_jobs : int , limit : int
506+ ) -> None :
507+ """Test that the parallelism limits imposed via scheduler resources are respected."""
508+ num_jobs = num_a_jobs + num_b_jobs
509+ resource = ["A" if i < num_a_jobs else "B" for i in range (num_jobs )]
510+ jobs = make_many_jobs (
511+ fxt .tmp_path , num_a_jobs + num_b_jobs , per_job = lambda i : {"resources" : {resource [i ]: 1 }}
512+ )
513+ # Ensure there are no parallelism limits in the launcher/backend.
514+ fxt .mock_legacy_backend .max_parallelism = 0
515+ resource_manager = ResourceManager (StaticResourceProvider ({"A" : limit , "B" : limit }))
516+ scheduler = Scheduler (jobs , fxt .backends , MOCK_BACKEND , resource_manager = resource_manager )
517+ expected_parallel = min (num_a_jobs , limit ) + min (num_b_jobs , limit )
518+ await TestScheduling ._parallelism_test_helper (fxt , scheduler , num_jobs , expected_parallel )
519+
520+ @staticmethod
521+ @pytest .mark .asyncio
522+ @pytest .mark .timeout (DEFAULT_TIMEOUT )
523+ @pytest .mark .parametrize ("num_resources" , [1 , 2 , 5 ])
524+ @pytest .mark .parametrize ("limit" , [5 , 16 , 33 , None ])
525+ async def test_resource_usage (fxt : Fxt , num_resources : int , limit : int | None ) -> None :
526+ """Test that job resource limits allow jobs to use multiples of resources."""
527+ num_jobs = limit * 2 if limit else num_resources * 2
528+ jobs = make_many_jobs (fxt .tmp_path , num_jobs , resources = {"TEST" : num_resources })
529+ # Ensure there are no parallelism limits in the launcher/backend.
530+ fxt .mock_legacy_backend .max_parallelism = 0
531+ resource_manager = ResourceManager (StaticResourceProvider ({"TEST" : limit }))
532+ scheduler = Scheduler (jobs , fxt .backends , MOCK_BACKEND , resource_manager = resource_manager )
533+ expected_parallel = limit // num_resources if limit else num_jobs
534+ await TestScheduling ._parallelism_test_helper (fxt , scheduler , num_jobs , expected_parallel )
491535
492536 @staticmethod
493537 @pytest .mark .asyncio
0 commit comments