From 722b58ff3299a7b53ac0e45cef56f79e36f801bb Mon Sep 17 00:00:00 2001 From: MortyMerr Date: Wed, 26 Jul 2017 01:39:16 +0300 Subject: [PATCH 1/6] cashsingdatastorage --- src/main/java/db/SlowCompletableFutureDb.java | 1 - .../part2/cache/CachingDataStorageImpl.java | 30 ++++++++++++++----- .../cache/CachingDataStorageImplTest.java | 3 +- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/src/main/java/db/SlowCompletableFutureDb.java b/src/main/java/db/SlowCompletableFutureDb.java index 4d2b373..b64c412 100755 --- a/src/main/java/db/SlowCompletableFutureDb.java +++ b/src/main/java/db/SlowCompletableFutureDb.java @@ -7,7 +7,6 @@ import java.util.concurrent.*; public class SlowCompletableFutureDb implements DataStorage, Closeable { - private volatile Map values; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); diff --git a/src/main/java/part2/cache/CachingDataStorageImpl.java b/src/main/java/part2/cache/CachingDataStorageImpl.java index a2ae460..566bf9e 100755 --- a/src/main/java/part2/cache/CachingDataStorageImpl.java +++ b/src/main/java/part2/cache/CachingDataStorageImpl.java @@ -1,7 +1,6 @@ package part2.cache; import db.DataStorage; -import db.SlowCompletableFutureDb; import java.util.concurrent.*; @@ -32,12 +31,27 @@ public CachingDataStorageImpl(DataStorage db, int timeout, TimeUnit t @Override public OutdatableResult getOutdatable(String key) { - // TODO implement - // TODO use ScheduledExecutorService to remove outdated result from cache - see SlowCompletableFutureDb implementation - // TODO complete OutdatableResult::outdated after removing outdated result from cache - // TODO don't use obtrudeException on result - just don't - // TODO use remove(Object key, Object value) to remove target value - // TODO Start timeout after receiving result in CompletableFuture, not after receiving CompletableFuture itself - throw new UnsupportedOperationException(); + final OutdatableResult newResult + = new OutdatableResult<>(new CompletableFuture<>(), new CompletableFuture<>()); + + final OutdatableResult cashed = + cache.putIfAbsent(key, newResult); + if (cashed != null) { + return cashed; + } + + db.get(key).whenComplete((res, ex) -> { + scheduledExecutorService.schedule(() -> { + cache.remove(key, newResult); + newResult.getOutdated().complete(null); + }, timeout, timeoutUnits); + if (ex != null) { + newResult.getResult().completeExceptionally(ex); + } else { + newResult.getResult().complete(res); + } + }); + + return newResult; } } diff --git a/src/test/java/part2/cache/CachingDataStorageImplTest.java b/src/test/java/part2/cache/CachingDataStorageImplTest.java index 041370d..bf32efb 100755 --- a/src/test/java/part2/cache/CachingDataStorageImplTest.java +++ b/src/test/java/part2/cache/CachingDataStorageImplTest.java @@ -57,7 +57,7 @@ public static void after() { @Test public void expiration() throws InterruptedException, ExecutionException, TimeoutException { final CachingDataStorageImpl employeeCache = - new CachingDataStorageImpl<>(employeeDb, 100, TimeUnit.MILLISECONDS); + new CachingDataStorageImpl<>(employeeDb, 10, TimeUnit.MILLISECONDS); Map values = new HashMap<>(); final Person person1 = new Person("John", "Galt", 66); @@ -83,5 +83,4 @@ public void expiration() throws InterruptedException, ExecutionException, Timeou assertEquals(person2, result3.getResult().get().getPerson()); } - } From 264906a747f614227bf3dc2e863a4757c6bdff90 Mon Sep 17 00:00:00 2001 From: MortyMerr Date: Wed, 26 Jul 2017 19:37:32 +0300 Subject: [PATCH 2/6] cashsingdatastorage --- .../cache/TypedEmployeeCachedStorage.java | 63 ++++++++++++++++++- .../cache/CachingDataStorageImplTest.java | 2 +- 2 files changed, 62 insertions(+), 3 deletions(-) diff --git a/src/main/java/part2/cache/TypedEmployeeCachedStorage.java b/src/main/java/part2/cache/TypedEmployeeCachedStorage.java index e9049a5..1865a57 100755 --- a/src/main/java/part2/cache/TypedEmployeeCachedStorage.java +++ b/src/main/java/part2/cache/TypedEmployeeCachedStorage.java @@ -2,14 +2,24 @@ import data.typed.Employee; import data.typed.Employer; +import data.typed.JobHistoryEntry; import data.typed.Position; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.function.Function; + +import static java.util.stream.Collectors.toList; + public class TypedEmployeeCachedStorage implements CachingDataStorage { private final CachingDataStorage employeeStorage; private final CachingDataStorage positionStorage; private final CachingDataStorage employerStorage; + public TypedEmployeeCachedStorage(CachingDataStorage employeeStorage, CachingDataStorage positionStorage, CachingDataStorage employerStorage) { @@ -18,9 +28,58 @@ public TypedEmployeeCachedStorage(CachingDataStorage empl this.employerStorage = employerStorage; } + private OutdatableResult asyncToTyped(data.Employee e) { + + final List> jobHistoryFutures = + e.getJobHistory().stream() + .map(this::asyncToTyped) + .collect(toList()); + + final List outdatedList = e.getJobHistory().stream() + .map(this::getOutDated) + .collect(toList()); + + + return new OutdatableResult<>( + CompletableFuture.allOf(jobHistoryFutures.toArray(new CompletableFuture[0])) + .thenApplyAsync(x -> { + final List jobHistory = jobHistoryFutures.stream() + .map(this::getOrNull) + .collect(toList()); + return new data.typed.Employee(e.getPerson(), jobHistory); + }) + .thenApply(Function.identity()), + CompletableFuture.anyOf(outdatedList.toArray(new CompletableFuture[0])) + .thenApply(x -> null) + ); + } + + private CompletableFuture asyncToTyped(data.JobHistoryEntry j) { + return employerStorage.get(j.getEmployer()) + .thenCombine( + positionStorage.get(j.getPosition()), + (e, p) -> new JobHistoryEntry(p, e, j.getDuration())); + } + + private CompletableFuture getOutDated(data.JobHistoryEntry j) { + return CompletableFuture.anyOf(positionStorage.getOutdatable(j.getPosition()).getOutdated(), + employerStorage.getOutdatable(j.getEmployer()).getOutdated()); + } + + private T getOrNull(Future f) { + try { + return f.get(); + } catch (InterruptedException | ExecutionException e1) { + e1.printStackTrace(); + return null; + } + } + @Override public OutdatableResult getOutdatable(String key) { - // TODO note that you don't know timeouts for different storage. And timeouts can be different. - throw new UnsupportedOperationException(); + final OutdatableResult outdatable = employeeStorage.getOutdatable(key); + final CompletableFuture employeeResult = outdatable.getResult().thenCompose(e -> asyncToTyped(e).getResult()); + return new OutdatableResult<>(employeeResult, outdatable.getOutdated().applyToEither(outdatable.getOutdated(), v -> null)); } } + diff --git a/src/test/java/part2/cache/CachingDataStorageImplTest.java b/src/test/java/part2/cache/CachingDataStorageImplTest.java index bf32efb..52f7db6 100755 --- a/src/test/java/part2/cache/CachingDataStorageImplTest.java +++ b/src/test/java/part2/cache/CachingDataStorageImplTest.java @@ -57,7 +57,7 @@ public static void after() { @Test public void expiration() throws InterruptedException, ExecutionException, TimeoutException { final CachingDataStorageImpl employeeCache = - new CachingDataStorageImpl<>(employeeDb, 10, TimeUnit.MILLISECONDS); + new CachingDataStorageImpl<>(employeeDb, 100, TimeUnit.MILLISECONDS); Map values = new HashMap<>(); final Person person1 = new Person("John", "Galt", 66); From addd057721df372e0ef67deb431f69125f5e54a7 Mon Sep 17 00:00:00 2001 From: MortyMerr Date: Wed, 26 Jul 2017 20:28:11 +0300 Subject: [PATCH 3/6] cashsingdatastorage --- .../cache/TypedEmployeeCachedStorageTest.java | 34 +++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/src/test/java/part2/cache/TypedEmployeeCachedStorageTest.java b/src/test/java/part2/cache/TypedEmployeeCachedStorageTest.java index fecb3b4..e66477a 100755 --- a/src/test/java/part2/cache/TypedEmployeeCachedStorageTest.java +++ b/src/test/java/part2/cache/TypedEmployeeCachedStorageTest.java @@ -1,6 +1,8 @@ package part2.cache; import data.Employee; +import data.JobHistoryEntry; +import data.Person; import data.typed.Employer; import data.typed.Position; import db.SlowCompletableFutureDb; @@ -10,18 +12,22 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Function; import static java.util.stream.Collectors.toMap; +import static org.junit.Assert.assertEquals; public class TypedEmployeeCachedStorageTest { private static SlowCompletableFutureDb employeeDb; private static SlowCompletableFutureDb employerDb; private static SlowCompletableFutureDb positionDb; + @BeforeClass public static void defore() { final Map employerMap = @@ -49,7 +55,7 @@ public static void after() { } @Test - public void expiration() { + public void expiration() throws ExecutionException, InterruptedException { final CachingDataStorageImpl employeeCache = new CachingDataStorageImpl<>(employeeDb, 1, TimeUnit.SECONDS); @@ -59,9 +65,33 @@ public void expiration() { final CachingDataStorageImpl positionCache = new CachingDataStorageImpl<>(positionDb, 100, TimeUnit.MILLISECONDS); + Map employeeTmp = new HashMap<>(); + + final Person person1 = new Person("John", "Doe", 30); + employeeTmp.put("a", new Employee(person1, + Collections.singletonList(new JobHistoryEntry(1, Position.BA.name(), Employer.EPAM.name())))); + employeeDb.setValues(employeeTmp); + final TypedEmployeeCachedStorage typedCache = new TypedEmployeeCachedStorage(employeeCache, positionCache, employerCache); - // TODO check than cache gets outdated with the firs outdated inner cache + final CachingDataStorage.OutdatableResult aPerson = typedCache.getOutdatable("a"); + + assertEquals(aPerson.getResult().get().getPerson(), person1); + assertEquals(aPerson.getResult().get().getJobHistoryEntries(), + Collections.singletonList(new data.typed.JobHistoryEntry(Position.BA, Employer.EPAM, 1))); + + Thread.sleep(500); + employeeTmp = new HashMap<>(); + final Person person2 = new Person("Dagni", "Taggart", 30); + employeeTmp.put("a", new Employee(person2, Collections.emptyList())); + employeeDb.setValues(employeeTmp); + + final CachingDataStorage.OutdatableResult aPerson2 = typedCache.getOutdatable("a"); + assertEquals(aPerson2.getResult().get().getPerson(), person1); + + Thread.sleep(500); + final CachingDataStorage.OutdatableResult aPerson3 = typedCache.getOutdatable("a"); + assertEquals(aPerson3.getResult().get().getPerson(), person2); } } From f5d4c9194fcd0a0611f9c0aaa79549753e472b4d Mon Sep 17 00:00:00 2001 From: MortyMerr Date: Wed, 26 Jul 2017 22:06:16 +0300 Subject: [PATCH 4/6] cashsingdatastorage --- .../java/part2/cache/TypedEmployeeCachedStorage.java | 4 +++- .../part2/cache/TypedEmployeeCachedStorageTest.java | 12 +++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/main/java/part2/cache/TypedEmployeeCachedStorage.java b/src/main/java/part2/cache/TypedEmployeeCachedStorage.java index 1865a57..519579a 100755 --- a/src/main/java/part2/cache/TypedEmployeeCachedStorage.java +++ b/src/main/java/part2/cache/TypedEmployeeCachedStorage.java @@ -79,7 +79,9 @@ private T getOrNull(Future f) { public OutdatableResult getOutdatable(String key) { final OutdatableResult outdatable = employeeStorage.getOutdatable(key); final CompletableFuture employeeResult = outdatable.getResult().thenCompose(e -> asyncToTyped(e).getResult()); - return new OutdatableResult<>(employeeResult, outdatable.getOutdated().applyToEither(outdatable.getOutdated(), v -> null)); + final CompletableFuture employeeOutdated = outdatable.getResult().thenCompose(e -> asyncToTyped(e).getOutdated()); + outdatable.getResult().thenCompose(e -> asyncToTyped(e).getOutdated()); + return new OutdatableResult<>(employeeResult, outdatable.getOutdated().applyToEither(employeeOutdated, Function.identity())); } } diff --git a/src/test/java/part2/cache/TypedEmployeeCachedStorageTest.java b/src/test/java/part2/cache/TypedEmployeeCachedStorageTest.java index e66477a..15d0d52 100755 --- a/src/test/java/part2/cache/TypedEmployeeCachedStorageTest.java +++ b/src/test/java/part2/cache/TypedEmployeeCachedStorageTest.java @@ -77,21 +77,23 @@ public void expiration() throws ExecutionException, InterruptedException { final CachingDataStorage.OutdatableResult aPerson = typedCache.getOutdatable("a"); - assertEquals(aPerson.getResult().get().getPerson(), person1); + assertEquals(aPerson.getResult().get().getPerson(), employeeCache.get("a").get().getPerson()); assertEquals(aPerson.getResult().get().getJobHistoryEntries(), Collections.singletonList(new data.typed.JobHistoryEntry(Position.BA, Employer.EPAM, 1))); - Thread.sleep(500); + Thread.sleep(50); + employeeTmp = new HashMap<>(); final Person person2 = new Person("Dagni", "Taggart", 30); employeeTmp.put("a", new Employee(person2, Collections.emptyList())); employeeDb.setValues(employeeTmp); final CachingDataStorage.OutdatableResult aPerson2 = typedCache.getOutdatable("a"); - assertEquals(aPerson2.getResult().get().getPerson(), person1); + assertEquals(aPerson2.getResult().get().getPerson(), employeeCache.get("a").get().getPerson()); - Thread.sleep(500); + Thread.sleep(100); final CachingDataStorage.OutdatableResult aPerson3 = typedCache.getOutdatable("a"); - assertEquals(aPerson3.getResult().get().getPerson(), person2); + + assertEquals(aPerson3.getResult().get().getPerson(), employeeCache.get("a").get().getPerson()); } } From b42414a675bd0ea2f8bf529d33e67f11c4ba851a Mon Sep 17 00:00:00 2001 From: MortyMerr Date: Thu, 27 Jul 2017 12:04:56 +0300 Subject: [PATCH 5/6] cashsingdatastorage --- .../part2/cache/TypedEmployeeCachedStorage.java | 17 +++++++++++++---- .../cache/TypedEmployeeCachedStorageTest.java | 6 +++--- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/main/java/part2/cache/TypedEmployeeCachedStorage.java b/src/main/java/part2/cache/TypedEmployeeCachedStorage.java index 519579a..b452470 100755 --- a/src/main/java/part2/cache/TypedEmployeeCachedStorage.java +++ b/src/main/java/part2/cache/TypedEmployeeCachedStorage.java @@ -78,10 +78,19 @@ private T getOrNull(Future f) { @Override public OutdatableResult getOutdatable(String key) { final OutdatableResult outdatable = employeeStorage.getOutdatable(key); - final CompletableFuture employeeResult = outdatable.getResult().thenCompose(e -> asyncToTyped(e).getResult()); - final CompletableFuture employeeOutdated = outdatable.getResult().thenCompose(e -> asyncToTyped(e).getOutdated()); - outdatable.getResult().thenCompose(e -> asyncToTyped(e).getOutdated()); - return new OutdatableResult<>(employeeResult, outdatable.getOutdated().applyToEither(employeeOutdated, Function.identity())); + + final CompletableFuture> future = outdatable.getResult().thenApply(this::asyncToTyped); + final OutdatableResult result = new OutdatableResult<>(new CompletableFuture<>(), new CompletableFuture<>()); + future.whenComplete((res, ex) -> { + if (ex != null) { + result.getResult().completeExceptionally(ex); +// result.getOutdated().completeExceptionally(ex); + } else { + result.getResult().complete(getOrNull(res.getResult())); + outdatable.getOutdated().runAfterEither(res.getOutdated(), () -> result.getOutdated().complete(null)); + } + }); + return result; } } diff --git a/src/test/java/part2/cache/TypedEmployeeCachedStorageTest.java b/src/test/java/part2/cache/TypedEmployeeCachedStorageTest.java index 15d0d52..a22762d 100755 --- a/src/test/java/part2/cache/TypedEmployeeCachedStorageTest.java +++ b/src/test/java/part2/cache/TypedEmployeeCachedStorageTest.java @@ -77,7 +77,7 @@ public void expiration() throws ExecutionException, InterruptedException { final CachingDataStorage.OutdatableResult aPerson = typedCache.getOutdatable("a"); - assertEquals(aPerson.getResult().get().getPerson(), employeeCache.get("a").get().getPerson()); + assertEquals(aPerson.getResult().get().getPerson(), person1); assertEquals(aPerson.getResult().get().getJobHistoryEntries(), Collections.singletonList(new data.typed.JobHistoryEntry(Position.BA, Employer.EPAM, 1))); @@ -89,11 +89,11 @@ public void expiration() throws ExecutionException, InterruptedException { employeeDb.setValues(employeeTmp); final CachingDataStorage.OutdatableResult aPerson2 = typedCache.getOutdatable("a"); - assertEquals(aPerson2.getResult().get().getPerson(), employeeCache.get("a").get().getPerson()); + assertEquals(aPerson2.getResult().get().getPerson(), person1); Thread.sleep(100); final CachingDataStorage.OutdatableResult aPerson3 = typedCache.getOutdatable("a"); - assertEquals(aPerson3.getResult().get().getPerson(), employeeCache.get("a").get().getPerson()); + assertEquals(aPerson3.getResult().get().getPerson(), person2); } } From 32e182e0ae2456787fafaefb0ac84281c408abc0 Mon Sep 17 00:00:00 2001 From: MortyMerr Date: Thu, 27 Jul 2017 12:10:14 +0300 Subject: [PATCH 6/6] cashsingdatastorage --- src/main/java/part2/cache/CachingDataStorageImpl.java | 8 ++++---- src/test/java/part2/cache/CachingDataStorageImplTest.java | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/part2/cache/CachingDataStorageImpl.java b/src/main/java/part2/cache/CachingDataStorageImpl.java index 566bf9e..6bc8902 100755 --- a/src/main/java/part2/cache/CachingDataStorageImpl.java +++ b/src/main/java/part2/cache/CachingDataStorageImpl.java @@ -41,15 +41,15 @@ public OutdatableResult getOutdatable(String key) { } db.get(key).whenComplete((res, ex) -> { - scheduledExecutorService.schedule(() -> { - cache.remove(key, newResult); - newResult.getOutdated().complete(null); - }, timeout, timeoutUnits); if (ex != null) { newResult.getResult().completeExceptionally(ex); } else { newResult.getResult().complete(res); } + scheduledExecutorService.schedule(() -> { + cache.remove(key, newResult); + newResult.getOutdated().complete(null); + }, timeout, timeoutUnits); }); return newResult; diff --git a/src/test/java/part2/cache/CachingDataStorageImplTest.java b/src/test/java/part2/cache/CachingDataStorageImplTest.java index 52f7db6..d0b3173 100755 --- a/src/test/java/part2/cache/CachingDataStorageImplTest.java +++ b/src/test/java/part2/cache/CachingDataStorageImplTest.java @@ -5,10 +5,10 @@ import data.typed.Employer; import data.typed.Position; import db.SlowCompletableFutureDb; -import part2.cache.CachingDataStorage.OutdatableResult; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import part2.cache.CachingDataStorage.OutdatableResult; import java.io.IOException; import java.util.Arrays;