Skip to content

[Python] Support for Event containing Row field can be json serializable#63

Merged
xintongsong merged 9 commits intoapache:mainfrom
Kavishankarks:python-support-for-row-serialization
Aug 11, 2025
Merged

[Python] Support for Event containing Row field can be json serializable#63
xintongsong merged 9 commits intoapache:mainfrom
Kavishankarks:python-support-for-row-serialization

Conversation

@Kavishankarks
Copy link
Copy Markdown
Contributor

@Kavishankarks Kavishankarks commented Jul 15, 2025

Linked issue: #64

This change introduces enhanced JSON serialization support for Row objects within the Event class in the Flink Agents module. Specifically:

Adds a custom row_serializer() function to convert PyFlink Row instances into serializable dictionaries.

Implements model_dump_json() override and _serialize_with_row_support() to gracefully handle Row serialization across all events.

Adds utility methods to detect serialization issues exclusively caused by Row objects, preventing false-positive validation errors.

Includes test coverage for both valid and invalid serialization cases involving Row and non-serializable types.

@Kavishankarks Kavishankarks changed the title [Python] Support Event contains Row field be json serializable [Python] Support for Event containing Row field can be json serializable Jul 15, 2025
@wenjin272
Copy link
Copy Markdown
Collaborator

@Kavishankarks Thanks a lot for the contribution, I will take a look later. And I created the correspond issue #64 , which you can take.

@wenjin272 wenjin272 self-requested a review July 16, 2025 03:43
@wenjin272
Copy link
Copy Markdown
Collaborator

Thanks for your contribution, @Kavishankarks. LGTM.
@xintongsong Please take a look at your convenience

@xintongsong
Copy link
Copy Markdown
Contributor

It seems we are always trying the pydantic serialization first, and fallback to json serialization with custom serializer if the first try fail.

This approach leads to a few issues.

  • Inefficiency, for trying to serialize the object twice.
  • Potential inconsistency. Adding one row-type filed in the nested object will entirely change the serializer.

I think the most elegant approach might be making the pyflink Row a pydantic base model and override model_dump_json() for it. However, even we make the change in Flink now, it will only be available to future Flink versions. Moreover, we'd better not to affect Flink for Flink Agents untill the latter is stabilized. Alternatively, we may consider modifying Row in Flink Agents with some monkey patches, as a temporal solution.

WDYT? @Kavishankarks @wenjin272

@wenjin272
Copy link
Copy Markdown
Collaborator

It seems we are always trying the pydantic serialization first, and fallback to json serialization with custom serializer if the first try fail.

This approach leads to a few issues.

  • Inefficiency, for trying to serialize the object twice.
  • Potential inconsistency. Adding one row-type filed in the nested object will entirely change the serializer.

I think the most elegant approach might be making the pyflink Row a pydantic base model and override model_dump_json() for it. However, even we make the change in Flink now, it will only be available to future Flink versions. Moreover, we'd better not to affect Flink for Flink Agents untill the latter is stabilized. Alternatively, we may consider modifying Row in Flink Agents with some monkey patches, as a temporal solution.

WDYT? @Kavishankarks @wenjin272

I think it make sense to avoid the issues.

Firstly, I try the monkey patches, but it occurs exception

TypeError: __bases__ assignment: 'BaseModel' deallocator differs from 'object'

Then, I try to find some methods to inject customer serializer to BaseModel model_dump_json(). Fortunately, there is exactly a fallback parameters when call model_dump_json()

fallback: A function to call when an unknown value is encountered. If not provided,
                a [`PydanticSerializationError`][pydantic_core.PydanticSerializationError] error is raised.

So, I modify the Event code like:

class Event(BaseModel, ABC, extra="allow"):
    """Base class for all event types in the system. Event allow extra properties, but
    these properties are required isinstance of BaseModel, or json serializable.

    Attributes:
    ----------
    id : UUID
        Unique identifier for the event, automatically generated using uuid4.
    """
    id: UUID = Field(default_factory=uuid4)
    
    @staticmethod
    def __serialize_unknow(field: Any) -> Dict[str, Any]:
        if isinstance(field, Row):
            return {"type": "Row", "values": field._values}
        else:
            err_msg = f"Unable to serialize unknown type: {field.__class__}"
            raise PydanticSerializationError(err_msg)
    
    @override
    def model_dump_json(self, **kwargs: Any) -> str:
        """Override model_dump_json to handle Row objects."""
        return super().model_dump_json(fallback=self.__serialize_unknow)
    

    @model_validator(mode='after')
    def validate_extra(self) -> 'Event':
        """Ensure init fields is serializable."""
        self.model_dump_json()
        return self

    def __setattr__(self, name: str, value: Any) -> None:
        super().__setattr__(name, value)
        # Ensure added property can be serialized.
        self.model_dump_json()

And it works

def test_inject_super_class_to_row() -> None:
    event = InputEvent(input=Row({"a": 1}))
    print(event.model_dump_json())
{"id":"037d5556-4a99-4d5f-a3f4-c7ec12773dc0","input":{"type":"Row","values":[{"a":1}]}}

WDYT? @Kavishankarks @xintongsong

@Kavishankarks
Copy link
Copy Markdown
Contributor Author

It seems we are always trying the pydantic serialization first, and fallback to json serialization with custom serializer if the first try fail.
This approach leads to a few issues.

  • Inefficiency, for trying to serialize the object twice.
  • Potential inconsistency. Adding one row-type filed in the nested object will entirely change the serializer.

I think the most elegant approach might be making the pyflink Row a pydantic base model and override model_dump_json() for it. However, even we make the change in Flink now, it will only be available to future Flink versions. Moreover, we'd better not to affect Flink for Flink Agents untill the latter is stabilized. Alternatively, we may consider modifying Row in Flink Agents with some monkey patches, as a temporal solution.
WDYT? @Kavishankarks @wenjin272

I think it make sense to avoid the issues.

Firstly, I try the monkey patches, but it occurs exception

TypeError: __bases__ assignment: 'BaseModel' deallocator differs from 'object'

Then, I try to find some methods to inject customer serializer to BaseModel model_dump_json(). Fortunately, there is exactly a fallback parameters when call model_dump_json()

fallback: A function to call when an unknown value is encountered. If not provided,
                a [`PydanticSerializationError`][pydantic_core.PydanticSerializationError] error is raised.

So, I modify the Event code like:

class Event(BaseModel, ABC, extra="allow"):
    """Base class for all event types in the system. Event allow extra properties, but
    these properties are required isinstance of BaseModel, or json serializable.

    Attributes:
    ----------
    id : UUID
        Unique identifier for the event, automatically generated using uuid4.
    """
    id: UUID = Field(default_factory=uuid4)
    
    @staticmethod
    def __serialize_unknow(field: Any) -> Dict[str, Any]:
        if isinstance(field, Row):
            return {"type": "Row", "values": field._values}
        else:
            err_msg = f"Unable to serialize unknown type: {field.__class__}"
            raise PydanticSerializationError(err_msg)
    
    @override
    def model_dump_json(self, **kwargs: Any) -> str:
        """Override model_dump_json to handle Row objects."""
        return super().model_dump_json(fallback=self.__serialize_unknow)
    

    @model_validator(mode='after')
    def validate_extra(self) -> 'Event':
        """Ensure init fields is serializable."""
        self.model_dump_json()
        return self

    def __setattr__(self, name: str, value: Any) -> None:
        super().__setattr__(name, value)
        # Ensure added property can be serialized.
        self.model_dump_json()

And it works

def test_inject_super_class_to_row() -> None:
    event = InputEvent(input=Row({"a": 1}))
    print(event.model_dump_json())
{"id":"037d5556-4a99-4d5f-a3f4-c7ec12773dc0","input":{"type":"Row","values":[{"a":1}]}}

WDYT? @Kavishankarks @xintongsong

I think this solution of using monkey patch looks more suitable.

@Kavishankarks
Copy link
Copy Markdown
Contributor Author

@wenjin272 shall I make the suggested changes?

@wenjin272
Copy link
Copy Markdown
Collaborator

wenjin272 commented Jul 29, 2025

@wenjin272 shall I make the suggested changes?

@Kavishankarks Of course you can.

@wenjin272
Copy link
Copy Markdown
Collaborator

Hi, @Kavishankarks, do you have time recently to make revisions based on the comments?

@Kavishankarks
Copy link
Copy Markdown
Contributor Author

Hi @wenjin272, i have done the changes, please review at your convieniece.

@wenjin272
Copy link
Copy Markdown
Collaborator

Hi @wenjin272, i have done the changes, please review at your convieniece.

LGTM.
@xintongsong Please take a look at your convenience

Copy link
Copy Markdown
Contributor

@xintongsong xintongsong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@xintongsong xintongsong merged commit 71b9414 into apache:main Aug 11, 2025
15 checks passed
@xintongsong xintongsong added this to the 0.1.0 milestone Sep 12, 2025
@Sxnan Sxnan added priority/major Default priority of the PR or issue. fixVersion/0.1.0 The feature or bug should be implemented/fixed in the 0.1.0 version. doc-not-needed Your PR changes do not impact docs labels Nov 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs fixVersion/0.1.0 The feature or bug should be implemented/fixed in the 0.1.0 version. priority/major Default priority of the PR or issue.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants