Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add heartbeat mechanism for failure discovery #2

Draft
wants to merge 142 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
142 commits
Select commit Hold shift + click to select a range
e193a2a
fix: UncaughtExceptionHandler not being set for Persistent Queries (#…
stevenpyzhang Dec 9, 2019
d7ce660
docs: intent for klip-16: Introduce K$ Dynamic Views (#4056)
blueedgenick Dec 9, 2019
5553283
chore: issue regular LIST TOPICS from healthcheck, not extended (#4061)
vcrfxia Dec 9, 2019
e0db728
docs: update example output from SHOW TOPICS (KSQL-2981) (#4089)
JimGalasyn Dec 9, 2019
525bb82
Merge branch '5.4.x'
JimGalasyn Dec 9, 2019
4c6caa6
docs: fix docker image names + remove monitoring interceptor docs (#4…
vcrfxia Dec 10, 2019
9a2bdec
chore: update KSQL_PULL_QUERIES_ENABLE_CONFIG name (MINOR) (#4059)
vcrfxia Dec 10, 2019
60e20ef
fix: Explicitly disallow table functions with table sources, fixes #4…
purplefox Dec 10, 2019
494b638
test: have QTT support checking full schema of sources (#4083)
big-andy-coates Dec 10, 2019
dfd44cd
docs: fix munged code blocks in Clickstream tutorial (DOCS-3084) (#4102)
JimGalasyn Dec 10, 2019
fe2ca2d
chore: update the status of approved KLIPs
derekjn Dec 10, 2019
331e304
docs: note CREATE CONNECTOR works only in interactive mode (DOCS-3036…
JimGalasyn Dec 10, 2019
8e36086
Merge branch '5.4.x'
JimGalasyn Dec 10, 2019
d68c636
docs: note CREATE CONNECTOR works only in interactive mode (md docs) …
JimGalasyn Dec 10, 2019
8dbfbb7
chore: primitive keys for simple queries (#4096)
big-andy-coates Dec 10, 2019
4dd76ac
test: serialize/deserialize plans from qtt (#4080)
rodesai Dec 10, 2019
6c80941
fix: properly set key when partition by ROWKEY and join on non-ROWKEY…
agavra Dec 10, 2019
7addf88
chore: partition-by primitive key support (#4098)
big-andy-coates Dec 10, 2019
707c5ba
Add milestone 0.7.0 to all new bugs.
apurvam Dec 10, 2019
e6605e4
Changing the default milestone to the milestone id
apurvam Dec 10, 2019
61fc0bb
reverting the milestone field
apurvam Dec 10, 2019
0a29eac
refactor: nicer spec for aggregation steps (#4063)
rodesai Dec 11, 2019
48a3b7e
docs: fix markdown docs build (DOCS-3091) (#4112)
JimGalasyn Dec 11, 2019
5a4e551
chore: split ARRAYCONTAINS into JSON_ARRAY_CONTAINS and ARRAY_CONTAIN…
agavra Dec 11, 2019
0d679c9
chore: remove RUN SCRIPT from the server implementation (#4116)
agavra Dec 11, 2019
37f8d89
chore: the KsqlJsonDeserializer no longer guarantees ordering (#4115)
agavra Dec 11, 2019
c6c00b1
fix: NPE when starting StandaloneExecutor (#4119)
stevenpyzhang Dec 12, 2019
87b03e3
chore: group-by primitive key support (#4108)
big-andy-coates Dec 12, 2019
584c4a7
chore: reserve KLIP numbers
big-andy-coates Dec 12, 2019
d95f420
chore: reserve KLIP numbers
big-andy-coates Dec 12, 2019
da96259
chore: Update KafkaStreams.close to take a duration parameter (#4110)
bbejeck Dec 12, 2019
54bc2be
chore: reserve klip number and update some links
big-andy-coates Dec 12, 2019
3eecd40
Merge branch 'master' of github.com:confluentinc/ksql
big-andy-coates Dec 12, 2019
858e4dc
feat: support writing plans to command topic (#4106)
rodesai Dec 13, 2019
e3a7279
Set Confluent to 5.3.2, Kafka to 5.3.2.
ConfluentJenkins Dec 13, 2019
b23dae9
feat: implemention of KLIP-13 (#4099)
alex-dukhno Dec 13, 2019
ebac104
feat: show properties now includes embedded connect properties and sc…
alex-dukhno Dec 13, 2019
3187a4c
docs: update push and pull query topics with feedback (DOCS-3092) (#4…
JimGalasyn Dec 13, 2019
a34c04a
docs: fix a typo in queries overview topic (DOCS-3105) (#4133)
JimGalasyn Dec 13, 2019
4a6141a
docs: add functions index to markdown docs (DOCS-3049) (#4135)
JimGalasyn Dec 13, 2019
d5974e1
chore: fix join left right names (#4136)
rodesai Dec 16, 2019
04de30e
fix: untracked file after cloning on Windows (#4122)
albertosantini Dec 16, 2019
1146aa5
feat: add source statement to SourceDescription (#4134)
stevenpyzhang Dec 16, 2019
2d5e680
feat: add COUNT_DISTINCT and allow generics in UDAFs (#4150)
agavra Dec 16, 2019
d595985
Adds support for using primitive types in joins. (#4132)
big-andy-coates Dec 17, 2019
cedf47e
feat: add config to make error messages configurable (#4121)
stevenpyzhang Dec 17, 2019
2f41aac
docs: update codeowners file for docs-md directory (DOCS-3120) (#4161)
JimGalasyn Dec 17, 2019
0ac8747
fix: show topics doesn't display topics with different casing (#4159)
stevenpyzhang Dec 17, 2019
e59a6fe
test: do not mock config store in StandaloneExecutor integration test…
vcrfxia Dec 17, 2019
6d769ad
Merge branch '5.4.x'
vcrfxia Dec 17, 2019
75b539e
fix: decimals in structs should display as numeric (#4165)
agavra Dec 18, 2019
e92d2f3
test: disambiguate some Schema Registry methods (#4166)
rayokota Dec 18, 2019
56ac607
chore: simplify group by schema resolving code (#4154)
big-andy-coates Dec 18, 2019
6c6695c
chore: enforce WITH KEY column type matches ROWKEY type (#4147)
big-andy-coates Dec 19, 2019
6e558da
feat: add support for inline struct creation (#4120)
agavra Dec 19, 2019
91c421a
fix: change query id generation to work with planned commands (#4149)
rodesai Dec 19, 2019
68f4c55
Merge branch '5.3.2-post' into 5.3.x
ConfluentJenkins Dec 19, 2019
cd50546
Merge branch '5.3.x' into 5.4.x
ConfluentJenkins Dec 19, 2019
c32cb64
Merge branch '5.4.x'
ConfluentJenkins Dec 19, 2019
0ac71cf
fix: pull queries should work across nodes (#4169)
big-andy-coates Dec 20, 2019
c239990
docs: add basic docker-compose file and instructions (#4175)
big-andy-coates Dec 20, 2019
cbd3bab
fix: immutability in some more classes (MINOR) (#4179)
agavra Dec 20, 2019
a5a1620
Bump Confluent to 5.3.3-SNAPSHOT, Kafka to 5.3.3-SNAPSHOT
ConfluentJenkins Dec 20, 2019
9b89a21
Merge branch '5.3.x' into 5.4.x
ConfluentJenkins Dec 20, 2019
6530677
Merge branch '5.4.x'
ConfluentJenkins Dec 20, 2019
acb656b
chore: pull query support for primitive keys (#4178)
big-andy-coates Dec 21, 2019
53cbf10
test: qtt generate/validate execution plans (#4176)
rodesai Dec 23, 2019
a50a665
fix: reintroduce FetchFieldFromStruct as a public UDF (#4185)
agavra Dec 23, 2019
7bb81c5
Merge branch '5.4.x'
agavra Dec 23, 2019
240c3dc
chore: use UsePartitionTimeOnInvalidTimestamp instead of UsePreviousT…
stevenpyzhang Dec 29, 2019
8005542
chore: re-enable final param and var checkstyle rule
big-andy-coates Dec 30, 2019
e41d8ee
test: enhance test framework to support primitive keys (#4201)
big-andy-coates Jan 2, 2020
b038f88
docs: unpin mkdocs-macros-plugin version (DOCS-3100) (#4208)
JimGalasyn Jan 2, 2020
9b48f4e
docs: add security note linking to processing log settings (DOCS-3076…
JimGalasyn Jan 2, 2020
5f84e77
Merge branch '5.4.x'
JimGalasyn Jan 2, 2020
869f7ac
docs: add note to ksqlDB processing log settings for security (#4213)
JimGalasyn Jan 3, 2020
cec0efe
docs: add section for ksql.streams.state.dir to md docs (#4214)
JimGalasyn Jan 3, 2020
607b539
docs: add section for ksql.streams.state.dir (DOCS-3153) (#4210)
JimGalasyn Jan 3, 2020
72f9efb
Merge branch '5.4.x'
JimGalasyn Jan 3, 2020
80191ac
chore: drop square brackets from string representation of logical sch…
big-andy-coates Jan 6, 2020
444fef0
chore: fix TestDataProvider's to have correct key type (#4206)
big-andy-coates Jan 6, 2020
eb0fe40
docs: remove basic auth section from server-config (DOCS-3140) (#4209)
JimGalasyn Jan 6, 2020
fc0decf
Merge branch '5.4.x'
JimGalasyn Jan 6, 2020
c62cc96
refactor: Use FunctionName not String in FunctionRegistry API (#4225)
purplefox Jan 6, 2020
0965afa
chore: add back expected topology tests (#4207)
rodesai Jan 6, 2020
e0a7de4
test: add equals testers for plan classes (#4189)
rodesai Jan 6, 2020
34a8795
Disable building docker images (#4230)
elismaga Jan 6, 2020
e17d1e1
chore: fix result schema on table GROUP BY for primitive keys (MINOR)…
big-andy-coates Jan 7, 2020
69dd7ec
Merge branch '5.4.x'
big-andy-coates Jan 7, 2020
da57002
refactor: inject a KsqlSecurityContext to REST requests (#4184)
spena Jan 7, 2020
096b78f
fix: CLI commands may be terminated with semicolon+whitespace (MINOR)…
vcrfxia Jan 7, 2020
665f207
feat: add test topology rewriter (#4204)
rodesai Jan 7, 2020
58ed39c
feat: add a KSQL cache for Kafka authorization validator (#4186)
spena Jan 7, 2020
5ac46f4
test: fix build issue caused by new register method on SR client (#4247)
big-andy-coates Jan 8, 2020
1213d93
test: Fix Schema Registry tests after new SR API changes (#4243)
rayokota Jan 8, 2020
555c573
chore: ensure sensible error on use of unsupported primitive key type…
big-andy-coates Jan 9, 2020
508e5a5
Disable building docker images (#4230)
elismaga Jan 6, 2020
9479fd6
fix: don't load current qtt test case from legacy loader (#4245)
rodesai Jan 9, 2020
de906c3
chore: remove upstream docker registry as not public
big-andy-coates Jan 9, 2020
f991752
perf: Improves pull query performance by making the default schema se…
AlanConfluent Jan 9, 2020
5ee1e9e
feat: enable Kafla ACL authorization checks for Pull Queries (#4187)
spena Jan 10, 2020
1281ab2
fix: better error message on self-join (#4248)
big-andy-coates Jan 10, 2020
5cc718b
fix: include path of field that causes JSON deserialization error (#4…
big-andy-coates Jan 10, 2020
e032ea9
feat: allow environment variables to configure embedded connect (#4260)
agavra Jan 10, 2020
41cbbea
chore: remove unused import (#4273)
albertosantini Jan 10, 2020
0b135aa
Set Confluent to 5.4.0, Kafka to 5.4.0.
ConfluentJenkins Jan 11, 2020
8326151
docs: klip-15 New client and API (#4069)
purplefox Jan 11, 2020
4e32da6
fix: add logging during restore (#4270)
big-andy-coates Jan 13, 2020
7a83bbf
feat: ask for password if -p is not provided (#4153)
spena Jan 13, 2020
307bf4d
fix: report clearer error message when AVG used with DELIMITED (#4295)
big-andy-coates Jan 13, 2020
4dcab06
build: Do not run twist lock scan and other docker image operations s…
elismaga Jan 13, 2020
3bde631
Merge branch '5.4.x'
agavra Jan 13, 2020
81f96dc
chore: migrate to use ParsedSchema for SchemaRegistryClient (#4259)
agavra Jan 13, 2020
cfeafe4
docs: add embedded Connect tutorial (#4212)
derekjn Jan 13, 2020
af8498e
docs: KLIP 12 - Implement High-Availability for Pull queries (#4022)
vpapavas Jan 13, 2020
6b5ce0c
fix: deadlock when closing transient push query (#4297)
big-andy-coates Jan 14, 2020
5911faf
fix: log4j properties files (#4293)
big-andy-coates Jan 14, 2020
50b4c1c
perf: Avoids logging INFO for rest-util requests, since it hurts pull…
AlanConfluent Jan 14, 2020
0b6da0b
fix: fix NPE in CLI if not username supplied (#4312)
big-andy-coates Jan 14, 2020
0bd4997
test: rQTT should fail if expecting more responses than statements (#…
big-andy-coates Jan 14, 2020
fcfe2b9
docs: refactor installation docs around Docker (DOCS-3009) (#4235)
JimGalasyn Jan 14, 2020
ac8fb63
fix: deadlock when closing transient push query (#4297)
big-andy-coates Jan 14, 2020
22aaaa7
Revert "fix: deadlock when closing transient push query (#4297)"
big-andy-coates Jan 14, 2020
c7fb07f
docs: add docs for COUNT_DISTINCT (#4300)
agavra Jan 14, 2020
dbf83f3
Merge branch '5.4.x'
rodesai Jan 14, 2020
0c812c8
Merge branch '5.4.0-post' into 5.4.x
ConfluentJenkins Jan 14, 2020
1f6eabf
Merge branch '5.4.x'
ConfluentJenkins Jan 14, 2020
2d0bfe8
feat: expression support in JOINs (#4278)
agavra Jan 15, 2020
51f8f7c
Bump Confluent to 5.4.1-SNAPSHOT, Kafka to 5.4.1-SNAPSHOT
ConfluentJenkins Jan 15, 2020
ea2d9c5
Merge branch '5.4.x'
ConfluentJenkins Jan 15, 2020
cfe6821
test: push queries work with non-windowed primitive keys (#4310)
big-andy-coates Jan 15, 2020
a0ca688
test: ensure test cases for group by and partition by primitive key (…
big-andy-coates Jan 15, 2020
5554913
Add note re. logs folder for server output (#4318)
rmoff Jan 15, 2020
67ee038
docs: fix broken link on dev guide index page (DOCS-3243) (#4327)
JimGalasyn Jan 16, 2020
7640b43
fix: pin the jetty client version (#4324)
rodesai Jan 16, 2020
934011c
test: added more table related test cases (#4329)
big-andy-coates Jan 16, 2020
0a74151
fix: add ksql-test-runner deps to ksql package lib (#4272)
spena Jan 16, 2020
ca9368a
fix: report window type and query status better from API (#4313)
big-andy-coates Jan 16, 2020
4ff9e6e
chore: change RUNNING to WARNING when all connector tasks fail (#4323)
agavra Jan 16, 2020
3946f73
chore: clean up LogicalSchemaWithMetaAndKeyFields (#4188)
rodesai Jan 16, 2020
5ee7904
initial implementation of heartbeat and cluster stats=us
vpapavas Dec 9, 2019
43dcdbe
fix typo in comment
vpapavas Jan 16, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: nicer spec for aggregation steps (confluentinc#4063)
* refactor: nicer spec for aggregation steps

This patch cleans up the aggregation step to specify a list of
non-aggregate column references instead of a count of non-aggregate
columns.
rodesai authored Dec 11, 2019
commit 0a29eace19227af3c8a6fed400211a3e72ceb23c
Original file line number Diff line number Diff line change
@@ -228,8 +228,12 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {

final QueryContext.Stacker aggregationContext = contextStacker.push(AGGREGATION_OP_NAME);

final List<ColumnRef> requiredColumnRefs = requiredColumns.stream()
.map(e -> (ColumnReferenceExp) internalSchema.resolveToInternal(e))
.map(ColumnReferenceExp::getReference)
.collect(Collectors.toList());
SchemaKTable<?> aggregated = schemaKGroupedStream.aggregate(
requiredColumns.size(),
requiredColumnRefs,
functionsWithInternalIdentifiers,
windowExpression,
valueFormat,
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.parser.tree.WindowExpression;
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatInfo;
@@ -72,7 +73,7 @@ public ExecutionStep<KGroupedStreamHolder> getSourceStep() {

@SuppressWarnings("unchecked")
public SchemaKTable<?> aggregate(
final int nonFuncColumnCount,
final List<ColumnRef> nonAggregateColumns,
final List<FunctionCall> aggregations,
final Optional<WindowExpression> windowExpression,
final ValueFormat valueFormat,
@@ -87,7 +88,7 @@ public SchemaKTable<?> aggregate(
contextStacker,
sourceStep,
Formats.of(keyFormat, valueFormat, SerdeOption.none()),
nonFuncColumnCount,
nonAggregateColumns,
aggregations,
windowExpression.get().getKsqlWindowExpression()
);
@@ -97,7 +98,7 @@ public SchemaKTable<?> aggregate(
contextStacker,
sourceStep,
Formats.of(keyFormat, valueFormat, SerdeOption.none()),
nonFuncColumnCount,
nonAggregateColumns,
aggregations
);
}
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.name.FunctionName;
import io.confluent.ksql.parser.tree.WindowExpression;
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.serde.SerdeOption;
@@ -71,7 +72,7 @@ public ExecutionStep<KGroupedTableHolder> getSourceTableStep() {
@SuppressWarnings("unchecked")
@Override
public SchemaKTable<Struct> aggregate(
final int nonFuncColumnCount,
final List<ColumnRef> nonAggregateColumns,
final List<FunctionCall> aggregations,
final Optional<WindowExpression> windowExpression,
final ValueFormat valueFormat,
@@ -98,7 +99,7 @@ public SchemaKTable<Struct> aggregate(
contextStacker,
sourceTableStep,
Formats.of(keyFormat, valueFormat, SerdeOption.none()),
nonFuncColumnCount,
nonAggregateColumns,
aggregations
);

Original file line number Diff line number Diff line change
@@ -42,7 +42,7 @@ public class KudafUndoAggregatorTest {
public void init() {
final List<TableAggregationFunction<?, ?, ?>> functions =
ImmutableList.of((TableAggregationFunction)SUM_INFO);
aggregator = new KudafUndoAggregator(2, functions);
aggregator = new KudafUndoAggregator(ImmutableList.of(0, 1), functions);
}

@Test
Original file line number Diff line number Diff line change
@@ -46,6 +46,7 @@
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.util.KsqlConfig;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
@@ -72,6 +73,9 @@ public class SchemaKGroupedStreamTest {
private static final KsqlWindowExpression KSQL_WINDOW_EXP = new SessionWindowExpression(
100, TimeUnit.SECONDS
);
private static final List<ColumnRef> NON_AGGREGATE_COLUMNS = ImmutableList.of(
ColumnRef.withoutSource(ColumnName.of("IN0"))
);

@Mock
private KeyField keyField;
@@ -115,7 +119,7 @@ public void setUp() {
public void shouldReturnKTableWithOutputSchema() {
// When:
final SchemaKTable result = schemaGroupedStream.aggregate(
1,
NON_AGGREGATE_COLUMNS,
ImmutableList.of(AGG),
Optional.empty(),
valueFormat,
@@ -130,7 +134,7 @@ public void shouldReturnKTableWithOutputSchema() {
public void shouldBuildStepForAggregate() {
// When:
final SchemaKTable result = schemaGroupedStream.aggregate(
1,
NON_AGGREGATE_COLUMNS,
ImmutableList.of(AGG),
Optional.empty(),
valueFormat,
@@ -145,7 +149,7 @@ public void shouldBuildStepForAggregate() {
queryContext,
schemaGroupedStream.getSourceStep(),
Formats.of(keyFormat, valueFormat, SerdeOption.none()),
1,
NON_AGGREGATE_COLUMNS,
ImmutableList.of(AGG)
)
)
@@ -156,7 +160,7 @@ public void shouldBuildStepForAggregate() {
public void shouldBuildStepForWindowedAggregate() {
// When:
final SchemaKTable result = schemaGroupedStream.aggregate(
1,
NON_AGGREGATE_COLUMNS,
ImmutableList.of(AGG),
Optional.of(windowExp),
valueFormat,
@@ -175,7 +179,7 @@ public void shouldBuildStepForWindowedAggregate() {
queryContext,
schemaGroupedStream.getSourceStep(),
Formats.of(expected, valueFormat, SerdeOption.none()),
1,
NON_AGGREGATE_COLUMNS,
ImmutableList.of(AGG),
KSQL_WINDOW_EXP
)
Original file line number Diff line number Diff line change
@@ -44,6 +44,7 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.junit.Rule;
import org.junit.Test;
@@ -63,6 +64,9 @@ public class SchemaKGroupedTableTest {
.valueColumn(ColumnName.of("KSQL_AGG_VARIABLE_0"), SqlTypes.INTEGER)
.valueColumn(ColumnName.of("KSQL_AGG_VARIABLE_1"), SqlTypes.BIGINT)
.build();
private static final List<ColumnRef> NON_AGG_COLUMNS = ImmutableList.of(
ColumnRef.withoutSource(ColumnName.of("IN0"))
);
private static final FunctionCall MIN = udaf("MIN");
private static final FunctionCall MAX = udaf("MAX");
private static final FunctionCall SUM = udaf("SUM");
@@ -91,7 +95,7 @@ public void shouldFailWindowedTableAggregation() {

// When:
groupedTable.aggregate(
1,
NON_AGG_COLUMNS,
ImmutableList.of(SUM, COUNT),
Optional.of(windowExp),
valueFormat,
@@ -111,7 +115,7 @@ public void shouldFailUnsupportedAggregateFunction() {

// When:
kGroupedTable.aggregate(
1,
NON_AGG_COLUMNS,
ImmutableList.of(MIN, MAX),
Optional.empty(),
valueFormat,
@@ -136,7 +140,7 @@ public void shouldBuildStepForAggregate() {
final SchemaKGroupedTable kGroupedTable = buildSchemaKGroupedTable();

final SchemaKTable result = kGroupedTable.aggregate(
1,
NON_AGG_COLUMNS,
ImmutableList.of(SUM, COUNT),
Optional.empty(),
valueFormat,
@@ -151,7 +155,7 @@ public void shouldBuildStepForAggregate() {
queryContext,
kGroupedTable.getSourceTableStep(),
Formats.of(keyFormat, valueFormat, SerdeOption.none()),
1,
NON_AGG_COLUMNS,
ImmutableList.of(SUM, COUNT)
)
)
@@ -165,7 +169,7 @@ public void shouldReturnKTableWithOutputSchema() {

// When:
final SchemaKTable result = groupedTable.aggregate(
1,
NON_AGG_COLUMNS,
ImmutableList.of(SUM, COUNT),
Optional.empty(),
valueFormat,
Original file line number Diff line number Diff line change
@@ -30,14 +30,17 @@

public class KudafAggregator<K> implements UdafAggregator<K> {

private final int initialUdafIndex;
private final List<Integer> nonAggColumnIndexes;
private final List<KsqlAggregateFunction<?, ?, ?>> aggregateFunctions;
private final int columnCount;

public KudafAggregator(int initialUdafIndex, List<KsqlAggregateFunction<?, ?, ?>> functions) {
this.initialUdafIndex = initialUdafIndex;
public KudafAggregator(
List<Integer> nonAggColumnIndexes,
List<KsqlAggregateFunction<?, ?, ?>> functions) {
this.nonAggColumnIndexes =
ImmutableList.copyOf(requireNonNull(nonAggColumnIndexes, "nonAggColumnIndexes"));
this.aggregateFunctions = ImmutableList.copyOf(requireNonNull(functions, "functions"));
this.columnCount = initialUdafIndex + aggregateFunctions.size();
this.columnCount = nonAggColumnIndexes.size() + aggregateFunctions.size();

if (aggregateFunctions.isEmpty()) {
throw new IllegalArgumentException("Aggregator needs aggregate functions");
@@ -47,8 +50,10 @@ public KudafAggregator(int initialUdafIndex, List<KsqlAggregateFunction<?, ?, ?>
@Override
public GenericRow apply(K k, GenericRow rowValue, GenericRow aggRowValue) {
// copy over group-by and aggregate parameter columns into the output row
int initialUdafIndex = nonAggColumnIndexes.size();
for (int idx = 0; idx < initialUdafIndex; idx++) {
aggRowValue.getColumns().set(idx, rowValue.getColumns().get(idx));
int idxInRow = nonAggColumnIndexes.get(idx);
aggRowValue.getColumns().set(idx, rowValue.getColumns().get(idxInRow));
}

// compute the aggregation and write it into the output row. Its assumed that
@@ -75,11 +80,13 @@ public Merger<Struct, GenericRow> getMerger() {
return (key, aggRowOne, aggRowTwo) -> {
List<Object> columns = new ArrayList<>(columnCount);

int initialUdafIndex = nonAggColumnIndexes.size();
for (int idx = 0; idx < initialUdafIndex; idx++) {
if (aggRowOne.getColumns().get(idx) == null) {
columns.add(idx, aggRowTwo.getColumns().get(idx));
int idxInRow = nonAggColumnIndexes.get(idx);
if (aggRowOne.getColumns().get(idxInRow) == null) {
columns.add(idx, aggRowTwo.getColumns().get(idxInRow));
} else {
columns.add(idx, aggRowOne.getColumns().get(idx));
columns.add(idx, aggRowOne.getColumns().get(idxInRow));
}
}

@@ -99,7 +106,7 @@ public Merger<Struct, GenericRow> getMerger() {
private KsqlAggregateFunction<Object, Object, Object> aggregateFunctionForColumn(
final int columnIndex
) {
return (KsqlAggregateFunction) aggregateFunctions.get(columnIndex - initialUdafIndex);
return (KsqlAggregateFunction) aggregateFunctions.get(columnIndex - nonAggColumnIndexes.size());
}

private final class ResultTransformer implements KsqlTransformer<K, GenericRow> {
@@ -116,11 +123,11 @@ public GenericRow transform(

final List<Object> columns = new ArrayList<>(columnCount);

for (int idx = 0; idx < initialUdafIndex; idx++) {
columns.add(idx, value.getColumns().get(idx));
for (int idx = 0; idx < nonAggColumnIndexes.size(); idx++) {
columns.add(idx, value.getColumns().get(nonAggColumnIndexes.get(idx)));
}

for (int idx = initialUdafIndex; idx < columnCount; idx++) {
for (int idx = nonAggColumnIndexes.size(); idx < columnCount; idx++) {
final KsqlAggregateFunction<Object, Object, Object> function =
aggregateFunctionForColumn(idx);

Original file line number Diff line number Diff line change
@@ -25,23 +25,25 @@

public class KudafUndoAggregator implements Aggregator<Struct, GenericRow, GenericRow> {

private final int initialUdafIndex;
private final List<Integer> nonAggColumnIndexes;
private final List<TableAggregationFunction<?, ?, ?>> aggregateFunctions;

public KudafUndoAggregator(
int initialUdafIndex, List<TableAggregationFunction<?, ?, ?>> aggregateFunctions
List<Integer> nonAggColumnIndexes,
List<TableAggregationFunction<?, ?, ?>> aggregateFunctions
) {
Objects.requireNonNull(aggregateFunctions, "aggregateFunctions");
this.aggregateFunctions = ImmutableList.copyOf(aggregateFunctions);
this.initialUdafIndex = initialUdafIndex;
this.nonAggColumnIndexes = ImmutableList.copyOf(nonAggColumnIndexes);
}

@SuppressWarnings("unchecked")
@Override
public GenericRow apply(Struct k, GenericRow rowValue, GenericRow aggRowValue) {
int idx = 0;
for (; idx < initialUdafIndex; idx++) {
aggRowValue.getColumns().set(idx, rowValue.getColumns().get(idx));
for (; idx < nonAggColumnIndexes.size(); idx++) {
final int idxInRow = nonAggColumnIndexes.get(idx);
aggRowValue.getColumns().set(idx, rowValue.getColumns().get(idxInRow));
}

for (TableAggregationFunction function : aggregateFunctions) {
@@ -54,8 +56,8 @@ public GenericRow apply(Struct k, GenericRow rowValue, GenericRow aggRowValue) {
return aggRowValue;
}

public int getInitialUdafIndex() {
return initialUdafIndex;
public List<Integer> getNonAggColumnIndexes() {
return nonAggColumnIndexes;
}

public List<TableAggregationFunction<?, ?, ?>> getAggregateFunctions() {
Loading