Skip to content

Commit

Permalink
KeyValue atomic delete and purge methods. (#1092)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmcote authored Mar 5, 2024
1 parent b4a51ed commit 7ed4990
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 0 deletions.
20 changes: 20 additions & 0 deletions src/main/java/io/nats/client/KeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ public interface KeyValue {
*/
void delete(String key) throws IOException, JetStreamApiException;

/**
* Soft deletes the key by placing a delete marker iff the key exists and its last revision matches the expected
* @param key the key
* @param expectedRevision the expected last revision
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
void delete(String key, long expectedRevision) throws IOException, JetStreamApiException;

/**
* Purge all values/history from the specific key
* @param key the key
Expand All @@ -147,6 +157,16 @@ public interface KeyValue {
*/
void purge(String key) throws IOException, JetStreamApiException;

/**
* Purge all values/history from the specific key iff the key exists and its last revision matches the expected
* @param key the key
* @param expectedRevision the expected last revision
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
void purge(String key, long expectedRevision) throws IOException, JetStreamApiException;

/**
* Watch updates for a specific key.
* @param key the key
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/io/nats/client/impl/NatsKeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,16 @@ public void delete(String key) throws IOException, JetStreamApiException {
_write(key, null, getDeleteHeaders());
}

/**
* {@inheritDoc}
*/
@Override
public void delete(String key, long expectedRevision) throws IOException, JetStreamApiException {
validateNonWildcardKvKeyRequired(key);
Headers h = getDeleteHeaders().put(EXPECTED_LAST_SUB_SEQ_HDR, Long.toString(expectedRevision));
_write(key, null, h).getSeqno();
}

/**
* {@inheritDoc}
*/
Expand All @@ -211,6 +221,15 @@ public void purge(String key) throws IOException, JetStreamApiException {
_write(key, null, getPurgeHeaders());
}

/**
* {@inheritDoc}
*/
@Override
public void purge(String key, long expectedRevision) throws IOException, JetStreamApiException {
Headers h = getPurgeHeaders().put(EXPECTED_LAST_SUB_SEQ_HDR, Long.toString(expectedRevision));
_write(key, null, h);
}

private PublishAck _write(String key, byte[] data, Headers h) throws IOException, JetStreamApiException {
validateNonWildcardKvKeyRequired(key);
return js.publish(NatsMessage.builder().subject(writeSubject(key)).data(data).headers(h).build());
Expand Down
57 changes: 57 additions & 0 deletions src/test/java/io/nats/client/impl/KeyValueTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,63 @@ public void testHistoryDeletePurge() throws Exception {
});
}

@Test
public void testAtomicDeleteAtomicPurge() throws Exception {
jsServer.run(nc -> {
KeyValueManagement kvm = nc.keyValueManagement();

// create bucket
String bucket = bucket();
kvm.create(KeyValueConfiguration.builder()
.name(bucket)
.storageType(StorageType.Memory)
.maxHistoryPerKey(64)
.build());

KeyValue kv = nc.keyValue(bucket);
String key = key();
kv.put(key, "a");
kv.put(key, "b");
kv.put(key, "c");
assertEquals(3, kv.get(key).getRevision());

// Delete wrong revision rejected
assertThrows(JetStreamApiException.class, () -> kv.delete(key, 1));

// Correct revision writes tombstone and bumps revision
kv.delete(key, 3);

assertHistory(Arrays.asList(
kv.get(key, 1L),
kv.get(key, 2L),
kv.get(key, 3L),
KeyValueOperation.DELETE),
kv.history(key));

// Wrong revision rejected again
assertThrows(JetStreamApiException.class, () -> kv.delete(key, 3));

// Delete is idempotent: two consecutive tombstones
kv.delete(key, 4);

assertHistory(Arrays.asList(
kv.get(key, 1L),
kv.get(key, 2L),
kv.get(key, 3L),
KeyValueOperation.DELETE,
KeyValueOperation.DELETE),
kv.history(key));

// Purge wrong revision rejected
assertThrows(JetStreamApiException.class, () -> kv.purge(key, 1));

// Correct revision writes roll-up purge tombstone
kv.purge(key, 5);

assertHistory(Arrays.asList(KeyValueOperation.PURGE), kv.history(key));
});
}

@Test
public void testPurgeDeletes() throws Exception {
jsServer.run(nc -> {
Expand Down

0 comments on commit 7ed4990

Please sign in to comment.