Skip to content

feat(ml): add qdrant ingestion#38142

Open
MichaelGruschke wants to merge 4 commits intoapache:masterfrom
MichaelGruschke:qdrant
Open

feat(ml): add qdrant ingestion#38142
MichaelGruschke wants to merge 4 commits intoapache:masterfrom
MichaelGruschke:qdrant

Conversation

@MichaelGruschke
Copy link
Copy Markdown
Contributor

resolves #38141

This PR adds support for Qdrant vector database ingestion to Apache Beam's ML RAG pipeline.

Implementation details:

  1. QdrantConnectionParameters

    • Dataclass for Qdrant connection configuration
    • Supports location, url, host, path for connection
    • Configurable port, grpc_port, api_key, https, timeout
  2. QdrantWriteConfig

    • Extends VectorDatabaseWriteConfig
    • Supports both dense and sparse embeddings
    • Creates converter to models.PointStruct
    • Batched writes (default 1000 items/batch)
  3. QdrantWriteTransform

    • Private PTransform wrapping _QdrantWriteFn
    • Batches elements and upserts to Qdrant collection
    • Handles client lifecycle (setup/teardown)
  4. Dependency Changes

    • Added qdrant-client to ml_test extras_require

5.Tests

  • Integration tests (qdrant_it_test.py) for end-to-end ingestion verification

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances Apache Beam's machine learning capabilities by integrating Qdrant, a popular vector database. It provides a complete framework for ingesting embedded data into Qdrant collections, allowing for scalable and efficient management of vector embeddings within RAG pipelines. The changes include new configuration options, data transformation logic, and thorough testing to ensure reliability and correctness.

Highlights

  • Qdrant Integration: Added comprehensive support for Qdrant vector database ingestion within Apache Beam's ML RAG pipeline, enabling users to write EmbeddableItem objects to Qdrant collections.
  • Configuration Classes: Introduced QdrantConnectionParameters for defining connection details (location, URL, host, port, API key, etc.) and QdrantWriteConfig for specifying write behavior (collection name, batch size, embedding keys).
  • Data Transformation and Ingestion: Implemented _QdrantWriteTransform and _QdrantWriteFn to handle the conversion of EmbeddableItem objects into Qdrant's PointStruct format, supporting both dense and sparse embeddings, and performing batched upserts to the Qdrant collection.
  • Dependency Management: Updated setup.py to include qdrant-client as a new dependency, ensuring it's available for ML-related tests and installations.
  • Integration Tests: Provided robust integration tests (qdrant_it_test.py) to verify end-to-end Qdrant ingestion functionality, covering various scenarios like dense-only, sparse-only, hybrid embeddings, batching, and idempotent writes.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@MichaelGruschke MichaelGruschke force-pushed the qdrant branch 2 times, most recently from 3a266b5 to efe7cf4 Compare April 10, 2026 22:33
@github-actions
Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@MichaelGruschke
Copy link
Copy Markdown
Contributor Author

Qdrant integration tests in the respective Python ML test suites all pass.
Failing checks seems to be an existing issue unrelated to the changes

@MichaelGruschke
Copy link
Copy Markdown
Contributor Author

assign set of reviewers

@github-actions
Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @jrmccluskey for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@github-actions
Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @jrmccluskey for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@github-actions
Copy link
Copy Markdown
Contributor

Reminder, please take a look at this pr: @jrmccluskey

Comment thread CHANGES.md Outdated
* Updates minimum Go version to 1.26.1 ([#37897](https://github.com/apache/beam/issues/37897)).
* (Python) Added image embedding support in `apache_beam.ml.rag` package ([#37628](https://github.com/apache/beam/issues/37628)).
* (Python) Added support for Python version 3.14 ([#37247](https://github.com/apache/beam/issues/37247)).
* (Python) Added [Qdrant](https://qdrant.tech/) VectorDatabaseWriteConfig implementation ([#38141](https://github.com/apache/beam/issues/38141)).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to the 2.74.0 release notes

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


import logging
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Optional
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use collections.abc types for Callable, use the built-in dict type for Dict hints

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for pointing that out, should be fixed now!



@dataclass
class QdrantConnectionParameters:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A docstring outlining each field and the mandatory information required to create a valid set of parameters will make this much more user friendly

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point! added docstring



@dataclass
class QdrantWriteConfig(VectorDatabaseWriteConfig):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar thought here, a docstring should be provided here since this is the entrypoint for users to drop the qdrant write into their pipelines

refactor: use local qdrant implementation for tests

chore: clean up imports

chore: add qdrant dependency to ml_test extra

chore: run precommit

chore: add comment to CHANGES.md

fix: guard against import error

fix: import
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 4, 2026

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @damccorm for label python.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@damccorm
Copy link
Copy Markdown
Contributor

damccorm commented May 4, 2026

R: @jrmccluskey @claudevdm

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 4, 2026

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@claudevdm
Copy link
Copy Markdown
Collaborator

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the Qdrant VectorDatabaseWriteConfig implementation for the Python SDK, allowing for data ingestion into Qdrant collections. The changes include new connection and write configuration classes, a dedicated Beam PTransform, and comprehensive integration tests. Review feedback highlights opportunities to improve the robustness of the implementation by moving batch initialization to start_bundle to avoid state leakage, adding validation for the batch_size parameter, and removing the override that disables client-server compatibility checks.

Comment on lines +159 to +162
def __init__(self, config: QdrantWriteConfig):
self.config = config
self._batch = []
self._client: "Optional[QdrantClient]" = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In Apache Beam, DoFn instances can be reused across bundles. To ensure that state is not leaked between bundles (especially in case of retries), it is a best practice to initialize bundle-specific state like self._batch in the start_bundle method rather than in __init__.

Suggested change
def __init__(self, config: QdrantWriteConfig):
self.config = config
self._batch = []
self._client: "Optional[QdrantClient]" = None
def __init__(self, config: QdrantWriteConfig):
self.config = config
self._client: "Optional[QdrantClient]" = None
def start_bundle(self):
self._batch = []

Comment on lines +112 to +114
def __post_init__(self):
if not self.collection_name:
raise ValueError("Collection name must be provided")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It is recommended to validate that batch_size is a positive integer in __post_init__ to prevent potential issues with empty or negative batch sizes during ingestion.

Suggested change
def __post_init__(self):
if not self.collection_name:
raise ValueError("Collection name must be provided")
def __post_init__(self):
if not self.collection_name:
raise ValueError("Collection name must be provided")
if self.batch_size <= 0:
raise ValueError("batch_size must be positive")

Comment on lines +183 to +184
check_compatibility=False,
**params.kwargs,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Disabling the compatibility check (check_compatibility=False) can lead to difficult-to-debug issues if there is a version mismatch between the client and the Qdrant server. Unless there is a specific reason to disable it, it is safer to leave it enabled (which is the default).

Suggested change
check_compatibility=False,
**params.kwargs,
**params.kwargs,

def process(self, element, *args, **kwargs):
self._batch.append(element)
if len(self._batch) >= self.config.batch_size:
self._flush()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a byte size limit for individual batches, similar to BigQuery streaming inserts

> self._max_insert_payload_size) or

return
if not self._client:
raise RuntimeError("Qdrant client is not initialized")
self._client.upsert(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any retriable errors that we should handle?



@dataclass
class QdrantConnectionParameters:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add classmethod factories to make it clearer which combinations of parameters are valid? Something like

@dataclass
class QdrantConnectionParameters:
    # ... existing fields unchanged ...

    @classmethod
    def for_cloud(
        cls,
        url: str,
        api_key: str,
        *,
        prefer_grpc: bool = False,
        timeout: Optional[int] = None,
        **kwargs: Any,
    ) -> "QdrantConnectionParameters":
        """Connect to Qdrant Cloud. Requires the cluster URL and an API key."""
        return cls(
            url=url,
            api_key=api_key,
            https=True,
            prefer_grpc=prefer_grpc,
            timeout=timeout,
            kwargs=kwargs,
        )

    @classmethod
    def for_host(
        cls,
        host: str,
        port: int = 6333,
        *,
        grpc_port: int = 6334,
        prefer_grpc: bool = False,
        https: bool = False,
        api_key: Optional[str] = None,
        timeout: Optional[int] = None,
        **kwargs: Any,
    ) -> "QdrantConnectionParameters":
        """Connect to a self-hosted Qdrant instance by host and port."""
        return cls(
            host=host, port=port, grpc_port=grpc_port,
            prefer_grpc=prefer_grpc, https=https,
            api_key=api_key, timeout=timeout, kwargs=kwargs,
        )

    @classmethod
    def for_url(
        cls,
        url: str,
        *,
        api_key: Optional[str] = None,
        prefer_grpc: bool = False,
        timeout: Optional[int] = None,
        **kwargs: Any,
    ) -> "QdrantConnectionParameters":
        """Connect using a full URL like 'https://my-qdrant.example.com:6333'."""
        return cls(url=url, api_key=api_key, prefer_grpc=prefer_grpc,
                   timeout=timeout, kwargs=kwargs)

    @classmethod
    def local(cls, path: str) -> "QdrantConnectionParameters":
        """Use an embedded Qdrant instance persisted to the given path."""
        return cls(path=path)

    @classmethod
    def in_memory(cls) -> "QdrantConnectionParameters":
        """Use an embedded in-memory Qdrant instance. Useful for tests."""
        return cls(location=":memory:")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature Request]: Add Qdrant vector database ingestion support

4 participants