-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implement pull query routing to standbys if active is down #3
base: master
Are you sure you want to change the base?
Conversation
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a high level pass.. I feel we can improve the interfaces/abstractions more
ksql-execution/src/main/java/io/confluent/ksql/services/SimpleKsqlClient.java
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java
Outdated
Show resolved
Hide resolved
final long interval, | ||
final long window) { | ||
long start = System.currentTimeMillis(); | ||
while (System.currentTimeMillis() - start < window) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can all these methods that wait till a condition is met use a common helper like waitTill(() -> boolean, timeout)
, which keeps running a function that returns a boolean, either until it returns true or the timeout expires?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They all have different signatures and it is just 3 methods
.../main/java/io/confluent/ksql/execution/streams/materialization/ks/ActiveAndStandByNodes.java
Show resolved
Hide resolved
.../main/java/io/confluent/ksql/execution/streams/materialization/ks/ActiveAndStandByNodes.java
Show resolved
Hide resolved
standByNodesMap.put(hostInfo, asNode(hostInfo)); | ||
} | ||
|
||
LOG.debug("Active host {} , standby hosts {}", activeHost, standByHosts); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by marking these statuses.. this interface/class is doing more than "locating", right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored so now it is locating and filtering. Whatever is returned from the locator is assumed to be a valid node (alive and less than max allowed lag)
...-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java
Outdated
Show resolved
Hide resolved
ksql-execution/src/main/java/io/confluent/ksql/services/SimpleKsqlClient.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
Show resolved
Hide resolved
.../main/java/io/confluent/ksql/execution/streams/materialization/ks/ActiveAndStandByNodes.java
Show resolved
Hide resolved
...-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java
Outdated
Show resolved
Hide resolved
...-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ActiveStandbyResource.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ActiveStandbyResource.java
Outdated
Show resolved
Hide resolved
ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/ActiveStandbyResponse.java
Show resolved
Hide resolved
undo changes to log files fixing tests fixed tests addressed vinoth's comments
ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Outdated
Show resolved
Hide resolved
this.lastStatusUpdateMs = lastStatusUpdateMs; | ||
} | ||
|
||
public void setHostAlive(final boolean hostAlive) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other threads (like during pull requests) are calling isHostAlive
on this object after they call getHostsStatus
, while your background thread is calling this method to update the value.
This technically isn't threadsafe, though it might work reasonably in practice. You should either add synchronization (volatile, synchronized, etc) or make this object immutable. I actually did the latter in my PR since I was reading the objects as well. If you're fine with that, I can make that change and you can ignore this for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand what the problem is. There is one thread that updates the value and the value is a boolean. There is no conditional updating (like checking the value and then update). What is the data corruption that can occur? The map is a ConcurrentMap
, new items are added only by one thread. Items are never removed. Only one thread updates values. There are multiple readers. But what corrupted value can the readers read? I am only updating the boolean (and the timestamp) of the object.
I suppose it's fine to have immutable objects since there won't be many in the collection. I am just trying to understand whether it is necessary or if it is an overkill
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tracked in confluentinc#4394
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's one writer and multiple readers in different threads of the boolean. It's not that there will be some corrupted value in this scenario. It's a binary bit that presumably is just false or true. In java (and other languages), each thread has a cache of memory where local changes are made (just like cpu architectures), and they're only written back to main memory where other threads might read them when certain rules are followed (the java memory model). In short, if you don't use volatile or synchronized, you're not guaranteed to ever read the written value across threads.
https://www.cs.umd.edu/~pugh/java/memoryModel/jsr-133-faq.html
I think you could fix this simply by making these fields volatile. The immutability change would also work and just means that the ConcurrentMap
is what does the job of synchronization.
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
Show resolved
Hide resolved
@Override | ||
public boolean filter(final HostInfo hostInfo, final String storeName, final int partition) { | ||
if (heartbeatAgent.isPresent()) { | ||
final Map<HostInfo, HostStatus> hostStatus = heartbeatAgent.get().getHostsStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're making a copy of the map in getHostsStatus
at every filtering that's happening. This is a small nit, but you might consider having the underlying map be an ImmutableMap so you can just return it since this will be theoretically happening at a much higher rate than updates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the map is not a ConcurrentMap
anymore, but rather an ImmutableMap
, I will have to synchronize every access on it or make it volatile and copy on every write, correct? If this is indeed better, this seems like a larger change better suitable for a different PR? What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tracked in issue: confluentinc#4393
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All you have to do is use an AtomicReference and set it to an ImmutableMap, which is threadsafe. Every time you get updates on all of the statuses, you build a new ImmutableMap and set the atomic reference to it.
You don't have to make the change here since this is currently threadsafe. Just an optimization we can do.
Description
Previously, pull queries were handled only by the active node of a partition. If the active goes down or rebalance happens, pull queries experience a period of unavailability. This PR allows standbys to handle pull queries until a new active is elected.
For this, it uses the liveness information provided by the heartbeat mechanism. If a node determines that the active is dead, it will forward the query to a standby that is alive. A query fails only if both the active and all standbys are dead.
Additionally, there is no busy loop with timeout when routing queries. Instead queries fail fast. This allows the client to implement the logic of retries.
Testing done
Functional test that ensures queries are routed to standby or active based on whether active is dead or alive.
In addition to this, manual tests will be performed both locally and on EC2 cluster as outlined here confluentinc#4360
Reviewer checklist