Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check pointing Leader Scheduler State #5352

Merged
merged 15 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,28 @@ public Map<String, Object> getKeyAttributes() {

@Override
public Instant getLastModifiedAt() {
long updatedAtMillis = Long.parseLong((String) this.metadata.getOrDefault(Constants.UPDATED, "0"));
long createdAtMillis = Long.parseLong((String) this.metadata.getOrDefault(Constants.CREATED, "0"));
long updatedAtMillis = getMetadataField(Constants.UPDATED);
long createdAtMillis = getMetadataField(CREATED);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Constants.CREATED

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed it

return createdAtMillis > updatedAtMillis ?
Instant.ofEpochMilli(createdAtMillis) : Instant.ofEpochMilli(updatedAtMillis);
}

private Long getMetadataField(String fieldName) {
Object value = this.metadata.get(fieldName);
if (value == null) {
return 0L;
} else if (value instanceof Long) {
return (Long) value;
} else if (value instanceof String) {
try {
return Long.parseLong((String) value);
} catch (Exception e) {
return 0L;
}
}
return 0L;
}

public static class JiraItemInfoBuilder {
private Map<String, Object> metadata;
private Instant eventTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,25 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.LeaderProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Named;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.LeaderScheduler.DEFAULT_EXTEND_LEASE_MINUTES;

@Named
public class Crawler {
private static final Logger log = LoggerFactory.getLogger(Crawler.class);
Expand All @@ -35,11 +40,15 @@ public Crawler(CrawlerClient client) {
this.crawlingTimer = pluginMetrics.timer("crawlingTime");
}

public Instant crawl(Instant lastPollTime,
public Instant crawl(LeaderPartition leaderPartition,
EnhancedSourceCoordinator coordinator, int batchSize) {
long startTime = System.currentTimeMillis();
Instant lastLeaderSavedInstant = Instant.now();
LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get();
Instant lastPollTime = leaderProgressState.getLastPollTime();
client.setLastPollTime(lastPollTime);
Iterator<ItemInfo> itemInfoIterator = client.listItems();
Instant latestModifiedTime = Instant.from(lastPollTime);
log.info("Starting to crawl the source with lastPollTime: {}", lastPollTime);
do {
final List<ItemInfo> itemInfoList = new ArrayList<>();
Expand All @@ -51,20 +60,40 @@ public Instant crawl(Instant lastPollTime,
continue;
}
itemInfoList.add(nextItem);
if (nextItem.getLastModifiedAt().isAfter(latestModifiedTime)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if an item that has already been added to the list gets modified and then we add a different item to the list that got modified later? Won't we end up skipping the update from the first item?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Max is not impacting the jql query. The max is only for intermediate check pointing.

latestModifiedTime = nextItem.getLastModifiedAt();
}
}
createPartition(itemInfoList, coordinator);
// intermediate updates to master partition state is required here

// Check point leader progress state at every minute interval.
Instant currentTimeInstance = Instant.now();
if (Duration.between(lastLeaderSavedInstant, currentTimeInstance).toMinutes() >= 1) {
// intermediate updates to master partition state
updateLeaderProgressState(leaderPartition, latestModifiedTime, coordinator);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just make sure that this won't block creation of any partitions by filtering them out based on the timestamp. You can be a little lenient if needed by making the "lastPollTime" from before the time that you actually start the crawling, since source coordination will dedupe. If there are no issues with this, then it is ideal since that means less calls to source coordination. You can start crawling from lastPollTime - someShortAmountOfTime if there's a chance of any race conditions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, at the end of the loop, I am checkpointing with the "lastPollTime" before the crawl start. lastPollTime - someShortAmountOfTime looks like a good idea for the next crawling. I will see the impact of this change in the JQL and play with it to see if we can go with that choice.

lastLeaderSavedInstant = currentTimeInstance;
}

} while (itemInfoIterator.hasNext());
Instant startTimeInstant = Instant.ofEpochMilli(startTime);
updateLeaderProgressState(leaderPartition, startTimeInstant, coordinator);
long crawlTimeMillis = System.currentTimeMillis() - startTime;
log.debug("Crawling completed in {} ms", crawlTimeMillis);
crawlingTimer.record(crawlTimeMillis, TimeUnit.MILLISECONDS);
return Instant.ofEpochMilli(startTime);
return startTimeInstant;
}

public void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) {
client.executePartition(state, buffer, acknowledgementSet);
}

private void updateLeaderProgressState(LeaderPartition leaderPartition, Instant updatedPollTime, EnhancedSourceCoordinator coordinator) {
LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get();
leaderProgressState.setLastPollTime(updatedPollTime);
leaderPartition.setLeaderProgressState(leaderProgressState);
coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);
}

private void createPartition(List<ItemInfo> itemInfoList, EnhancedSourceCoordinator coordinator) {
if (itemInfoList.isEmpty()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,19 @@
import org.opensearch.dataprepper.plugins.source.source_crawler.base.Crawler;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourcePlugin;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.LeaderProgressState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;

public class LeaderScheduler implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(LeaderScheduler.class);

/**
* Default duration to extend the timeout of lease
*/
private static final Duration DEFAULT_EXTEND_LEASE_MINUTES = Duration.ofMinutes(3);

public static final Duration DEFAULT_EXTEND_LEASE_MINUTES = Duration.ofMinutes(3);
private static final Logger LOG = LoggerFactory.getLogger(LeaderScheduler.class);
/**
* Default interval to run lease check and shard discovery
*/
Expand Down Expand Up @@ -64,14 +60,8 @@ public void run() {
// Once owned, run Normal LEADER node process.
// May want to quit this scheduler if we don't want to monitor future changes
if (leaderPartition != null) {
LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get();
Instant lastPollTime = leaderProgressState.getLastPollTime();

//Start crawling and create child partitions
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, batchSize);
leaderProgressState.setLastPollTime(updatedPollTime);
leaderPartition.setLeaderProgressState(leaderProgressState);
coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);
//Start crawling, create child partitions and also continue to update leader partition state
crawler.crawl(leaderPartition, coordinator, batchSize);
}

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.LeaderProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.TestItemInfo;
Expand All @@ -20,48 +22,50 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.times;

@ExtendWith(MockitoExtension.class)
public class CrawlerTest {
private static final int DEFAULT_BATCH_SIZE = 50;
Instant lastPollTime = Instant.ofEpochMilli(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to make this private

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this private now

@Mock
private AcknowledgementSet acknowledgementSet;

@Mock
private EnhancedSourceCoordinator coordinator;

@Mock
private Buffer<Record<Event>> buffer;

@Mock
private CrawlerClient client;

@Mock
private SaasWorkerProgressState state;

@Mock
private LeaderPartition leaderPartition;
private Crawler crawler;

private static final int DEFAULT_BATCH_SIZE = 50;

@BeforeEach
public void setup() {
crawler = new Crawler(client);
when(leaderPartition.getProgressState()).thenReturn(Optional.of(new LeaderProgressState(lastPollTime)));
}

@Test
public void crawlerConstructionTest() {
reset(leaderPartition);
assertNotNull(crawler);
}

@Test
public void executePartitionTest() {
reset(leaderPartition);
crawler.executePartition(state, buffer, acknowledgementSet);
verify(client).executePartition(state, buffer, acknowledgementSet);
}
Expand All @@ -70,45 +74,43 @@ public void executePartitionTest() {
void testCrawlWithEmptyList() {
Instant lastPollTime = Instant.ofEpochMilli(0);
when(client.listItems()).thenReturn(Collections.emptyIterator());
crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
when(leaderPartition.getProgressState()).thenReturn(Optional.of(new LeaderProgressState(lastPollTime)));
crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE);
verify(coordinator, never()).createPartition(any(SaasSourcePartition.class));
}

@Test
void testCrawlWithNonEmptyList(){
Instant lastPollTime = Instant.ofEpochMilli(0);
void testCrawlWithNonEmptyList() {
List<ItemInfo> itemInfoList = new ArrayList<>();
for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) {
itemInfoList.add(new TestItemInfo("itemId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE);
verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class));

}

@Test
void testCrawlWithMultiplePartitions(){
Instant lastPollTime = Instant.ofEpochMilli(0);
void testCrawlWithMultiplePartitions() {
List<ItemInfo> itemInfoList = new ArrayList<>();
for (int i = 0; i < DEFAULT_BATCH_SIZE + 1; i++) {
itemInfoList.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE);
verify(coordinator, times(2)).createPartition(any(SaasSourcePartition.class));
}

@Test
void testBatchSize() {
Instant lastPollTime = Instant.ofEpochMilli(0);
List<ItemInfo> itemInfoList = new ArrayList<>();
int maxItemsPerPage = 50;
for (int i = 0; i < maxItemsPerPage; i++) {
itemInfoList.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage);
crawler.crawl(leaderPartition, coordinator, maxItemsPerPage);
int expectedNumberOfInvocations = 1;
verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class));

Expand All @@ -117,33 +119,31 @@ void testBatchSize() {
for (int i = 0; i < maxItemsPerPage; i++) {
itemInfoList2.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage2);
when(client.listItems()).thenReturn(itemInfoList2.iterator());
crawler.crawl(leaderPartition, coordinator, maxItemsPerPage2);
expectedNumberOfInvocations += 2;
verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class));
}

@Test
void testCrawlWithNullItemsInList() throws NoSuchFieldException, IllegalAccessException {
Instant lastPollTime = Instant.ofEpochMilli(0);
void testCrawlWithNullItemsInList() {
List<ItemInfo> itemInfoList = new ArrayList<>();
itemInfoList.add(null);
for (int i = 0; i < DEFAULT_BATCH_SIZE - 1; i++) {
itemInfoList.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE);
verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class));
}

@Test
void testUpdatingPollTimeNullMetaData() {
Instant lastPollTime = Instant.ofEpochMilli(0);
List<ItemInfo> itemInfoList = new ArrayList<>();
ItemInfo testItem = createTestItemInfo("1");
itemInfoList.add(testItem);
when(client.listItems()).thenReturn(itemInfoList.iterator());
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
Instant updatedPollTime = crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE);
assertNotEquals(Instant.ofEpochMilli(0), updatedPollTime);
}

Expand All @@ -154,7 +154,7 @@ void testUpdatedPollTimeNiCreatedLarger() {
ItemInfo testItem = createTestItemInfo("1");
itemInfoList.add(testItem);
when(client.listItems()).thenReturn(itemInfoList.iterator());
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
Instant updatedPollTime = crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE);
assertNotEquals(lastPollTime, updatedPollTime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@
@ExtendWith(MockitoExtension.class)
public class LeaderSchedulerTest {

private final int batchSize = 50;
@Mock
private EnhancedSourceCoordinator coordinator;
@Mock
private CrawlerSourcePlugin saasSourcePlugin;
@Mock
private Crawler crawler;

private final int batchSize = 50;

@Test
void testUnableToAcquireLeaderPartition() throws InterruptedException {
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize);
Expand Down Expand Up @@ -69,8 +68,8 @@ void testLeaderPartitionsCreation(boolean initializationState) throws Interrupte
executorService.shutdownNow();

// Check if crawler was invoked and updated leader lease renewal time
verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator, batchSize);
verify(coordinator, times(2)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));
verify(crawler, times(1)).crawl(leaderPartition, coordinator, batchSize);
verify(coordinator, times(1)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));

}

Expand Down Expand Up @@ -100,7 +99,7 @@ void testWhileLoopRunnningAfterTheSleep() throws InterruptedException {
LeaderPartition leaderPartition = new LeaderPartition();
leaderPartition.getProgressState().get().setInitialized(false);
leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L));
when(crawler.crawl(any(Instant.class), any(EnhancedSourceCoordinator.class), anyInt())).thenReturn(Instant.ofEpochMilli(10));
when(crawler.crawl(any(LeaderPartition.class), any(EnhancedSourceCoordinator.class), anyInt())).thenReturn(Instant.ofEpochMilli(10));
when(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE))
.thenReturn(Optional.of(leaderPartition))
.thenThrow(RuntimeException.class);
Expand All @@ -113,6 +112,6 @@ void testWhileLoopRunnningAfterTheSleep() throws InterruptedException {
executorService.shutdownNow();

// Check if crawler was invoked and updated leader lease renewal time
verify(crawler, atLeast(2)).crawl(any(Instant.class), any(EnhancedSourceCoordinator.class), anyInt());
verify(crawler, atLeast(2)).crawl(any(LeaderPartition.class), any(EnhancedSourceCoordinator.class), anyInt());
}
}
Loading