Skip to content

Commit d9e5a2a

Browse files
committed
Set workflowId for upload
1 parent 68aabff commit d9e5a2a

15 files changed

Lines changed: 242 additions & 129 deletions

File tree

conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunner.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
3737
import org.conductoross.conductor.client.FileClient;
3838
import org.conductoross.conductor.sdk.file.FileHandler;
39+
import org.conductoross.conductor.sdk.file.FileUploadOptions;
3940
import org.conductoross.conductor.sdk.file.LocalFileHandler;
4041
import org.slf4j.Logger;
4142
import org.slf4j.LoggerFactory;
@@ -381,7 +382,7 @@ private void executeTask(Worker worker, Task task) {
381382
worker.getIdentity());
382383
result = worker.execute(task);
383384
stopwatch.stop();
384-
uploadFilesToFileStorage(result);
385+
uploadFilesToFileStorage(result, task.getWorkflowInstanceId(), task.getTaskId());
385386
eventDispatcher.publish(new TaskExecutionCompleted(taskType, task.getTaskId(), worker.getIdentity(), stopwatch.elapsed(TimeUnit.MILLISECONDS)));
386387
// record execution end time in task
387388
task.getExecutionMetadata().setExecutionEndTime(System.currentTimeMillis());
@@ -468,7 +469,7 @@ private void updateTaskResult(int count, Task task, TaskResult result, Worker wo
468469
}
469470

470471
@VisibleForTesting
471-
void uploadFilesToFileStorage(TaskResult result) {
472+
void uploadFilesToFileStorage(TaskResult result, String workflowId, String taskId) {
472473
if (fileClient == null || result.getOutputData() == null) {
473474
return;
474475
}
@@ -478,7 +479,10 @@ void uploadFilesToFileStorage(TaskResult result) {
478479
}
479480
if (fh.getFileHandleId() == null) {
480481
Path path = ((LocalFileHandler) fh).getPath();
481-
FileHandler uploaded = fileClient.upload(path, fh.getContentType());
482+
FileUploadOptions options = new FileUploadOptions()
483+
.setContentType(fh.getContentType())
484+
.setTaskId(taskId);
485+
FileHandler uploaded = fileClient.upload(workflowId, path, options);
482486
entry.setValue(uploaded.getFileHandleId());
483487
} else {
484488
entry.setValue(fh.getFileHandleId());

conductor-client/src/main/java/org/conductoross/conductor/client/FileClient.java

Lines changed: 70 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.netflix.conductor.client.http.ConductorClient;
3131
import com.netflix.conductor.client.http.ConductorClientRequest;
3232
import com.netflix.conductor.client.http.ConductorClientRequest.Method;
33+
import com.netflix.conductor.sdk.workflow.executor.task.TaskContext;
3334

3435
import com.fasterxml.jackson.core.type.TypeReference;
3536

@@ -51,12 +52,18 @@
5152
*/
5253
public class FileClient implements FileUploader {
5354

54-
private static final TypeReference<FileUploadResponse> FILE_UPLOAD_RESPONSE_TYPE = new TypeReference<>() {};
55-
private static final TypeReference<FileUploadUrlResponse> FILE_UPLOAD_URL_RESPONSE_TYPE = new TypeReference<>() {};
56-
private static final TypeReference<FileUploadCompleteResponse> FILE_UPLOAD_COMPLETE_RESPONSE_TYPE = new TypeReference<>() {};
57-
private static final TypeReference<FileDownloadUrlResponse> FILE_DOWNLOAD_URL_RESPONSE_TYPE = new TypeReference<>() {};
58-
private static final TypeReference<FileHandle> FILE_HANDLE_TYPE = new TypeReference<>() {};
59-
private static final TypeReference<MultipartInitResponse> MULTIPART_INIT_RESPONSE_TYPE = new TypeReference<>() {};
55+
private static final TypeReference<FileUploadResponse> FILE_UPLOAD_RESPONSE_TYPE = new TypeReference<>() {
56+
};
57+
private static final TypeReference<FileUploadUrlResponse> FILE_UPLOAD_URL_RESPONSE_TYPE = new TypeReference<>() {
58+
};
59+
private static final TypeReference<FileUploadCompleteResponse> FILE_UPLOAD_COMPLETE_RESPONSE_TYPE = new TypeReference<>() {
60+
};
61+
private static final TypeReference<FileDownloadUrlResponse> FILE_DOWNLOAD_URL_RESPONSE_TYPE = new TypeReference<>() {
62+
};
63+
private static final TypeReference<FileHandle> FILE_HANDLE_TYPE = new TypeReference<>() {
64+
};
65+
private static final TypeReference<MultipartInitResponse> MULTIPART_INIT_RESPONSE_TYPE = new TypeReference<>() {
66+
};
6067

6168
private final ConductorClient client;
6269
private final FileClientProperties properties;
@@ -73,8 +80,8 @@ public FileClient(ConductorClient client) {
7380
/**
7481
* Full-args constructor.
7582
*
76-
* @param properties {@code null} to use defaults
77-
* @param fileStorageBackendsByStorageType {@code null} to use the built-in backends
83+
* @param properties {@code null} to use defaults
84+
* @param fileStorageBackendsByStorageType {@code null} to use the built-in backends
7885
*/
7986
public FileClient(ConductorClient client, FileClientProperties properties, Map<StorageType, FileStorageBackend> fileStorageBackendsByStorageType) {
8087
this.client = client;
@@ -84,25 +91,24 @@ public FileClient(ConductorClient client, FileClientProperties properties, Map<S
8491

8592
// --- FileUploader (developer-facing) ---
8693

87-
@Override
88-
public FileHandler upload(Path localFile) {
89-
return upload(localFile, "application/octet-stream");
90-
}
91-
9294
/**
93-
* Uploads a local file. Creates the server-side metadata record, selects the backend that
94-
* matches the server-reported {@link StorageType}, and uses multipart when the file size
95-
* exceeds {@link FileClientProperties#getMultipartThreshold()} AND the backend supports
96-
* multipart (see {@link FileStorageBackend#hasMultipartSupport()}).
95+
* Uploads a local file with the given {@link FileUploadOptions}.
96+
*
97+
* <p>{@code workflowId} is required; pass {@code null} only if you are prepared to see a
98+
* {@link FileStorageException}. When called from inside a worker and {@code options.taskId}
99+
* is null, {@code taskId} is auto-filled from the active {@link TaskContext}.
97100
*
98-
* @throws FileStorageException if no backend is registered for the server's storage type,
99-
* or if any step of the upload fails
101+
* @throws FileStorageException if {@code workflowId} is null, if no backend is registered for
102+
* the server's storage type, or if any step of the upload fails
100103
*/
101104
@Override
102-
public FileHandler upload(Path localFile, String contentType) {
105+
public FileHandler upload(String workflowId, Path localFile, FileUploadOptions options) {
106+
if (workflowId == null) {
107+
throw new FileStorageException("workflowId is required");
108+
}
103109
try {
104-
long fileSize = Files.size(localFile);
105-
FileUploadRequest request = FileHandlerConverter.toFileUploadRequest(localFile, contentType, fileSize);
110+
fillDefaults(options, localFile);
111+
FileUploadRequest request = FileHandlerConverter.toFileUploadRequest(workflowId, options);
106112

107113
FileUploadResponse response = createFileOnServer(request);
108114

@@ -112,7 +118,7 @@ public FileHandler upload(Path localFile, String contentType) {
112118
+ " but SDK only supports: " + fileStorageBackendsByStorageType.keySet());
113119
}
114120

115-
if (fileSize > properties.getMultipartThreshold() && storageBackend.hasMultipartSupport()) {
121+
if (options.getFileSize() > properties.getMultipartThreshold() && storageBackend.hasMultipartSupport()) {
116122
uploadMultipart(response.getFileHandleId(), response.getStorageType(), localFile);
117123
} else {
118124
storageBackend.upload(response.getUploadUrl(), localFile);
@@ -125,22 +131,52 @@ public FileHandler upload(Path localFile, String contentType) {
125131
}
126132
}
127133

134+
/**
135+
* Buffers the stream to a temp file, then delegates to {@link #upload(String, Path, FileUploadOptions)}.
136+
*/
128137
@Override
129-
public FileHandler upload(InputStream inputStream) {
130-
return upload(inputStream, "application/octet-stream");
131-
}
132-
133-
@Override
134-
public FileHandler upload(InputStream inputStream, String contentType) {
138+
public FileHandler upload(String workflowId, InputStream inputStream, FileUploadOptions options) {
135139
try {
136140
Path temp = Files.createTempFile("conductor-upload-", ".tmp");
137-
Files.copy(inputStream, temp, java.nio.file.StandardCopyOption.REPLACE_EXISTING);
138-
return upload(temp, contentType);
141+
Files.copy(inputStream, temp, StandardCopyOption.REPLACE_EXISTING);
142+
return upload(workflowId, temp, options);
139143
} catch (IOException e) {
140144
throw new FileStorageException("Failed to write InputStream to temp file", e);
141145
}
142146
}
143147

148+
@Override
149+
public FileHandler upload(String workflowId, Path localFile) {
150+
return upload(workflowId, localFile, new FileUploadOptions());
151+
}
152+
153+
@Override
154+
public FileHandler upload(String workflowId, InputStream inputStream) {
155+
return upload(workflowId, inputStream, new FileUploadOptions());
156+
}
157+
158+
/**
159+
* Populates unset options from the local file and active {@link TaskContext}. Mutates
160+
* {@code options} in place — SDK-internal helper only.
161+
*/
162+
private static void fillDefaults(FileUploadOptions options, Path localFile) throws IOException {
163+
if (options.getFileName() == null) {
164+
options.setFileName(localFile.getFileName().toString());
165+
}
166+
if (options.getFileSize() == 0) {
167+
options.setFileSize(Files.size(localFile));
168+
}
169+
if (options.getContentType() == null) {
170+
options.setContentType("application/octet-stream");
171+
}
172+
if (options.getTaskId() == null) {
173+
TaskContext ctx = TaskContext.get();
174+
if (ctx != null) {
175+
options.setTaskId(ctx.getTaskId());
176+
}
177+
}
178+
}
179+
144180
// --- SDK-internal (public for cross-package access, not developer-facing) ---
145181

146182
/**
@@ -158,7 +194,9 @@ public void download(String workflowId, String fileHandleId, StorageType storage
158194
fileStorageBackendsByStorageType.get(storageType).download(urlResponse.getDownloadUrl(), destination);
159195
}
160196

161-
/** Fetches the {@link FileHandle} metadata via {@code GET /api/files/{fileId}}. */
197+
/**
198+
* Fetches the {@link FileHandle} metadata via {@code GET /api/files/{fileId}}.
199+
*/
162200
public FileHandle getMetadata(String fileHandleId) {
163201
ConductorClientRequest request = ConductorClientRequest.builder()
164202
.method(Method.GET)

conductor-client/src/main/java/org/conductoross/conductor/sdk/file/FileHandlerConverter.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import java.nio.file.Path;
1616

1717
import org.conductoross.conductor.client.FileClient;
18-
import org.conductoross.conductor.client.model.file.FileHandle;
1918
import org.conductoross.conductor.client.model.file.FileUploadRequest;
2019
import org.conductoross.conductor.client.model.file.FileUploadResponse;
2120

@@ -38,21 +37,14 @@ public static ManagedFileHandler toManagedFileHandler(FileUploadResponse respons
3837
return handler;
3938
}
4039

41-
/** Copies server-provided metadata onto an existing {@link ManagedFileHandler}. */
42-
public static void populateFromMetadata(ManagedFileHandler handler, FileHandle metadata) {
43-
handler.setFileName(metadata.getFileName());
44-
handler.setContentType(metadata.getContentType());
45-
handler.setFileSize(metadata.getFileSize());
46-
handler.setStorageType(metadata.getStorageType());
47-
}
48-
4940
/** Builds the {@link FileUploadRequest} payload for {@code POST /api/files}. */
50-
public static FileUploadRequest toFileUploadRequest(Path localFile, String contentType,
51-
long fileSize) {
41+
public static FileUploadRequest toFileUploadRequest(String workflowId, FileUploadOptions options) {
5242
FileUploadRequest request = new FileUploadRequest();
53-
request.setFileName(localFile.getFileName().toString());
54-
request.setContentType(contentType);
55-
request.setFileSize(fileSize);
43+
request.setFileName(options.getFileName());
44+
request.setContentType(options.getContentType());
45+
request.setFileSize(options.getFileSize());
46+
request.setWorkflowId(workflowId);
47+
request.setTaskId(options.getTaskId());
5648
return request;
5749
}
5850
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2026 Conductor Authors.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package org.conductoross.conductor.sdk.file;
14+
15+
/**
16+
* Optional metadata to attach to a file upload. {@code workflowId} is supplied as the explicit
17+
* first argument on {@link FileUploader#upload}; this type carries everything else.
18+
*
19+
* <p>When uploading inside a worker, {@code taskId} is populated automatically from the active
20+
* {@link com.netflix.conductor.sdk.workflow.executor.task.TaskContext} if not set here.
21+
* Providing it explicitly overrides the auto-detected value.
22+
*
23+
* <pre>{@code
24+
* FileUploadOptions options = new FileUploadOptions()
25+
* .setContentType("application/pdf")
26+
* .setTaskId("task-456");
27+
* FileHandler handler = fileUploader.upload(workflowId, path, options);
28+
* }</pre>
29+
*/
30+
public class FileUploadOptions {
31+
32+
private String taskId;
33+
private String fileName;
34+
private String contentType;
35+
private long fileSize;
36+
37+
public String getTaskId() {
38+
return taskId;
39+
}
40+
41+
public FileUploadOptions setTaskId(String taskId) {
42+
this.taskId = taskId;
43+
return this;
44+
}
45+
46+
public String getFileName() {
47+
return fileName;
48+
}
49+
50+
public FileUploadOptions setFileName(String fileName) {
51+
this.fileName = fileName;
52+
return this;
53+
}
54+
55+
public String getContentType() {
56+
return contentType;
57+
}
58+
59+
public FileUploadOptions setContentType(String contentType) {
60+
this.contentType = contentType;
61+
return this;
62+
}
63+
64+
public long getFileSize() {
65+
return fileSize;
66+
}
67+
68+
public FileUploadOptions setFileSize(long fileSize) {
69+
this.fileSize = fileSize;
70+
return this;
71+
}
72+
}

conductor-client/src/main/java/org/conductoross/conductor/sdk/file/FileUploader.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,20 @@
2424
* populated with the prefixed handle {@code conductor://file/<fileId>}. The {@link InputStream}
2525
* overloads stream the content to a temporary file first, then delegate to the {@link Path}
2626
* overload.
27+
*
28+
* <p>When called from within a worker, {@code workflowId} and {@code taskId} are injected
29+
* automatically from the active task context. Use {@link FileUploadOptions} to override them or
30+
* to supply additional metadata when uploading outside a worker.
2731
*/
2832
public interface FileUploader {
2933

30-
/** Upload from an {@link InputStream} with content type {@code application/octet-stream}. */
31-
FileHandler upload(InputStream inputStream);
34+
FileHandler upload(String workflowId, Path localFile);
3235

33-
/** Upload from an {@link InputStream} with an explicit content type. */
34-
FileHandler upload(InputStream inputStream, String contentType);
36+
FileHandler upload(String workflowId, InputStream inputStream);
3537

36-
/** Upload from a local file with content type {@code application/octet-stream}. */
37-
FileHandler upload(Path localFile);
38+
/** Upload from a local file with the given options. */
39+
FileHandler upload(String workflowId, Path localFile, FileUploadOptions options);
3840

39-
/** Upload from a local file with an explicit content type. */
40-
FileHandler upload(Path localFile, String contentType);
41+
/** Upload from an {@link InputStream} with the given options. */
42+
FileHandler upload(String workflowId, InputStream inputStream, FileUploadOptions options);
4143
}

conductor-client/src/main/java/org/conductoross/conductor/sdk/file/LocalFileHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
* {@link FileHandler} wrapping a local file that has not been uploaded.
2424
* {@link #getFileHandleId()} returns {@code null}. Returned by
2525
* {@link FileHandler#fromLocalFile(Path)}; consumed by
26-
* {@link org.conductoross.conductor.client.FileClient#upload(Path)} or by
26+
* {@link FileClient#upload(String, Path)} or by
2727
* {@code TaskRunner}'s auto-upload interception on task completion.
2828
*/
2929
public class LocalFileHandler implements FileHandler {

0 commit comments

Comments
 (0)