Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

__vertx.subs has stale entries for downed nodes after ungraceful shutdowns #141

Open
dspangen opened this issue Jul 30, 2024 · 11 comments
Open
Assignees
Labels

Comments

@dspangen
Copy link

This is a continuation of #94 as it's hitting us in production. Looking into it, it seems the culprit is in handling a node leave event--the subscriptions are cleared by the cluster member that gets to delete the node info from the cache. But that can take a little while during which the member that is handling the removal (essentially the leader for this event) could crash or be shutdown, leaving the subs map in a bad state.

I think the solution here is to add a status field to IgniteNodeInfo of {STARTED, STOPPING} and mark the entry for a node as stopping. Then, whoever gets to update that entry gets the first chance to clear the subs. There would also be a background executor service that would poll for stopping members, maybe synchronized by a sempaphore id'd to the removed member.

Does this seem like a reasonable solution to the problem?

Version

4.5.7

@dspangen dspangen added the bug label Jul 30, 2024
@zyclonite
Copy link
Member

Is you node leaving because it crashed or is it a graceful shutdown?

Not sure if i could follow your idea fully.

I fear an update on the NodeInfo might as well get lost when there is a node leaving and not running a graceful shutdown.
Or was the idea to just update the local state on all cluster members and then double check against the cluster map if the node really left?
Can you come up with a test that covers this behaviour?

As a backup solution any join or leave event could trigger getting the member list to make sure they are still in sync.

In general i would like to make sure that the vertx side does not compensate a missing feature or bug on the ignite side.

@zyclonite zyclonite self-assigned this Jul 31, 2024
@dspangen
Copy link
Author

For details--the service is hosted in Amazon ECS and ECS can terminate a container quite un-gracefully (and multiple containers at once at times since for us we run 9 cluster members).

The solution is to ensure that the __vertx.subs gets properly cleaned even if the node that is doing the cleaning crashes--the cleaning of __vertx.subs is triggered by successfully deleting from __vertx.nodeInfo, and since the two cache operations are not transactional (and IgniteCache::removeAll is explicitly not transactional), Ignite cannot provide any guarantees that changes to __vertx.subs will happen at some future point given it's only invoked once.

Instead of the current "first deleter wins", __vertx.nodeInfo would also store the state of "removing" nodes to make sure the cluster manager is always able to clean the subs map of all subscriptions for removed nodes. I don't have the exact details worked out to handle that cleaning, but presumably it is that a node takes a lock for some period of time and executes IgniteCache::removeAll, giving up / refreshing the lock periodically until all entries are removed at which point the entry in __vertx.nodeInfo could be removed as well.

I can attempt to write a test but it can be triggered in a kubernetes / other container management situation where 2+ nodes are killed at ~ the same time.

@zyclonite
Copy link
Member

Might not be the full extend of what you thought of but have a look at #142 and let me know if this would go into the right direction.

As the cache is configured as atomic there is even a retry behind the scenes to the atomic remove methods, so this could be promising.

@zyclonite
Copy link
Member

i extended the test in ClusteredEventbosTest to not only start 3 nodes and kill one to

  @Test
  public void testSubsRemovedForKilledNode() throws Exception {
    int start = 20;
    int stop = 18;
    int messages = 30;
    testSubsRemoved(start, stop, messages, latch -> {
      for(int i = 1; i<=stop; i++) {
        VertxInternal vi = (VertxInternal) vertices[i];
        Promise<Void> promise = vi.getOrCreateContext().promise();
        vi.getClusterManager().leave(promise);
        promise.future().onComplete(onSuccess(v -> {
          latch.countDown();
        }));
      }
    });
  }

  private void testSubsRemoved(int start, int stop, int messages, Consumer<CountDownLatch> action) throws Exception {
    if((start - stop) < 2) {
      fail("start count needs to be at least stop count +2");
    }
    startNodes(start);
    CountDownLatch regLatch = new CountDownLatch(stop);
    AtomicInteger cnt = new AtomicInteger();
    vertices[0].eventBus().consumer(ADDRESS1, msg -> {
      int c = cnt.getAndIncrement();
      assertEquals(msg.body(), "foo" + c);
      if (c == messages - 1) {
        testComplete();
      }
      if (c >= messages) {
        fail("too many messages");
      }
    }).completionHandler(onSuccess(v -> {
      for(int i = 1; i<=stop; i++) {
        vertices[i].eventBus().consumer(ADDRESS1, msg -> {
          fail("shouldn't get message");
        }).completionHandler(onSuccess(v2 -> {
          regLatch.countDown();
        }));
      }
    }));
    awaitLatch(regLatch);

    CountDownLatch closeLatch = new CountDownLatch(1);
    action.accept(closeLatch);
    awaitLatch(closeLatch);

    // Allow time for kill to be propagate
    Thread.sleep(2000);

    vertices[start - 1].runOnContext(v -> {
      // Now send some messages from node 2 - they should ALL go to node 0
      EventBus ebSender = vertices[start - 1].eventBus();
      for (int i = 0; i < messages; i++) {
        ebSender.send(ADDRESS1, "foo" + i);
      }
    });

    await();
  }

i get some ignite exceptions but it might be too graceful shutdowns still...

@zyclonite
Copy link
Member

@dspangen what do you think?

@dspangen
Copy link
Author

I think this is a useful addition, but not sure it will be everything that is needed.

For context we've been using the following in prod for the last few weeks and are still getting some instability. (We use vert.x in Quarkus fwiw). After a restart we may get a few straggler addresses that contain node info for departed nodes, triggering a Connecting to server <cluster uuid> failed. It is much better with this bean running on each node though.

@ApplicationScoped
public class IgniteVertxSubscriptionsPeriodicCleaner {
    private static final Logger LOGGER = Logger.getLogger(IgniteVertxSubscriptionsPeriodicCleaner.class);

    @ConfigProperty(name = "co.explo.vertx.ignite.periodic-subscriptions-cleaner-initial-delay")
    Duration cleanerInitialDelay;

    @ConfigProperty(name = "co.explo.vertx.ignite.periodic-subscriptions-cleaner-period")
    Duration cleanerPeriod;

    @ConfigProperty(name = "co.explo.vertx.ignite.periodic-subscriptions-cleaner-settle-period")
    Duration settlePeriod;

    final Random entropy = new Random();

    @Inject
    IgniteFactory igniteFactory;

    Ignite ignite;

    @Inject
    Vertx vertx;

    IgniteCache<String, IgniteNodeInfo> nodeInfo;
    IgniteCache<String, IgniteNodeRemovalStatus> nodeRemovalStatus;
    IgniteCache<IgniteRegistrationInfo, Boolean> subsMap;

    String currentMemberNodeId;

    @Priority(999)
    void startPeriodicCleaner(@Observes final StartupEvent e) {
        if (igniteFactory.isIgniteConfigured()) {
            ignite = igniteFactory.getIgniteInstance();
            currentMemberNodeId = ignite.cluster().localNode().id().toString();

            nodeInfo = ignite.getOrCreateCache("__vertx.nodeInfo");
            nodeRemovalStatus = ignite.getOrCreateCache("__vertx.nodeRemovalStatus");
            subsMap = ignite.getOrCreateCache("__vertx.subs");

            nodeInfo.query(new ContinuousQuery<String, IgniteNodeInfo>()
                    .setAutoUnsubscribe(true)
                    .setTimeInterval(100L)
                    .setPageSize(128)
                    .setLocalListener(this::listenOnNodeInfoChanges));

            vertx.setPeriodic(
                    entropy.nextLong(1000, cleanerPeriod.toMillis()),
                    cleanerPeriod.toMillis(),
                    ignored -> vertx.executeBlockingAndForget(() -> {
                        blockingFindUnremovedNode();
                        return null;
                    }));
        }
    }

    void blockingFindUnremovedNode() {
        final Instant minimumSettledStartedAt = Instant.now().minus(settlePeriod);
        final Set<String> currentClusterNodes =
                ignite.cluster().nodes().stream().map(n -> n.id().toString()).collect(Collectors.toSet());
        LOGGER.debugf("Attempting to find unremoved nodes");
        nodeRemovalStatus
                .query(new ScanQuery<String, IgniteNodeRemovalStatus>(
                        // Find entries where the processing node is no longer part of the cluster
                        // (i.e., crashed in the middle) and has settled (to ensure the node that started processing
                        // the removal is actually part of the cluster state cluster-wide)
                        (k, v) -> v.getCurrentStartedAt().isBefore(minimumSettledStartedAt)
                                && !currentClusterNodes.contains(v.getCurrentNodeId())))
                .getAll()
                .stream()
                .map(entry -> {
                    LOGGER.infof(
                            "Node removal for node %s by node %s orphaned; attempting to steal task",
                            entry.getKey(), entry.getValue().getCurrentNodeId());
                    // So we have an entry where the processing node is gone and this task is orphaned. Let's replace
                    // the entry and attempt to mark us as the new owner. If we succeed at marking ourselves as the
                    // owner we can proceed immediately to continuing to clean the subs map.
                    if (nodeRemovalStatus.replace(
                            entry.getKey(),
                            entry.getValue(),
                            new IgniteNodeRemovalStatus(Instant.now(), currentMemberNodeId))) {
                        blockingDeleteForNode(entry.getKey());
                        return true;
                    }
                    return false;
                })
                .findFirst() // only process a single un-removed node per loop
                .ifPresent(ignored -> {
                    // ignored
                });
    }

    void blockingDeleteForNode(final String removedNodeId) {
        LOGGER.infof("Node removal starting for node %s", removedNodeId);
        final TreeSet<IgniteRegistrationInfo> toRemove = subsMap
                .query(new ScanQuery<IgniteRegistrationInfo, Boolean>(
                        (k, v) -> k.registrationInfo().nodeId().equals(removedNodeId)))
                .getAll()
                .stream()
                .map(Cache.Entry::getKey)
                .collect(Collectors.toCollection(TreeSet::new));

        // Cannot use streams here as that would remove the ordering from the TreeSet
        for (final IgniteRegistrationInfo info : toRemove) {
            try {
                subsMap.remove(info, Boolean.TRUE);
            } catch (IllegalStateException | CacheException e) {
                LOGGER.warnf("Could not remove subscriber (for nodeId=%s): %s", removedNodeId, e.getMessage());
            }
        }

        nodeRemovalStatus.remove(removedNodeId);
        LOGGER.infof("Node removal complete for node %s", removedNodeId);
    }

    void listenOnNodeInfoChanges(final Iterable<CacheEntryEvent<? extends String, ? extends IgniteNodeInfo>> events) {
        vertx.executeBlockingAndForget(() -> {
            StreamSupport.stream(events.spliterator(), false)
                    .filter(e -> e.getEventType() == EventType.REMOVED)
                    .forEach(e -> blockingOnNodeRemove(e.getKey()));
            return null;
        });
    }

    void blockingOnNodeRemove(final String removedNodeId) {
        if (nodeRemovalStatus.putIfAbsent(
                removedNodeId, new IgniteNodeRemovalStatus(Instant.now(), currentMemberNodeId))) {
            LOGGER.infof("Node removal recorded for node %s", removedNodeId);
            vertx.setTimer(
                    cleanerInitialDelay.toMillis(),
                    ignored -> vertx.executeBlockingAndForget(() -> {
                        blockingDeleteForNode(removedNodeId);
                        return null;
                    }));
        }
    }
}

@zyclonite
Copy link
Member

i am not sure if this is only valid for the ignite cluster manager. i would go with that solution as last resort and maybe have it as a toggle-able option.

i would prefer to have something like an interface where such a recovery can be hooked into, like handling clustered eventbus exceptions not being able to talk to a peer. so this event could trigger a consistency check and make sure this node is still part of the cluster and/or has subscriptions not cleared yet.

@tsegismont what do you think about the above problem, or do you see something obvious in the ignite side that could go wrong?

@dspangen
Copy link
Author

A recovery interface seems like a good idea... at least until the root cause is identified. Today it's not possible to respond to a subs map inconsistency programmatically in any way (besides forking the cluster manager) hence this heavy-handed approach.

@tsegismont
Copy link
Contributor

@dspangen I'm not familiar with ECS, is there anyway to configure rolling updates and up/downscaling so that it does not confuse the data grid? In the vertx-hazelcast doc we have:

https://vertx.io/docs/vertx-hazelcast/java/#_rolling_updates
and
https://vertx.io/docs/vertx-hazelcast/java/#_cluster_administration

I know it's not the same datagrid but I think the concepts apply as well

@dspangen
Copy link
Author

Apologies in advance for a long explanation.

Looking into this further I believe I found the root cause, a mis-understanding of the Ignite distributed cache contract. The IgniteClusterManager (and its SubsMapHelper) are written with the assumption that updates observed via a ContinuousQuery are immediately visible when performing a subsequent ScanQuery. That, in my reading, isn't the case for REPLICATED caches in Ignite (which all vertx internal state caches are marked as)--it's quite possible that you read from a partition copy that hasn't received all updates.

Here's how it manifests:

  1. A node departs and Ignite observes it, raising an event.
  2. IgniteClusterManager observes the event locally on all nodes
  3. A single cluster member will win the election to clean the subs map (by successfully removing the node info entry from __vertx.nodeInfo)
    • I originally thought this was the source of the inconsistencies (and what initiated this GH issue), since the elected master could crash immediately, not cleaning the subs map. While that's an issue (albeit a rare one) for sure, that isn't the main issue.
    • This is also the issue fixed in the IgniteVertxSubscriptionsPeriodicCleaner that I shared above.
  4. Cleaning of the subs map commences on the elected master where all entries for a given node id are removed. This actually triggers a delayed update of the local ConcurrentMap in SubsMapHelper.
    • This is the crux of the matter--reads of the cache aren't guaranteed to reflect the state of the world coming from the event stream.
    • This is exacerbated throughout the cluster because the SubsMapHelper (on every other node) relies on reading updates from the __vertx.subs cache that the node removal master is writing and reflecting them locally, also querying the distributed cache, also potentially reading data from a non-primary or backup.

I tested this hypothesis by forking the IgniteClusterManager and SubsMapHelper to explicitly mark nodes in the removal process when observing a node loss event in a local ConcurrentMap. Entries in said map are maintained until the node removal process is complete, which is signified by the IgniteVertxSubscriptionsPeriodicCleaner bean (which I shared above) removing the entry from the __vertx.nodeRemovalStatus cache.

Importantly, with this change we have observed no inconsistency in a week. Previously, after every update to the cluster (approx once a day) several registrations would stick around in perpetuity, failing approximately 0.5-1.0% of all requests to our application with the infamous connecting to server <guid> failed log message. This would force us to manually kill off containers until we got each node into a state without stale data in the local subs map. (See below for details of our application).

Given that this state is maintained across two unrelated objects it will take some work to clean that all up into a PR for submission, but I am happy to do so if of interest.

Aside: Explo

Explo provides embedded analytics and BI to companies looking to share data with their customers. We provide a SQL-first analytic engine on top of customer databases and data warehouses. We use Vert.x via Quarkus 3 to maintain our connection pooling infrastructure for our customer data warehouse connections. Given that we serve hundreds of customers and thousands of databases and data warehouses we needed a sharding solution to maintain pools of database connections through a cluster of nodes. We use Vert.x to manage that.

Our application is architected as follows:

  • An application node is part of an haGroup and has many shards assigned to it.
  • Each ha group has an HA coordinator that ensures that each shard coordinator is live (see below)
  • Each shard has a coordinator that maintains the pool of connections locally by starting / maintaining verticles for each connection to each data source on the shard.
  • Example:
    • 3 ha groups with 3 nodes each, 17 total shards
      • ha group 0 will have connections for shards 0, 1, 2, 3, 4, 5 maintained
  • Each application node also runs an http server, translating a request to an address on the clustered event bus for a datasource to perform a query.
    • This is where we would observe an event bus messaging failure.
  • We run this application on AWS ECS Fargate with a separate service per ha group to manage the split in configuration.
  • Application nodes discover their peers via their ALB

@zyclonite
Copy link
Member

Thanks for the long explanation, that helps to trace down the potential problem a lot.

The reason for using REPLICATED in combination with writeSynchronizationMode FULL_SYNC should guarantee that the replicas (not partitions) are always updated with a successful write. So reading should be quick and consistent, only writes have potential to get a performance hit.

To stay consistent by default for the rest of the shared data caches, it's PARTITIONED with turned readFromBackup off to stay consistent as well.

Can you refer to the Ignite documentation where this is stated?

Where you able to test my PR draft to check if that one get's you a lower failure rate?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

No branches or pull requests

3 participants