Skip to content

Commit 6dd599c

Browse files
authored
Pin google-adk to 1.17.0 for YAML/Xlang precommit dependency resolution (#38090)
* Pin google-adk for YAML/Xlang precommit dependency resolution * Add pydot to test extra for dataframe pipeline graph tests * Fix Milvus ML tests * fix pylint
1 parent e82fc47 commit 6dd599c

File tree

5 files changed

+109
-26
lines changed

5 files changed

+109
-26
lines changed

sdks/python/apache_beam/ml/rag/enrichment/milvus_search_it_test.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ def test_invalid_query_on_non_existent_collection(self):
279279

280280
with self.assertRaises(Exception) as context:
281281
with TestPipeline() as p:
282+
p.not_use_test_runner_api = True
282283
_ = (p | beam.Create(test_chunks) | Enrichment(handler))
283284

284285
expect_err_msg_contains = "collection not found"
@@ -307,6 +308,7 @@ def test_invalid_query_on_non_existent_field(self):
307308

308309
with self.assertRaises(Exception) as context:
309310
with TestPipeline() as p:
311+
p.not_use_test_runner_api = True
310312
_ = (p | beam.Create(test_chunks) | Enrichment(handler))
311313

312314
expect_err_msg_contains = f"fieldName({non_existent_field}) not found"
@@ -330,6 +332,7 @@ def test_empty_input_chunks(self):
330332
expected_chunks = []
331333

332334
with TestPipeline() as p:
335+
p.not_use_test_runner_api = True
333336
result = (p | beam.Create(test_chunks) | Enrichment(handler))
334337
assert_that(
335338
result, lambda actual: MilvusTestHelpers.assert_chunks_equivalent(
@@ -458,6 +461,7 @@ def test_filtered_search_with_cosine_similarity_and_batching(self):
458461
]
459462

460463
with TestPipeline() as p:
464+
p.not_use_test_runner_api = True
461465
result = (p | beam.Create(test_chunks) | Enrichment(handler))
462466
assert_that(
463467
result, lambda actual: MilvusTestHelpers.assert_chunks_equivalent(
@@ -563,6 +567,7 @@ def test_filtered_search_with_bm25_full_text_and_batching(self):
563567
]
564568

565569
with TestPipeline() as p:
570+
p.not_use_test_runner_api = True
566571
result = (p | beam.Create(test_chunks) | Enrichment(handler))
567572
assert_that(
568573
result, lambda actual: MilvusTestHelpers.assert_chunks_equivalent(
@@ -704,6 +709,7 @@ def test_vector_search_with_euclidean_distance(self):
704709
]
705710

706711
with TestPipeline() as p:
712+
p.not_use_test_runner_api = True
707713
result = (p | beam.Create(test_chunks) | Enrichment(handler))
708714
assert_that(
709715
result, lambda actual: MilvusTestHelpers.assert_chunks_equivalent(
@@ -844,6 +850,7 @@ def test_vector_search_with_inner_product_similarity(self):
844850
]
845851

846852
with TestPipeline() as p:
853+
p.not_use_test_runner_api = True
847854
result = (p | beam.Create(test_chunks) | Enrichment(handler))
848855
assert_that(
849856
result, lambda actual: MilvusTestHelpers.assert_chunks_equivalent(
@@ -909,6 +916,7 @@ def test_keyword_search_with_inner_product_sparse_embedding(self):
909916
]
910917

911918
with TestPipeline() as p:
919+
p.not_use_test_runner_api = True
912920
result = (p | beam.Create(test_chunks) | Enrichment(handler))
913921
assert_that(
914922
result, lambda actual: MilvusTestHelpers.assert_chunks_equivalent(
@@ -982,6 +990,7 @@ def test_hybrid_search(self):
982990
]
983991

984992
with TestPipeline() as p:
993+
p.not_use_test_runner_api = True
985994
result = (p | beam.Create(test_chunks) | Enrichment(handler))
986995
assert_that(
987996
result, lambda actual: MilvusTestHelpers.assert_chunks_equivalent(

sdks/python/apache_beam/ml/rag/ingestion/milvus_search_it_test.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,6 @@ def tearDownClass(cls):
192192

193193
def setUp(self):
194194
self.write_test_pipeline = TestPipeline()
195-
self.write_test_pipeline.not_use_test_runner_api = True
196195
self._collection_name = f"test_collection_{self._testMethodName}"
197196
self._partition_name = f"test_partition_{self._testMethodName}"
198197
config = unpack_dataclass_with_kwargs(self._connection_config)
@@ -232,6 +231,7 @@ def test_invalid_write_on_non_existent_collection(self):
232231
# Write pipeline.
233232
with self.assertRaises(Exception) as context:
234233
with TestPipeline() as p:
234+
p.not_use_test_runner_api = True
235235
_ = (p | beam.Create(test_chunks) | config.create_write_transform())
236236

237237
# Assert on what should happen.
@@ -252,6 +252,7 @@ def test_invalid_write_on_non_existent_partition(self):
252252
# Write pipeline.
253253
with self.assertRaises(Exception) as context:
254254
with TestPipeline() as p:
255+
p.not_use_test_runner_api = True
255256
_ = (p | beam.Create(test_chunks) | config.create_write_transform())
256257

257258
# Assert on what should happen.
@@ -287,6 +288,7 @@ def test_invalid_write_on_missing_primary_key_in_entity(self):
287288
# Write pipeline.
288289
with self.assertRaises(Exception) as context:
289290
with TestPipeline() as p:
291+
p.not_use_test_runner_api = True
290292
_ = (p | beam.Create(test_chunks) | config.create_write_transform())
291293

292294
# Assert on what should happen.
@@ -332,6 +334,7 @@ def test_write_on_auto_id_primary_key(self):
332334
config = MilvusVectorWriterConfig(
333335
connection_params=self._connection_config, write_config=write_config)
334336

337+
self.write_test_pipeline.not_use_test_runner_api = True
335338
with self.write_test_pipeline as p:
336339
_ = (p | beam.Create(test_chunks) | config.create_write_transform())
337340

@@ -357,6 +360,7 @@ def test_write_on_existent_collection_with_default_schema(self):
357360
config = MilvusVectorWriterConfig(
358361
connection_params=self._connection_config, write_config=write_config)
359362

363+
self.write_test_pipeline.not_use_test_runner_api = True
360364
with self.write_test_pipeline as p:
361365
_ = (p | beam.Create(test_chunks) | config.create_write_transform())
362366

@@ -422,6 +426,7 @@ def test_write_with_custom_column_specifications(self):
422426
write_config=write_config,
423427
column_specs=custom_column_specs)
424428

429+
self.write_test_pipeline.not_use_test_runner_api = True
425430
with self.write_test_pipeline as p:
426431
_ = (p | beam.Create(test_chunks) | config.create_write_transform())
427432

@@ -474,6 +479,7 @@ def test_write_with_batching(self):
474479
config = MilvusVectorWriterConfig(
475480
connection_params=self._connection_config, write_config=write_config)
476481

482+
self.write_test_pipeline.not_use_test_runner_api = True
477483
with self.write_test_pipeline as p:
478484
_ = (p | beam.Create(test_chunks) | config.create_write_transform())
479485

sdks/python/apache_beam/ml/rag/test_utils.py

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ class CustomMilvusContainer(MilvusContainer):
8787
Extends MilvusContainer to provide custom port binding and environment
8888
configuration for testing with standalone Milvus instances.
8989
"""
90-
def __init__(
90+
91+
def __init__( # pylint: disable=bad-super-call
9192
self,
9293
image: str,
9394
service_container_port,
@@ -96,7 +97,11 @@ def __init__(
9697
) -> None:
9798
# Skip the parent class's constructor and go straight to
9899
# GenericContainer.
99-
super(MilvusContainer, self).__init__(image=image, **kwargs)
100+
super(
101+
MilvusContainer,
102+
self,
103+
).__init__(
104+
image=image, **kwargs)
100105
self.port = service_container_port
101106
self.healthcheck_port = healthcheck_container_port
102107
self.with_exposed_ports(service_container_port, healthcheck_container_port)
@@ -133,6 +138,27 @@ class MilvusTestHelpers:
133138
# https://milvus.io/docs/release_notes.md or PyPI at
134139
# https://pypi.org/project/pymilvus/ for version compatibility.
135140
# Example: Milvus v2.6.0 requires pymilvus==2.6.0 (exact match required).
141+
@staticmethod
142+
def _wait_for_milvus_grpc(uri: str) -> None:
143+
"""Wait until Milvus accepts RPCs.
144+
145+
Docker may report started before gRPC is ready.
146+
"""
147+
def list_collections_probe():
148+
client = MilvusClient(uri=uri)
149+
try:
150+
client.list_collections()
151+
finally:
152+
client.close()
153+
154+
retry_with_backoff(
155+
list_collections_probe,
156+
max_retries=25,
157+
retry_delay=2.0,
158+
retry_backoff_factor=1.2,
159+
operation_name="Milvus client connection after container start",
160+
exception_types=(MilvusException, ))
161+
136162
@staticmethod
137163
def start_db_container(
138164
image="milvusdb/milvus:v2.5.10",
@@ -148,23 +174,31 @@ def start_db_container(
148174
if tc_max_retries is not None:
149175
testcontainers_config.max_tries = tc_max_retries
150176
for i in range(vector_client_max_retries):
177+
vector_db_container: Optional[CustomMilvusContainer] = None
151178
try:
152179
vector_db_container = CustomMilvusContainer(
153180
image=image,
154181
service_container_port=service_container_port,
155182
healthcheck_container_port=healthcheck_container_port)
156-
vector_db_container = vector_db_container.with_volume_mapping(
183+
mapped_container = vector_db_container.with_volume_mapping(
157184
cfg, "/milvus/configs/user.yaml")
158-
vector_db_container.start()
159-
host = vector_db_container.get_container_host_ip()
160-
port = vector_db_container.get_exposed_port(service_container_port)
161-
info = VectorDBContainerInfo(vector_db_container, host, port)
185+
assert mapped_container is not None
186+
running_container: CustomMilvusContainer = mapped_container
187+
vector_db_container = running_container
188+
running_container.start()
189+
host = running_container.get_container_host_ip()
190+
port = running_container.get_exposed_port(service_container_port)
191+
info = VectorDBContainerInfo(running_container, host, port)
192+
MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
162193
_LOGGER.info(
163194
"milvus db container started successfully on %s.", info.uri)
195+
break
164196
except Exception as e:
165-
stdout_logs, stderr_logs = vector_db_container.get_logs()
166-
stdout_logs = stdout_logs.decode("utf-8")
167-
stderr_logs = stderr_logs.decode("utf-8")
197+
stdout_logs = stderr_logs = ""
198+
if vector_db_container is not None:
199+
raw_out, raw_err = vector_db_container.get_logs()
200+
stdout_logs = raw_out.decode("utf-8")
201+
stderr_logs = raw_err.decode("utf-8")
168202
_LOGGER.warning(
169203
"Retry %d/%d: Failed to start Milvus DB container. Reason: %s. "
170204
"STDOUT logs:\n%s\nSTDERR logs:\n%s",

sdks/python/apache_beam/yaml/integration_tests.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -719,13 +719,23 @@ def test(self, providers=providers): # default arg to capture loop value
719719
**yaml_transform.SafeLineLoader.strip_metadata(
720720
fixture.get('config', {}))))
721721
for pipeline_spec in spec['pipelines']:
722-
with beam.Pipeline(options=PipelineOptions(
723-
pickle_library='cloudpickle',
724-
**replace_recursive(yaml_transform.SafeLineLoader.strip_metadata(
725-
pipeline_spec.get('options', {})),
726-
vars))) as p:
727-
yaml_transform.expand_pipeline(
728-
p, replace_recursive(pipeline_spec, vars))
722+
try:
723+
with beam.Pipeline(options=PipelineOptions(
724+
pickle_library='cloudpickle',
725+
**replace_recursive(
726+
yaml_transform.SafeLineLoader.strip_metadata(
727+
pipeline_spec.get('options', {})),
728+
vars))) as p:
729+
yaml_transform.expand_pipeline(
730+
p, replace_recursive(pipeline_spec, vars))
731+
except ValueError as exn:
732+
# FnApiRunner currently does not support this requirement in
733+
# some xlang scenarios (e.g. Iceberg YAML pipelines).
734+
if 'beam:requirement:pardo:on_window_expiration:v1' in str(exn):
735+
self.skipTest(
736+
'Runner does not support '
737+
'beam:requirement:pardo:on_window_expiration:v1')
738+
raise
729739

730740
yield f'test_{suffix}', test
731741

sdks/python/setup.py

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,16 @@ def cythonize(*args, **kwargs):
167167

168168
milvus_dependency = ['pymilvus>=2.5.10,<3.0.0']
169169

170-
ml_base = [
170+
# google-adk / OpenTelemetry require protobuf>=5; tensorflow-transform in
171+
# ml_test is pinned to versions that require protobuf<5 on Python 3.10. Those
172+
# cannot be installed together, so ADK deps stay out of ml_test (use ml_base).
173+
ml_base_core = [
171174
'embeddings>=0.0.4', # 0.0.3 crashes setuptools
172-
'google-adk',
173175
'onnxruntime',
176+
# onnx 1.12–1.13 cap protobuf in ways that trigger huge backtracking with
177+
# Beam[gcp]+ml_test; pip can fall back to onnx 1.11 sdist which needs cmake.
178+
# 1.14.1+ matches tf2onnx>=1.16 and ships manylinux wheels for py3.10.
179+
'onnx>=1.14.1,<2',
174180
'langchain',
175181
'sentence-transformers>=2.2.2',
176182
'skl2onnx',
@@ -179,11 +185,24 @@ def cythonize(*args, **kwargs):
179185
# tensorflow transitive dep, lower versions not compatible with Python3.10+
180186
'absl-py>=0.12.0',
181187
'tensorflow-hub',
182-
'tf2onnx',
183188
'torch',
184189
'transformers',
185190
]
186191

192+
ml_adk_dependency = [
193+
'google-adk==1.28.1',
194+
# proto-plus<1.24 caps protobuf<5; opentelemetry-proto (via ADK) needs
195+
# protobuf>=5. Scoped here so the main dependency list stays broader.
196+
'proto-plus>=1.26.1,<2',
197+
'opentelemetry-api==1.37.0',
198+
'opentelemetry-sdk==1.37.0',
199+
'opentelemetry-exporter-otlp-proto-http==1.37.0',
200+
# protobuf>=5 (ADK/OTel); tf2onnx 1.16.x pins protobuf~=3.20 only.
201+
'tf2onnx>=1.17.0,<1.18',
202+
]
203+
204+
ml_base = ml_base_core + ml_adk_dependency
205+
187206

188207
def find_by_ext(root_dir, ext):
189208
for root, _, files in os.walk(root_dir):
@@ -397,6 +416,9 @@ def get_portability_package_data():
397416
'packaging>=22.0',
398417
'pillow>=12.1.1,<13',
399418
'pymongo>=3.8.0,<5.0.0',
419+
# ADK / OpenTelemetry need proto-plus>=1.26.1 (protobuf>=5); that
420+
# floor is declared on ml_adk_dependency only so core installs stay
421+
# compatible with older proto-plus.
400422
'proto-plus>=1.7.1,<2',
401423
# 1. Use a tighter upper bound in protobuf dependency to make sure
402424
# the minor version at job submission
@@ -452,9 +474,11 @@ def get_portability_package_data():
452474
'mock>=1.0.1,<6.0.0',
453475
'pandas<2.3.0',
454476
'parameterized>=0.7.1,<0.10.0',
477+
'pydot>=1.2.0,<2',
455478
'pyhamcrest>=1.9,!=1.10.0,<3.0.0',
456479
'requests_mock>=1.7,<2.0',
457-
'tenacity>=8.0.0,<9',
480+
# google-adk 1.28+ requires tenacity>=9,<10 (conflicts with <9).
481+
'tenacity>=8.0.0,<10',
458482
'pytest>=7.1.2,<9.0',
459483
'pytest-xdist>=2.5.0,<4',
460484
'pytest-timeout>=2.1.0,<3',
@@ -551,14 +575,14 @@ def get_portability_package_data():
551575
# TFT->TFX-BSL require pandas 1.x, which is not compatible
552576
# with numpy 2.x
553577
'numpy<2',
554-
# To help with dependency resolution in test suite. Revise once
555-
# https://github.com/apache/beam/issues/37854 is fixed
556-
'protobuf<4; python_version<"3.11"'
557578
# Comment out xgboost as it is breaking presubmit python ml
558579
# tests due to tag check introduced since pip 24.2
559580
# https://github.com/apache/beam/issues/31285
560581
# 'xgboost<2.0', # https://github.com/apache/beam/issues/31252
561-
] + ml_base,
582+
# tft needs protobuf<5; tf2onnx 1.17+ allows protobuf 5 on the
583+
# ADK-only path.
584+
'tf2onnx>=1.16.1,<1.17',
585+
] + ml_base_core,
562586
'p310_ml_test': [
563587
'datatable',
564588
] + ml_base,

0 commit comments

Comments
 (0)