Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/main/java/io/cdap/plugin/salesforce/SalesforceQueryUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@ public static String createSObjectIdQuery(String query) {
return SELECT + FIELD_ID + " " + fromStatement;
}

/**
* Creates a COUNT query from an existing SOQL query.
* Replaces the SELECT fields with COUNT() while preserving the FROM and WHERE clauses.
*
* @param query the original SOQL query
* @return a COUNT SOQL query string
*/
public static String createCountQuery(String query) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How long does this query takes when we use filters?

Can you add tests cases with before and after time with high record count

  1. Test table having 30-40M records and with a filter query
  2. Table with 10M records with a filter query.

Basically the things that we want to tests is how does the query cost scales with record count.

String fromStatement = SalesforceQueryParser.getFromStatement(query);
return SELECT + "COUNT() " + fromStatement;
}

/**
* Generates SObject query filter based on provided values.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -71,6 +73,7 @@ public class SalesforceBatchSource extends
public static final String NAME = "Salesforce";

private final SalesforceSourceConfig config;
private static final Logger LOG = LoggerFactory.getLogger(SalesforceBatchSource.class);
private Schema schema;
private MapToRecordTransformer transformer;
private Set<String> jobIds = new HashSet<>();
Expand Down Expand Up @@ -139,7 +142,7 @@ public void prepareRun(BatchSourceContext context) {

authenticatorCredentials = config.getConnection().getAuthenticatorCredentials();
List<SalesforceSplit> querySplits =
getSplits(config, authenticatorCredentials, context.getLogicalStartTime(), oAuthInfo);
getSplits(config, authenticatorCredentials, context.getLogicalStartTime(), oAuthInfo, false);
querySplits.stream().forEach(salesforceSplit -> jobIds.add(salesforceSplit.getJobId()));
context.setInput(Input.of(config.getReferenceNameOrNormalizedFQN(orgId, sObjectName),
new SalesforceInputFormatProvider(
Expand All @@ -148,10 +151,21 @@ public void prepareRun(BatchSourceContext context) {

public static List<SalesforceSplit> getSplits(
SalesforceSourceConfig config, AuthenticatorCredentials authenticatorCredentials,
long logicStartTime, OAuthInfo oAuthInfo) {
long logicStartTime, OAuthInfo oAuthInfo, boolean pkChunkCountCheck) {
String query = config.getQuery(logicStartTime, oAuthInfo);
BulkConnection bulkConnection = SalesforceSplitUtil.getBulkConnection(authenticatorCredentials);

boolean enablePKChunk = config.getEnablePKChunk();

if (enablePKChunk && pkChunkCountCheck) {
enablePKChunk = SalesforceSplitUtil.hasRequiredCountForPkChunking(
query, authenticatorCredentials, SalesforceSourceConstants.PK_CHUNK_RECORD_COUNT_THRESHOLD);
if (!enablePKChunk) {
LOG.info("PK chunking skipped: record count is below threshold {} ",
SalesforceSourceConstants.PK_CHUNK_RECORD_COUNT_THRESHOLD);
}
}

if (enablePKChunk) {
String parent = config.getParent();
int chunkSize = config.getChunkSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class SalesforceSourceConstants {
public static final int MAX_PK_CHUNK_SIZE = 250000;
public static final int DEFAULT_PK_CHUNK_SIZE = 100000;
public static final int MIN_PK_CHUNK_SIZE = 1;
public static final long PK_CHUNK_RECORD_COUNT_THRESHOLD = 1_000_000;
// https://developer.salesforce.com/docs/atlas.en-us.252.0.api_asynch.meta/api_asynch/
// async_api_headers_enable_pk_chunking.htm
// **Always use lowercase names** to ensure consistency, especially if the sObject name is manually provided.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,21 @@
import com.sforce.async.JobInfo;
import com.sforce.async.JobStateEnum;
import com.sforce.async.OperationEnum;
import com.sforce.soap.partner.PartnerConnection;
import com.sforce.soap.partner.QueryResult;
import com.sforce.ws.ConnectionException;
import dev.failsafe.FailsafeException;
import dev.failsafe.RetryPolicy;
import dev.failsafe.TimeoutExceededException;
import io.cdap.plugin.salesforce.BulkAPIBatchException;
import io.cdap.plugin.salesforce.InvalidConfigException;
import io.cdap.plugin.salesforce.SObjectDescriptor;
import io.cdap.plugin.salesforce.SalesforceBulkUtil;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
import io.cdap.plugin.salesforce.SalesforceQueryUtil;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.parser.SalesforceQueryParser;
import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -263,6 +268,32 @@ public static boolean isPkChunkingSupported(String sobjectName) {
return false;
}

/**
* Determines whether PK chunking should be enable/disable automatically based on record count,
* object support, and query compatibility.
*
* @param query the SOQL query
* @param credentials authenticator credentials for SOAP API
* @param threshold the record count threshold above which PK chunking is enabled
* @return true if PK chunking should be auto-enabled, false otherwise
*/
public static boolean hasRequiredCountForPkChunking(
String query, AuthenticatorCredentials credentials, long threshold) {
try {
String sObjectName = SObjectDescriptor.fromQuery(query).getName();
String countQuery = SalesforceQueryUtil.createCountQuery(query);
PartnerConnection partnerConnection = SalesforceConnectionUtil.getPartnerConnection(credentials);
QueryResult result = partnerConnection.query(countQuery);
int recordCount = result.getSize();
LOG.debug("PK Chunking validation: object '{}' has {} records, threshold is {}",
sObjectName, recordCount, threshold);
return recordCount >= threshold;
} catch (Exception e) {
LOG.warn("PK Chunking validation: unexpected error, falling back to default PK chunking", e);
return true;
}
}

// This is added only for UCS use case.
private static boolean isCustomObject(String sobjectName) {
return sobjectName.toLowerCase().endsWith("__c");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,24 @@ public void testCreateSObjectIdQuery() {

Assert.assertEquals("SELECT Id " + fromClause, sObjectIdQuery);
}

@Test
public void testCreateCountQuery_queryWithWhereClause_replacesFieldsWithCount() {
String query = "SELECT Id,Name,SomeField FROM sObjectName WHERE LastModifiedDate>=2019-04-12T23:23:23Z";

String result = SalesforceQueryUtil.createCountQuery(query);

Assert.assertEquals(
"SELECT COUNT() FROM sObjectName WHERE LastModifiedDate>=2019-04-12T23:23:23Z",
result);
}

@Test
public void testCreateCountQuery_queryWithoutWhereClause_replacesFieldsWithCount() {
String query = "SELECT Id, Name FROM Account";

String result = SalesforceQueryUtil.createCountQuery(query);

Assert.assertEquals("SELECT COUNT() FROM Account", result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -624,4 +624,114 @@ private void testPKChunk(ImmutableMap<String, String> properties) throws Excepti
Assert.assertEquals("record3", results.get(2).get("Name"));
Assert.assertEquals("record4", results.get(3).get("Name"));
}

@Test
public void testGetSplits_pkChunkEnabledWithCustomChunkSize_returnsAllRecords() throws Exception {
ImmutableMap<String, String> properties = new ImmutableMap.Builder<String, String>()
.put("enablePKChunk", "true")
.put("chunkSize", "2")
.build();

testPKChunk(properties);
}

@Test
public void testGetSplits_pkChunkEnabledWithDefaultChunkSize_returnsAllRecords() throws Exception {
ImmutableMap<String, String> properties = new ImmutableMap.Builder<String, String>()
.put("enablePKChunk", "true")
.build();

testPKChunk(properties);
}

@Test
public void testGetSplits_pkChunkEnabledWithRecordsBelowThreshold_chunkingSkipped() throws Exception {
String sObjectName = createCustomObject("IT_PKChunkBelowThreshold", null);

List<SObject> sObjects = new ImmutableList.Builder<SObject>()
.add(new SObjectBuilder()
.setType(sObjectName)
.put("Name", "record1")
.build())
.add(new SObjectBuilder()
.setType(sObjectName)
.put("Name", "record2")
.build())
.build();

addSObjects(sObjects, true);

ImmutableMap.Builder<String, String> properties = getBaseProperties("SalesforceReaderPKChunkBelowThreshold");
properties.put("enablePKChunk", "true");
properties.put(SalesforceSourceConstants.PROPERTY_QUERY, "SELECT Name FROM " + sObjectName);

List<StructuredRecord> results = getPipelineResults(properties.build(), SalesforceBatchSource.NAME,
"SalesforceBatch");

results.sort(Comparator.comparing(record -> record.get("Name")));
Assert.assertEquals(2, results.size());
Assert.assertEquals("record1", results.get(0).get("Name"));
Assert.assertEquals("record2", results.get(1).get("Name"));
}

@Test
public void testGetSplits_pkChunkDisabledViaUI_chunkingNotApplied() throws Exception {
String sObjectName = createCustomObject("IT_PKChunkDisabled", null);

List<SObject> sObjects = new ImmutableList.Builder<SObject>()
.add(new SObjectBuilder()
.setType(sObjectName)
.put("Name", "record1")
.build())
.add(new SObjectBuilder()
.setType(sObjectName)
.put("Name", "record2")
.build())
.build();

addSObjects(sObjects, true);

ImmutableMap.Builder<String, String> properties = getBaseProperties("SalesforceReaderPKChunkDisabled");
properties.put("enablePKChunk", "false");
properties.put(SalesforceSourceConstants.PROPERTY_QUERY, "SELECT Name FROM " + sObjectName);

List<StructuredRecord> results = getPipelineResults(properties.build(), SalesforceBatchSource.NAME,
"SalesforceBatch");

results.sort(Comparator.comparing(record -> record.get("Name")));
Assert.assertEquals(2, results.size());
Assert.assertEquals("record1", results.get(0).get("Name"));
Assert.assertEquals("record2", results.get(1).get("Name"));
}

@Test
public void testGetSplits_pkChunkEnabledOnCustomObjectBelowThreshold_chunkingSkipped() throws Exception {
String customObjectName = createCustomObject("IT_CustomPKChunk", null);

List<SObject> sObjects = new ImmutableList.Builder<SObject>()
.add(new SObjectBuilder()
.setType(customObjectName)
.put("Name", "custom_record1")
.build())
.add(new SObjectBuilder()
.setType(customObjectName)
.put("Name", "custom_record2")
.build())
.build();

addSObjects(sObjects, true);

ImmutableMap.Builder<String, String> properties = getBaseProperties("SalesforceReaderCustomPKChunk");
properties.put("enablePKChunk", "true");
properties.put("chunkSize", "100000");
properties.put(SalesforceSourceConstants.PROPERTY_QUERY, "SELECT Name FROM " + customObjectName);

List<StructuredRecord> results = getPipelineResults(properties.build(), SalesforceBatchSource.NAME,
"SalesforceBatch");

results.sort(Comparator.comparing(record -> record.get("Name")));
Assert.assertEquals(2, results.size());
Assert.assertEquals("custom_record1", results.get(0).get("Name"));
Assert.assertEquals("custom_record2", results.get(1).get("Name"));
}
}