diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index e3f01dd85295..ed727d495cf8 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -25,6 +25,8 @@ import com.google.cloud.storage.BucketInfo; import com.google.cloud.storage.Storage.BlobGetOption; import com.google.cloud.storage.Storage.BlobListOption; +import com.google.cloud.storage.Storage.BlobSourceOption; +import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.Storage.BucketGetOption; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; @@ -186,9 +188,19 @@ public Page listBlobs( } public SeekableByteChannel open(GcsPath path) throws IOException { + if (delegateV2 != null) { + return delegateV2.open(path); + } return delegate.open(path); } + public SeekableByteChannel openV2(GcsPath path, BlobSourceOption... options) throws IOException { + if (delegateV2 != null) { + return delegateV2.open(path, options); + } + throw new IOException("GcsUtil V2 not initialized."); + } + /** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */ @Deprecated public WritableByteChannel create(GcsPath path, String type) throws IOException { @@ -254,9 +266,20 @@ public CreateOptions build() { } public WritableByteChannel create(GcsPath path, CreateOptions options) throws IOException { + if (delegateV2 != null) { + delegateV2.create(path, options.delegate); + } return delegate.create(path, options.delegate); } + public WritableByteChannel createV2( + GcsPath path, CreateOptions options, BlobWriteOption... writeOptions) throws IOException { + if (delegateV2 != null) { + return delegateV2.create(path, options.delegate, writeOptions); + } + throw new IOException("GcsUtil V2 not initialized."); + } + public void verifyBucketAccessible(GcsPath path) throws IOException { if (delegateV2 != null) { delegateV2.verifyBucketAccessible(path); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index b00b7ce0d728..9119dd79652e 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -23,6 +23,8 @@ import com.google.api.gax.paging.Page; import com.google.auto.value.AutoValue; +import com.google.cloud.ReadChannel; +import com.google.cloud.WriteChannel; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; @@ -33,21 +35,29 @@ import com.google.cloud.storage.Storage.BlobField; import com.google.cloud.storage.Storage.BlobGetOption; import com.google.cloud.storage.Storage.BlobListOption; +import com.google.cloud.storage.Storage.BlobSourceOption; +import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.Storage.BucketField; import com.google.cloud.storage.Storage.BucketGetOption; import com.google.cloud.storage.Storage.CopyRequest; import com.google.cloud.storage.StorageBatch; import com.google.cloud.storage.StorageBatchResult; +import com.google.cloud.storage.StorageChannelUtils; import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; +import java.nio.channels.WritableByteChannel; import java.nio.file.AccessDeniedException; import java.nio.file.FileAlreadyExistsException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.PipelineOptions; @@ -70,6 +80,8 @@ public GcsUtilV2 create(PipelineOptions options) { private Storage storage; + private final @Nullable Integer uploadBufferSizeBytes; + /** Maximum number of items to retrieve per Objects.List request. */ private static final long MAX_LIST_BLOBS_PER_CALL = 1024; @@ -85,13 +97,14 @@ public GcsUtilV2 create(PipelineOptions options) { GcsUtilV2(PipelineOptions options) { String projectId = options.as(GcpOptions.class).getProject(); storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService(); + uploadBufferSizeBytes = options.as(GcsOptions.class).getGcsUploadBufferSizeBytes(); } @SuppressWarnings({ "nullness" // For Creating AccessDeniedException FileNotFoundException, and // FileAlreadyExistsException with null. }) - private IOException translateStorageException(GcsPath gcsPath, StorageException e) { + private static IOException translateStorageException(GcsPath gcsPath, StorageException e) { switch (e.getCode()) { case 403: return new AccessDeniedException(gcsPath.toString(), null, e.getMessage()); @@ -481,4 +494,152 @@ public void removeBucket(BucketInfo bucketInfo) throws IOException { throw translateStorageException(bucketInfo.getName(), null, e); } } + + /** A bridge that allows a GCS ReadChannel to behave as a SeekableByteChannel. */ + private static class GcsSeekableByteChannel implements SeekableByteChannel { + private final ReadChannel reader; + private final long size; + private long position = 0; + + GcsSeekableByteChannel(ReadChannel reader, long size) { + this.reader = reader; + this.size = size; + this.position = 0; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + int count = StorageChannelUtils.blockingFillFrom(dst, reader); + if (count > 0) { + this.position += count; + } + return count; + } + + @Override + public SeekableByteChannel position(long newPosition) throws IOException { + checkArgument(newPosition >= 0, "Position must be non-negative: %s", newPosition); + reader.seek(newPosition); + this.position = newPosition; + return this; + } + + @Override + public long position() throws IOException { + return this.position; + } + + @Override + public long size() throws IOException { + return size; + } + + @Override + public SeekableByteChannel truncate(long size) throws IOException { + throw new UnsupportedOperationException( + "GcsSeekableByteChannels are read-only and cannot be truncated."); + } + + @Override + public int write(ByteBuffer src) throws IOException { + throw new UnsupportedOperationException( + "GcsSeekableByteChannel are read-only and does not support writing."); + } + + @Override + public boolean isOpen() { + return reader.isOpen(); + } + + @Override + public void close() throws IOException { + if (isOpen()) { + reader.close(); + } + } + } + + public SeekableByteChannel open(GcsPath path, BlobSourceOption... sourceOptions) + throws IOException { + Blob blob = getBlob(path, BlobGetOption.fields(BlobField.SIZE)); + ReadChannel reader = blob.getStorage().reader(blob.getBlobId(), sourceOptions); + // disable internal buffering, and make the channel non-blocking + reader.setChunkSize(0); + return new GcsSeekableByteChannel(reader, blob.getSize()); + } + + /** A bridge that allows a GCS WriteChannel to behave as a WritableByteChannel. */ + private static class GcsWritableByteChannel implements WritableByteChannel { + private final WriteChannel writer; + private final GcsPath gcsPath; + + GcsWritableByteChannel(WriteChannel writer, GcsPath gcsPath) { + this.writer = writer; + this.gcsPath = gcsPath; + } + + @Override + public int write(ByteBuffer src) throws IOException { + try { + return writer.write(src); + } catch (StorageException e) { + throw translateStorageException(gcsPath, e); + } + } + + @Override + public boolean isOpen() { + return writer.isOpen(); + } + + @Override + public void close() throws IOException { + writer.close(); + } + } + + public WritableByteChannel create( + GcsPath path, GcsUtilV1.CreateOptions options, BlobWriteOption... writeOptions) + throws IOException { + try { + // Define the metadata for the new object + BlobInfo.Builder builder = BlobInfo.newBuilder(path.getBucket(), path.getObject()); + String type = options.getContentType(); + if (type != null) { + builder.setContentType(type); + } + + BlobInfo blobInfo = builder.build(); + + List writeOptionList = new ArrayList<>(Arrays.asList(writeOptions)); + if (options.getExpectFileToNotExist()) { + writeOptionList.add(BlobWriteOption.doesNotExist()); + } else { + // We do not merge this check with the getExpectFileToNotExist() branch above + // because we don't want to always make the storage.get() RPC call. + Blob blob = storage.get(path.getBucket(), path.getObject()); + if (blob == null) { + writeOptionList.add(BlobWriteOption.doesNotExist()); + } else { + writeOptionList.add(BlobWriteOption.generationMatch(blob.getGeneration())); + } + } + // Open a WriteChannel from the storage service + WriteChannel writer = + storage.writer(blobInfo, writeOptionList.toArray(new BlobWriteOption[0])); + Integer uploadBufferSizeBytes = + options.getUploadBufferSizeBytes() != null + ? options.getUploadBufferSizeBytes() + : this.uploadBufferSizeBytes; + if (uploadBufferSizeBytes != null) { + writer.setChunkSize(uploadBufferSizeBytes); + } + + // Return the bridge wrapper + return new GcsWritableByteChannel(writer, path); + + } catch (StorageException e) { + throw translateStorageException(path, e); + } + } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java index 80ffd72924fa..5759bb10a654 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.extensions.gcp.util; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; @@ -28,15 +29,25 @@ import com.google.api.services.storage.model.StorageObject; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.StorageChannelUtils; +import java.io.ByteArrayOutputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.charset.StandardCharsets; import java.nio.file.AccessDeniedException; import java.nio.file.FileAlreadyExistsException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.CreateOptions; import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy; import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.OverwriteStrategy; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; @@ -301,7 +312,8 @@ public void testCreateAndRemoveBucket() throws IOException { } } - private List createTestBucketHelper(String bucketName) throws IOException { + private List createTestBucketHelper(String bucketName, boolean copyData) + throws IOException { final List originPaths = Arrays.asList( GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardii.txt"), @@ -316,16 +328,24 @@ private List createTestBucketHelper(String bucketName) throws IOExcepti if (experiment.equals("use_gcsutil_v2")) { gcsUtil.createBucket(BucketInfo.of(bucketName)); - gcsUtil.copyV2(originPaths, testPaths); + if (copyData) { + gcsUtil.copyV2(originPaths, testPaths); + } else { + return Collections.emptyList(); + } } else { GcsOptions gcsOptions = options.as(GcsOptions.class); gcsUtil.createBucket(gcsOptions.getProject(), new Bucket().setName(bucketName)); - final List originList = - originPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); - final List testList = - testPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); - gcsUtil.copy(originList, testList); + if (copyData) { + final List originList = + originPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + final List testList = + testPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + gcsUtil.copy(originList, testList); + } else { + return Collections.emptyList(); + } } return testPaths; @@ -355,7 +375,7 @@ public void testCopy() throws IOException { final String nonExistentBucket = "my-random-test-bucket-12345"; try { - final List srcPaths = createTestBucketHelper(existingBucket); + final List srcPaths = createTestBucketHelper(existingBucket, true); final List dstPaths = srcPaths.stream() .map(o -> GcsPath.fromComponents(existingBucket, o.getObject() + ".bak")) @@ -423,7 +443,7 @@ public void testRemove() throws IOException { final String nonExistentBucket = "my-random-test-bucket-12345"; try { - final List srcPaths = createTestBucketHelper(existingBucket); + final List srcPaths = createTestBucketHelper(existingBucket, true); final List errPaths = srcPaths.stream() .map(o -> GcsPath.fromComponents(nonExistentBucket, o.getObject())) @@ -485,7 +505,7 @@ public void testRename() throws IOException { final String nonExistentBucket = "my-random-test-bucket-12345"; try { - final List srcPaths = createTestBucketHelper(existingBucket); + final List srcPaths = createTestBucketHelper(existingBucket, true); final List tmpPaths = srcPaths.stream() .map(o -> GcsPath.fromComponents(existingBucket, "tmp/" + o.getObject())) @@ -587,4 +607,80 @@ private void assertNotExists(GcsPath path) throws IOException { assertThrows(FileNotFoundException.class, () -> gcsUtil.getObject(path)); } } + + String computeHash(ByteBuffer buffer) throws NoSuchAlgorithmException { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + digest.update(buffer); + byte[] hashBytes = digest.digest(); + + // Convert bytes to Hex String + StringBuilder sb = new StringBuilder(); + for (byte b : hashBytes) { + sb.append(String.format("%02x", b)); + } + return sb.toString(); + } + + @Test + public void testRead() throws IOException, NoSuchAlgorithmException { + final GcsPath gcsPath = GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt"); + final String expectedHash = "674a2725884307c96398440497c889ad8cecccedf5689df85e6b0faabe4e0fe8"; + final long expectedSize = 157283L; + + try (SeekableByteChannel channel = gcsUtil.open(gcsPath)) { + // Verify Size + assertEquals(expectedSize, channel.size()); + assertEquals(0, channel.position()); + + // Read content into ByteBuffer. + // Allocate a larger buffer to ensure we receive the EOF at the expected place. + ByteBuffer buffer = ByteBuffer.allocate((int) expectedSize + 1024); + int bytesRead = StorageChannelUtils.blockingFillFrom(buffer, channel); + + // Verify total bytes read and position + assertEquals(expectedSize, bytesRead); + assertEquals(expectedSize, channel.position()); + + // Flip the buffer to prepare it for reading (sets limit to current position, position to 0) + buffer.flip(); + + // Verify hash + String actualHash = computeHash(buffer); + assertEquals("Content hash should match", expectedHash, actualHash); + } + } + + @Test + public void testWriteAndRead() throws IOException { + final String bucketName = "apache-beam-temp-bucket-12345"; + final GcsPath targetPath = + GcsPath.fromComponents(bucketName, "test-object-" + java.util.UUID.randomUUID() + ".txt"); + final byte[] content = "Hello, GCS!".getBytes(StandardCharsets.UTF_8); + + try { + createTestBucketHelper(bucketName, false); + + // Write content to a GCS file + CreateOptions options = CreateOptions.builder().setExpectFileToNotExist(true).build(); + try (WritableByteChannel writer = gcsUtil.create(targetPath, options)) { + writer.write(ByteBuffer.wrap(content)); + } + + // Read content into a buffer + ByteArrayOutputStream readContent = new ByteArrayOutputStream(); + try (ReadableByteChannel reader = gcsUtil.open(targetPath)) { + ByteBuffer buffer = ByteBuffer.allocate(1024); + while (reader.read(buffer) != -1) { + buffer.flip(); + readContent.write(buffer.array(), 0, buffer.limit()); + buffer.clear(); + } + } + + // Verify content + assertArrayEquals(content, readContent.toByteArray()); + } finally { + tearDownTestBucketHelper(bucketName); + } + } }