diff --git a/src/main/java/io/cdap/plugin/salesforce/SalesforceQueryUtil.java b/src/main/java/io/cdap/plugin/salesforce/SalesforceQueryUtil.java index d47237a3..8afc8d7e 100644 --- a/src/main/java/io/cdap/plugin/salesforce/SalesforceQueryUtil.java +++ b/src/main/java/io/cdap/plugin/salesforce/SalesforceQueryUtil.java @@ -83,6 +83,18 @@ public static String createSObjectIdQuery(String query) { return SELECT + FIELD_ID + " " + fromStatement; } + /** + * Creates a COUNT query from an existing SOQL query. + * Replaces the SELECT fields with COUNT() while preserving the FROM and WHERE clauses. + * + * @param query the original SOQL query + * @return a COUNT SOQL query string + */ + public static String createCountQuery(String query) { + String fromStatement = SalesforceQueryParser.getFromStatement(query); + return SELECT + "COUNT() " + fromStatement; + } + /** * Generates SObject query filter based on provided values. * diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java index 423c91ba..6d617dc1 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java @@ -48,6 +48,8 @@ import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashSet; @@ -71,6 +73,7 @@ public class SalesforceBatchSource extends public static final String NAME = "Salesforce"; private final SalesforceSourceConfig config; + private static final Logger LOG = LoggerFactory.getLogger(SalesforceBatchSource.class); private Schema schema; private MapToRecordTransformer transformer; private Set jobIds = new HashSet<>(); @@ -139,7 +142,7 @@ public void prepareRun(BatchSourceContext context) { authenticatorCredentials = config.getConnection().getAuthenticatorCredentials(); List querySplits = - getSplits(config, authenticatorCredentials, context.getLogicalStartTime(), oAuthInfo); + getSplits(config, authenticatorCredentials, context.getLogicalStartTime(), oAuthInfo, false); querySplits.stream().forEach(salesforceSplit -> jobIds.add(salesforceSplit.getJobId())); context.setInput(Input.of(config.getReferenceNameOrNormalizedFQN(orgId, sObjectName), new SalesforceInputFormatProvider( @@ -148,10 +151,21 @@ public void prepareRun(BatchSourceContext context) { public static List getSplits( SalesforceSourceConfig config, AuthenticatorCredentials authenticatorCredentials, - long logicStartTime, OAuthInfo oAuthInfo) { + long logicStartTime, OAuthInfo oAuthInfo, boolean pkChunkCountCheck) { String query = config.getQuery(logicStartTime, oAuthInfo); BulkConnection bulkConnection = SalesforceSplitUtil.getBulkConnection(authenticatorCredentials); + boolean enablePKChunk = config.getEnablePKChunk(); + + if (enablePKChunk && pkChunkCountCheck) { + enablePKChunk = SalesforceSplitUtil.hasRequiredCountForPkChunking( + query, authenticatorCredentials, SalesforceSourceConstants.PK_CHUNK_RECORD_COUNT_THRESHOLD); + if (!enablePKChunk) { + LOG.info("PK chunking skipped: record count is below threshold {} ", + SalesforceSourceConstants.PK_CHUNK_RECORD_COUNT_THRESHOLD); + } + } + if (enablePKChunk) { String parent = config.getParent(); int chunkSize = config.getChunkSize(); diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSourceConstants.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSourceConstants.java index dff1130b..66386a05 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSourceConstants.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSourceConstants.java @@ -55,6 +55,7 @@ public class SalesforceSourceConstants { public static final int MAX_PK_CHUNK_SIZE = 250000; public static final int DEFAULT_PK_CHUNK_SIZE = 100000; public static final int MIN_PK_CHUNK_SIZE = 1; + public static final long PK_CHUNK_RECORD_COUNT_THRESHOLD = 1_000_000; // https://developer.salesforce.com/docs/atlas.en-us.252.0.api_asynch.meta/api_asynch/ // async_api_headers_enable_pk_chunking.htm // **Always use lowercase names** to ensure consistency, especially if the sObject name is manually provided. diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java index 467af177..134bef27 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java @@ -26,6 +26,9 @@ import com.sforce.async.JobInfo; import com.sforce.async.JobStateEnum; import com.sforce.async.OperationEnum; +import com.sforce.soap.partner.PartnerConnection; +import com.sforce.soap.partner.QueryResult; +import com.sforce.ws.ConnectionException; import dev.failsafe.FailsafeException; import dev.failsafe.RetryPolicy; import dev.failsafe.TimeoutExceededException; @@ -33,9 +36,11 @@ import io.cdap.plugin.salesforce.InvalidConfigException; import io.cdap.plugin.salesforce.SObjectDescriptor; import io.cdap.plugin.salesforce.SalesforceBulkUtil; +import io.cdap.plugin.salesforce.SalesforceConnectionUtil; import io.cdap.plugin.salesforce.SalesforceQueryUtil; import io.cdap.plugin.salesforce.authenticator.Authenticator; import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials; +import io.cdap.plugin.salesforce.parser.SalesforceQueryParser; import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceSplit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -263,6 +268,32 @@ public static boolean isPkChunkingSupported(String sobjectName) { return false; } + /** + * Determines whether PK chunking should be enable/disable automatically based on record count, + * object support, and query compatibility. + * + * @param query the SOQL query + * @param credentials authenticator credentials for SOAP API + * @param threshold the record count threshold above which PK chunking is enabled + * @return true if PK chunking should be auto-enabled, false otherwise + */ + public static boolean hasRequiredCountForPkChunking( + String query, AuthenticatorCredentials credentials, long threshold) { + try { + String sObjectName = SObjectDescriptor.fromQuery(query).getName(); + String countQuery = SalesforceQueryUtil.createCountQuery(query); + PartnerConnection partnerConnection = SalesforceConnectionUtil.getPartnerConnection(credentials); + QueryResult result = partnerConnection.query(countQuery); + int recordCount = result.getSize(); + LOG.debug("PK Chunking validation: object '{}' has {} records, threshold is {}", + sObjectName, recordCount, threshold); + return recordCount >= threshold; + } catch (Exception e) { + LOG.warn("PK Chunking validation: unexpected error, falling back to default PK chunking", e); + return true; + } + } + // This is added only for UCS use case. private static boolean isCustomObject(String sobjectName) { return sobjectName.toLowerCase().endsWith("__c"); diff --git a/src/test/java/io/cdap/plugin/salesforce/SalesforceQueryUtilTest.java b/src/test/java/io/cdap/plugin/salesforce/SalesforceQueryUtilTest.java index 7ae75bb1..cb3b8320 100644 --- a/src/test/java/io/cdap/plugin/salesforce/SalesforceQueryUtilTest.java +++ b/src/test/java/io/cdap/plugin/salesforce/SalesforceQueryUtilTest.java @@ -227,4 +227,24 @@ public void testCreateSObjectIdQuery() { Assert.assertEquals("SELECT Id " + fromClause, sObjectIdQuery); } + + @Test + public void testCreateCountQuery_queryWithWhereClause_replacesFieldsWithCount() { + String query = "SELECT Id,Name,SomeField FROM sObjectName WHERE LastModifiedDate>=2019-04-12T23:23:23Z"; + + String result = SalesforceQueryUtil.createCountQuery(query); + + Assert.assertEquals( + "SELECT COUNT() FROM sObjectName WHERE LastModifiedDate>=2019-04-12T23:23:23Z", + result); + } + + @Test + public void testCreateCountQuery_queryWithoutWhereClause_replacesFieldsWithCount() { + String query = "SELECT Id, Name FROM Account"; + + String result = SalesforceQueryUtil.createCountQuery(query); + + Assert.assertEquals("SELECT COUNT() FROM Account", result); + } } diff --git a/src/test/java/io/cdap/plugin/salesforce/etl/SalesforceBatchSourceETLTest.java b/src/test/java/io/cdap/plugin/salesforce/etl/SalesforceBatchSourceETLTest.java index b4398463..e2ab4400 100644 --- a/src/test/java/io/cdap/plugin/salesforce/etl/SalesforceBatchSourceETLTest.java +++ b/src/test/java/io/cdap/plugin/salesforce/etl/SalesforceBatchSourceETLTest.java @@ -624,4 +624,114 @@ private void testPKChunk(ImmutableMap properties) throws Excepti Assert.assertEquals("record3", results.get(2).get("Name")); Assert.assertEquals("record4", results.get(3).get("Name")); } + + @Test + public void testGetSplits_pkChunkEnabledWithCustomChunkSize_returnsAllRecords() throws Exception { + ImmutableMap properties = new ImmutableMap.Builder() + .put("enablePKChunk", "true") + .put("chunkSize", "2") + .build(); + + testPKChunk(properties); + } + + @Test + public void testGetSplits_pkChunkEnabledWithDefaultChunkSize_returnsAllRecords() throws Exception { + ImmutableMap properties = new ImmutableMap.Builder() + .put("enablePKChunk", "true") + .build(); + + testPKChunk(properties); + } + + @Test + public void testGetSplits_pkChunkEnabledWithRecordsBelowThreshold_chunkingSkipped() throws Exception { + String sObjectName = createCustomObject("IT_PKChunkBelowThreshold", null); + + List sObjects = new ImmutableList.Builder() + .add(new SObjectBuilder() + .setType(sObjectName) + .put("Name", "record1") + .build()) + .add(new SObjectBuilder() + .setType(sObjectName) + .put("Name", "record2") + .build()) + .build(); + + addSObjects(sObjects, true); + + ImmutableMap.Builder properties = getBaseProperties("SalesforceReaderPKChunkBelowThreshold"); + properties.put("enablePKChunk", "true"); + properties.put(SalesforceSourceConstants.PROPERTY_QUERY, "SELECT Name FROM " + sObjectName); + + List results = getPipelineResults(properties.build(), SalesforceBatchSource.NAME, + "SalesforceBatch"); + + results.sort(Comparator.comparing(record -> record.get("Name"))); + Assert.assertEquals(2, results.size()); + Assert.assertEquals("record1", results.get(0).get("Name")); + Assert.assertEquals("record2", results.get(1).get("Name")); + } + + @Test + public void testGetSplits_pkChunkDisabledViaUI_chunkingNotApplied() throws Exception { + String sObjectName = createCustomObject("IT_PKChunkDisabled", null); + + List sObjects = new ImmutableList.Builder() + .add(new SObjectBuilder() + .setType(sObjectName) + .put("Name", "record1") + .build()) + .add(new SObjectBuilder() + .setType(sObjectName) + .put("Name", "record2") + .build()) + .build(); + + addSObjects(sObjects, true); + + ImmutableMap.Builder properties = getBaseProperties("SalesforceReaderPKChunkDisabled"); + properties.put("enablePKChunk", "false"); + properties.put(SalesforceSourceConstants.PROPERTY_QUERY, "SELECT Name FROM " + sObjectName); + + List results = getPipelineResults(properties.build(), SalesforceBatchSource.NAME, + "SalesforceBatch"); + + results.sort(Comparator.comparing(record -> record.get("Name"))); + Assert.assertEquals(2, results.size()); + Assert.assertEquals("record1", results.get(0).get("Name")); + Assert.assertEquals("record2", results.get(1).get("Name")); + } + + @Test + public void testGetSplits_pkChunkEnabledOnCustomObjectBelowThreshold_chunkingSkipped() throws Exception { + String customObjectName = createCustomObject("IT_CustomPKChunk", null); + + List sObjects = new ImmutableList.Builder() + .add(new SObjectBuilder() + .setType(customObjectName) + .put("Name", "custom_record1") + .build()) + .add(new SObjectBuilder() + .setType(customObjectName) + .put("Name", "custom_record2") + .build()) + .build(); + + addSObjects(sObjects, true); + + ImmutableMap.Builder properties = getBaseProperties("SalesforceReaderCustomPKChunk"); + properties.put("enablePKChunk", "true"); + properties.put("chunkSize", "100000"); + properties.put(SalesforceSourceConstants.PROPERTY_QUERY, "SELECT Name FROM " + customObjectName); + + List results = getPipelineResults(properties.build(), SalesforceBatchSource.NAME, + "SalesforceBatch"); + + results.sort(Comparator.comparing(record -> record.get("Name"))); + Assert.assertEquals(2, results.size()); + Assert.assertEquals("custom_record1", results.get(0).get("Name")); + Assert.assertEquals("custom_record2", results.get(1).get("Name")); + } }