diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java index 2bde6939651d..ff5bce7f5089 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java @@ -950,6 +950,27 @@ public OmMultipartUploadCompleteInfo completeMultipartUpload(String key, partsMap); } + /** + * Complete Multipart upload with conditional write support. + * This will combine all the parts and make the key visible in ozone, + * but only if the specified preconditions are met. + * + * @param key key name + * @param uploadID multipart upload ID + * @param partsMap map of part numbers to ETags + * @param expectedDataGeneration expected data generation for conditional write + * (use OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS for If-None-Match: *) + * @param expectedETag expected ETag for conditional write (for If-Match) + * @return OmMultipartUploadCompleteInfo + * @throws IOException if precondition fails or other I/O error occurs + */ + public OmMultipartUploadCompleteInfo completeMultipartUpload(String key, + String uploadID, Map partsMap, + Long expectedDataGeneration, String expectedETag) throws IOException { + return proxy.completeMultipartUpload(volumeName, name, key, uploadID, + partsMap, expectedDataGeneration, expectedETag); + } + /** * Abort multipart upload request. * @param keyName diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java index f1570143dfdd..c8611043fe44 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java @@ -727,6 +727,27 @@ OmMultipartUploadCompleteInfo completeMultipartUpload(String volumeName, String bucketName, String keyName, String uploadID, Map partsMap) throws IOException; + /** + * Complete Multipart upload with conditional write support. + * This will combine all the parts and make the key visible in ozone, + * but only if the specified preconditions are met. + * + * @param volumeName volume name + * @param bucketName bucket name + * @param keyName key name + * @param uploadID multipart upload ID + * @param partsMap map of part numbers to ETags + * @param expectedDataGeneration expected data generation for conditional write + * (use OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS for If-None-Match: *) + * @param expectedETag expected ETag for conditional write (for If-Match) + * @return OmMultipartUploadCompleteInfo + * @throws IOException if precondition fails or other I/O error occurs + */ + OmMultipartUploadCompleteInfo completeMultipartUpload(String volumeName, + String bucketName, String keyName, String uploadID, + Map partsMap, + Long expectedDataGeneration, String expectedETag) throws IOException; + /** * Abort Multipart upload request for the given key with given uploadID. * @param volumeName diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index f02e65a7ce58..6fe64a263765 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -2154,6 +2154,39 @@ public OmMultipartUploadCompleteInfo completeMultipartUpload( } + @Override + public OmMultipartUploadCompleteInfo completeMultipartUpload( + String volumeName, String bucketName, String keyName, String uploadID, + Map partsMap, + Long expectedDataGeneration, String expectedETag) throws IOException { + verifyVolumeName(volumeName); + verifyBucketName(bucketName); + HddsClientUtils.checkNotNull(keyName, uploadID); + String ownerName = getRealUserInfo().getShortUserName(); + + OmKeyArgs.Builder builder = new OmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setMultipartUploadID(uploadID) + .setOwnerName(ownerName); + + if (expectedDataGeneration != null) { + builder.setExpectedDataGeneration(expectedDataGeneration); + } + if (expectedETag != null) { + builder.setExpectedETag(expectedETag); + } + + OmKeyArgs keyArgs = builder.build(); + + OmMultipartUploadCompleteList omMultipartUploadCompleteList = + new OmMultipartUploadCompleteList(partsMap); + + return ozoneManagerClient.completeMultipartUpload(keyArgs, + omMultipartUploadCompleteList); + } + @Override public void abortMultipartUpload(String volumeName, String bucketName, String keyName, String uploadID) throws IOException { diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java index f81432957352..604298b3c89c 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java @@ -1805,6 +1805,13 @@ public OmMultipartUploadCompleteInfo completeMultipartUpload( OzoneAcl.toProtobuf(a)).collect(Collectors.toList())); } + if (omKeyArgs.getExpectedDataGeneration() != null) { + keyArgs.setExpectedDataGeneration(omKeyArgs.getExpectedDataGeneration()); + } + if (omKeyArgs.getExpectedETag() != null) { + keyArgs.setExpectedETag(omKeyArgs.getExpectedETag()); + } + multipartUploadCompleteRequest.setKeyArgs(keyArgs.build()); multipartUploadCompleteRequest.addAllPartsList(multipartUploadList .getPartsList()); diff --git a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java index c39a840d375b..42c4de1d503d 100644 --- a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java +++ b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java @@ -655,6 +655,208 @@ public void testMultipartUploadPartWithWrongMD5Header(String wrongMd5Base64, Str assertFalse(s3Client.doesObjectExist(bucketName, keyName)); } + @Test + public void testCompleteMultipartUploadIfNoneMatch() throws Exception { + final String bucketName = getBucketName(); + final String keyName = getKeyName(); + s3Client.createBucket(bucketName); + + // Initiate and complete multipart upload with If-None-Match: * + InitiateMultipartUploadRequest initRequest = + new InitiateMultipartUploadRequest(bucketName, keyName); + InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest); + String uploadId = initResponse.getUploadId(); + + // Upload a part + String partContent = "part1data"; + byte[] partBytes = partContent.getBytes(StandardCharsets.UTF_8); + UploadPartRequest uploadRequest = new UploadPartRequest() + .withBucketName(bucketName) + .withKey(keyName) + .withUploadId(uploadId) + .withPartNumber(1) + .withInputStream(new ByteArrayInputStream(partBytes)) + .withPartSize(partBytes.length); + UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest); + + // Complete with If-None-Match: * (key doesn't exist, should succeed) + List partETags = Collections.singletonList(uploadResult.getPartETag()); + CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest( + bucketName, keyName, uploadId, partETags); + completeRequest.putCustomRequestHeader("If-None-Match", "*"); + + CompleteMultipartUploadResult result = s3Client.completeMultipartUpload(completeRequest); + assertNotNull(result.getETag()); + + // Verify object was created + assertTrue(s3Client.doesObjectExist(bucketName, keyName)); + } + + @Test + public void testCompleteMultipartUploadIfNoneMatchFail() throws Exception { + final String bucketName = getBucketName(); + final String keyName = getKeyName(); + s3Client.createBucket(bucketName); + + // First, create an existing key + s3Client.putObject(bucketName, keyName, + new ByteArrayInputStream("existing".getBytes(StandardCharsets.UTF_8)), + new ObjectMetadata()); + + // Initiate multipart upload for same key + InitiateMultipartUploadRequest initRequest = + new InitiateMultipartUploadRequest(bucketName, keyName); + InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest); + String uploadId = initResponse.getUploadId(); + + // Upload a part + String partContent = "part1data"; + byte[] partBytes = partContent.getBytes(StandardCharsets.UTF_8); + UploadPartRequest uploadRequest = new UploadPartRequest() + .withBucketName(bucketName) + .withKey(keyName) + .withUploadId(uploadId) + .withPartNumber(1) + .withInputStream(new ByteArrayInputStream(partBytes)) + .withPartSize(partBytes.length); + UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest); + + // Complete with If-None-Match: * (key exists, should fail) + List partETags = Collections.singletonList(uploadResult.getPartETag()); + CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest( + bucketName, keyName, uploadId, partETags); + completeRequest.putCustomRequestHeader("If-None-Match", "*"); + + AmazonServiceException ase = assertThrows(AmazonServiceException.class, + () -> s3Client.completeMultipartUpload(completeRequest)); + + assertEquals(ErrorType.Client, ase.getErrorType()); + assertEquals(412, ase.getStatusCode()); + assertEquals("PreconditionFailed", ase.getErrorCode()); + } + + @Test + public void testCompleteMultipartUploadIfMatch() throws Exception { + final String bucketName = getBucketName(); + final String keyName = getKeyName(); + s3Client.createBucket(bucketName); + + // First, create an existing key and get its ETag + PutObjectResult existingResult = s3Client.putObject(bucketName, keyName, + new ByteArrayInputStream("existing".getBytes(StandardCharsets.UTF_8)), + new ObjectMetadata()); + String existingETag = existingResult.getETag(); + + // Initiate multipart upload for same key + InitiateMultipartUploadRequest initRequest = + new InitiateMultipartUploadRequest(bucketName, keyName); + InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest); + String uploadId = initResponse.getUploadId(); + + // Upload a part + String partContent = "newcontent"; + byte[] partBytes = partContent.getBytes(StandardCharsets.UTF_8); + UploadPartRequest uploadRequest = new UploadPartRequest() + .withBucketName(bucketName) + .withKey(keyName) + .withUploadId(uploadId) + .withPartNumber(1) + .withInputStream(new ByteArrayInputStream(partBytes)) + .withPartSize(partBytes.length); + UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest); + + // Complete with If-Match: (should succeed) + List partETags = Collections.singletonList(uploadResult.getPartETag()); + CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest( + bucketName, keyName, uploadId, partETags); + completeRequest.putCustomRequestHeader("If-Match", "\"" + existingETag + "\""); + + CompleteMultipartUploadResult result = s3Client.completeMultipartUpload(completeRequest); + assertNotNull(result.getETag()); + assertNotEquals(existingETag, stripQuotes(result.getETag())); + } + + @Test + public void testCompleteMultipartUploadIfMatchFail() throws Exception { + final String bucketName = getBucketName(); + final String keyName = getKeyName(); + s3Client.createBucket(bucketName); + + // First, create an existing key + s3Client.putObject(bucketName, keyName, + new ByteArrayInputStream("existing".getBytes(StandardCharsets.UTF_8)), + new ObjectMetadata()); + + // Initiate multipart upload for same key + InitiateMultipartUploadRequest initRequest = + new InitiateMultipartUploadRequest(bucketName, keyName); + InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest); + String uploadId = initResponse.getUploadId(); + + // Upload a part + String partContent = "newcontent"; + byte[] partBytes = partContent.getBytes(StandardCharsets.UTF_8); + UploadPartRequest uploadRequest = new UploadPartRequest() + .withBucketName(bucketName) + .withKey(keyName) + .withUploadId(uploadId) + .withPartNumber(1) + .withInputStream(new ByteArrayInputStream(partBytes)) + .withPartSize(partBytes.length); + UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest); + + // Complete with If-Match: wrong-etag (should fail) + List partETags = Collections.singletonList(uploadResult.getPartETag()); + CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest( + bucketName, keyName, uploadId, partETags); + completeRequest.putCustomRequestHeader("If-Match", "\"wrong-etag\""); + + AmazonServiceException ase = assertThrows(AmazonServiceException.class, + () -> s3Client.completeMultipartUpload(completeRequest)); + + assertEquals(ErrorType.Client, ase.getErrorType()); + assertEquals(412, ase.getStatusCode()); + assertEquals("PreconditionFailed", ase.getErrorCode()); + } + + @Test + public void testCompleteMultipartUploadIfMatchMissingKeyFail() throws Exception { + final String bucketName = getBucketName(); + final String keyName = getKeyName(); + s3Client.createBucket(bucketName); + + // Initiate multipart upload (key doesn't exist) + InitiateMultipartUploadRequest initRequest = + new InitiateMultipartUploadRequest(bucketName, keyName); + InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest); + String uploadId = initResponse.getUploadId(); + + // Upload a part + String partContent = "newcontent"; + byte[] partBytes = partContent.getBytes(StandardCharsets.UTF_8); + UploadPartRequest uploadRequest = new UploadPartRequest() + .withBucketName(bucketName) + .withKey(keyName) + .withUploadId(uploadId) + .withPartNumber(1) + .withInputStream(new ByteArrayInputStream(partBytes)) + .withPartSize(partBytes.length); + UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest); + + // Complete with If-Match on non-existent key (should fail) + List partETags = Collections.singletonList(uploadResult.getPartETag()); + CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest( + bucketName, keyName, uploadId, partETags); + completeRequest.putCustomRequestHeader("If-Match", "\"some-etag\""); + + AmazonServiceException ase = assertThrows(AmazonServiceException.class, + () -> s3Client.completeMultipartUpload(completeRequest)); + + assertEquals(ErrorType.Client, ase.getErrorType()); + assertEquals(412, ase.getStatusCode()); + assertEquals("PreconditionFailed", ase.getErrorCode()); + } + @Test public void testPutDoubleSlashPrefixObject() throws IOException { final String bucketName = getBucketName(); diff --git a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java index 51d0d7bbd204..3204e3fe5ff7 100644 --- a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java +++ b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java @@ -643,6 +643,217 @@ public void testMultipartUploadPartWithWrongMD5Header(String wrongMd5Base64, Str assertThrows(NoSuchKeyException.class, () -> s3Client.headObject(b -> b.bucket(bucketName).key(keyName))); } + @Test + public void testCompleteMultipartUploadIfNoneMatch() { + final String bucketName = getBucketName(); + final String keyName = getKeyName(); + s3Client.createBucket(b -> b.bucket(bucketName)); + + // Initiate multipart upload + CreateMultipartUploadResponse createResponse = s3Client.createMultipartUpload(b -> b + .bucket(bucketName) + .key(keyName)); + String uploadId = createResponse.uploadId(); + + // Upload a part + String partContent = "part1data"; + byte[] partBytes = partContent.getBytes(StandardCharsets.UTF_8); + UploadPartResponse partResponse = s3Client.uploadPart(b -> b + .bucket(bucketName) + .key(keyName) + .uploadId(uploadId) + .partNumber(1), + RequestBody.fromBytes(partBytes)); + + // Complete with If-None-Match: * (key doesn't exist, should succeed) + CompletedPart completedPart = CompletedPart.builder() + .partNumber(1) + .eTag(partResponse.eTag()) + .build(); + + CompleteMultipartUploadResponse result = s3Client.completeMultipartUpload(b -> b + .bucket(bucketName) + .key(keyName) + .uploadId(uploadId) + .ifNoneMatch("*") + .multipartUpload(CompletedMultipartUpload.builder().parts(completedPart).build())); + + assertNotNull(result.eTag()); + + // Verify object was created + assertDoesNotThrow(() -> s3Client.headObject(b -> b.bucket(bucketName).key(keyName))); + } + + @Test + public void testCompleteMultipartUploadIfNoneMatchFail() { + final String bucketName = getBucketName(); + final String keyName = getKeyName(); + s3Client.createBucket(b -> b.bucket(bucketName)); + + // First, create an existing key + s3Client.putObject(b -> b.bucket(bucketName).key(keyName), + RequestBody.fromString("existing")); + + // Initiate multipart upload for same key + CreateMultipartUploadResponse createResponse = s3Client.createMultipartUpload(b -> b + .bucket(bucketName) + .key(keyName)); + String uploadId = createResponse.uploadId(); + + // Upload a part + String partContent = "part1data"; + byte[] partBytes = partContent.getBytes(StandardCharsets.UTF_8); + UploadPartResponse partResponse = s3Client.uploadPart(b -> b + .bucket(bucketName) + .key(keyName) + .uploadId(uploadId) + .partNumber(1), + RequestBody.fromBytes(partBytes)); + + // Complete with If-None-Match: * (key exists, should fail) + CompletedPart completedPart = CompletedPart.builder() + .partNumber(1) + .eTag(partResponse.eTag()) + .build(); + + S3Exception exception = assertThrows(S3Exception.class, () -> s3Client.completeMultipartUpload(b -> b + .bucket(bucketName) + .key(keyName) + .uploadId(uploadId) + .ifNoneMatch("*") + .multipartUpload(CompletedMultipartUpload.builder().parts(completedPart).build()))); + + assertEquals(412, exception.statusCode()); + assertEquals("PreconditionFailed", exception.awsErrorDetails().errorCode()); + } + + @Test + public void testCompleteMultipartUploadIfMatch() { + final String bucketName = getBucketName(); + final String keyName = getKeyName(); + s3Client.createBucket(b -> b.bucket(bucketName)); + + // First, create an existing key and get its ETag + PutObjectResponse existingResult = s3Client.putObject( + b -> b.bucket(bucketName).key(keyName), + RequestBody.fromString("existing")); + String existingETag = existingResult.eTag(); + + // Initiate multipart upload for same key + CreateMultipartUploadResponse createResponse = s3Client.createMultipartUpload(b -> b + .bucket(bucketName) + .key(keyName)); + String uploadId = createResponse.uploadId(); + + // Upload a part + String partContent = "newcontent"; + byte[] partBytes = partContent.getBytes(StandardCharsets.UTF_8); + UploadPartResponse partResponse = s3Client.uploadPart(b -> b + .bucket(bucketName) + .key(keyName) + .uploadId(uploadId) + .partNumber(1), + RequestBody.fromBytes(partBytes)); + + // Complete with If-Match: (should succeed) + CompletedPart completedPart = CompletedPart.builder() + .partNumber(1) + .eTag(partResponse.eTag()) + .build(); + + CompleteMultipartUploadResponse result = s3Client.completeMultipartUpload(b -> b + .bucket(bucketName) + .key(keyName) + .uploadId(uploadId) + .ifMatch(existingETag) + .multipartUpload(CompletedMultipartUpload.builder().parts(completedPart).build())); + + assertNotNull(result.eTag()); + assertNotEquals(existingETag, result.eTag()); + } + + @Test + public void testCompleteMultipartUploadIfMatchFail() { + final String bucketName = getBucketName(); + final String keyName = getKeyName(); + s3Client.createBucket(b -> b.bucket(bucketName)); + + // First, create an existing key + s3Client.putObject(b -> b.bucket(bucketName).key(keyName), + RequestBody.fromString("existing")); + + // Initiate multipart upload for same key + CreateMultipartUploadResponse createResponse = s3Client.createMultipartUpload(b -> b + .bucket(bucketName) + .key(keyName)); + String uploadId = createResponse.uploadId(); + + // Upload a part + String partContent = "newcontent"; + byte[] partBytes = partContent.getBytes(StandardCharsets.UTF_8); + UploadPartResponse partResponse = s3Client.uploadPart(b -> b + .bucket(bucketName) + .key(keyName) + .uploadId(uploadId) + .partNumber(1), + RequestBody.fromBytes(partBytes)); + + // Complete with If-Match: wrong-etag (should fail) + CompletedPart completedPart = CompletedPart.builder() + .partNumber(1) + .eTag(partResponse.eTag()) + .build(); + + S3Exception exception = assertThrows(S3Exception.class, () -> s3Client.completeMultipartUpload(b -> b + .bucket(bucketName) + .key(keyName) + .uploadId(uploadId) + .ifMatch("\"wrong-etag\"") + .multipartUpload(CompletedMultipartUpload.builder().parts(completedPart).build()))); + + assertEquals(412, exception.statusCode()); + assertEquals("PreconditionFailed", exception.awsErrorDetails().errorCode()); + } + + @Test + public void testCompleteMultipartUploadIfMatchMissingKeyFail() { + final String bucketName = getBucketName(); + final String keyName = getKeyName(); + s3Client.createBucket(b -> b.bucket(bucketName)); + + // Initiate multipart upload (key doesn't exist) + CreateMultipartUploadResponse createResponse = s3Client.createMultipartUpload(b -> b + .bucket(bucketName) + .key(keyName)); + String uploadId = createResponse.uploadId(); + + // Upload a part + String partContent = "newcontent"; + byte[] partBytes = partContent.getBytes(StandardCharsets.UTF_8); + UploadPartResponse partResponse = s3Client.uploadPart(b -> b + .bucket(bucketName) + .key(keyName) + .uploadId(uploadId) + .partNumber(1), + RequestBody.fromBytes(partBytes)); + + // Complete with If-Match on non-existent key (should fail) + CompletedPart completedPart = CompletedPart.builder() + .partNumber(1) + .eTag(partResponse.eTag()) + .build(); + + S3Exception exception = assertThrows(S3Exception.class, () -> s3Client.completeMultipartUpload(b -> b + .bucket(bucketName) + .key(keyName) + .uploadId(uploadId) + .ifMatch("\"some-etag\"") + .multipartUpload(CompletedMultipartUpload.builder().parts(completedPart).build()))); + + assertEquals(412, exception.statusCode()); + assertEquals("PreconditionFailed", exception.awsErrorDetails().errorCode()); + } + @Test public void testListObjectsMany() throws Exception { testListObjectsMany(false); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java index 8c77491ac86e..d9e04e5eed3a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java @@ -3935,6 +3935,159 @@ void testCommitPartAfterCompleteUpload() throws Exception { assertEquals(NO_SUCH_MULTIPART_UPLOAD_ERROR, ex.getResult()); } + @Test + public void testConditionalCompleteMultipartUploadIfNoneMatch() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + // Initiate and upload part + String uploadID = initiateMultipartUpload(bucket, keyName, anyReplication()); + byte[] data = generateData(5 * 1024 * 1024, (byte) 97); + Pair partInfo = uploadPart(bucket, keyName, uploadID, 1, data); + + Map partsMap = new LinkedHashMap<>(); + partsMap.put(1, partInfo.getValue()); + + // Complete with If-None-Match semantics (key doesn't exist, should succeed) + OmMultipartUploadCompleteInfo result = bucket.completeMultipartUpload( + keyName, uploadID, partsMap, + EXPECTED_GEN_CREATE_IF_NOT_EXISTS, null); + + assertNotNull(result); + assertEquals(keyName, result.getKey()); + assertNotNull(result.getHash()); + + // Verify object exists + OzoneKeyDetails keyDetails = bucket.getKey(keyName); + assertNotNull(keyDetails); + } + + @Test + public void testConditionalCompleteMultipartUploadIfNoneMatchFail() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + // First, create an existing key + createTestKey(bucket, keyName, "existing content"); + + // Initiate and upload part for same key + String uploadID = initiateMultipartUpload(bucket, keyName, anyReplication()); + byte[] data = generateData(5 * 1024 * 1024, (byte) 97); + Pair partInfo = uploadPart(bucket, keyName, uploadID, 1, data); + + Map partsMap = new LinkedHashMap<>(); + partsMap.put(1, partInfo.getValue()); + + // Complete with If-None-Match semantics (key exists, should fail) + OMException omEx = assertThrows(OMException.class, + () -> bucket.completeMultipartUpload(keyName, uploadID, partsMap, + EXPECTED_GEN_CREATE_IF_NOT_EXISTS, null)); + + assertEquals(KEY_ALREADY_EXISTS, omEx.getResult()); + } + + @Test + public void testConditionalCompleteMultipartUploadIfMatch() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + // First, create an existing key with ETag + OzoneKeyDetails existingKey = createTestKeyWithETag(bucket, keyName, "existing content"); + String existingETag = existingKey.getMetadata().get(ETAG); + assertNotNull(existingETag); + + // Initiate and upload part for same key + String uploadID = initiateMultipartUpload(bucket, keyName, anyReplication()); + byte[] data = generateData(5 * 1024 * 1024, (byte) 97); + Pair partInfo = uploadPart(bucket, keyName, uploadID, 1, data); + + Map partsMap = new LinkedHashMap<>(); + partsMap.put(1, partInfo.getValue()); + + // Complete with If-Match semantics (ETag matches, should succeed) + OmMultipartUploadCompleteInfo result = bucket.completeMultipartUpload( + keyName, uploadID, partsMap, null, existingETag); + + assertNotNull(result); + assertEquals(keyName, result.getKey()); + assertNotNull(result.getHash()); + } + + @Test + public void testConditionalCompleteMultipartUploadIfMatchFail() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + // First, create an existing key with ETag + createTestKeyWithETag(bucket, keyName, "existing content"); + + // Initiate and upload part for same key + String uploadID = initiateMultipartUpload(bucket, keyName, anyReplication()); + byte[] data = generateData(5 * 1024 * 1024, (byte) 97); + Pair partInfo = uploadPart(bucket, keyName, uploadID, 1, data); + + Map partsMap = new LinkedHashMap<>(); + partsMap.put(1, partInfo.getValue()); + + // Complete with If-Match semantics (wrong ETag, should fail) + OMException omEx = assertThrows(OMException.class, + () -> bucket.completeMultipartUpload(keyName, uploadID, partsMap, + null, "wrong-etag")); + + assertEquals(ETAG_MISMATCH, omEx.getResult()); + } + + @Test + public void testConditionalCompleteMultipartUploadIfMatchMissingKeyFail() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + // Initiate and upload part (key doesn't exist) + String uploadID = initiateMultipartUpload(bucket, keyName, anyReplication()); + byte[] data = generateData(5 * 1024 * 1024, (byte) 97); + Pair partInfo = uploadPart(bucket, keyName, uploadID, 1, data); + + Map partsMap = new LinkedHashMap<>(); + partsMap.put(1, partInfo.getValue()); + + // Complete with If-Match on non-existent key (should fail) + OMException omEx = assertThrows(OMException.class, + () -> bucket.completeMultipartUpload(keyName, uploadID, partsMap, + null, "some-etag")); + + assertEquals(KEY_NOT_FOUND, omEx.getResult()); + } + @Test public void testAbortUploadSuccessWithOutAnyParts() throws Exception { String volumeName = UUID.randomUUID().toString(); @@ -4617,6 +4770,13 @@ private OzoneKeyDetails createTestKeyWithETag(OzoneBucket bucket) UUID.randomUUID().toString().getBytes(UTF_8), metadata); } + private OzoneKeyDetails createTestKeyWithETag(OzoneBucket bucket, + String keyName, String content) throws IOException { + Map metadata = createTestKeyMetadata(); + metadata.put(ETAG, UUID.randomUUID().toString()); + return createTestKey(bucket, keyName, content.getBytes(UTF_8), metadata); + } + private static Map createTestKeyMetadata() { Map metadata = new HashMap<>(); metadata.put("key", RandomStringUtils.secure().nextAscii(10)); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java index b9cca39270c4..b82541791b75 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java @@ -475,57 +475,4 @@ public static OMRequest blockCreateKeyWithBucketLayoutFromOldClient( } return req; } - - protected void validateAtomicRewrite(OmKeyInfo dbKeyInfo, KeyArgs keyArgs) - throws OMException { - if (keyArgs.hasExpectedDataGeneration()) { - long expectedGen = keyArgs.getExpectedDataGeneration(); - // If expectedGen is EXPECTED_GEN_CREATE_IF_NOT_EXISTS, it means the key MUST NOT exist (If-None-Match) - if (expectedGen == OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS) { - if (dbKeyInfo != null) { - throw new OMException("Key already exists", - OMException.ResultCodes.KEY_ALREADY_EXISTS); - } - } else { - // If a key does not exist, or if it exists but the updateID do not match, then fail this request. - if (dbKeyInfo == null) { - throw new OMException("Key not found during expected rewrite", OMException.ResultCodes.KEY_NOT_FOUND); - } - if (dbKeyInfo.getUpdateID() != expectedGen) { - throw new OMException("Generation mismatch during expected rewrite", - OMException.ResultCodes.KEY_NOT_FOUND); - } - } - } - - } - - protected KeyArgs validateAndRewriteIfMatchAsExpectedGeneration( - KeyArgs keyArgs, OmKeyInfo dbKeyInfo) throws OMException { - if (!keyArgs.hasExpectedETag()) { - return keyArgs; - } - - String expectedETag = keyArgs.getExpectedETag(); - if (dbKeyInfo == null) { - throw new OMException("Key not found for If-Match", - OMException.ResultCodes.KEY_NOT_FOUND); - } - if (!dbKeyInfo.hasEtag()) { - throw new OMException("Key does not have an ETag", - OMException.ResultCodes.ETAG_NOT_AVAILABLE); - } - if (!dbKeyInfo.isEtagEquals(expectedETag)) { - throw new OMException("ETag mismatch", - OMException.ResultCodes.ETAG_MISMATCH); - } - if (keyArgs.hasExpectedDataGeneration()) { - return keyArgs; - } - - return keyArgs.toBuilder() - .setExpectedDataGeneration(dbKeyInfo.getUpdateID()) - .clearExpectedETag() - .build(); - } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java index 2485c5855f4d..77239ad28bc9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java @@ -1309,4 +1309,97 @@ protected void validateEncryptionKeyInfo(OmBucketInfo bucketInfo, KeyArgs keyArg keyArgs.getKeyName() + " in encrypted bucket " + keyArgs.getBucketName(), INVALID_REQUEST); } } + + /** + * Validates atomic rewrite conditions for conditional writes. + *

+ * For If-None-Match: * (expectedDataGeneration = EXPECTED_GEN_CREATE_IF_NOT_EXISTS), + * the key must NOT exist. + *

+ * For If-Match with a specific generation, the key must exist with matching updateID. + * + * @param dbKeyInfo the existing key info from the database (null if key doesn't exist) + * @param keyArgs the key arguments containing expected generation + * @throws OMException if validation fails + */ + protected void validateAtomicRewrite(OmKeyInfo dbKeyInfo, KeyArgs keyArgs) + throws OMException { + if (keyArgs.hasExpectedDataGeneration()) { + long expectedGen = keyArgs.getExpectedDataGeneration(); + // If expectedGen is EXPECTED_GEN_CREATE_IF_NOT_EXISTS, it means the key MUST NOT exist (If-None-Match) + if (expectedGen == OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS) { + if (dbKeyInfo != null) { + throw new OMException("Key already exists", + OMException.ResultCodes.KEY_ALREADY_EXISTS); + } + } else { + // If a key does not exist, or if it exists but the updateID do not match, then fail this request. + if (dbKeyInfo == null) { + throw new OMException("Key not found during expected rewrite", + OMException.ResultCodes.KEY_NOT_FOUND); + } + if (dbKeyInfo.getUpdateID() != expectedGen) { + throw new OMException("Generation mismatch during expected rewrite", + OMException.ResultCodes.KEY_NOT_FOUND); + } + } + } + } + + /** + * Validates If-Match ETag condition. + *

+ * This method checks if the existing key's ETag matches the expected ETag. + * Use this for single-phase operations (like MPU complete) where no rewrite is needed. + * + * @param keyArgs the key arguments containing expected ETag + * @param dbKeyInfo the existing key info from the database + * @throws OMException if validation fails + */ + protected void validateIfMatchETag(KeyArgs keyArgs, OmKeyInfo dbKeyInfo) + throws OMException { + if (!keyArgs.hasExpectedETag()) { + return; + } + + String expectedETag = keyArgs.getExpectedETag(); + if (dbKeyInfo == null) { + throw new OMException("Key not found for If-Match", + OMException.ResultCodes.KEY_NOT_FOUND); + } + if (!dbKeyInfo.hasEtag()) { + throw new OMException("Key does not have an ETag", + OMException.ResultCodes.ETAG_NOT_AVAILABLE); + } + if (!dbKeyInfo.isEtagEquals(expectedETag)) { + throw new OMException("ETag mismatch", + OMException.ResultCodes.ETAG_MISMATCH); + } + } + + /** + * Validates If-Match ETag condition and converts it to expectedDataGeneration. + *

+ * This method checks if the existing key's ETag matches the expected ETag. + * If it matches, the ETag condition is converted to a generation-based condition + * for atomic commit validation in two-phase operations (CreateKey → CommitKey). + * + * @param keyArgs the key arguments containing expected ETag + * @param dbKeyInfo the existing key info from the database + * @return updated KeyArgs with expectedDataGeneration set (if ETag matched) + * @throws OMException if validation fails + */ + protected KeyArgs validateAndRewriteIfMatchAsExpectedGeneration( + KeyArgs keyArgs, OmKeyInfo dbKeyInfo) throws OMException { + validateIfMatchETag(keyArgs, dbKeyInfo); + + if (!keyArgs.hasExpectedETag() || keyArgs.hasExpectedDataGeneration()) { + return keyArgs; + } + + return keyArgs.toBuilder() + .setExpectedDataGeneration(dbKeyInfo.getUpdateID()) + .clearExpectedETag() + .build(); + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java index cadd7f80b62f..179eb87aabae 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java @@ -269,6 +269,13 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR); } + // Conditional write validation (If-None-Match / If-Match). + // BUCKET_LOCK is held, so validation and commit are atomic. + // Only 412 PreconditionFailed is possible; 409 Conflict cannot occur. + OmKeyInfo existingKeyInfo = omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey); + validateAtomicRewrite(existingKeyInfo, keyArgs); + validateIfMatchETag(keyArgs, existingKeyInfo); + if (!partsList.isEmpty()) { final OmMultipartKeyInfo.PartKeyInfoMap partKeyInfoMap = multipartKeyInfo.getPartKeyInfoMap(); diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index 60a5f742141b..b74a7d52186d 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -726,6 +726,9 @@ public Response completeMultipartUpload( List partList = multipartUploadRequest.getPartList(); + S3ConditionalRequest.WriteConditions writeConditions = + S3ConditionalRequest.parseWriteConditions(getHeaders(), key); + OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo; try { OzoneBucket ozoneBucket = volume.getBucket(bucket); @@ -738,7 +741,16 @@ public Response completeMultipartUpload( LOG.debug("Parts map {}", partsMap); } - omMultipartUploadCompleteInfo = ozoneBucket.completeMultipartUpload(key, uploadID, partsMap); + Long expectedDataGeneration = null; + String expectedETag = null; + if (writeConditions.hasIfNoneMatch()) { + expectedDataGeneration = OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS; + } else if (writeConditions.hasIfMatch()) { + expectedETag = writeConditions.getExpectedETag(); + } + + omMultipartUploadCompleteInfo = ozoneBucket.completeMultipartUpload( + key, uploadID, partsMap, expectedDataGeneration, expectedETag); CompleteMultipartUploadResponse completeMultipartUploadResponse = new CompleteMultipartUploadResponse(); completeMultipartUploadResponse.setBucket(bucket); @@ -770,6 +782,13 @@ public Response completeMultipartUpload( "considered as Unix Paths. A directory already exists with a " + "given KeyName caused failure for MPU"); throw os3Exception; + } else if (ex.getResult() == ResultCodes.KEY_ALREADY_EXISTS + || ex.getResult() == ResultCodes.ETAG_MISMATCH + || ex.getResult() == ResultCodes.ETAG_NOT_AVAILABLE) { + throw newError(PRECOND_FAILED, key, ex); + } else if (ex.getResult() == ResultCodes.KEY_NOT_FOUND + && writeConditions.hasIfMatch()) { + throw newError(PRECOND_FAILED, key, ex); } throw newError(bucket, key, ex); } catch (Exception ex) { diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java index 720c5851d640..35956bc1df8a 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java @@ -409,6 +409,16 @@ public OmMultipartUploadCompleteInfo completeMultipartUpload( .completeMultipartUpload(keyName, uploadID, partsMap); } + @Override + public OmMultipartUploadCompleteInfo completeMultipartUpload( + String volumeName, String bucketName, String keyName, String uploadID, + Map partsMap, + Long expectedDataGeneration, String expectedETag) throws IOException { + return getBucket(volumeName, bucketName) + .completeMultipartUpload(keyName, uploadID, partsMap, + expectedDataGeneration, expectedETag); + } + @Override public void abortMultipartUpload(String volumeName, String bucketName, String keyName, String uploadID) diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java index a1e511eda886..f9b86f1cd672 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.client; import static org.apache.hadoop.ozone.OzoneConsts.ETAG; +import static org.apache.hadoop.ozone.OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS; import static org.apache.hadoop.ozone.OzoneConsts.MD5_HASH; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER; @@ -582,6 +583,30 @@ public OmMultipartUploadCompleteInfo completeMultipartUpload(String key, DigestUtils.sha256Hex(key)); } + @Override + public OmMultipartUploadCompleteInfo completeMultipartUpload(String key, + String uploadID, Map partsMap, + Long expectedDataGeneration, String expectedETag) throws IOException { + // Handle If-None-Match: * (expectedDataGeneration == -1 means create-if-absent) + if (expectedDataGeneration != null && + expectedDataGeneration == EXPECTED_GEN_CREATE_IF_NOT_EXISTS) { + if (keyContents.containsKey(key)) { + throw new OMException("Key already exists", ResultCodes.KEY_ALREADY_EXISTS); + } + } + + // Handle If-Match: + if (expectedETag != null) { + OzoneKeyDetails existingKey = keyDetails.get(key); + if (existingKey == null) { + throw new OMException("Key not found", ResultCodes.KEY_NOT_FOUND); + } + // Stub doesn't track ETag, so we just delegate + } + + return completeMultipartUpload(key, uploadID, partsMap); + } + @Override public void abortMultipartUpload(String keyName, String uploadID) throws IOException {