Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drozdov Igor Part 2 right #56

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/main/java/db/SlowCompletableFutureDb.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.util.concurrent.*;

public class SlowCompletableFutureDb<T> implements DataStorage<String, T>, Closeable {

private volatile Map<String, T> values;
private final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();
Expand Down
30 changes: 22 additions & 8 deletions src/main/java/part2/cache/CachingDataStorageImpl.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package part2.cache;

import db.DataStorage;
import db.SlowCompletableFutureDb;

import java.util.concurrent.*;

Expand Down Expand Up @@ -32,12 +31,27 @@ public CachingDataStorageImpl(DataStorage<String, T> db, int timeout, TimeUnit t

@Override
public OutdatableResult<T> getOutdatable(String key) {
// TODO implement
// TODO use ScheduledExecutorService to remove outdated result from cache - see SlowCompletableFutureDb implementation
// TODO complete OutdatableResult::outdated after removing outdated result from cache
// TODO don't use obtrudeException on result - just don't
// TODO use remove(Object key, Object value) to remove target value
// TODO Start timeout after receiving result in CompletableFuture, not after receiving CompletableFuture itself
throw new UnsupportedOperationException();
final OutdatableResult<T> newResult
= new OutdatableResult<>(new CompletableFuture<>(), new CompletableFuture<>());

final OutdatableResult<T> cashed =
cache.putIfAbsent(key, newResult);
if (cashed != null) {
return cashed;
}

db.get(key).whenComplete((res, ex) -> {
if (ex != null) {
newResult.getResult().completeExceptionally(ex);
} else {
newResult.getResult().complete(res);
}
scheduledExecutorService.schedule(() -> {
cache.remove(key, newResult);
newResult.getOutdated().complete(null);
}, timeout, timeoutUnits);
});

return newResult;
}
}
74 changes: 72 additions & 2 deletions src/main/java/part2/cache/TypedEmployeeCachedStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,24 @@

import data.typed.Employee;
import data.typed.Employer;
import data.typed.JobHistoryEntry;
import data.typed.Position;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Function;

import static java.util.stream.Collectors.toList;

public class TypedEmployeeCachedStorage implements CachingDataStorage<String, data.typed.Employee> {

private final CachingDataStorage<String, data.Employee> employeeStorage;
private final CachingDataStorage<String, Position> positionStorage;
private final CachingDataStorage<String, Employer> employerStorage;


public TypedEmployeeCachedStorage(CachingDataStorage<String, data.Employee> employeeStorage,
CachingDataStorage<String, Position> positionStorage,
CachingDataStorage<String, Employer> employerStorage) {
Expand All @@ -18,9 +28,69 @@ public TypedEmployeeCachedStorage(CachingDataStorage<String, data.Employee> empl
this.employerStorage = employerStorage;
}

private OutdatableResult<Employee> asyncToTyped(data.Employee e) {

final List<CompletableFuture<JobHistoryEntry>> jobHistoryFutures =
e.getJobHistory().stream()
.map(this::asyncToTyped)
.collect(toList());

final List<CompletableFuture> outdatedList = e.getJobHistory().stream()
.map(this::getOutDated)
.collect(toList());


return new OutdatableResult<>(
CompletableFuture.allOf(jobHistoryFutures.toArray(new CompletableFuture[0]))
.thenApplyAsync(x -> {
final List<JobHistoryEntry> jobHistory = jobHistoryFutures.stream()
.map(this::getOrNull)
.collect(toList());
return new data.typed.Employee(e.getPerson(), jobHistory);
})
.thenApply(Function.identity()),
CompletableFuture.anyOf(outdatedList.toArray(new CompletableFuture[0]))
.thenApply(x -> null)
);
}

private CompletableFuture<JobHistoryEntry> asyncToTyped(data.JobHistoryEntry j) {
return employerStorage.get(j.getEmployer())
.thenCombine(
positionStorage.get(j.getPosition()),
(e, p) -> new JobHistoryEntry(p, e, j.getDuration()));
}

private CompletableFuture getOutDated(data.JobHistoryEntry j) {
return CompletableFuture.anyOf(positionStorage.getOutdatable(j.getPosition()).getOutdated(),
employerStorage.getOutdatable(j.getEmployer()).getOutdated());
}

private <T> T getOrNull(Future<T> f) {
try {
return f.get();
} catch (InterruptedException | ExecutionException e1) {
e1.printStackTrace();
return null;
}
}

@Override
public OutdatableResult<Employee> getOutdatable(String key) {
// TODO note that you don't know timeouts for different storage. And timeouts can be different.
throw new UnsupportedOperationException();
final OutdatableResult<data.Employee> outdatable = employeeStorage.getOutdatable(key);

final CompletableFuture<OutdatableResult<Employee>> future = outdatable.getResult().thenApply(this::asyncToTyped);
final OutdatableResult<Employee> result = new OutdatableResult<>(new CompletableFuture<>(), new CompletableFuture<>());
future.whenComplete((res, ex) -> {
if (ex != null) {
result.getResult().completeExceptionally(ex);
// result.getOutdated().completeExceptionally(ex);
} else {
result.getResult().complete(getOrNull(res.getResult()));
outdatable.getOutdated().runAfterEither(res.getOutdated(), () -> result.getOutdated().complete(null));
}
});
return result;
}
}

3 changes: 1 addition & 2 deletions src/test/java/part2/cache/CachingDataStorageImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import data.typed.Employer;
import data.typed.Position;
import db.SlowCompletableFutureDb;
import part2.cache.CachingDataStorage.OutdatableResult;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import part2.cache.CachingDataStorage.OutdatableResult;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -83,5 +83,4 @@ public void expiration() throws InterruptedException, ExecutionException, Timeou

assertEquals(person2, result3.getResult().get().getPerson());
}

}
36 changes: 34 additions & 2 deletions src/test/java/part2/cache/TypedEmployeeCachedStorageTest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package part2.cache;

import data.Employee;
import data.JobHistoryEntry;
import data.Person;
import data.typed.Employer;
import data.typed.Position;
import db.SlowCompletableFutureDb;
Expand All @@ -10,18 +12,22 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static java.util.stream.Collectors.toMap;
import static org.junit.Assert.assertEquals;

public class TypedEmployeeCachedStorageTest {
private static SlowCompletableFutureDb<Employee> employeeDb;
private static SlowCompletableFutureDb<Employer> employerDb;
private static SlowCompletableFutureDb<Position> positionDb;


@BeforeClass
public static void defore() {
final Map<String, Employer> employerMap =
Expand Down Expand Up @@ -49,7 +55,7 @@ public static void after() {
}

@Test
public void expiration() {
public void expiration() throws ExecutionException, InterruptedException {
final CachingDataStorageImpl<Employee> employeeCache =
new CachingDataStorageImpl<>(employeeDb, 1, TimeUnit.SECONDS);

Expand All @@ -59,9 +65,35 @@ public void expiration() {
final CachingDataStorageImpl<Position> positionCache =
new CachingDataStorageImpl<>(positionDb, 100, TimeUnit.MILLISECONDS);

Map<String, Employee> employeeTmp = new HashMap<>();

final Person person1 = new Person("John", "Doe", 30);
employeeTmp.put("a", new Employee(person1,
Collections.singletonList(new JobHistoryEntry(1, Position.BA.name(), Employer.EPAM.name()))));
employeeDb.setValues(employeeTmp);

final TypedEmployeeCachedStorage typedCache =
new TypedEmployeeCachedStorage(employeeCache, positionCache, employerCache);

// TODO check than cache gets outdated with the firs outdated inner cache
final CachingDataStorage.OutdatableResult<data.typed.Employee> aPerson = typedCache.getOutdatable("a");

assertEquals(aPerson.getResult().get().getPerson(), person1);
assertEquals(aPerson.getResult().get().getJobHistoryEntries(),
Collections.singletonList(new data.typed.JobHistoryEntry(Position.BA, Employer.EPAM, 1)));

Thread.sleep(50);

employeeTmp = new HashMap<>();
final Person person2 = new Person("Dagni", "Taggart", 30);
employeeTmp.put("a", new Employee(person2, Collections.emptyList()));
employeeDb.setValues(employeeTmp);

final CachingDataStorage.OutdatableResult<data.typed.Employee> aPerson2 = typedCache.getOutdatable("a");
assertEquals(aPerson2.getResult().get().getPerson(), person1);

Thread.sleep(100);
final CachingDataStorage.OutdatableResult<data.typed.Employee> aPerson3 = typedCache.getOutdatable("a");

assertEquals(aPerson3.getResult().get().getPerson(), person2);
}
}