Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions conductor-client-spring/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ repositories {
dependencies {
api project(":conductor-client")
implementation 'org.springframework.boot:spring-boot-starter:3.5.13'

testImplementation 'org.springframework.boot:spring-boot-starter-test:3.3.13'
}

java {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
import java.util.Optional;

import org.apache.commons.lang3.StringUtils;
import org.conductoross.conductor.client.FileClient;
import org.conductoross.conductor.client.FileClientProperties;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureOrder;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.core.Ordered;
Expand Down Expand Up @@ -122,4 +125,18 @@ public WorkflowExecutor workflowExecutor(ConductorClient client, AnnotatedWorker
public WorkflowClient workflowClient(ConductorClient client) {
return new WorkflowClient(client);
}

@Bean
@ConfigurationProperties("conductor.file-client")
@ConditionalOnMissingBean
public FileClientProperties fileClientProperties() {
return new FileClientProperties();
}

@Bean
@ConditionalOnBean(ConductorClient.class)
@ConditionalOnMissingBean
public FileClient fileClient(ConductorClient client, FileClientProperties fileClientProperties) {
return new FileClient(client, fileClientProperties, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package io.orkes.conductor.client.spring;

import org.apache.commons.lang3.StringUtils;
import org.conductoross.conductor.client.FileClient;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
Expand Down Expand Up @@ -128,4 +129,10 @@ public SecretClient orkesSecretClient(OrkesClients clients) {
return clients.getSecretClient();
}

@Bean
@ConditionalOnBean(ApiClient.class)
@ConditionalOnMissingBean
public FileClient fileClient(ApiClient client) {
return new FileClient(client);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2026 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.client.spring;

import org.conductoross.conductor.client.FileClient;
import org.conductoross.conductor.client.FileClientProperties;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;

import static org.assertj.core.api.Assertions.assertThat;

class ConductorClientAutoConfigurationTest {

private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(ConductorClientAutoConfiguration.class));

@Test
void zeroConfigYieldsWorkingFileClientBean() {
contextRunner
.withPropertyValues("conductor.client.base-path=http://localhost:8080/api")
.run(context -> {
assertThat(context).hasSingleBean(FileClient.class);
FileClientProperties properties = context.getBean(FileClientProperties.class);
assertThat(properties.getLocalCacheDirectory())
.startsWith(System.getProperty("java.io.tmpdir"));
});
}

@Test
void cacheDirectoryOverrideIsRespected() {
contextRunner
.withPropertyValues(
"conductor.client.base-path=http://localhost:8080/api",
"conductor.file-client.local-cache-directory=/tmp/custom-cache")
.run(context -> {
FileClientProperties properties = context.getBean(FileClientProperties.class);
assertThat(properties.getLocalCacheDirectory()).isEqualTo("/tmp/custom-cache");
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
Expand All @@ -33,6 +34,11 @@
import java.util.function.Function;

import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.conductoross.conductor.client.FileClient;
import org.conductoross.conductor.sdk.file.FileHandler;
import org.conductoross.conductor.sdk.file.FileUploadOptions;
import org.conductoross.conductor.sdk.file.LocalFileHandler;
import org.conductoross.conductor.sdk.file.WorkflowFileClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -51,6 +57,7 @@
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskResult;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Uninterruptibles;

Expand All @@ -73,6 +80,7 @@ class TaskRunner {
private final EventDispatcher<TaskRunnerEvent> eventDispatcher;
private final LinkedBlockingQueue<Task> tasksTobeExecuted;
private final boolean enableUpdateV2;
private final FileClient fileClient;
private static final int LEASE_EXTEND_RETRY_COUNT = 3;
private static final double LEASE_EXTEND_DURATION_FACTOR = 0.8;
private final ScheduledExecutorService leaseExtendExecutorService;
Expand All @@ -87,9 +95,11 @@ class TaskRunner {
int taskPollTimeout,
List<PollFilter> pollFilters,
EventDispatcher<TaskRunnerEvent> eventDispatcher,
boolean useVirtualThreads) {
boolean useVirtualThreads,
FileClient fileClient) {
this.worker = worker;
this.taskClient = taskClient;
this.fileClient = fileClient;
this.updateRetryCount = updateRetryCount;
this.taskPollTimeout = taskPollTimeout;
this.pollingIntervalInMillis = worker.getPollingInterval();
Expand Down Expand Up @@ -346,6 +356,11 @@ private void executeTask(Worker worker, Task task) {
return;
}

// Set FileClient on task for file storage support
if (fileClient != null) {
task.setWorkflowFileClient(new WorkflowFileClient(fileClient, task.getWorkflowInstanceId()));
}

// Calculate inbound network latency
try {
if(task.getExecutionMetadata().getServerSendTime() != null ){
Expand All @@ -368,6 +383,7 @@ private void executeTask(Worker worker, Task task) {
worker.getIdentity());
result = worker.execute(task);
stopwatch.stop();
uploadFilesToFileStorage(result, task.getWorkflowInstanceId(), task.getTaskId());
eventDispatcher.publish(new TaskExecutionCompleted(taskType, task.getTaskId(), worker.getIdentity(), stopwatch.elapsed(TimeUnit.MILLISECONDS)));
// record execution end time in task
task.getExecutionMetadata().setExecutionEndTime(System.currentTimeMillis());
Expand Down Expand Up @@ -453,6 +469,26 @@ private void updateTaskResult(int count, Task task, TaskResult result, Worker wo
}
}

@VisibleForTesting
void uploadFilesToFileStorage(TaskResult result, String workflowId, String taskId) {
if (fileClient == null || result.getOutputData() == null) {
return;
}
for (var entry : result.getOutputData().entrySet()) {
if (!(entry.getValue() instanceof FileHandler fh)) {
continue;
}
if (fh.getFileHandleId() == null) {
Path path = ((LocalFileHandler) fh).getPath();
FileUploadOptions options = new FileUploadOptions()
.setContentType(fh.getContentType())
.setTaskId(taskId);
FileHandler uploaded = fileClient.upload(workflowId, path, options);
entry.setValue(uploaded);
}
}
}

private Optional<String> upload(TaskResult result, String taskType) {
try {
return taskClient.evaluateAndUploadLargePayload(result.getOutputData(), taskType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.function.Consumer;

import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.conductoross.conductor.client.FileClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -54,6 +55,7 @@ public class TaskRunnerConfigurer {
private final List<PollFilter> pollFilters;
private final EventDispatcher<TaskRunnerEvent> eventDispatcher;
private final boolean useVirtualThreads;
private final FileClient fileClient;

/**
* @see TaskRunnerConfigurer.Builder
Expand All @@ -75,6 +77,7 @@ private TaskRunnerConfigurer(TaskRunnerConfigurer.Builder builder) {
this.pollFilters = builder.pollFilters;
this.eventDispatcher = builder.eventDispatcher;
this.useVirtualThreads = builder.useVirtualThreads;
this.fileClient = builder.fileClient;
builder.workers.forEach(this.workers::add);
taskRunners = new LinkedList<>();
}
Expand Down Expand Up @@ -173,7 +176,8 @@ private void startWorker(Worker worker) {
taskPollTimeout,
pollFilters,
eventDispatcher,
useVirtualThreads);
useVirtualThreads,
fileClient);
// startWorker(worker) is executed by several threads.
// taskRunners.add(taskRunner) without synchronization could lead to a race condition and unpredictable behavior,
// including potential null values being inserted or corrupted state.
Expand Down Expand Up @@ -206,6 +210,7 @@ public static class Builder {
private final List<PollFilter> pollFilters = new LinkedList<>();
private final EventDispatcher<TaskRunnerEvent> eventDispatcher = new EventDispatcher<>();
private boolean useVirtualThreads;
private FileClient fileClient;

public Builder(TaskClient taskClient, Iterable<Worker> workers) {
Preconditions.checkNotNull(taskClient, "TaskClient cannot be null");
Expand Down Expand Up @@ -352,5 +357,10 @@ public Builder withUseVirtualThreads(boolean useVirtualThreads) {
this.useVirtualThreads = useVirtualThreads;
return this;
}

public Builder withFileClient(FileClient fileClient) {
this.fileClient = fileClient;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@
import java.util.Optional;

import org.apache.commons.lang3.StringUtils;
import org.conductoross.conductor.sdk.file.FileHandler;
import org.conductoross.conductor.sdk.file.FileStorageException;
import org.conductoross.conductor.sdk.file.FileUploader;
import org.conductoross.conductor.sdk.file.ManagedFileHandler;
import org.conductoross.conductor.sdk.file.WorkflowFileClient;

import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.common.run.tasks.TypedTask;

import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.*;

@Data
Expand Down Expand Up @@ -179,6 +185,27 @@ public boolean isRetriable() {

private long firstStartTime;

@JsonIgnore
private transient WorkflowFileClient workflowFileClient;

@JsonIgnore
public FileUploader getFileUploader() { return workflowFileClient; }

public void setWorkflowFileClient(WorkflowFileClient workflowFileClient) {
this.workflowFileClient = workflowFileClient;
}

public FileHandler getInputFileHandler(String key) {
Object value = getInputData().get(key);
String fileHandleId = FileHandler.extractFileHandleId(value);
if (FileHandler.isFileHandleId(fileHandleId)) {
return new ManagedFileHandler(fileHandleId, workflowFileClient);
}
throw new FileStorageException(
"Expected " + FileHandler.PREFIX
+ " reference for key '" + key + "', got: " + value);
}

public void setInputData(Map<String, Object> inputData) {
if (inputData == null) {
inputData = new HashMap<>();
Expand Down
Loading
Loading