Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
3c84b16
improve: filter only own updates for read-after-write-conistency
csviri Jun 7, 2026
9d9571a
wip
csviri Jun 8, 2026
0d7fe34
wip
csviri Jun 8, 2026
50fc62c
wip
csviri Jun 8, 2026
2b76ac2
Event filtering with recording
csviri Jun 8, 2026
de1fdb4
test fix
csviri Jun 8, 2026
3ff2f62
Simplified EventHandling
csviri Jun 9, 2026
42087b9
unit tests fix
csviri Jun 9, 2026
e206edf
small fix, test repeats
csviri Jun 9, 2026
caa75ab
improvements and releated unit tests
csviri Jun 9, 2026
28ee8f3
cleanup
csviri Jun 9, 2026
a2548d1
improve: filter only own updates for read-after-write-conistency with…
csviri Jun 9, 2026
29834e1
improvements on edge cases
csviri Jun 9, 2026
ae7e49e
Potential fix for pull request finding
csviri Jun 10, 2026
12c84e9
delete related improvements and unit tests
csviri Jun 10, 2026
0b60010
delete handling improvements and test improvements
csviri Jun 10, 2026
fa55237
wip
csviri Jun 10, 2026
c8bfdcb
tests
csviri Jun 10, 2026
f89a3d3
test fix
csviri Jun 10, 2026
53e8876
fix typo
csviri Jun 10, 2026
8c7d4db
Potential fix for pull request finding
csviri Jun 10, 2026
4955b0c
fixes
csviri Jun 10, 2026
7be9fd6
improve: filter only own updates for read-after-write-conistency with…
csviri Jun 9, 2026
0340319
test fixes
csviri Jun 10, 2026
e88b7f5
logging and improvements
csviri Jun 10, 2026
39b0ac3
test AI identified cases
csviri Jun 10, 2026
8b2d6be
fix: only filter own events
csviri Jun 11, 2026
2b84952
wip
csviri Jun 11, 2026
f529dab
improvements and test fixes
csviri Jun 11, 2026
a8a92ab
improvements
csviri Jun 11, 2026
9585d74
fix resource cache read
csviri Jun 11, 2026
6f0e857
support for re-list
csviri Jun 12, 2026
d2e406f
simple algorithm, refined tests
csviri Jun 12, 2026
f31825a
naming fix
csviri Jun 12, 2026
7cb0a41
small fixes
csviri Jun 12, 2026
fc3e984
cleanup
csviri Jun 12, 2026
e25d598
cleanup
csviri Jun 12, 2026
a3112be
additional tests
csviri Jun 12, 2026
ff2cc50
addiotinal tests and docs improvements
csviri Jun 13, 2026
a97f314
wip
csviri Jun 14, 2026
d3c528c
wip
csviri Jun 14, 2026
db033f9
prepare for re-list
csviri Jun 14, 2026
f19429d
wip
csviri Jun 14, 2026
880f888
wip
csviri Jun 14, 2026
76e8c83
Potential fix for pull request finding
csviri Jun 14, 2026
842bddd
wip
csviri Jun 14, 2026
f084821
fix filtering
csviri Jun 14, 2026
d3ed86a
logging
csviri Jun 15, 2026
d3fb737
revert trace log
csviri Jun 15, 2026
2e84d7e
addressed issues from code review
csviri Jun 15, 2026
9adbca4
wip
csviri Jun 15, 2026
831be5d
minor improvements
csviri Jun 15, 2026
3f494d4
rename back GenericResourceEvent to ExtendedResourceEvent
csviri Jun 16, 2026
5741aea
fix: typo
metacosm Jun 16, 2026
32eba12
documenting algorithm
csviri Jun 16, 2026
3bfa595
further optimizations
csviri Jun 16, 2026
4fa883b
fix: typo
metacosm Jun 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions docs/content/en/blog/news/read-after-write-consistency.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ From this point the idea of the algorithm is very simple:
2. When the informer propagates an event, check if its resource version is greater than or equal to
the one in the TRC. If yes, evict the resource from the TRC.
3. When the controller reads a resource from cache, it checks the TRC first, then falls back to the Informer's cache.

The actual filtering of events for our own writes is more nuanced than a simple
"evict on RV ≥ TRC version" rule — it is driven by a per-resource state machine
that tracks in-flight writes and the events received around them. See
[Filtering events for our own updates](#filtering-events-for-our-own-updates) below.


```mermaid
Expand Down Expand Up @@ -221,13 +226,38 @@ sequenceDiagram
When we update a resource, eventually the informer will propagate an event that would trigger a reconciliation.
However, this is mostly not desired. Since we already have the up-to-date resource at that point,
we would like to be notified only if the resource is changed after our change.
Therefore, in addition to caching the resource, we also filter out events that contain a resource
version older than or equal to our cached resource version.

Note that the implementation of this is relatively complex, since while performing the update we want to record all the
events received in the meantime and decide whether to propagate them further once the update request is complete.

However, this way we significantly reduce the number of reconciliations, making the whole process much more efficient.
The framework runs a per-resource *event filter window* around each in-flight
write: it records the resource version returned by our update, buffers any
related events that arrive in the meantime, and at the end of the window
decides what (if anything) to surface to the reconciler. The rules:

- **Pure own echo**: if the only events in the window are watch events whose

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would help to include the sequence diagram that is in the PR here as well… Actually, a sequence diagram for each of these cases would be nice but might be too much work.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, also that blog post I would like to keep short, so folks actually read it. The unit tests basically cover all the sequences, and there is >20 of them, what is quite a lot.

resource versions match our recorded own writes (and the action is `UPDATED`),
they are filtered out — the reconciler isn't bothered.
- **Foreign change in the window**: if a resource version arrived that was *not*
one of our own writes — e.g. a third party modified the resource between two
of our updates — the framework synthesizes a single `UPDATED` event covering
the whole window (`previousResource` = the resource just before the window,
`resource` = the latest known state). The reconciler is notified once, with a
faithful before/after picture, instead of receiving each underlying watch
event individually.
- **DELETE in the middle**: if the resource was deleted at some point during
the window, that DELETE participates in the synthesis. A trailing `DELETED`
is surfaced verbatim; a DELETE-then-recreate inside the window collapses to
an `UPDATED` from the deleted state to the recreated state.
- **Held foreign events**: a foreign event that arrives *before* the matching
own write echo is buffered until the write completes. This avoids
surfacing it as foreign only to immediately overwrite it with a synthesized
echo.
- **ReList**: events arriving while the informer is performing a relist are
tagged. Because a relist may have hidden events, the framework defaults to
surfacing such events to the reconciler rather than silently filtering
them — even when they would otherwise look like our own echoes.

This way we significantly reduce the number of reconciliations, making the whole
process much more efficient, while preserving the invariant that any
foreign change reaches the reconciler.

### The case for instant reschedule

Expand Down
17 changes: 17 additions & 0 deletions docs/content/en/docs/documentation/reconciler.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,23 @@ supports stronger guarantees, both for primary and secondary resources. If this
that resource again. This feature also makes sure that the reconciliation is not triggered from the event from our
writes.

The filter is implemented as a per-resource *event filter window* that opens
when an update starts and closes when it completes. Inside the window:
- Pure own echoes (watch events whose resource version matches one of our
recorded own writes) are dropped.
- Foreign events received during the window are merged with the surrounding
own writes into a single synthesized `UPDATED` event so the reconciler
gets a faithful before/after picture rather than each individual watch
event. These events are carefully crafted so they correspond to a real-life scenario,
and remain fully usable by filters.
- A `DELETED` arriving in the window is propagated; a delete-then-recreate
inside the window collapses into one synthesized `UPDATED` from the
deleted state to the recreated state.
- During an informer relist the filter degrades to "surface what we see":
events received while a relist is in progress are propagated even when
they would otherwise look like own echoes, since the relist may have
hidden events.


In order to benefit from these stronger guarantees, use [`ResourceOperations`](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java)
from the context of the reconciliation:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
import io.javaoperatorsdk.operator.processing.event.source.informer.ExtendedResourceEvent;
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;

import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.handleKubernetesClientException;
import static io.javaoperatorsdk.operator.processing.event.source.controller.InternalEventFilters.*;
Expand Down Expand Up @@ -141,11 +141,22 @@ private void handleOnAddOrUpdate(
ResourceAction action, T oldCustomResource, T newCustomResource) {
var handling =
temporaryResourceCache.onAddOrUpdateEvent(action, newCustomResource, oldCustomResource);
if (handling == EventHandling.NEW) {
handleEvent(action, newCustomResource, oldCustomResource, null);
} else if (log.isDebugEnabled()) {
log.debug("{} event propagation for action: {}", handling, action);
}
handling.ifPresentOrElse(
this::handleEvent,
() -> {
if (log.isDebugEnabled()) {
log.debug("Skipping/deferring event propagation for action: {}", action);
}
});
}

@SuppressWarnings("unchecked")
private void handleEvent(ExtendedResourceEvent r) {
handleEvent(
r.getAction(),
(T) r.getResource().orElseThrow(),
(T) r.getPreviousResource().orElse(null),
r.isLastStateUnknown());
}

@Override
Expand All @@ -154,10 +165,10 @@ public synchronized void onDelete(T resource, boolean deletedFinalStateUnknown)
resource,
ResourceAction.DELETED,
() -> {
temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
var res = temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
// delete event is quite special here, that requires special care, since we clean up
// caches on delete event.
handleEvent(ResourceAction.DELETED, resource, null, deletedFinalStateUnknown);
res.ifPresent(this::handleEvent);
});
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright Java Operator SDK Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.javaoperatorsdk.operator.processing.event.ResourceID;

class EventFilterSupport {

private static final Logger log = LoggerFactory.getLogger(EventFilterSupport.class);

private final Map<ResourceID, EventFilterWindow> eventFilterWindows = new HashMap<>();
private boolean ongoingReList = false;

public synchronized void startEventFilteringModify(ResourceID resourceID) {
var existing = eventFilterWindows.get(resourceID);
var ed =
eventFilterWindows.computeIfAbsent(resourceID, id -> new EventFilterWindow(ongoingReList));
ed.increaseActiveUpdates();
log.debug(
"startEventFilteringModify: id={}, windowReused={}, ongoingReList={}",
resourceID,
existing != null,
ongoingReList);
}

public synchronized Optional<ExtendedResourceEvent> doneEventFilterModify(ResourceID resourceID) {
var ed = eventFilterWindows.get(resourceID);
if (ed == null) {
log.debug("doneEventFilterModify: no window for id={}", resourceID);
return Optional.empty();
}
ed.decreaseActiveUpdates();
log.debug("doneEventFilterModify: id={}", resourceID);
return check(ed, resourceID);
}

public synchronized Optional<ExtendedResourceEvent> processEvent(
ResourceID resourceId, ExtendedResourceEvent extendedResourceEvent) {
var ed = eventFilterWindows.get(resourceId);
if (ed != null) {
log.debug(
"processEvent: buffering event in window. id={}, action={}, rv={}",
resourceId,
extendedResourceEvent.getAction(),
extendedResourceEvent
.getResource()
.map(r -> r.getMetadata().getResourceVersion())
.orElse("?"));
ed.addRelatedEvent(extendedResourceEvent);
return check(ed, resourceId);
} else {
log.debug(
"processEvent: no active window, surfacing directly. id={}, action={}",
resourceId,
extendedResourceEvent.getAction());
return Optional.of(extendedResourceEvent);
}
}

private Optional<ExtendedResourceEvent> check(
EventFilterWindow eventFilterWindow, ResourceID resourceID) {
var res = eventFilterWindow.check();
if (eventFilterWindow.canBeRemoved()) {
log.debug("Removing empty event filter window. id={}", resourceID);
eventFilterWindows.remove(resourceID);
}
return res;
}

public synchronized void addToOwnResourceVersions(ResourceID resourceId, String resourceVersion) {
var window = eventFilterWindows.get(resourceId);
if (window != null) {
log.debug("Recording own resourceVersion. id={}, rv={}", resourceId, resourceVersion);
window.addToOwnUpdateVersions(resourceVersion);
} else {
log.debug(
"addToOwnResourceVersions: no active window for id={}, rv={} (skipped)",
resourceId,
resourceVersion);
}
}

public synchronized void handleGhostResourceRemoval(ResourceID resourceId) {
log.debug("Ghost resource removal: discarding event filter window. id={}", resourceId);
eventFilterWindows.remove(resourceId);
}

// for testing purposes
synchronized Map<ResourceID, EventFilterWindow> getEventFilterWindows() {
return eventFilterWindows;
}

public synchronized void setStartingReList() {
log.debug("ReList starting: tagging {} active window(s)", eventFilterWindows.size());
ongoingReList = true;
eventFilterWindows.values().forEach(EventFilterWindow::setReListStarted);
}

public synchronized void setRelistFinished() {
log.debug("ReList finished: clearing tag from {} active window(s)", eventFilterWindows.size());
ongoingReList = false;
eventFilterWindows.values().forEach(EventFilterWindow::setReListFinished);
}

public synchronized boolean isActiveUpdateFor(ResourceID resourceId) {
return eventFilterWindows.containsKey(resourceId);
}
}
Loading
Loading