Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for compression config per KDS stream. #5046

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import software.amazon.kinesis.common.InitialPositionInStream;

import java.time.Duration;
Expand All @@ -38,4 +39,9 @@ public class KinesisStreamConfig {
public InitialPositionInStream getInitialPosition() {
return initialPosition.getPositionInStream();
}

@Getter
@JsonProperty("compression")
private CompressionOption compression = CompressionOption.NONE;

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

package org.opensearch.dataprepper.plugins.kinesis.source.converter;

import org.opensearch.dataprepper.model.codec.DecompressionEngine;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
Expand All @@ -31,11 +32,12 @@ public KinesisRecordConverter(final InputCodec codec) {
this.codec = codec;
}

public List<Record<Event>> convert(List<KinesisClientRecord> kinesisClientRecords,
public List<Record<Event>> convert(final DecompressionEngine decompressionEngine,
List<KinesisClientRecord> kinesisClientRecords,
final String streamName) throws IOException {
List<Record<Event>> records = new ArrayList<>();
for (KinesisClientRecord kinesisClientRecord : kinesisClientRecords) {
processRecord(kinesisClientRecord, record -> {
processRecord(decompressionEngine, kinesisClientRecord, record -> {
records.add(record);
Event event = record.getData();
EventMetadata eventMetadata = event.getMetadata();
Expand All @@ -52,11 +54,14 @@ public List<Record<Event>> convert(List<KinesisClientRecord> kinesisClientRecord
return records;
}

private void processRecord(KinesisClientRecord record, Consumer<Record<Event>> eventConsumer) throws IOException {
private void processRecord(final DecompressionEngine decompressionEngine,
KinesisClientRecord record,
Consumer<Record<Event>> eventConsumer) throws IOException {
// Read bytebuffer
byte[] arr = new byte[record.data().remaining()];
record.data().get(arr);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(arr);
codec.parse(byteArrayInputStream, eventConsumer);

codec.parse(decompressionEngine.createInputStream(byteArrayInputStream), eventConsumer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ public void processRecords(ProcessRecordsInput processRecordsInput) {

// Track the records for checkpoint purpose
kinesisCheckpointerTracker.addRecordForCheckpoint(extendedSequenceNumber, processRecordsInput.checkpointer());
List<Record<Event>> records = kinesisRecordConverter.convert(processRecordsInput.records(), streamIdentifier.streamName());
List<Record<Event>> records = kinesisRecordConverter.convert(
kinesisStreamConfig.getCompression().getDecompressionEngine(),
processRecordsInput.records(), streamIdentifier.streamName());

int eventCount = 0;
for (Record<Event> record: records) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.opensearch.dataprepper.pipeline.parser.DataPrepperDurationDeserializer;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import software.amazon.awssdk.regions.Region;
import software.amazon.kinesis.common.InitialPositionInStream;

Expand Down Expand Up @@ -156,6 +157,7 @@ void testSourceConfigWithInitialPosition() {
assertTrue(kinesisStreamConfig.getName().contains("stream"));
assertEquals(kinesisStreamConfig.getInitialPosition(), InitialPositionInStream.TRIM_HORIZON);
assertEquals(kinesisStreamConfig.getCheckPointInterval(), expectedCheckpointIntervals.get(kinesisStreamConfig.getName()));
assertEquals(kinesisStreamConfig.getCompression(), CompressionOption.GZIP);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.event.TestEventFactory;
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.codec.json.NdjsonInputCodec;
import org.opensearch.dataprepper.plugins.codec.json.NdjsonInputConfig;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
Expand All @@ -40,6 +43,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class KinesisRecordConverterTest {
private static final String streamId = "stream-1";
Expand All @@ -48,13 +52,15 @@ public class KinesisRecordConverterTest {
void testRecordConverter() throws IOException {
InputCodec codec = mock(InputCodec.class);
KinesisRecordConverter kinesisRecordConverter = new KinesisRecordConverter(codec);
DecompressionEngine decompressionEngine = CompressionOption.NONE.getDecompressionEngine();
doNothing().when(codec).parse(any(InputStream.class), any(Consumer.class));

String sample_record_data = "sample record data";
KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder()
.data(ByteBuffer.wrap(sample_record_data.getBytes()))
.build();
kinesisRecordConverter.convert(List.of(kinesisClientRecord), streamId);

kinesisRecordConverter.convert(decompressionEngine, List.of(kinesisClientRecord), streamId);
verify(codec, times(1)).parse(any(InputStream.class), any(Consumer.class));
}

Expand Down Expand Up @@ -89,7 +95,11 @@ public void testRecordConverterWithNdJsonInputCodec() throws IOException {
.subSequenceNumber(subsequenceNumber)
.partitionKey(partitionKey)
.build();
List<Record<Event>> events = kinesisRecordConverter.convert(List.of(kinesisClientRecord), streamId);
DecompressionEngine decompressionEngine = mock(DecompressionEngine.class);
InputStream inputStream = new ByteArrayInputStream(writer.toString().getBytes());
when(decompressionEngine.createInputStream(any(InputStream.class))).thenReturn(inputStream);

List<Record<Event>> events = kinesisRecordConverter.convert(decompressionEngine, List.of(kinesisClientRecord), streamId);

assertEquals(events.size(), numRecords);
events.forEach(eventRecord -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.converter.KinesisRecordConverter;
Expand All @@ -45,6 +47,8 @@
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -143,8 +147,11 @@ public class KinesisRecordProcessorTest {
@Mock
private KinesisCheckpointerTracker kinesisCheckpointerTracker;

@Mock
private DecompressionEngine decompressionEngine;

@BeforeEach
public void setup() {
public void setup() throws IOException {
MockitoAnnotations.initMocks(this);
pluginMetrics = mock(PluginMetrics.class);
pluginFactory = mock(PluginFactory.class);
Expand All @@ -156,6 +163,7 @@ public void setup() {
when(initializationInput.shardId()).thenReturn(shardId);
when(streamIdentifier.streamName()).thenReturn(streamId);
when(kinesisStreamConfig.getName()).thenReturn(streamId);
when(kinesisStreamConfig.getCompression()).thenReturn(CompressionOption.NONE);
PluginModel pluginModel = mock(PluginModel.class);
when(pluginModel.getPluginName()).thenReturn(codec_plugin_name);
when(pluginModel.getPluginSettings()).thenReturn(Collections.emptyMap());
Expand All @@ -172,6 +180,12 @@ public void setup() {
when(pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME, KINESIS_STREAM_TAG_KEY,
streamIdentifier.streamName())).thenReturn(acknowledgementSetFailures);

InputStream inputStream = mock(InputStream.class);
when(decompressionEngine.createInputStream(any(InputStream.class))).thenReturn(inputStream);
CompressionOption compressionOption = mock(CompressionOption.class);
when(kinesisStreamConfig.getCompression()).thenReturn(compressionOption);
when(compressionOption.getDecompressionEngine()).thenReturn(decompressionEngine);

recordProcessed = mock(Counter.class);
when(pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSED, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName())).thenReturn(recordProcessed);

Expand All @@ -193,7 +207,9 @@ void testProcessRecordsWithoutAcknowledgementsWithCheckpointApplied()
event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId);
Record<Event> record = new Record<>(event);
records.add(record);
when(kinesisRecordConverter.convert(eq(kinesisClientRecords), eq(streamId))).thenReturn(records);
InputStream inputStream = mock(InputStream.class);
when(decompressionEngine.createInputStream(inputStream)).thenReturn(inputStream);
when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records);

kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig,
acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier);
Expand Down Expand Up @@ -240,7 +256,7 @@ public void testProcessRecordsWithoutAcknowledgementsEnabled()
event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId);
Record<Event> record = new Record<>(event);
records.add(record);
when(kinesisRecordConverter.convert(eq(kinesisClientRecords), eq(streamId))).thenReturn(records);
when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records);

kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig,
acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier);
Expand Down Expand Up @@ -291,7 +307,7 @@ void testProcessRecordsWithAcknowledgementsEnabled()
event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId);
Record<Event> record = new Record<>(event);
records.add(record);
when(kinesisRecordConverter.convert(eq(kinesisClientRecords), eq(streamId))).thenReturn(records);
when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records);

kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig,
acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier);
Expand Down Expand Up @@ -346,7 +362,7 @@ void testProcessRecordsWithNDJsonInputCodec()
event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId);
Record<Event> record = new Record<>(event);
records.add(record);
when(kinesisRecordConverter.convert(eq(kinesisClientRecords), eq(streamId))).thenReturn(records);
when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records);

kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig,
acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier);
Expand Down Expand Up @@ -389,7 +405,7 @@ void testProcessRecordsNoThrowException()
event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId);
Record<Event> record = new Record<>(event);
records.add(record);
when(kinesisRecordConverter.convert(eq(kinesisClientRecords), eq(streamId))).thenReturn(records);
when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records);
final Throwable exception = mock(RuntimeException.class);
doThrow(exception).when(bufferAccumulator).add(any(Record.class));

Expand All @@ -414,7 +430,7 @@ void testProcessRecordsBufferFlushNoThrowException()
event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId);
Record<Event> record = new Record<>(event);
records.add(record);
when(kinesisRecordConverter.convert(eq(kinesisClientRecords), eq(streamId))).thenReturn(records);
when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records);
final Throwable exception = mock(RuntimeException.class);
doThrow(exception).when(bufferAccumulator).flush();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ source:
- stream_name: "stream-1"
initial_position: "EARLIEST"
checkpoint_interval: "20s"
compression: "gzip"
- stream_name: "stream-2"
initial_position: "EARLIEST"
checkpoint_interval: "PT15M"
compression: "gzip"
- stream_name: "stream-3"
initial_position: "EARLIEST"
checkpoint_interval: "PT2H"
compression: "gzip"
codec:
ndjson:
aws:
Expand Down
Loading