From 7ed4990fa49f9967487949ea8eed1842875474d9 Mon Sep 17 00:00:00 2001 From: davidmcote Date: Tue, 5 Mar 2024 07:28:53 -0500 Subject: [PATCH] KeyValue atomic delete and purge methods. (#1092) --- src/main/java/io/nats/client/KeyValue.java | 20 +++++++ .../io/nats/client/impl/NatsKeyValue.java | 19 +++++++ .../io/nats/client/impl/KeyValueTests.java | 57 +++++++++++++++++++ 3 files changed, 96 insertions(+) diff --git a/src/main/java/io/nats/client/KeyValue.java b/src/main/java/io/nats/client/KeyValue.java index 7868e1ac3..b12d7d263 100644 --- a/src/main/java/io/nats/client/KeyValue.java +++ b/src/main/java/io/nats/client/KeyValue.java @@ -138,6 +138,16 @@ public interface KeyValue { */ void delete(String key) throws IOException, JetStreamApiException; + /** + * Soft deletes the key by placing a delete marker iff the key exists and its last revision matches the expected + * @param key the key + * @param expectedRevision the expected last revision + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + void delete(String key, long expectedRevision) throws IOException, JetStreamApiException; + /** * Purge all values/history from the specific key * @param key the key @@ -147,6 +157,16 @@ public interface KeyValue { */ void purge(String key) throws IOException, JetStreamApiException; + /** + * Purge all values/history from the specific key iff the key exists and its last revision matches the expected + * @param key the key + * @param expectedRevision the expected last revision + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + void purge(String key, long expectedRevision) throws IOException, JetStreamApiException; + /** * Watch updates for a specific key. * @param key the key diff --git a/src/main/java/io/nats/client/impl/NatsKeyValue.java b/src/main/java/io/nats/client/impl/NatsKeyValue.java index 02caedfac..ddede13f2 100644 --- a/src/main/java/io/nats/client/impl/NatsKeyValue.java +++ b/src/main/java/io/nats/client/impl/NatsKeyValue.java @@ -203,6 +203,16 @@ public void delete(String key) throws IOException, JetStreamApiException { _write(key, null, getDeleteHeaders()); } + /** + * {@inheritDoc} + */ + @Override + public void delete(String key, long expectedRevision) throws IOException, JetStreamApiException { + validateNonWildcardKvKeyRequired(key); + Headers h = getDeleteHeaders().put(EXPECTED_LAST_SUB_SEQ_HDR, Long.toString(expectedRevision)); + _write(key, null, h).getSeqno(); + } + /** * {@inheritDoc} */ @@ -211,6 +221,15 @@ public void purge(String key) throws IOException, JetStreamApiException { _write(key, null, getPurgeHeaders()); } + /** + * {@inheritDoc} + */ + @Override + public void purge(String key, long expectedRevision) throws IOException, JetStreamApiException { + Headers h = getPurgeHeaders().put(EXPECTED_LAST_SUB_SEQ_HDR, Long.toString(expectedRevision)); + _write(key, null, h); + } + private PublishAck _write(String key, byte[] data, Headers h) throws IOException, JetStreamApiException { validateNonWildcardKvKeyRequired(key); return js.publish(NatsMessage.builder().subject(writeSubject(key)).data(data).headers(h).build()); diff --git a/src/test/java/io/nats/client/impl/KeyValueTests.java b/src/test/java/io/nats/client/impl/KeyValueTests.java index eed3d0d1a..c19047fe9 100644 --- a/src/test/java/io/nats/client/impl/KeyValueTests.java +++ b/src/test/java/io/nats/client/impl/KeyValueTests.java @@ -547,6 +547,63 @@ public void testHistoryDeletePurge() throws Exception { }); } + @Test + public void testAtomicDeleteAtomicPurge() throws Exception { + jsServer.run(nc -> { + KeyValueManagement kvm = nc.keyValueManagement(); + + // create bucket + String bucket = bucket(); + kvm.create(KeyValueConfiguration.builder() + .name(bucket) + .storageType(StorageType.Memory) + .maxHistoryPerKey(64) + .build()); + + KeyValue kv = nc.keyValue(bucket); + String key = key(); + kv.put(key, "a"); + kv.put(key, "b"); + kv.put(key, "c"); + assertEquals(3, kv.get(key).getRevision()); + + // Delete wrong revision rejected + assertThrows(JetStreamApiException.class, () -> kv.delete(key, 1)); + + // Correct revision writes tombstone and bumps revision + kv.delete(key, 3); + + assertHistory(Arrays.asList( + kv.get(key, 1L), + kv.get(key, 2L), + kv.get(key, 3L), + KeyValueOperation.DELETE), + kv.history(key)); + + // Wrong revision rejected again + assertThrows(JetStreamApiException.class, () -> kv.delete(key, 3)); + + // Delete is idempotent: two consecutive tombstones + kv.delete(key, 4); + + assertHistory(Arrays.asList( + kv.get(key, 1L), + kv.get(key, 2L), + kv.get(key, 3L), + KeyValueOperation.DELETE, + KeyValueOperation.DELETE), + kv.history(key)); + + // Purge wrong revision rejected + assertThrows(JetStreamApiException.class, () -> kv.purge(key, 1)); + + // Correct revision writes roll-up purge tombstone + kv.purge(key, 5); + + assertHistory(Arrays.asList(KeyValueOperation.PURGE), kv.history(key)); + }); + } + @Test public void testPurgeDeletes() throws Exception { jsServer.run(nc -> {