Skip to content

Commit

Permalink
CON-673: Fix a race condition when initializing an Engine (#419)
Browse files Browse the repository at this point in the history
* CON-673: Fix a race condition when initializing an Engine

Ensure that multiple client connections to a non-default engine don't
cause the engine to be initialized more than once.

* small changes

* formatting
  • Loading branch information
jtnelson committed Aug 22, 2020
1 parent 99bc305 commit 0cb6a3a
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Fixed a bug where sorting on a navigation key that isn't fetched (e.g. using a navigation key in a `find` operation or not specifying the navigation key as an operation key in a `get` or `select` operation), causes the results set to be returned in the incorrect order.
* Upgraded CCL version to `2.6.3` in order to fix a parsing bug that occurred when creating a `Criteria` containing a String or String-like value with a whitespace or equal sign (e.g. `=`) character.
* Fixed a bug that made it possible to store circular links (e.g. a link from a record to itself) when atomically adding or setting data in multiple records at once.
* Fixed a race condition that occurred when multiple client connections logged into a non-default environment at the same time. The proper concurrency controls weren't in place, so the simultaneous connection attempts, in many cases, caused the Engine for that environment to be initialized multiple times. This did not cause any data duplication issues (because only one of the duplicate Engines would be recognized at any given time), but it could cause an `OutOfMemoryException` if that corresponding environment had a lot of metadata to be loaded into memory during intialization.

#### Version 0.10.4 (December 15, 2019)
* Added support for using the `LIKE`, `NOT_LIKE` and `LINKS_TO` operators in the `TObject#is` methods.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import javax.annotation.Nullable;
Expand All @@ -53,6 +54,7 @@
import com.cinchapi.ccl.util.NaturalLanguage;
import com.cinchapi.common.base.AnyStrings;
import com.cinchapi.common.base.Array;
import com.cinchapi.common.base.CheckedExceptions;
import com.cinchapi.common.reflect.Reflection;
import com.cinchapi.concourse.Constants;
import com.cinchapi.concourse.Timestamp;
Expand Down Expand Up @@ -358,6 +360,12 @@ private static SortableTable<Set<TObject>> emptySortableResultDatasetWithCapacit
// prefs file in a
// future release.

/**
* Tracks the number of {@link Engine engines} that have been intialized via
* {@link #getEngineUnsafe(String)}.
*/
protected final AtomicInteger numEnginesInitialized = new AtomicInteger(0); // CON-673

/**
* The base location where the indexed buffer pages are stored.
*/
Expand Down Expand Up @@ -6621,6 +6629,7 @@ public void stop() {
for (Engine engine : engines.values()) {
engine.stop();
}
numEnginesInitialized.set(0);
System.out.println("The Concourse server has stopped");
}
}
Expand Down Expand Up @@ -6989,6 +6998,20 @@ protected UserService users() {
return users;
}

/**
* {@link #start() Start} the server as a daemon.
*/
void spawn() {
new Thread(() -> {
try {
start();
}
catch (TTransportException e) {
throw CheckedExceptions.wrapAsRuntimeException(e);
}
}).start();
}

/**
* Return the {@link Engine} that is associated with the
* {@link Default#ENVIRONMENT}.
Expand All @@ -7005,14 +7028,14 @@ private Engine getEngine() {
* exist, create a new one and add it to the collection.
*/
private Engine getEngineUnsafe(String env) {
Engine engine = engines.get(env);
if(engine == null) {
engine = new Engine(bufferStore + File.separator + env,
dbStore + File.separator + env, env);
return engines.computeIfAbsent(env, $ -> {
String buffer = bufferStore + File.separator + env;
String db = dbStore + File.separator + env;
Engine engine = new Engine(buffer, db, env);
engine.start();
engines.put(env, engine);
}
return engine;
numEnginesInitialized.incrementAndGet();
return engine;
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,25 @@
*/
package com.cinchapi.concourse.server;

import java.util.concurrent.CountDownLatch;

import javax.management.InstanceAlreadyExistsException;
import javax.management.MBeanRegistrationException;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;

import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.junit.Assert;
import org.junit.Test;

import com.cinchapi.common.io.ByteBuffers;
import com.cinchapi.concourse.Concourse;
import com.cinchapi.concourse.test.ConcourseBaseTest;
import com.cinchapi.concourse.thrift.AccessToken;
import com.cinchapi.concourse.util.Environments;
import com.cinchapi.concourse.util.Networking;
import com.cinchapi.concourse.util.TestData;

/**
* Unit tests for {@link com.cinchapi.concourse.server.ConcourseServer}.
Expand Down Expand Up @@ -88,4 +96,43 @@ public void testFindEnvKeepsUnderScore() {
Assert.assertEquals("_test_environment_", Environments.sanitize(env));
}

@Test
public void testGetEngineRaceCondition()
throws TException, InterruptedException { // CON-673
int port = Networking.getOpenPort();
String env = "test";
String buffer = TestData.getTemporaryTestDir();
String db = TestData.getTemporaryTestDir();
server = ConcourseServer.create(port, buffer, db);
server.spawn();
try {
AccessToken token = server.login(
ByteBuffers.fromUtf8String("admin"),
ByteBuffers.fromUtf8String("admin"), env);
for (int i = 0; i < 10000; ++i) {
server.addKeyValue(TestData.getSimpleString(),
TestData.getTObject(), token, null, env);
}
server.stop();
server = ConcourseServer.create(port, buffer, db);
server.spawn();
int threads = 20;
CountDownLatch latch = new CountDownLatch(threads);
for (int i = 0; i < threads; ++i) {
Thread t = new Thread(() -> {
Concourse client = Concourse.at().port(port)
.environment(env).connect();
client.exit();
latch.countDown();
});
t.start();
}
latch.await();
Assert.assertEquals(2, server.numEnginesInitialized.get());
}
finally {
server.stop();
}
}

}

0 comments on commit 0cb6a3a

Please sign in to comment.