-
Notifications
You must be signed in to change notification settings - Fork 2
Replace Ephemeral Collections with Collection Adapters #151
Description
Plan: Replace EphemeralCollections with CollectionAdapters
Goal
Replace the EphemeralCollection class and its associated duck-typed detection
pattern (getattr(x, "ephemeral", False) + persistent_object) with proper
CollectionAdapter subclasses that follow Galaxy's established adapter
serialization/recovery pipeline.
Background
EphemeralCollection (workflow/modules.py:3072-3097) is a workflow-only
mechanism that dynamically merges multiple step outputs into a single collection
input. It wraps an in-memory DatasetCollection + lazily-persisted HDCA. It's
detected via ephemeral = True flag and unwrapped at 6 consumption points.
CollectionAdapters (dataset_collections/adapters.py) are a newer, more
principled framework for wrapping model objects into collection-like facades.
They have proper serialization (Pydantic models), DB persistence (adapter JSON
columns on job input tables), and recovery (recover_adapter()). Currently they
handle only promote/reshape operations, not merging.
The two concepts solve similar problems with different patterns. Unifying them
eliminates the ad-hoc duck typing, gives merge operations proper persistence
for job provenance, and simplifies the consumption code.
Merge Strategies to Support
Three merge paths exist in run.py:466-557:
| Path | Input | merge_type | Result collection_type | Description |
|---|---|---|---|---|
| A | Multiple HDAs | default | list |
Promote N datasets to a list |
| B | Multiple list HDCAs |
merge_flattened |
list |
Flatten all list elements into one list |
| C | Multiple list HDCAs |
merge_nested |
list:<input_type> |
Nest input lists as sub-collections |
Step 1: New Adapter Classes
File: lib/galaxy/model/dataset_collections/adapters.py
1a. TransientCollectionAdapterCollectionElement
New helper alongside existing TransientCollectionAdapterDatasetInstanceElement.
Wraps an HDCA (or its inner DatasetCollection) as a collection element:
class TransientCollectionAdapterCollectionElement:
def __init__(self, element_identifier: str, hdca: "HistoryDatasetCollectionAssociation"):
self.element_identifier = element_identifier
self._hdca = hdca
@property
def child_collection(self):
return self._hdca.collection
@property
def element_object(self):
return self._hdca.collection
@property
def dataset_instance(self):
return None
@property
def is_collection(self):
return True
@property
def columns(self):
return None1b. MergeDatasetsAdapter
Replaces Path A (multiple HDAs → list).
class MergeDatasetsAdapter(CollectionAdapter):
"""Merge multiple datasets into a list collection."""
def __init__(self, datasets: list["HistoryDatasetAssociation"]):
self._datasets = datasets
self._elements = [
TransientCollectionAdapterDatasetInstanceElement(str(i), hda)
for i, hda in enumerate(datasets)
]
@property
def collection_type(self) -> str:
return "list"
@property
def elements(self):
return self._elements
@property
def dataset_instances(self):
return list(self._datasets)
@property
def dataset_action_tuples(self):
# Aggregate from all HDAs
...
@property
def dataset_states_and_extensions_summary(self):
# Aggregate from all HDAs
...
@property
def adapting(self):
return self._elements # list → matches existing list-adapting recording path
def to_adapter_model(self):
return AdaptedDataCollectionMergeDatasetsRequestInternal(
src="CollectionAdapter",
adapter_type="MergeDatasets",
adapting=[DataRequestInternalHda(src="hda", id=hda.id)
for hda in self._datasets],
)1c. MergeListsFlattenedAdapter
Replaces Path B (multiple list HDCAs → single flat list).
class MergeListsFlattenedAdapter(CollectionAdapter):
"""Flatten multiple list collections into one list."""
def __init__(self, hdcas: list["HistoryDatasetCollectionAssociation"]):
self._hdcas = hdcas
@property
def collection_type(self) -> str:
return "list"
@property
def elements(self):
result = []
idx = 0
for hdca in self._hdcas:
for elem in hdca.collection.elements:
result.append(TransientCollectionAdapterDatasetInstanceElement(
str(idx), elem.dataset_instance
))
idx += 1
return result
@property
def dataset_instances(self):
instances = []
for hdca in self._hdcas:
instances.extend(hdca.dataset_instances)
return instances
@property
def adapting(self):
return self._hdcas
def to_adapter_model(self):
return AdaptedDataCollectionMergeListsFlattenedRequestInternal(
src="CollectionAdapter",
adapter_type="MergeListsFlattened",
adapting=[DataRequestInternalHdca(src="hdca", id=hdca.id)
for hdca in self._hdcas],
)1d. MergeListsNestedAdapter
Replaces Path C (multiple list HDCAs → list:list).
class MergeListsNestedAdapter(CollectionAdapter):
"""Nest multiple list collections as sub-collections."""
def __init__(self, hdcas: list["HistoryDatasetCollectionAssociation"],
input_collection_type: str):
self._hdcas = hdcas
self._input_collection_type = input_collection_type
@property
def collection_type(self) -> str:
return f"list:{self._input_collection_type}"
@property
def elements(self):
return [TransientCollectionAdapterCollectionElement(str(i), hdca)
for i, hdca in enumerate(self._hdcas)]
@property
def dataset_instances(self):
instances = []
for hdca in self._hdcas:
instances.extend(hdca.dataset_instances)
return instances
@property
def adapting(self):
return self._hdcas
def to_adapter_model(self):
return AdaptedDataCollectionMergeListsNestedRequestInternal(
src="CollectionAdapter",
adapter_type="MergeListsNested",
input_collection_type=self._input_collection_type,
adapting=[DataRequestInternalHdca(src="hdca", id=hdca.id)
for hdca in self._hdcas],
)Step 2: Pydantic Serialization Models
File: lib/galaxy/tool_util_models/parameters.py
Internal models (DB-facing, int IDs):
class AdaptedDataCollectionMergeDatasetsRequestInternal(AdaptedDataCollectionRequestBase):
adapter_type: Literal["MergeDatasets"]
adapting: list[DataRequestInternalHda]
class AdaptedDataCollectionMergeListsFlattenedRequestInternal(AdaptedDataCollectionRequestBase):
adapter_type: Literal["MergeListsFlattened"]
adapting: list[DataRequestInternalHdca]
class AdaptedDataCollectionMergeListsNestedRequestInternal(AdaptedDataCollectionRequestBase):
adapter_type: Literal["MergeListsNested"]
input_collection_type: str
adapting: list[DataRequestInternalHdca]External models (API-facing, string IDs):
class AdaptedDataCollectionMergeDatasetsRequest(AdaptedDataCollectionRequestBase):
adapter_type: Literal["MergeDatasets"]
adapting: list[DataRequestHda]
class AdaptedDataCollectionMergeListsFlattenedRequest(AdaptedDataCollectionRequestBase):
adapter_type: Literal["MergeListsFlattened"]
adapting: list[DataRequestHdca]
class AdaptedDataCollectionMergeListsNestedRequest(AdaptedDataCollectionRequestBase):
adapter_type: Literal["MergeListsNested"]
input_collection_type: str
adapting: list[DataRequestHdca]Update discriminated unions:
Add new models to both AdaptedDataCollectionRequest and
AdaptedDataCollectionRequestInternal unions, discriminated on adapter_type.
Step 3: Recovery Support
File: lib/galaxy/model/dataset_collections/adapters.py
3a. Update recover_adapter()
Add branches for new adapter types. The wrapped_object parameter needs to
handle list-of-HDAs and list-of-HDCAs:
def recover_adapter(wrapped_object, adapter_model):
adapter_type = adapter_model.adapter_type
if adapter_type == "PromoteCollectionElementToCollection":
return PromoteCollectionElementToCollectionAdapter(wrapped_object)
elif adapter_type == "PromoteDatasetToCollection":
return PromoteDatasetToCollection(wrapped_object, adapter_model.collection_type)
elif adapter_type == "PromoteDatasetsToCollection":
return PromoteDatasetsToCollection(wrapped_object, adapter_model.collection_type)
elif adapter_type == "MergeDatasets":
return MergeDatasetsAdapter(wrapped_object) # list[HDA]
elif adapter_type == "MergeListsFlattened":
return MergeListsFlattenedAdapter(wrapped_object) # list[HDCA]
elif adapter_type == "MergeListsNested":
return MergeListsNestedAdapter(wrapped_object, adapter_model.input_collection_type)
else:
raise Exception(f"Unknown collection adapter encountered {adapter_type}")3b. Update src_id_to_item() in tools/parameters/basic.py
The existing recovery in src_id_to_item() handles lists of
TransientCollectionAdapterDatasetInstanceElement (for PromoteDatasetsToCollection).
New merge adapters need recovery from lists of HDAs or HDCAs:
if isinstance(adapting, list):
items = []
for item in adapting:
item_dict = item.dict() if hasattr(item, 'dict') else item.model_dump()
if hasattr(item, 'name'):
# existing TransientElement path for PromoteDatasetsToCollection
element = TransientCollectionAdapterDatasetInstanceElement(
item.name,
cast(HDA, src_id_to_item(sa_session, item_dict, security)),
)
items.append(element)
else:
# new path: fetch raw HDA or HDCA
items.append(src_id_to_item(sa_session, item_dict, security))
return recover_adapter(items, adapter_model)Step 4: Job Input Recording
File: lib/galaxy/tools/actions/__init__.py
The existing CollectionAdapter recording logic (lines ~1039-1056) handles:
adaptingisDatasetCollectionElement→add_input_dataset_collection_element()adaptingisHistoryDatasetAssociation→add_input_dataset()adaptingislist→ iterate,add_input_dataset()per element
For merge adapters, adapting is a list. Two sub-cases:
-
MergeDatasets:
adaptingislist[TransientElement]→ existing list branch
works (each element has.element_identifierand.hda) -
MergeFlattened/MergeNested:
adaptingislist[HDCA]→ need new sub-branch:
elif isinstance(adapting, list):
adapter_model_dump = adapter_model.model_dump()
for i, element in enumerate(adapting):
if isinstance(element, TransientCollectionAdapterDatasetInstanceElement):
input_key = f"{name}|__adapter_part__|{element.element_identifier}"
job.add_input_dataset(input_key, dataset=element.hda)
elif isinstance(element, model.HistoryDatasetCollectionAssociation):
input_key = f"{name}|__adapter_part__|{i}"
job.add_input_dataset_collection(
input_key, element,
adapter_json=adapter_model_dump if i == 0 else None,
)Store the adapter JSON on the first input association only; recovery reads it
from whichever association carries it.
Step 5: Replace Creation Site
File: lib/galaxy/workflow/run.py
Replace replacement_for_input_connections() lines 511-557. Instead of building
a DatasetCollection + DatasetCollectionElement objects in memory and wrapping
in EphemeralCollection, create the appropriate adapter:
# Was:
collection = model.DatasetCollection()
collection.collection_type = input_collection_type or "list"
# ... build DCE objects ...
return modules.EphemeralCollection(collection=collection, history=...)
# Becomes:
if input_collection_type is None:
# Path A: merge datasets into list
if merge_type == "merge_nested":
raise NotImplementedError()
return MergeDatasetsAdapter(inputs)
elif input_collection_type == "list":
if merge_type == "merge_flattened":
# Path B: flatten lists
return MergeListsFlattenedAdapter(inputs)
elif merge_type == "merge_nested":
# Path C: nest lists
return MergeListsNestedAdapter(inputs, input_collection_type)
else:
# default for lists = flatten (existing behavior)
return MergeListsFlattenedAdapter(inputs)
else:
raise NotImplementedError()Note: inputs here are the replacement objects from
replacement_for_connection(). For Path A these are HDAs; for Paths B/C these
are HDCAs. Need to verify the actual types returned by
replacement_for_connection() — they may be HDAs or HDCAs depending on the
upstream step output type.
Step 6: Remove Ephemeral Consumption Points
Replace all getattr(x, "ephemeral", False) checks and persistent_object
references. These are already handled for CollectionAdapters via isinstance(x, CollectionAdapter) in most places.
6a. tools/parameters/basic.py:1932-1938 — to_json()
Current:
if getattr(value, "ephemeral", False):
value = value.persistent_object
if value.id is None:
app.model.context.add(value)
app.model.context.flush()Replace: Remove this block entirely. CollectionAdapters don't need to be
persisted as HDCAs — they're serialized as adapter JSON instead. The
to_json() method should handle CollectionAdapters via the existing adapter
serialization path. If CollectionAdapter doesn't already have a to_json()
handler here, add one that calls to_adapter_model().model_dump().
6b. tools/actions/__init__.py:1031-1032 — job input recording
Current:
if getattr(dataset_collection, "ephemeral", False):
dataset_collection = dataset_collection.persistent_objectReplace: Remove. CollectionAdapters already flow into the isinstance(x, CollectionAdapter) branch at line ~1039.
6c. managers/collections.py:272-280 — implicit input recording
Current:
if getattr(input_collection, "ephemeral", False):
input_collection = input_collection.persistent_objectReplace: Check isinstance(input_collection, CollectionAdapter). For merge
adapters that wrap HDCAs, extract the underlying HDCAs and record each. For
adapters that wrap HDAs, skip (no collection-level implicit input). Or simply
skip recording for all adapters (matching existing ephemeral behavior where
implicit_inputs returns []).
6d. managers/collections.py:444-446 — tag propagation
Current:
if getattr(v, "ephemeral", False):
v = v.persistent_object
for tag in v.auto_propagated_tags:Replace: Add auto_propagated_tags property to CollectionAdapter base
class that aggregates tags from all adapted objects:
@property
def auto_propagated_tags(self):
tags = []
for instance in self.dataset_instances:
tags.extend(instance.auto_propagated_tags)
return tags6e. model/__init__.py:10240 — workflow output recording
Current:
if getattr(output_object, "ephemeral", False):
return # Don't record ephemeral collections as step outputsReplace:
if isinstance(output_object, CollectionAdapter):
return # Don't record adapter collections as step outputs6f. matching.py:23 — ephemeral detection
Current:
self.uses_ephemeral_collections = self.uses_ephemeral_collections or not hasattr(hdca, "hid")No change needed. CollectionAdapter subclasses also lack hid, so the
existing detection mechanism works. Rename the flag to
uses_non_persisted_collections for clarity (optional).
6g. matching.py:104-108 — implicit_inputs
No change needed. Flag-based logic still applies.
6h. execute.py:467-473 — on_text generation
No change needed. Flag-based logic still applies.
Step 7: Delete EphemeralCollection
File: lib/galaxy/workflow/modules.py
Delete the EphemeralCollection class (lines 3072-3097) and its import in
run.py.
Step 8: Update Type Hints & Imports
- Remove
EphemeralCollectionfrom any union types or parameter annotations - Add
CollectionAdapterimports where merge adapters are created (run.py) - Update type hints in consumption points if they referenced
EphemeralCollection
Testing Strategy
Conformance Tests (Primary)
The CWL conformance tests are the primary validation. No existing collection
adapters (PromoteDatasetToCollection, etc.) have unit tests — they're
validated through integration tests only. Follow the same pattern here.
These conformance tests exercise all three merge paths:
| Test ID | Merge Path | Status Check |
|---|---|---|
wf_wc_scatter_multiple_merge |
A (default) | Must pass |
wf_wc_scatter_multiple_flattened |
B (flatten) | Must pass |
wf_wc_scatter_multiple_nested |
C (nested) | Must pass |
wf_scatter_twopar_oneinput_flattenedmerge |
B (flatten with list inputs) | Must pass |
scatter_multi_input_embedded_subworkflow |
A (default, subworkflow) | Must pass |
valuefrom_wf_step_multiple |
multiple sources | Must pass |
wf_multiplesources_multipletypes |
mixed types | Must pass |
Development Order
- Implement adapter classes + Pydantic models + recovery (Steps 1-3)
- Implement job recording changes (Step 4)
- Replace creation site in
run.py(Step 5) - Update consumption points (Step 6)
- Delete
EphemeralCollection(Step 7) - Run conformance tests to validate
Files Modified
| File | Change |
|---|---|
lib/galaxy/model/dataset_collections/adapters.py |
Add 3 adapter classes + 1 helper + update recover_adapter() |
lib/galaxy/tool_util_models/parameters.py |
Add 6 Pydantic models (3 internal + 3 external) + update unions |
lib/galaxy/workflow/run.py |
Replace EphemeralCollection creation with adapter creation |
lib/galaxy/tools/parameters/basic.py |
Remove ephemeral check in to_json(), update src_id_to_item() recovery |
lib/galaxy/tools/actions/__init__.py |
Remove ephemeral check, extend list-adapting recording for HDCAs |
lib/galaxy/managers/collections.py |
Replace ephemeral checks with isinstance(CollectionAdapter) |
lib/galaxy/model/__init__.py |
Replace ephemeral check with isinstance(CollectionAdapter) |
lib/galaxy/workflow/modules.py |
Delete EphemeralCollection class |
No changes needed:
matching.py—not hasattr(hid)detection still worksexecute.py—uses_ephemeral_collectionsflag logic still works
Risks & Considerations
-
replacement_for_connection()return types — need to verify what types
are actually returned. For datasets it may return HDA directly or something
wrapped. For collections it may return HDCA or anEphemeralCollectionfrom
a prior merge. Need to handle the case where a merge adapter feeds into
another merge. -
Default merge for lists — the current code's default path (no explicit
merge_type) for list inputs falls through to the flatten branch via the
elif input_collection_type == "list":block without checking merge_type
explicitly. Need to preserve this behavior. -
DB migration — adapter JSON columns already exist on all three job input
tables. No schema migration needed; new adapter_type values will just appear
in the JSON. -
CollectionAdapter in parameter matching — the matching code in
matching.pycurrently handles CollectionAdapters by duck typing (checking
elements, collection_type). Merge adapters need to satisfy the same
interface. TheTransientCollectionAdapterCollectionElementhelper is needed
for nested merge to provide properchild_collection.
Unresolved Questions
- The
inputspassed to merge adapters fromreplacement_for_connection()—
are they always raw HDA/HDCA objects, or could they be other wrapper types? - Should merge adapters be exclusively workflow-internal, or could they also be
useful for API requests (e.g., user sends multiple HDCAs to be merged)? - The
to_json()path inbasic.py— how exactly should a CollectionAdapter
serialize? Existing adapters go throughfrom_json()/to_json()differently
than ephemeral collections. Need to verify the exact code path a merge
adapter takes through parameter serialization. - Should we rename
uses_ephemeral_collectionsflag in matching.py to
something more generic (e.g.uses_adapter_collections)? - Tag propagation for merge adapters wrapping HDCAs — should tags come from
the HDCA level or the individual dataset level?