Skip to content

Commit

Permalink
migration feature time stamp provider monotonic #643 (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
hg-ms authored Dec 6, 2023
1 parent be74d0d commit d7f0bb4
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

import static org.eclipse.serializer.util.X.notNull;

import java.util.Objects;
import java.util.stream.Stream;

import org.eclipse.serializer.collections.EqHashEnum;
import org.eclipse.serializer.collections.XSort;
import org.eclipse.serializer.persistence.types.Persistence;
Expand All @@ -25,6 +28,7 @@ public interface StorageChannelTaskInitialize extends StorageChannelTask
{
public StorageIdAnalysis idAnalysis();

public long latestTimestamp();


public final class Default
Expand All @@ -40,6 +44,7 @@ public final class Default

private Long consistentStoreTimestamp ;
private Long commonTaskHeadFileTimestamp;
private Long latestTimestamp ;

private long maxEntityObjectOid ;
private long maxEntityConstantOid;
Expand Down Expand Up @@ -200,6 +205,15 @@ protected final void succeed(final StorageChannel channel, final StorageInventor

this.updateIdAnalysis(idAnalysis);

//Some storage targets like SQL will create "files" only if there is some data written.
//The transactionsFileAnalysis may be null if a new storage has been created
//and the transactions log is empty.
this.latestTimestamp = Stream.of(result)
.filter( r -> Objects.nonNull(r.transactionsFileAnalysis()))
.mapToLong( r -> r.transactionsFileAnalysis().maxTimestamp())
.max()
.orElse(0L);

this.operationController.activate();
}

Expand All @@ -221,6 +235,12 @@ public synchronized StorageIdAnalysis idAnalysis()
);
}

@Override
public long latestTimestamp()
{
return this.latestTimestamp;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,7 @@ protected StorageRootTypeIdProvider ensureRootTypeIdProvider()

protected StorageTimestampProvider ensureTimestampProvider()
{
return new StorageTimestampProvider.Default();
return new StorageTimestampProvider.MonotonicTime();
}

protected StorageObjectIdRangeEvaluator ensureObjectIdRangeEvaluator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ private StorageIdAnalysis startThreads(final StorageChannelTaskInitialize initia
}
initializingTask.waitOnCompletion();
}

this.timestampProvider.set(initializingTask.latestTimestamp());

return initializingTask.idAnalysis();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.eclipse.store.storage.types;

import java.util.concurrent.atomic.AtomicLong;

/*-
* #%L
* EclipseStore Storage
Expand All @@ -24,6 +26,15 @@ public interface StorageTimestampProvider
*/
public long currentNanoTimestamp();

/**
* Set the base value used to create the timestamps
* Implementations are allowed to ignore it.
*
* @param base base value for timestamp creation.
* @return base value for timestamp creation.
*/
public long set(long base);


public final class Default implements StorageTimestampProvider
{
Expand All @@ -45,5 +56,94 @@ public synchronized long currentNanoTimestamp()
return Storage.millisecondsToNanoseconds(this.lastTimeMillis = currentTimeMillis);
}


/**
* This implementation ignores the offset.
*
* @param offset ignored by this implementation.
* @return always zero.
*/
@Override
public long set(final long offset)
{
return 0;
}

}

/**
* Returns a timestamp based upon the current system time.
* If the current system time is less or equal to the last returned value
* the last returned timestamp plus 1 nanosecond will be returned to archive
* a strictly monotone behavior.
*/
public final class MonotonicTime implements StorageTimestampProvider
{
private long lastTimeNanos;

/**
* Returns a timestamp based upon the current system time.
* If the current system time is less or equal to the last returned value
* the last returned timestamp plus 1 nanosecond will be returned to archive
* a strictly monotone behavior.
*/
@Override
public synchronized long currentNanoTimestamp()
{
final long currentTimeNanos = Storage.millisecondsToNanoseconds(System.currentTimeMillis());

if(currentTimeNanos <= this.lastTimeNanos)
{
return ++this.lastTimeNanos;
}
return this.lastTimeNanos = currentTimeNanos;
}

/**
* Set to new base value only if the new value is larger then the current one.
*/
@Override
public synchronized long set(final long timeNs)
{
if(timeNs > this.lastTimeNanos)
{
this.lastTimeNanos = timeNs;
}
return this.lastTimeNanos;
}
}

/**
* An implementation of {@link StorageTimestampProvider} that provides an strictly monotonic increasing
* long value instead of a time value. This implementation does not rely on any time based value
* that might be affected by changes of the system clock.
*
*/
public final class MonotonicCounter implements StorageTimestampProvider
{
private final AtomicLong lastValue = new AtomicLong();

/**
* Provides an strictly monotonic increasing
* long value starting from the set base value {@link #set(long)} instead of a time based value.
*
* @return a strictly monotone increasing long value.
*/
@Override
public long currentNanoTimestamp()
{
return this.lastValue.incrementAndGet();
}

/**
* Set to new base value only if the new value is larger then the current one.
*/
@Override
public long set(final long base)
{
return this.lastValue.updateAndGet((v) -> (base > v) ? base : v);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public interface StorageTransactionsAnalysis

public long headFileLatestTimestamp();

public long maxTimestamp();

public StorageLiveTransactionsFile transactionsFile();

public XGettingTable<Long, ? extends StorageTransactionEntry> transactionsFileEntries();
Expand Down Expand Up @@ -465,6 +467,7 @@ public static XGettingSequence<String> header()
"Type" ,
"Timestamp" ,
"Time Delta (ms)" ,
"Timestamp(long)" ,
"Resulting Length" ,
"Length Change" ,
"Current Head File",
Expand Down Expand Up @@ -541,6 +544,7 @@ private void addCommonTimestampPart(final long address)
this.vs
.add(formateTimeStamp(new Date(Storage.millisecondsToSeconds(timestamp)))).tab()
.add(Storage.millisecondsToSeconds(timestamp - this.lastTimestamp)).tab()
.add(timestamp).tab()
;

this.lastTimestamp = timestamp;
Expand Down Expand Up @@ -670,6 +674,7 @@ public final class EntryAggregator implements EntryIterator
private long lastConsistentStoreTimestamp;
private long currentStoreLength ;
private long currentStoreTimestamp ;
private long maxTimeStamp;

private long currentFileNumber = -1;

Expand Down Expand Up @@ -729,6 +734,7 @@ private boolean handleEntryFileCreation(final long address, final long available
}

// timestamp is intentionally ignored as file creation happens AFTER a store has been issued.
this.updateMaxTimestamp(Logic.getEntryTimestamp(address));

// entry is consistent, register completed file and reset values for new file.
this.registerCurrentFile();
Expand Down Expand Up @@ -773,6 +779,7 @@ private boolean handleEntryStore(final long address, final long availableItemLen
}

final long timestamp = Logic.getEntryTimestamp(address);
this.updateMaxTimestamp(timestamp);
if(timestamp <= this.currentStoreTimestamp)
{
throw new StorageExceptionConsistency(
Expand Down Expand Up @@ -804,6 +811,8 @@ private boolean handleEntryTransfer(final long address, final long availableItem
);
}

this.updateMaxTimestamp(Logic.getEntryTimestamp(address));

/* lastConsistentStoreTimestamp is not updated to associate the new file length with the old timestamp
* i.e. when an inter-channel rollback has to occur, the transfer part is not rolled back, as it is
* channel-local
Expand Down Expand Up @@ -848,6 +857,7 @@ private boolean handleEntryFileTruncation(final long address, final long availab
);
}

this.updateMaxTimestamp(Logic.getEntryTimestamp(address));
this.lastConsistentStoreLength = this.currentStoreLength = newLength;

return true;
Expand All @@ -868,6 +878,8 @@ private boolean handleEntryFileDeletion(final long address, final long available
}
file.isDeleted = true;

this.updateMaxTimestamp(Logic.getEntryTimestamp(address));

return true;
}

Expand All @@ -882,9 +894,15 @@ final StorageTransactionsAnalysis yield(final StorageLiveTransactionsFile transa
this.lastConsistentStoreLength ,
this.lastConsistentStoreTimestamp,
this.currentStoreLength ,
this.currentStoreTimestamp
this.currentStoreTimestamp ,
this.maxTimeStamp
);
}

private void updateMaxTimestamp(final long timestamp)
{
this.maxTimeStamp = Math.max(this.maxTimeStamp, timestamp);
}

}

Expand All @@ -902,6 +920,7 @@ public final class Default implements StorageTransactionsAnalysis
private final long headFileLastConsistentStoreTimestamp;
private final long headFileLatestLength ;
private final long headFileLatestTimestamp ;
private final long maxTimestamp ;



Expand All @@ -915,7 +934,8 @@ public final class Default implements StorageTransactionsAnalysis
final long headFileLastConsistentStoreLength ,
final long headFileLastConsistentStoreTimestamp,
final long headFileLatestLength ,
final long headFileLatestTimestamp
final long headFileLatestTimestamp ,
final long maxTimestamp
)
{
super();
Expand All @@ -925,6 +945,7 @@ public final class Default implements StorageTransactionsAnalysis
this.headFileLastConsistentStoreTimestamp = headFileLastConsistentStoreTimestamp;
this.headFileLatestLength = headFileLatestLength ;
this.headFileLatestTimestamp = headFileLatestTimestamp ;
this.maxTimestamp = maxTimestamp ;
}


Expand Down Expand Up @@ -968,6 +989,12 @@ public final long headFileLatestTimestamp()
{
return this.headFileLatestTimestamp;
}

@Override
public final long maxTimestamp()
{
return this.maxTimestamp;
}

}

Expand Down

0 comments on commit d7f0bb4

Please sign in to comment.