Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement pull query routing to standbys if active is down #3

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

vpapavas
Copy link
Owner

@vpapavas vpapavas commented Jan 23, 2020

Description

Previously, pull queries were handled only by the active node of a partition. If the active goes down or rebalance happens, pull queries experience a period of unavailability. This PR allows standbys to handle pull queries until a new active is elected.

For this, it uses the liveness information provided by the heartbeat mechanism. If a node determines that the active is dead, it will forward the query to a standby that is alive. A query fails only if both the active and all standbys are dead.

Additionally, there is no busy loop with timeout when routing queries. Instead queries fail fast. This allows the client to implement the logic of retries.

Testing done

Functional test that ensures queries are routed to standby or active based on whether active is dead or alive.

In addition to this, manual tests will be performed both locally and on EC2 cluster as outlined here confluentinc#4360

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

Copy link

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a high level pass.. I feel we can improve the interfaces/abstractions more

final long interval,
final long window) {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < window) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can all these methods that wait till a condition is met use a common helper like waitTill(() -> boolean, timeout), which keeps running a function that returns a boolean, either until it returns true or the timeout expires?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They all have different signatures and it is just 3 methods

standByNodesMap.put(hostInfo, asNode(hostInfo));
}

LOG.debug("Active host {} , standby hosts {}", activeHost, standByHosts);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by marking these statuses.. this interface/class is doing more than "locating", right?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored so now it is locating and filtering. Whatever is returned from the locator is assumed to be a valid node (alive and less than max allowed lag)

this.lastStatusUpdateMs = lastStatusUpdateMs;
}

public void setHostAlive(final boolean hostAlive) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other threads (like during pull requests) are calling isHostAlive on this object after they call getHostsStatus, while your background thread is calling this method to update the value.

This technically isn't threadsafe, though it might work reasonably in practice. You should either add synchronization (volatile, synchronized, etc) or make this object immutable. I actually did the latter in my PR since I was reading the objects as well. If you're fine with that, I can make that change and you can ignore this for now.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what the problem is. There is one thread that updates the value and the value is a boolean. There is no conditional updating (like checking the value and then update). What is the data corruption that can occur? The map is a ConcurrentMap, new items are added only by one thread. Items are never removed. Only one thread updates values. There are multiple readers. But what corrupted value can the readers read? I am only updating the boolean (and the timestamp) of the object.
I suppose it's fine to have immutable objects since there won't be many in the collection. I am just trying to understand whether it is necessary or if it is an overkill

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked in confluentinc#4394

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's one writer and multiple readers in different threads of the boolean. It's not that there will be some corrupted value in this scenario. It's a binary bit that presumably is just false or true. In java (and other languages), each thread has a cache of memory where local changes are made (just like cpu architectures), and they're only written back to main memory where other threads might read them when certain rules are followed (the java memory model). In short, if you don't use volatile or synchronized, you're not guaranteed to ever read the written value across threads.

https://www.cs.umd.edu/~pugh/java/memoryModel/jsr-133-faq.html

I think you could fix this simply by making these fields volatile. The immutability change would also work and just means that the ConcurrentMap is what does the job of synchronization.

@Override
public boolean filter(final HostInfo hostInfo, final String storeName, final int partition) {
if (heartbeatAgent.isPresent()) {
final Map<HostInfo, HostStatus> hostStatus = heartbeatAgent.get().getHostsStatus();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're making a copy of the map in getHostsStatus at every filtering that's happening. This is a small nit, but you might consider having the underlying map be an ImmutableMap so you can just return it since this will be theoretically happening at a much higher rate than updates.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the map is not a ConcurrentMap anymore, but rather an ImmutableMap, I will have to synchronize every access on it or make it volatile and copy on every write, correct? If this is indeed better, this seems like a larger change better suitable for a different PR? What do you think?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked in issue: confluentinc#4393

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All you have to do is use an AtomicReference and set it to an ImmutableMap, which is threadsafe. Every time you get updates on all of the statuses, you build a new ImmutableMap and set the atomic reference to it.

You don't have to make the change here since this is currently threadsafe. Just an optimization we can do.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants