-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Adds a new RoutingFilter, FreshnessFilter that looks at offset lags #4
base: confluent-ha-routing
Are you sure you want to change the base?
feat: Adds a new RoutingFilter, FreshnessFilter that looks at offset lags #4
Conversation
* chore: drop ColumnRef this patch drops ColumnRef in favor of just using ColumnName. ColumnRef became pointless once we dropped qualifiers from the schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made some cosmetics comments.. But overall looks good!
Very clever use of a factory to contain all the random options. :)
ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Outdated
Show resolved
Hide resolved
ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/FreshnessFilter.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
Outdated
Show resolved
Hide resolved
ksql-streams/src/main/java/io/confluent/ksql/execution/streams/RoutingFilter.java
Outdated
Show resolved
Hide resolved
/** | ||
* These are options used for locating the host to retrieve data from. | ||
*/ | ||
public interface RoutingOptions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we consolidate other routing options like ; should route to standbys into this itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This object is useful for query time options more than server options. Maybe I can change the name?
Which other flags do you mean? It would required adding this to many call-site parameters if I were to do that for something like KSQL_QUERY_PULL_ENABLE_STANDBY_READS. (That has more to do with state stores than routing, per se).
...-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java
Outdated
Show resolved
Hide resolved
@@ -127,7 +129,8 @@ KsLocator create( | |||
String stateStoreName, | |||
KafkaStreams kafkaStreams, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
too bad, we can't get the applicationId from this kafkaStreams instance. do you think we should push for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be nice. It makes sense for a KafkaStreams instance to have. Does KafkaStreams currently have any notion of an id? Many setups probably don't have more than one or two applications at a time.
public static Optional<FreshnessFilter> create( | ||
final Optional<LagReportingAgent> lagReportingAgent, | ||
final RoutingOptions routingOptions, | ||
final List<KsqlHost> hosts, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC.. we have a stream already in KsLocator and we make it a list and pass it in? we seem to be recreating a stream out of it anyway.. So change hosts to Stream<KsqlHost>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can only process a stream once, so I have to stick with a list here. I removed the stream in KsLocator anyway and just produce a single list.
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/FreshnessFilter.java
Outdated
Show resolved
Hide resolved
@AlanConfluent oh and just making sure, we will be adding some |
…confluentinc#4450) KSQL currently lets you take a non-windowed stream and perform a windowed group by: ```sql CREATE TABLE T as SELECT stuff FROM S WINDOW TUMBLING (SIZE 1 SECOND) group by something; ``` Which is essentially grouping by not just `something`, but also implicitly by the window bounds. This might be more correctly written with a Tumbling table function: ```sql CREATE TABLE T as SELECT stuff FROM Tumbling(S, SIZE 1 SECOND) group by something, windowstart, windowend; ``` Where the Tumbling table function returns one row for each row in `S`, with the addition of the `windowstart` and `windowend` columns. (Note: Hopping and session table functions are also possible, though in the case of the latter the table function would also emit retractions). In a correct SQL model `windowstart` and `windowend` would therefore be available as fields within the selection, e.g. ``` CREATE TABLE T as SELECT windowstart, windowend, something, count() FROM Tumbling(S, SIZE 1 SECOND) group by something, windowstart, windowend; ``` This change makes such awesomeness possible.
Yes, that's right. After filtering, ranking would occur. Though, as we mentioned, with active always first, regardless. |
* feat: remove WindowStart() and WindowEnd() UDAFs These two UDAFs were introduced to allow access to the start and end times of the window in a windowed source. `WINDOWSTART` and `WINDOWEND` are now accessible as columns to be used in the SELECT of a query, (outside of UDAFs). This makes the two UDAFs redundant. BREAKING CHANGE: The `WindowStart()` and `WindowEnd()` UDAFs have been removed from KSQL. Use the `WindowStart` and `WindowEnd` system columns to access the window bounds within the SELECT expression instead.
* test: add test to prove confluentinc#596 is fixed fixes: confluentinc#596 Functionality to join on an expression was added previously.
* feat: primitive key support ksqlDB now supports the following primitive key types: `INT`, `BIGINT`, `DOUBLE` as well as the existing `STRING` type. The key type can be defined in the CREATE TABLE or CREATE STREAM statement by including a column definition for `ROWKEY` in the form `ROWKEY <primitive-key-type> KEY,`, for example: ```sql CREATE TABLE USERS (ROWKEY BIGINT KEY, NAME STRING, RATING DOUBLE) WITH (kafka_topic='users', VALUE_FORMAT='json'); ``` ksqlDB currently requires the name of the key column to be `ROWKEY`. Support for arbitrary key names is tracked by confluentinc#3536. ksqlDB currently requires keys to use the `KAFKA` format. Support for additional formats is tracked by https://github.com/confluentinc/ksql/projects/3. Schema inference currently only works with `STRING` keys, Support for additional key types is tracked by confluentinc#4462. (Schema inference is where ksqlDB infers the schema of a CREATE TABLE and CREATE STREAM statements from the schema registered in the Schema Registry, as opposed to the user supplying the set of columns in the statement). Apache Kafka Connect can be configured to output keys in the `KAFKA` format by using a Converter, e.g. `"key.converter": "org.apache.kafka.connect.converters.IntegerConverter"`. Details of which converter to use for which key type can be found here: https://docs.confluent.io/current/ksql/docs/developer-guide/serialization.html#kafka in the `Connect Converter` column. @rmoff has written an introductory blog about primitive keys: https://rmoff.net/2020/02/07/primitive-keys-in-ksqldb/ BREAKING CHANGE: existing queries that perform a PARTITION BY or GROUP BY on a single column of one of the above supported primitive key types will now set the key to the appropriate type, not a `STRING` as previously.
Description
What behavior do you want to change, why, how does your patch achieve the changes?
Testing done
Describe the testing strategy. Unit and integration tests are expected for any behavior changes.
Reviewer checklist