-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Check pointing Leader Scheduler State #5352
Changes from 14 commits
bebc885
b4aa574
784106e
f825a27
bc2449f
2589f1f
d21bd0e
b2abddf
86ca70e
c665e93
4cd341c
9893f15
15d0fdf
3d7e2f0
e2fd7be
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,20 +7,25 @@ | |
import org.opensearch.dataprepper.model.event.Event; | ||
import org.opensearch.dataprepper.model.record.Record; | ||
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; | ||
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition; | ||
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition; | ||
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.LeaderProgressState; | ||
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState; | ||
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import javax.inject.Named; | ||
import java.time.Duration; | ||
import java.time.Instant; | ||
import java.util.ArrayList; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.LeaderScheduler.DEFAULT_EXTEND_LEASE_MINUTES; | ||
|
||
@Named | ||
public class Crawler { | ||
private static final Logger log = LoggerFactory.getLogger(Crawler.class); | ||
|
@@ -35,11 +40,15 @@ public Crawler(CrawlerClient client) { | |
this.crawlingTimer = pluginMetrics.timer("crawlingTime"); | ||
} | ||
|
||
public Instant crawl(Instant lastPollTime, | ||
public Instant crawl(LeaderPartition leaderPartition, | ||
EnhancedSourceCoordinator coordinator, int batchSize) { | ||
long startTime = System.currentTimeMillis(); | ||
Instant lastLeaderSavedInstant = Instant.now(); | ||
LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get(); | ||
Instant lastPollTime = leaderProgressState.getLastPollTime(); | ||
client.setLastPollTime(lastPollTime); | ||
Iterator<ItemInfo> itemInfoIterator = client.listItems(); | ||
Instant latestModifiedTime = Instant.from(lastPollTime); | ||
log.info("Starting to crawl the source with lastPollTime: {}", lastPollTime); | ||
do { | ||
final List<ItemInfo> itemInfoList = new ArrayList<>(); | ||
|
@@ -51,20 +60,40 @@ public Instant crawl(Instant lastPollTime, | |
continue; | ||
} | ||
itemInfoList.add(nextItem); | ||
if (nextItem.getLastModifiedAt().isAfter(latestModifiedTime)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if an item that has already been added to the list gets modified and then we add a different item to the list that got modified later? Won't we end up skipping the update from the first item? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Max is not impacting the jql query. The max is only for intermediate check pointing. |
||
latestModifiedTime = nextItem.getLastModifiedAt(); | ||
} | ||
} | ||
createPartition(itemInfoList, coordinator); | ||
// intermediate updates to master partition state is required here | ||
|
||
// Check point leader progress state at every minute interval. | ||
Instant currentTimeInstance = Instant.now(); | ||
if (Duration.between(lastLeaderSavedInstant, currentTimeInstance).toMinutes() >= 1) { | ||
// intermediate updates to master partition state | ||
updateLeaderProgressState(leaderPartition, latestModifiedTime, coordinator); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just make sure that this won't block creation of any partitions by filtering them out based on the timestamp. You can be a little lenient if needed by making the "lastPollTime" from before the time that you actually start the crawling, since source coordination will dedupe. If there are no issues with this, then it is ideal since that means less calls to source coordination. You can start crawling from lastPollTime - someShortAmountOfTime if there's a chance of any race conditions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, at the end of the loop, I am checkpointing with the "lastPollTime" before the crawl start. lastPollTime - someShortAmountOfTime looks like a good idea for the next crawling. I will see the impact of this change in the JQL and play with it to see if we can go with that choice. |
||
lastLeaderSavedInstant = currentTimeInstance; | ||
} | ||
|
||
} while (itemInfoIterator.hasNext()); | ||
Instant startTimeInstant = Instant.ofEpochMilli(startTime); | ||
updateLeaderProgressState(leaderPartition, startTimeInstant, coordinator); | ||
long crawlTimeMillis = System.currentTimeMillis() - startTime; | ||
log.debug("Crawling completed in {} ms", crawlTimeMillis); | ||
crawlingTimer.record(crawlTimeMillis, TimeUnit.MILLISECONDS); | ||
return Instant.ofEpochMilli(startTime); | ||
return startTimeInstant; | ||
} | ||
|
||
public void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) { | ||
client.executePartition(state, buffer, acknowledgementSet); | ||
} | ||
|
||
private void updateLeaderProgressState(LeaderPartition leaderPartition, Instant updatedPollTime, EnhancedSourceCoordinator coordinator) { | ||
LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get(); | ||
leaderProgressState.setLastPollTime(updatedPollTime); | ||
leaderPartition.setLeaderProgressState(leaderProgressState); | ||
coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES); | ||
} | ||
|
||
private void createPartition(List<ItemInfo> itemInfoList, EnhancedSourceCoordinator coordinator) { | ||
if (itemInfoList.isEmpty()) { | ||
return; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,7 +10,9 @@ | |
import org.opensearch.dataprepper.model.event.Event; | ||
import org.opensearch.dataprepper.model.record.Record; | ||
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; | ||
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition; | ||
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition; | ||
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.LeaderProgressState; | ||
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState; | ||
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; | ||
import org.opensearch.dataprepper.plugins.source.source_crawler.model.TestItemInfo; | ||
|
@@ -20,48 +22,50 @@ | |
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Optional; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertNotEquals; | ||
import static org.junit.jupiter.api.Assertions.assertNotNull; | ||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.Mockito.never; | ||
import static org.mockito.Mockito.reset; | ||
import static org.mockito.Mockito.verify; | ||
import static org.mockito.Mockito.when; | ||
import static org.mockito.internal.verification.VerificationModeFactory.times; | ||
|
||
@ExtendWith(MockitoExtension.class) | ||
public class CrawlerTest { | ||
private static final int DEFAULT_BATCH_SIZE = 50; | ||
Instant lastPollTime = Instant.ofEpochMilli(0); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good to make this private There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Made this private now |
||
@Mock | ||
private AcknowledgementSet acknowledgementSet; | ||
|
||
@Mock | ||
private EnhancedSourceCoordinator coordinator; | ||
|
||
@Mock | ||
private Buffer<Record<Event>> buffer; | ||
|
||
@Mock | ||
private CrawlerClient client; | ||
|
||
@Mock | ||
private SaasWorkerProgressState state; | ||
|
||
@Mock | ||
private LeaderPartition leaderPartition; | ||
private Crawler crawler; | ||
|
||
private static final int DEFAULT_BATCH_SIZE = 50; | ||
|
||
@BeforeEach | ||
public void setup() { | ||
crawler = new Crawler(client); | ||
when(leaderPartition.getProgressState()).thenReturn(Optional.of(new LeaderProgressState(lastPollTime))); | ||
} | ||
|
||
@Test | ||
public void crawlerConstructionTest() { | ||
reset(leaderPartition); | ||
assertNotNull(crawler); | ||
} | ||
|
||
@Test | ||
public void executePartitionTest() { | ||
reset(leaderPartition); | ||
crawler.executePartition(state, buffer, acknowledgementSet); | ||
verify(client).executePartition(state, buffer, acknowledgementSet); | ||
} | ||
|
@@ -70,45 +74,43 @@ public void executePartitionTest() { | |
void testCrawlWithEmptyList() { | ||
Instant lastPollTime = Instant.ofEpochMilli(0); | ||
when(client.listItems()).thenReturn(Collections.emptyIterator()); | ||
crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); | ||
when(leaderPartition.getProgressState()).thenReturn(Optional.of(new LeaderProgressState(lastPollTime))); | ||
crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE); | ||
verify(coordinator, never()).createPartition(any(SaasSourcePartition.class)); | ||
} | ||
|
||
@Test | ||
void testCrawlWithNonEmptyList(){ | ||
Instant lastPollTime = Instant.ofEpochMilli(0); | ||
void testCrawlWithNonEmptyList() { | ||
List<ItemInfo> itemInfoList = new ArrayList<>(); | ||
for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) { | ||
itemInfoList.add(new TestItemInfo("itemId")); | ||
} | ||
when(client.listItems()).thenReturn(itemInfoList.iterator()); | ||
crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); | ||
crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE); | ||
verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class)); | ||
|
||
} | ||
|
||
@Test | ||
void testCrawlWithMultiplePartitions(){ | ||
Instant lastPollTime = Instant.ofEpochMilli(0); | ||
void testCrawlWithMultiplePartitions() { | ||
List<ItemInfo> itemInfoList = new ArrayList<>(); | ||
for (int i = 0; i < DEFAULT_BATCH_SIZE + 1; i++) { | ||
itemInfoList.add(new TestItemInfo("testId")); | ||
} | ||
when(client.listItems()).thenReturn(itemInfoList.iterator()); | ||
crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); | ||
crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE); | ||
verify(coordinator, times(2)).createPartition(any(SaasSourcePartition.class)); | ||
} | ||
|
||
@Test | ||
void testBatchSize() { | ||
Instant lastPollTime = Instant.ofEpochMilli(0); | ||
List<ItemInfo> itemInfoList = new ArrayList<>(); | ||
int maxItemsPerPage = 50; | ||
for (int i = 0; i < maxItemsPerPage; i++) { | ||
itemInfoList.add(new TestItemInfo("testId")); | ||
} | ||
when(client.listItems()).thenReturn(itemInfoList.iterator()); | ||
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); | ||
crawler.crawl(leaderPartition, coordinator, maxItemsPerPage); | ||
int expectedNumberOfInvocations = 1; | ||
verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class)); | ||
|
||
|
@@ -117,33 +119,31 @@ void testBatchSize() { | |
for (int i = 0; i < maxItemsPerPage; i++) { | ||
itemInfoList2.add(new TestItemInfo("testId")); | ||
} | ||
when(client.listItems()).thenReturn(itemInfoList.iterator()); | ||
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage2); | ||
when(client.listItems()).thenReturn(itemInfoList2.iterator()); | ||
crawler.crawl(leaderPartition, coordinator, maxItemsPerPage2); | ||
expectedNumberOfInvocations += 2; | ||
verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class)); | ||
} | ||
|
||
@Test | ||
void testCrawlWithNullItemsInList() throws NoSuchFieldException, IllegalAccessException { | ||
Instant lastPollTime = Instant.ofEpochMilli(0); | ||
void testCrawlWithNullItemsInList() { | ||
List<ItemInfo> itemInfoList = new ArrayList<>(); | ||
itemInfoList.add(null); | ||
for (int i = 0; i < DEFAULT_BATCH_SIZE - 1; i++) { | ||
itemInfoList.add(new TestItemInfo("testId")); | ||
} | ||
when(client.listItems()).thenReturn(itemInfoList.iterator()); | ||
crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); | ||
crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE); | ||
verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class)); | ||
} | ||
|
||
@Test | ||
void testUpdatingPollTimeNullMetaData() { | ||
Instant lastPollTime = Instant.ofEpochMilli(0); | ||
List<ItemInfo> itemInfoList = new ArrayList<>(); | ||
ItemInfo testItem = createTestItemInfo("1"); | ||
itemInfoList.add(testItem); | ||
when(client.listItems()).thenReturn(itemInfoList.iterator()); | ||
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); | ||
Instant updatedPollTime = crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE); | ||
assertNotEquals(Instant.ofEpochMilli(0), updatedPollTime); | ||
} | ||
|
||
|
@@ -154,7 +154,7 @@ void testUpdatedPollTimeNiCreatedLarger() { | |
ItemInfo testItem = createTestItemInfo("1"); | ||
itemInfoList.add(testItem); | ||
when(client.listItems()).thenReturn(itemInfoList.iterator()); | ||
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); | ||
Instant updatedPollTime = crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE); | ||
assertNotEquals(lastPollTime, updatedPollTime); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
Constants.CREATED
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed it