From 9e70407b5e6c8f0c9233c3421ec8baf5281b7c2c Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 20 Mar 2026 15:37:23 -0400 Subject: [PATCH 1/5] Implement open method for gcsutilv2. Add an integration test. --- .../beam/sdk/extensions/gcp/util/GcsUtil.java | 11 +++ .../sdk/extensions/gcp/util/GcsUtilV2.java | 74 +++++++++++++++++++ .../gcp/util/GcsUtilParameterizedIT.java | 49 ++++++++++++ 3 files changed, 134 insertions(+) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index e3f01dd85295..f613c1f6c931 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -25,6 +25,7 @@ import com.google.cloud.storage.BucketInfo; import com.google.cloud.storage.Storage.BlobGetOption; import com.google.cloud.storage.Storage.BlobListOption; +import com.google.cloud.storage.Storage.BlobSourceOption; import com.google.cloud.storage.Storage.BucketGetOption; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; @@ -186,9 +187,19 @@ public Page listBlobs( } public SeekableByteChannel open(GcsPath path) throws IOException { + if (delegateV2 != null) { + return delegateV2.open(path); + } return delegate.open(path); } + public SeekableByteChannel openV2(GcsPath path, BlobSourceOption... options) throws IOException { + if (delegateV2 != null) { + return delegateV2.open(path, options); + } + throw new IOException("GcsUtil V2 not initialized."); + } + /** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */ @Deprecated public WritableByteChannel create(GcsPath path, String type) throws IOException { diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index b00b7ce0d728..b5b25a95479a 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -23,6 +23,7 @@ import com.google.api.gax.paging.Page; import com.google.auto.value.AutoValue; +import com.google.cloud.ReadChannel; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; @@ -33,6 +34,7 @@ import com.google.cloud.storage.Storage.BlobField; import com.google.cloud.storage.Storage.BlobGetOption; import com.google.cloud.storage.Storage.BlobListOption; +import com.google.cloud.storage.Storage.BlobSourceOption; import com.google.cloud.storage.Storage.BucketField; import com.google.cloud.storage.Storage.BucketGetOption; import com.google.cloud.storage.Storage.CopyRequest; @@ -42,6 +44,8 @@ import com.google.cloud.storage.StorageOptions; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; import java.nio.file.AccessDeniedException; import java.nio.file.FileAlreadyExistsException; import java.util.ArrayList; @@ -481,4 +485,74 @@ public void removeBucket(BucketInfo bucketInfo) throws IOException { throw translateStorageException(bucketInfo.getName(), null, e); } } + + /** A bridge that allows a GCS ReadChannel to behave as a SeekableByteChannel. */ + private static class GcsSeekableByteChannel implements SeekableByteChannel { + private final ReadChannel reader; + private final long size; + private long position = 0; + + GcsSeekableByteChannel(ReadChannel reader, long size) { + this.reader = reader; + this.size = size; + this.position = 0; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + int count = reader.read(dst); + if (count > 0) { + this.position += count; + } + return count; + } + + @Override + public SeekableByteChannel position(long newPosition) throws IOException { + checkArgument(newPosition >= 0, "Position must be non-negative: %s", newPosition); + reader.seek(newPosition); + this.position = newPosition; + return this; + } + + @Override + public long position() throws IOException { + return this.position; + } + + @Override + public long size() throws IOException { + return size; + } + + @Override + public SeekableByteChannel truncate(long size) throws IOException { + throw new UnsupportedOperationException( + "GcsSeekableByteChannels are read-only and cannot be truncated."); + } + + @Override + public int write(ByteBuffer src) throws IOException { + throw new UnsupportedOperationException( + "GcsSeekableByteChannel are read-only and does not support writing."); + } + + @Override + public boolean isOpen() { + return reader.isOpen(); + } + + @Override + public void close() throws IOException { + if (isOpen()) { + reader.close(); + } + } + } + + public SeekableByteChannel open(GcsPath path, BlobSourceOption... options) throws IOException { + Blob blob = getBlob(path, BlobGetOption.fields(BlobField.SIZE)); + return new GcsSeekableByteChannel( + blob.getStorage().reader(blob.getBlobId(), options), blob.getSize()); + } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java index 80ffd72924fa..80fb494facf0 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java @@ -30,8 +30,12 @@ import com.google.cloud.storage.BucketInfo; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; import java.nio.file.AccessDeniedException; import java.nio.file.FileAlreadyExistsException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -587,4 +591,49 @@ private void assertNotExists(GcsPath path) throws IOException { assertThrows(FileNotFoundException.class, () -> gcsUtil.getObject(path)); } } + + @Test + public void testOpenAndReadSeekableChannel() throws IOException, NoSuchAlgorithmException { + final GcsPath gcsPath = GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt"); + final String expectedHash = "674a2725884307c96398440497c889ad8cecccedf5689df85e6b0faabe4e0fe8"; + final long expectedSize = 157283L; + + try (SeekableByteChannel channel = gcsUtil.open(gcsPath)) { + // Verify Size + assertEquals(expectedSize, channel.size()); + assertEquals(0, channel.position()); + + // Read content into ByteBuffer + ByteBuffer buffer = ByteBuffer.allocate((int) expectedSize); + int bytesRead = 0; + while (buffer.hasRemaining()) { + int read = channel.read(buffer); + if (read == -1) { + break; + } + bytesRead += read; + } + + // Verify total bytes read and position + assertEquals(expectedSize, bytesRead); + assertEquals(expectedSize, channel.position()); + + // Flip the buffer to prepare it for reading (sets limit to current position, position to 0) + buffer.flip(); + + // Verify hash + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + digest.update(buffer); + byte[] hashBytes = digest.digest(); + + // Convert bytes to Hex String + StringBuilder sb = new StringBuilder(); + for (byte b : hashBytes) { + sb.append(String.format("%02x", b)); + } + String actualHash = sb.toString(); + + assertEquals("Content hash should match", expectedHash, actualHash); + } + } } From eb1be77eab3d11bb11176b8d0a305f49c2097173 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 20 Mar 2026 22:42:46 -0400 Subject: [PATCH 2/5] Implement create method and add an integration test. --- .../beam/sdk/extensions/gcp/util/GcsUtil.java | 12 +++ .../sdk/extensions/gcp/util/GcsUtilV2.java | 78 ++++++++++++++++++- .../gcp/util/GcsUtilParameterizedIT.java | 64 ++++++++++++--- 3 files changed, 141 insertions(+), 13 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index f613c1f6c931..03733463e5c4 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -26,6 +26,7 @@ import com.google.cloud.storage.Storage.BlobGetOption; import com.google.cloud.storage.Storage.BlobListOption; import com.google.cloud.storage.Storage.BlobSourceOption; +import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.Storage.BucketGetOption; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; @@ -265,9 +266,20 @@ public CreateOptions build() { } public WritableByteChannel create(GcsPath path, CreateOptions options) throws IOException { + if (delegateV2 != null) { + delegateV2.create(path, options.delegate); + } return delegate.create(path, options.delegate); } + public WritableByteChannel create( + GcsPath path, CreateOptions options, BlobWriteOption... writeOptions) throws IOException { + if (delegateV2 != null) { + return delegateV2.create(path, options.delegate, writeOptions); + } + throw new IOException("GcsUtil V2 not initialized."); + } + public void verifyBucketAccessible(GcsPath path) throws IOException { if (delegateV2 != null) { delegateV2.verifyBucketAccessible(path); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index b5b25a95479a..0fd8fd0e3c60 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -24,6 +24,7 @@ import com.google.api.gax.paging.Page; import com.google.auto.value.AutoValue; import com.google.cloud.ReadChannel; +import com.google.cloud.WriteChannel; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; @@ -35,6 +36,7 @@ import com.google.cloud.storage.Storage.BlobGetOption; import com.google.cloud.storage.Storage.BlobListOption; import com.google.cloud.storage.Storage.BlobSourceOption; +import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.Storage.BucketField; import com.google.cloud.storage.Storage.BucketGetOption; import com.google.cloud.storage.Storage.CopyRequest; @@ -46,12 +48,15 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SeekableByteChannel; +import java.nio.channels.WritableByteChannel; import java.nio.file.AccessDeniedException; import java.nio.file.FileAlreadyExistsException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.PipelineOptions; @@ -74,6 +79,8 @@ public GcsUtilV2 create(PipelineOptions options) { private Storage storage; + private final @Nullable Integer uploadBufferSizeBytes; + /** Maximum number of items to retrieve per Objects.List request. */ private static final long MAX_LIST_BLOBS_PER_CALL = 1024; @@ -89,6 +96,7 @@ public GcsUtilV2 create(PipelineOptions options) { GcsUtilV2(PipelineOptions options) { String projectId = options.as(GcpOptions.class).getProject(); storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService(); + uploadBufferSizeBytes = options.as(GcsOptions.class).getGcsUploadBufferSizeBytes(); } @SuppressWarnings({ @@ -550,9 +558,75 @@ public void close() throws IOException { } } - public SeekableByteChannel open(GcsPath path, BlobSourceOption... options) throws IOException { + public SeekableByteChannel open(GcsPath path, BlobSourceOption... sourceOptions) + throws IOException { Blob blob = getBlob(path, BlobGetOption.fields(BlobField.SIZE)); return new GcsSeekableByteChannel( - blob.getStorage().reader(blob.getBlobId(), options), blob.getSize()); + blob.getStorage().reader(blob.getBlobId(), sourceOptions), blob.getSize()); + } + + /** A bridge that allows a GCS WriteChannel to behave as a WritableByteChannel. */ + private static class GcsWritableByteChannel implements WritableByteChannel { + private final WriteChannel writer; + + GcsWritableByteChannel(WriteChannel writer) { + this.writer = writer; + } + + @Override + public int write(ByteBuffer src) throws IOException { + try { + return writer.write(src); + } catch (StorageException e) { + // In a real implementation, you'd use your translateStorageException here + throw new IOException(e); + } + } + + @Override + public boolean isOpen() { + return writer.isOpen(); + } + + @Override + public void close() throws IOException { + writer.close(); + } + } + + public WritableByteChannel create( + GcsPath path, GcsUtilV1.CreateOptions options, BlobWriteOption... writeOptions) + throws IOException { + try { + // Define the metadata for the new object + BlobInfo.Builder builder = BlobInfo.newBuilder(path.getBucket(), path.getObject()); + String type = options.getContentType(); + if (type != null) { + builder.setContentType(type); + } + + BlobInfo blobInfo = builder.build(); + + List writeOptionList = new ArrayList<>(Arrays.asList(writeOptions)); + if (options.getExpectFileToNotExist()) { + writeOptionList.add(BlobWriteOption.doesNotExist()); + } + // Open a WriteChannel from the storage service + WriteChannel writer = + storage.writer(blobInfo, writeOptionList.toArray(new BlobWriteOption[0])); + Integer uploadBufferSizeBytes = + options.getUploadBufferSizeBytes() != null + ? options.getUploadBufferSizeBytes() + : this.uploadBufferSizeBytes; + if (uploadBufferSizeBytes != null) { + writer.setChunkSize(uploadBufferSizeBytes); + } + + // Return the bridge wrapper + return new GcsWritableByteChannel(writer); + + } catch (StorageException e) { + throw translateStorageException(path, e); + } } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java index 80fb494facf0..91cd7bc56bc1 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java @@ -31,7 +31,10 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.charset.StandardCharsets; import java.nio.file.AccessDeniedException; import java.nio.file.FileAlreadyExistsException; import java.security.MessageDigest; @@ -41,6 +44,7 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.CreateOptions; import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy; import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.OverwriteStrategy; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; @@ -305,7 +309,8 @@ public void testCreateAndRemoveBucket() throws IOException { } } - private List createTestBucketHelper(String bucketName) throws IOException { + private List createTestBucketHelper(String bucketName, boolean copyData) + throws IOException { final List originPaths = Arrays.asList( GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardii.txt"), @@ -320,16 +325,24 @@ private List createTestBucketHelper(String bucketName) throws IOExcepti if (experiment.equals("use_gcsutil_v2")) { gcsUtil.createBucket(BucketInfo.of(bucketName)); - gcsUtil.copyV2(originPaths, testPaths); + if (copyData) { + gcsUtil.copyV2(originPaths, testPaths); + } else { + return Collections.emptyList(); + } } else { GcsOptions gcsOptions = options.as(GcsOptions.class); gcsUtil.createBucket(gcsOptions.getProject(), new Bucket().setName(bucketName)); - final List originList = - originPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); - final List testList = - testPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); - gcsUtil.copy(originList, testList); + if (copyData) { + final List originList = + originPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + final List testList = + testPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + gcsUtil.copy(originList, testList); + } else { + return Collections.emptyList(); + } } return testPaths; @@ -359,7 +372,7 @@ public void testCopy() throws IOException { final String nonExistentBucket = "my-random-test-bucket-12345"; try { - final List srcPaths = createTestBucketHelper(existingBucket); + final List srcPaths = createTestBucketHelper(existingBucket, true); final List dstPaths = srcPaths.stream() .map(o -> GcsPath.fromComponents(existingBucket, o.getObject() + ".bak")) @@ -427,7 +440,7 @@ public void testRemove() throws IOException { final String nonExistentBucket = "my-random-test-bucket-12345"; try { - final List srcPaths = createTestBucketHelper(existingBucket); + final List srcPaths = createTestBucketHelper(existingBucket, true); final List errPaths = srcPaths.stream() .map(o -> GcsPath.fromComponents(nonExistentBucket, o.getObject())) @@ -489,7 +502,7 @@ public void testRename() throws IOException { final String nonExistentBucket = "my-random-test-bucket-12345"; try { - final List srcPaths = createTestBucketHelper(existingBucket); + final List srcPaths = createTestBucketHelper(existingBucket, true); final List tmpPaths = srcPaths.stream() .map(o -> GcsPath.fromComponents(existingBucket, "tmp/" + o.getObject())) @@ -593,7 +606,7 @@ private void assertNotExists(GcsPath path) throws IOException { } @Test - public void testOpenAndReadSeekableChannel() throws IOException, NoSuchAlgorithmException { + public void testRead() throws IOException, NoSuchAlgorithmException { final GcsPath gcsPath = GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt"); final String expectedHash = "674a2725884307c96398440497c889ad8cecccedf5689df85e6b0faabe4e0fe8"; final long expectedSize = 157283L; @@ -636,4 +649,33 @@ public void testOpenAndReadSeekableChannel() throws IOException, NoSuchAlgorithm assertEquals("Content hash should match", expectedHash, actualHash); } } + + @Test + public void testWriteAndRead() throws IOException { + final String bucketName = "apache-beam-temp-bucket-12345"; + final GcsPath targetPath = GcsPath.fromComponents(bucketName, "test-object.txt"); + final String content = "Hello, GCS!"; + + try { + createTestBucketHelper(bucketName, false); + + CreateOptions options = CreateOptions.builder().setExpectFileToNotExist(true).build(); + try (WritableByteChannel writer = gcsUtil.create(targetPath, options)) { + writer.write(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))); + } + + StringBuilder readContent = new StringBuilder(); + try (ReadableByteChannel reader = gcsUtil.open(targetPath)) { + ByteBuffer buffer = ByteBuffer.allocate(1024); + while (reader.read(buffer) != -1) { + buffer.flip(); + readContent.append(StandardCharsets.UTF_8.decode(buffer)); + buffer.clear(); + } + } + assertEquals(content, readContent.toString()); + } finally { + tearDownTestBucketHelper(bucketName); + } + } } From 67c42da789970eeb7cc17a7b6070b6cc8d84234a Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 20 Mar 2026 22:58:58 -0400 Subject: [PATCH 3/5] Store the gcs path into GcsWritableByteChannel. --- .../beam/sdk/extensions/gcp/util/GcsUtilV2.java | 11 ++++++----- .../extensions/gcp/util/GcsUtilParameterizedIT.java | 4 ++++ 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index 0fd8fd0e3c60..5a422001c572 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -103,7 +103,7 @@ public GcsUtilV2 create(PipelineOptions options) { "nullness" // For Creating AccessDeniedException FileNotFoundException, and // FileAlreadyExistsException with null. }) - private IOException translateStorageException(GcsPath gcsPath, StorageException e) { + private static IOException translateStorageException(GcsPath gcsPath, StorageException e) { switch (e.getCode()) { case 403: return new AccessDeniedException(gcsPath.toString(), null, e.getMessage()); @@ -568,9 +568,11 @@ public SeekableByteChannel open(GcsPath path, BlobSourceOption... sourceOptions) /** A bridge that allows a GCS WriteChannel to behave as a WritableByteChannel. */ private static class GcsWritableByteChannel implements WritableByteChannel { private final WriteChannel writer; + private final GcsPath gcsPath; - GcsWritableByteChannel(WriteChannel writer) { + GcsWritableByteChannel(WriteChannel writer, GcsPath gcsPath) { this.writer = writer; + this.gcsPath = gcsPath; } @Override @@ -578,8 +580,7 @@ public int write(ByteBuffer src) throws IOException { try { return writer.write(src); } catch (StorageException e) { - // In a real implementation, you'd use your translateStorageException here - throw new IOException(e); + throw translateStorageException(gcsPath, e); } } @@ -623,7 +624,7 @@ public WritableByteChannel create( } // Return the bridge wrapper - return new GcsWritableByteChannel(writer); + return new GcsWritableByteChannel(writer, path); } catch (StorageException e) { throw translateStorageException(path, e); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java index 91cd7bc56bc1..7816ce603001 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java @@ -659,11 +659,13 @@ public void testWriteAndRead() throws IOException { try { createTestBucketHelper(bucketName, false); + // Write content to a GCS file CreateOptions options = CreateOptions.builder().setExpectFileToNotExist(true).build(); try (WritableByteChannel writer = gcsUtil.create(targetPath, options)) { writer.write(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))); } + // Read content into a buffer StringBuilder readContent = new StringBuilder(); try (ReadableByteChannel reader = gcsUtil.open(targetPath)) { ByteBuffer buffer = ByteBuffer.allocate(1024); @@ -673,6 +675,8 @@ public void testWriteAndRead() throws IOException { buffer.clear(); } } + + // Verify content assertEquals(content, readContent.toString()); } finally { tearDownTestBucketHelper(bucketName); From 2b1e93ac6005f9762d88c48bcfe2370edbc585e4 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 23 Mar 2026 08:57:34 -0400 Subject: [PATCH 4/5] Rename the new create method to createV2. --- .../java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 03733463e5c4..ed727d495cf8 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -272,7 +272,7 @@ public WritableByteChannel create(GcsPath path, CreateOptions options) throws IO return delegate.create(path, options.delegate); } - public WritableByteChannel create( + public WritableByteChannel createV2( GcsPath path, CreateOptions options, BlobWriteOption... writeOptions) throws IOException { if (delegateV2 != null) { return delegateV2.create(path, options.delegate, writeOptions); From 5043eb04a783440a80536790b0161fc1e4f3ec73 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 5 May 2026 16:01:15 -0400 Subject: [PATCH 5/5] Revise according to reviewer comments. --- .../sdk/extensions/gcp/util/GcsUtilV2.java | 18 +++++- .../gcp/util/GcsUtilParameterizedIT.java | 55 ++++++++++--------- 2 files changed, 43 insertions(+), 30 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index 5a422001c572..9119dd79652e 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -42,6 +42,7 @@ import com.google.cloud.storage.Storage.CopyRequest; import com.google.cloud.storage.StorageBatch; import com.google.cloud.storage.StorageBatchResult; +import com.google.cloud.storage.StorageChannelUtils; import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; import java.io.FileNotFoundException; @@ -508,7 +509,7 @@ private static class GcsSeekableByteChannel implements SeekableByteChannel { @Override public int read(ByteBuffer dst) throws IOException { - int count = reader.read(dst); + int count = StorageChannelUtils.blockingFillFrom(dst, reader); if (count > 0) { this.position += count; } @@ -561,8 +562,10 @@ public void close() throws IOException { public SeekableByteChannel open(GcsPath path, BlobSourceOption... sourceOptions) throws IOException { Blob blob = getBlob(path, BlobGetOption.fields(BlobField.SIZE)); - return new GcsSeekableByteChannel( - blob.getStorage().reader(blob.getBlobId(), sourceOptions), blob.getSize()); + ReadChannel reader = blob.getStorage().reader(blob.getBlobId(), sourceOptions); + // disable internal buffering, and make the channel non-blocking + reader.setChunkSize(0); + return new GcsSeekableByteChannel(reader, blob.getSize()); } /** A bridge that allows a GCS WriteChannel to behave as a WritableByteChannel. */ @@ -611,6 +614,15 @@ public WritableByteChannel create( List writeOptionList = new ArrayList<>(Arrays.asList(writeOptions)); if (options.getExpectFileToNotExist()) { writeOptionList.add(BlobWriteOption.doesNotExist()); + } else { + // We do not merge this check with the getExpectFileToNotExist() branch above + // because we don't want to always make the storage.get() RPC call. + Blob blob = storage.get(path.getBucket(), path.getObject()); + if (blob == null) { + writeOptionList.add(BlobWriteOption.doesNotExist()); + } else { + writeOptionList.add(BlobWriteOption.generationMatch(blob.getGeneration())); + } } // Open a WriteChannel from the storage service WriteChannel writer = diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java index 7816ce603001..5759bb10a654 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.extensions.gcp.util; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; @@ -28,6 +29,8 @@ import com.google.api.services.storage.model.StorageObject; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.StorageChannelUtils; +import java.io.ByteArrayOutputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; @@ -605,6 +608,19 @@ private void assertNotExists(GcsPath path) throws IOException { } } + String computeHash(ByteBuffer buffer) throws NoSuchAlgorithmException { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + digest.update(buffer); + byte[] hashBytes = digest.digest(); + + // Convert bytes to Hex String + StringBuilder sb = new StringBuilder(); + for (byte b : hashBytes) { + sb.append(String.format("%02x", b)); + } + return sb.toString(); + } + @Test public void testRead() throws IOException, NoSuchAlgorithmException { final GcsPath gcsPath = GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt"); @@ -616,16 +632,10 @@ public void testRead() throws IOException, NoSuchAlgorithmException { assertEquals(expectedSize, channel.size()); assertEquals(0, channel.position()); - // Read content into ByteBuffer - ByteBuffer buffer = ByteBuffer.allocate((int) expectedSize); - int bytesRead = 0; - while (buffer.hasRemaining()) { - int read = channel.read(buffer); - if (read == -1) { - break; - } - bytesRead += read; - } + // Read content into ByteBuffer. + // Allocate a larger buffer to ensure we receive the EOF at the expected place. + ByteBuffer buffer = ByteBuffer.allocate((int) expectedSize + 1024); + int bytesRead = StorageChannelUtils.blockingFillFrom(buffer, channel); // Verify total bytes read and position assertEquals(expectedSize, bytesRead); @@ -635,17 +645,7 @@ public void testRead() throws IOException, NoSuchAlgorithmException { buffer.flip(); // Verify hash - MessageDigest digest = MessageDigest.getInstance("SHA-256"); - digest.update(buffer); - byte[] hashBytes = digest.digest(); - - // Convert bytes to Hex String - StringBuilder sb = new StringBuilder(); - for (byte b : hashBytes) { - sb.append(String.format("%02x", b)); - } - String actualHash = sb.toString(); - + String actualHash = computeHash(buffer); assertEquals("Content hash should match", expectedHash, actualHash); } } @@ -653,8 +653,9 @@ public void testRead() throws IOException, NoSuchAlgorithmException { @Test public void testWriteAndRead() throws IOException { final String bucketName = "apache-beam-temp-bucket-12345"; - final GcsPath targetPath = GcsPath.fromComponents(bucketName, "test-object.txt"); - final String content = "Hello, GCS!"; + final GcsPath targetPath = + GcsPath.fromComponents(bucketName, "test-object-" + java.util.UUID.randomUUID() + ".txt"); + final byte[] content = "Hello, GCS!".getBytes(StandardCharsets.UTF_8); try { createTestBucketHelper(bucketName, false); @@ -662,22 +663,22 @@ public void testWriteAndRead() throws IOException { // Write content to a GCS file CreateOptions options = CreateOptions.builder().setExpectFileToNotExist(true).build(); try (WritableByteChannel writer = gcsUtil.create(targetPath, options)) { - writer.write(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))); + writer.write(ByteBuffer.wrap(content)); } // Read content into a buffer - StringBuilder readContent = new StringBuilder(); + ByteArrayOutputStream readContent = new ByteArrayOutputStream(); try (ReadableByteChannel reader = gcsUtil.open(targetPath)) { ByteBuffer buffer = ByteBuffer.allocate(1024); while (reader.read(buffer) != -1) { buffer.flip(); - readContent.append(StandardCharsets.UTF_8.decode(buffer)); + readContent.write(buffer.array(), 0, buffer.limit()); buffer.clear(); } } // Verify content - assertEquals(content, readContent.toString()); + assertArrayEquals(content, readContent.toByteArray()); } finally { tearDownTestBucketHelper(bucketName); }