From ab23645ffc20a6468e3f4119f51c60aaf8440eb2 Mon Sep 17 00:00:00 2001 From: "igor.petrenko" Date: Fri, 17 Jan 2025 13:43:13 +0200 Subject: [PATCH 1/2] oap-mail: mongo persistence --- oap-mail/oap-mail-mongo/pom.xml | 31 ++++++++ .../mail/mongo/MailQueuePersistenceMongo.java | 63 ++++++++++++++++ .../main/java/oap/mail/mongo/MessageData.java | 13 ++++ .../main/resources/META-INF/oap-module.conf | 15 ++++ oap-mail/oap-mail-test/pom.xml | 10 +++ .../src/test/java/oap/mail/MailQueueTest.java | 14 +--- .../oap/mail/mongo/MailQueueMongoTest.java | 74 +++++++++++++++++++ .../src/main/java/oap/mail/Attachment.java | 12 ++- .../src/main/java/oap/mail/MailAddress.java | 7 +- .../src/main/java/oap/mail/MailQueue.java | 64 +++++----------- .../java/oap/mail/MailQueuePersistence.java | 7 ++ .../oap/mail/MailQueuePersistenceFile.java | 69 +++++++++++++++++ .../src/main/java/oap/mail/Message.java | 19 +++-- .../main/resources/META-INF/oap-module.conf | 19 ++++- oap-mail/pom.xml | 1 + .../main/resources/META-INF/oap-module.conf | 2 +- pom.xml | 2 +- 17 files changed, 353 insertions(+), 69 deletions(-) create mode 100644 oap-mail/oap-mail-mongo/pom.xml create mode 100644 oap-mail/oap-mail-mongo/src/main/java/oap/mail/mongo/MailQueuePersistenceMongo.java create mode 100644 oap-mail/oap-mail-mongo/src/main/java/oap/mail/mongo/MessageData.java create mode 100644 oap-mail/oap-mail-mongo/src/main/resources/META-INF/oap-module.conf create mode 100644 oap-mail/oap-mail-test/src/test/java/oap/mail/mongo/MailQueueMongoTest.java create mode 100644 oap-mail/oap-mail/src/main/java/oap/mail/MailQueuePersistence.java create mode 100644 oap-mail/oap-mail/src/main/java/oap/mail/MailQueuePersistenceFile.java diff --git a/oap-mail/oap-mail-mongo/pom.xml b/oap-mail/oap-mail-mongo/pom.xml new file mode 100644 index 0000000000..f1bdc77205 --- /dev/null +++ b/oap-mail/oap-mail-mongo/pom.xml @@ -0,0 +1,31 @@ + + + + oap-mail-parent + oap + ${oap.project.version} + + 4.0.0 + + oap-mail-mongo + + + + oap + oap-mail + ${oap.project.version} + + + oap + oap-storage-mongo + ${oap.project.version} + + + + org.projectlombok + lombok + + + \ No newline at end of file diff --git a/oap-mail/oap-mail-mongo/src/main/java/oap/mail/mongo/MailQueuePersistenceMongo.java b/oap-mail/oap-mail-mongo/src/main/java/oap/mail/mongo/MailQueuePersistenceMongo.java new file mode 100644 index 0000000000..72013a8ab5 --- /dev/null +++ b/oap-mail/oap-mail-mongo/src/main/java/oap/mail/mongo/MailQueuePersistenceMongo.java @@ -0,0 +1,63 @@ +package oap.mail.mongo; + +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.model.Filters; +import oap.mail.MailQueuePersistence; +import oap.mail.Message; +import oap.reflect.TypeRef; +import oap.storage.mongo.JsonCodec; +import oap.storage.mongo.MongoClient; +import oap.util.Cuid; +import org.bson.codecs.configuration.CodecRegistries; +import org.bson.codecs.configuration.CodecRegistry; + +import java.util.Iterator; + +public class MailQueuePersistenceMongo implements MailQueuePersistence { + private final MongoCollection collection; + + public MailQueuePersistenceMongo( MongoClient mongoClient, String collectionName ) { + CodecRegistry codecRegistry = CodecRegistries.fromRegistries( + CodecRegistries.fromCodecs( new JsonCodec<>( new TypeRef() {}, md -> md.id, id -> id ) ), + mongoClient.getCodecRegistry() + ); + this.collection = mongoClient + .getCollection( collectionName, MessageData.class ) + .withCodecRegistry( codecRegistry ); + } + + @Override + public void add( Message message ) { + collection.insertOne( new MessageData( Cuid.UNIQUE.next(), message ) ); + } + + @Override + public int size() { + return ( int ) collection.countDocuments(); + } + + @Override + public Iterator iterator() { + MongoCursor iterator = collection.find().iterator(); + return new Iterator<>() { + MessageData messageData; + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public Message next() { + messageData = iterator.next(); + return messageData.message; + } + + @Override + public void remove() { + collection.deleteOne( Filters.eq( "_id", messageData.id ) ); + } + }; + } +} diff --git a/oap-mail/oap-mail-mongo/src/main/java/oap/mail/mongo/MessageData.java b/oap-mail/oap-mail-mongo/src/main/java/oap/mail/mongo/MessageData.java new file mode 100644 index 0000000000..44b9dc9547 --- /dev/null +++ b/oap-mail/oap-mail-mongo/src/main/java/oap/mail/mongo/MessageData.java @@ -0,0 +1,13 @@ +package oap.mail.mongo; + +import oap.mail.Message; + +public class MessageData { + public final Message message; + public String id; + + public MessageData( String id, Message message ) { + this.id = id; + this.message = message; + } +} diff --git a/oap-mail/oap-mail-mongo/src/main/resources/META-INF/oap-module.conf b/oap-mail/oap-mail-mongo/src/main/resources/META-INF/oap-module.conf new file mode 100644 index 0000000000..2b57764143 --- /dev/null +++ b/oap-mail/oap-mail-mongo/src/main/resources/META-INF/oap-module.conf @@ -0,0 +1,15 @@ +name = oap-mail-mongo + +dependsOn = [ + oap-storage-mongo +] + +services { + mail-queue-persistence-mongo { + implementation = oap.mail.mongo.MailQueuePersistenceMongo + parameters { + mongoClient = + collectionName = mails + } + } +} diff --git a/oap-mail/oap-mail-test/pom.xml b/oap-mail/oap-mail-test/pom.xml index 8acc8fa906..ddb03b77b0 100644 --- a/oap-mail/oap-mail-test/pom.xml +++ b/oap-mail/oap-mail-test/pom.xml @@ -42,12 +42,22 @@ oap-mail ${project.version} + + oap + oap-mail-mongo + ${project.version} + oap oap-stdlib-test ${oap.project.version} + + oap + oap-storage-mongo-test + ${project.version} + org.projectlombok diff --git a/oap-mail/oap-mail-test/src/test/java/oap/mail/MailQueueTest.java b/oap-mail/oap-mail-test/src/test/java/oap/mail/MailQueueTest.java index 32ab7afe46..81cf52c7c7 100644 --- a/oap-mail/oap-mail-test/src/test/java/oap/mail/MailQueueTest.java +++ b/oap-mail/oap-mail-test/src/test/java/oap/mail/MailQueueTest.java @@ -47,11 +47,11 @@ public MailQueueTest() { @Test public void persist() { - var location = testDirectoryFixture.testPath( "queue" ); + Path location = testDirectoryFixture.testPath( "queue" ); MailQueue queue = prepareQueue( location ); queue.processing( reject() ); assertThat( location.resolve( "mail.gz" ) ).exists(); - var queue2 = new MailQueue( location ); + MailQueue queue2 = new MailQueue( new MailQueuePersistenceFile( location ) ); assertMessages( queue2.messages() ) .hasSize( 2 ) .bySubject( "subj1", MessageAssertion::assertMessage ) @@ -59,18 +59,12 @@ public void persist() { queue2.processing( accept() ); assertMessages( queue2.messages() ).isEmpty(); - var queue3 = new MailQueue( location ); + MailQueue queue3 = new MailQueue( new MailQueuePersistenceFile( location ) ); assertMessages( queue3.messages() ).isEmpty(); } - @Test - public void persistWithNullLocation() { - MailQueue queue = prepareQueue( null ); - queue.processing( reject() ); - } - private MailQueue prepareQueue( Path location ) { - var queue = new MailQueue( location ); + MailQueue queue = new MailQueue( new MailQueuePersistenceFile( location ) ); queue.add( new Message( "subj1", "body", Lists.empty() ) ); queue.add( new Message( "subj2", "body", Lists.empty() ) ); Message message = new Message( "subj3", "body", Lists.empty() ); diff --git a/oap-mail/oap-mail-test/src/test/java/oap/mail/mongo/MailQueueMongoTest.java b/oap-mail/oap-mail-test/src/test/java/oap/mail/mongo/MailQueueMongoTest.java new file mode 100644 index 0000000000..232651b980 --- /dev/null +++ b/oap-mail/oap-mail-test/src/test/java/oap/mail/mongo/MailQueueMongoTest.java @@ -0,0 +1,74 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) Open Application Platform Authors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +package oap.mail.mongo; + +import oap.mail.MailQueue; +import oap.mail.Message; +import oap.mail.test.MessageAssertion; +import oap.storage.mongo.MongoFixture; +import oap.testng.Fixtures; +import oap.util.Dates; +import oap.util.Lists; +import org.joda.time.DateTime; +import org.testng.annotations.Test; + +import static oap.mail.test.MessagesAssertion.assertMessages; +import static oap.util.function.Functions.empty.accept; +import static oap.util.function.Functions.empty.reject; +import static org.assertj.core.api.Assertions.assertThat; + +public class MailQueueMongoTest extends Fixtures { + private final MongoFixture mongoFixture; + + public MailQueueMongoTest() { + mongoFixture = fixture( new MongoFixture() ); + } + + @Test + public void persist() { + MailQueue queue = prepareQueue(); + queue.processing( reject() ); + assertThat( mongoFixture.client().getCollection( "mails" ).countDocuments() ).isEqualTo( 2 ); + MailQueue queue2 = new MailQueue( new MailQueuePersistenceMongo( mongoFixture.client(), "mails" ) ); + assertMessages( queue2.messages() ) + .hasSize( 2 ) + .bySubject( "subj1", MessageAssertion::assertMessage ) + .bySubject( "subj2", MessageAssertion::assertMessage ); + + queue2.processing( accept() ); + assertMessages( queue2.messages() ).isEmpty(); + MailQueue queue3 = new MailQueue( new MailQueuePersistenceMongo( mongoFixture.client(), "mails" ) ); + assertMessages( queue3.messages() ).isEmpty(); + } + + private MailQueue prepareQueue() { + MailQueue queue = new MailQueue( new MailQueuePersistenceMongo( mongoFixture.client(), "mails" ) ); + queue.add( new Message( "subj1", "body", Lists.empty() ) ); + queue.add( new Message( "subj2", "body", Lists.empty() ) ); + Message message = new Message( "subj3", "body", Lists.empty() ); + message.created = DateTime.now().minus( Dates.w( 3 ) ); + queue.add( message ); + return queue; + } +} diff --git a/oap-mail/oap-mail/src/main/java/oap/mail/Attachment.java b/oap-mail/oap-mail/src/main/java/oap/mail/Attachment.java index 442f294f90..f5b77e9fa9 100644 --- a/oap-mail/oap-mail/src/main/java/oap/mail/Attachment.java +++ b/oap-mail/oap-mail/src/main/java/oap/mail/Attachment.java @@ -28,10 +28,16 @@ import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; import lombok.ToString; +import java.io.Serial; +import java.io.Serializable; + import static com.google.common.base.Preconditions.checkArgument; @ToString -public class Attachment { +public class Attachment implements Serializable { + @Serial + private static final long serialVersionUID = -8962616411939449885L; + private final String contentId; private final String file; @JacksonXmlProperty( isAttribute = true ) @@ -46,8 +52,8 @@ public Attachment( String contentType, String content ) { @JsonCreator public Attachment( String contentType, String content, String contentId, String file, String name ) { checkArgument( file != null - || contentType.startsWith( "text/" ) - || contentType.startsWith( "image/" ), + || contentType.startsWith( "text/" ) + || contentType.startsWith( "image/" ), "contentType.startsWith( text/ ) || contentType.startsWith( image/ ) || file != null" ); this.contentType = contentType; this.content = content; diff --git a/oap-mail/oap-mail/src/main/java/oap/mail/MailAddress.java b/oap-mail/oap-mail/src/main/java/oap/mail/MailAddress.java index 9070133b64..013ca90a13 100644 --- a/oap-mail/oap-mail/src/main/java/oap/mail/MailAddress.java +++ b/oap-mail/oap-mail/src/main/java/oap/mail/MailAddress.java @@ -30,6 +30,8 @@ import org.apache.commons.lang3.ArrayUtils; import javax.mail.internet.InternetAddress; +import java.io.Serial; +import java.io.Serializable; import java.util.Collections; import java.util.List; import java.util.stream.Stream; @@ -38,7 +40,10 @@ @ToString @EqualsAndHashCode -public class MailAddress { +public class MailAddress implements Serializable { + @Serial + private static final long serialVersionUID = -7106481927777678346L; + public final String personal; public final String mail; diff --git a/oap-mail/oap-mail/src/main/java/oap/mail/MailQueue.java b/oap-mail/oap-mail/src/main/java/oap/mail/MailQueue.java index d24289a89d..c25bb4a489 100644 --- a/oap-mail/oap-mail/src/main/java/oap/mail/MailQueue.java +++ b/oap-mail/oap-mail/src/main/java/oap/mail/MailQueue.java @@ -24,34 +24,20 @@ package oap.mail; -import io.micrometer.core.instrument.Metrics; -import io.micrometer.core.instrument.Tags; import lombok.extern.slf4j.Slf4j; -import oap.json.Binder; -import oap.reflect.TypeRef; import oap.util.Dates; -import oap.util.Lists; -import oap.util.Stream; import org.joda.time.DateTime; -import java.nio.file.Path; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.Iterator; import java.util.function.Predicate; @Slf4j public class MailQueue { - private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); - private final Path location; + private final MailQueuePersistence mailQueuePersistence; public long brokenMessageTTL = Dates.w( 2 ); - public MailQueue( Path location ) { - if( location != null ) { - this.location = location.resolve( "mail.gz" ); - load(); - } else this.location = null; - - Metrics.gaugeCollectionSize( "mail_queue", Tags.empty(), queue ); + public MailQueue( MailQueuePersistence mailQueuePersistence ) { + this.mailQueuePersistence = mailQueuePersistence; } public MailQueue() { @@ -60,43 +46,29 @@ public MailQueue() { public void add( Message message ) { log.trace( "adding {}", message ); - queue.add( message ); + mailQueuePersistence.add( message ); } public void processing( Predicate processor ) { DateTime ttl = DateTime.now().minus( brokenMessageTTL ); - queue.removeIf( m -> { - if( processor.test( m ) ) return true; - if( m.created.isBefore( ttl ) ) { - log.debug( "removing expired message: {}", m ); - return true; - } - return false; - } ); - persist(); - } - - private void persist() { - if( location != null ) Binder.json.marshal( location, this.queue ); - } - - private void load() { - log.debug( "loading queue..." ); - queue.addAll( Binder.json.unmarshal( new TypeRef>() {}, location ).orElse( Lists.empty() ) ); - log.debug( "{} messages loaded", size() ); - } + Iterator iterator = mailQueuePersistence.iterator(); + while( iterator.hasNext() ) { + Message message = iterator.next(); - public List messages() { - return Stream.of( queue.stream() ).toList(); + if( processor.test( message ) ) { + iterator.remove(); + } else if( message.created.isBefore( ttl ) ) { + log.debug( "removing expired message: {}", message ); + iterator.remove(); + } + } } public int size() { - return queue.size(); + return mailQueuePersistence.size(); } - public void removeAll() { - log.trace( "removeAll" ); - queue.clear(); - persist(); + public Iterable messages() { + return mailQueuePersistence; } } diff --git a/oap-mail/oap-mail/src/main/java/oap/mail/MailQueuePersistence.java b/oap-mail/oap-mail/src/main/java/oap/mail/MailQueuePersistence.java new file mode 100644 index 0000000000..18935d38b5 --- /dev/null +++ b/oap-mail/oap-mail/src/main/java/oap/mail/MailQueuePersistence.java @@ -0,0 +1,7 @@ +package oap.mail; + +public interface MailQueuePersistence extends Iterable { + void add( Message message ); + + int size(); +} diff --git a/oap-mail/oap-mail/src/main/java/oap/mail/MailQueuePersistenceFile.java b/oap-mail/oap-mail/src/main/java/oap/mail/MailQueuePersistenceFile.java new file mode 100644 index 0000000000..d6fe7b44af --- /dev/null +++ b/oap-mail/oap-mail/src/main/java/oap/mail/MailQueuePersistenceFile.java @@ -0,0 +1,69 @@ +package oap.mail; + +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Tags; +import lombok.extern.slf4j.Slf4j; +import oap.json.Binder; +import oap.reflect.TypeRef; +import oap.util.Lists; + +import java.nio.file.Path; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; + +@Slf4j +public class MailQueuePersistenceFile implements MailQueuePersistence { + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + private final Path location; + + public MailQueuePersistenceFile( Path location ) { + this.location = location.resolve( "mail.gz" ); + load(); + + Metrics.gaugeCollectionSize( "mail_queue", Tags.empty(), queue ); + } + + private void load() { + log.debug( "loading queue..." ); + queue.addAll( Binder.json.unmarshal( new TypeRef>() {}, location ).orElse( Lists.empty() ) ); + log.debug( "{} messages loaded", size() ); + } + + @Override + public int size() { + return queue.size(); + } + + @Override + public void add( Message message ) { + queue.add( message ); + } + + private synchronized void persist() { + Binder.json.marshal( location, this.queue ); + } + + @Override + public Iterator iterator() { + Iterator iterator = queue.iterator(); + return new Iterator<>() { + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public Message next() { + return iterator.next(); + } + + @Override + public void remove() { + iterator.remove(); + + persist(); + } + }; + } +} diff --git a/oap-mail/oap-mail/src/main/java/oap/mail/Message.java b/oap-mail/oap-mail/src/main/java/oap/mail/Message.java index 8d790d7842..b6687b4af1 100644 --- a/oap-mail/oap-mail/src/main/java/oap/mail/Message.java +++ b/oap-mail/oap-mail/src/main/java/oap/mail/Message.java @@ -27,18 +27,23 @@ import lombok.ToString; import org.joda.time.DateTime; +import java.io.Serial; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; @ToString( of = { "subject", "from", "to", "cc", "bcc", "created" } ) -public class Message { +public class Message implements Serializable { + @Serial + private static final long serialVersionUID = 207713767383114714L; + + public final ArrayList to = new ArrayList<>(); + public final ArrayList cc = new ArrayList<>(); + public final ArrayList bcc = new ArrayList<>(); public String subject; public String body; - public List attachments = new ArrayList<>(); + public ArrayList attachments = new ArrayList<>(); public MailAddress from; - public final List to = new ArrayList<>(); - public final List cc = new ArrayList<>(); - public final List bcc = new ArrayList<>(); public String contentType = "text/plain"; public DateTime created = new DateTime(); @@ -46,7 +51,9 @@ public class Message { public Message( String subject, String body, List attachments ) { this.body = body; this.subject = subject; - if( attachments != null ) this.attachments.addAll( attachments ); + if( attachments != null ) { + this.attachments.addAll( attachments ); + } } public Message() { diff --git a/oap-mail/oap-mail/src/main/resources/META-INF/oap-module.conf b/oap-mail/oap-mail/src/main/resources/META-INF/oap-module.conf index ab97f3c37c..d6b7e36e41 100644 --- a/oap-mail/oap-mail/src/main/resources/META-INF/oap-module.conf +++ b/oap-mail/oap-mail/src/main/resources/META-INF/oap-module.conf @@ -14,8 +14,25 @@ services { } } + mail-queue-persistence { + abstract = true + implementation = oap.mail.MailQueuePersistence + default = + } + + mail-queue-persistence-file { + implementation = oap.mail.MailQueuePersistenceFile + parameters { + location = + } + } + oap-mail-queue { - implementation: oap.mail.MailQueue + implementation = oap.mail.MailQueue + parameters { + mailQueuePersistence = + brokenMessageTTL = 2w + } } oap-mail-mailman { diff --git a/oap-mail/pom.xml b/oap-mail/pom.xml index 9e27157cd7..104d1e036a 100644 --- a/oap-mail/pom.xml +++ b/oap-mail/pom.xml @@ -14,6 +14,7 @@ oap-mail oap-mail-sendgrid + oap-mail-mongo oap-mail-test \ No newline at end of file diff --git a/oap-storage/oap-storage-mongo/src/main/resources/META-INF/oap-module.conf b/oap-storage/oap-storage-mongo/src/main/resources/META-INF/oap-module.conf index ed0f7bd4e8..82b61d4d6b 100644 --- a/oap-storage/oap-storage-mongo/src/main/resources/META-INF/oap-module.conf +++ b/oap-storage/oap-storage-mongo/src/main/resources/META-INF/oap-module.conf @@ -1,7 +1,7 @@ name = oap-storage-mongo services { mongo-client { - implementation: oap.storage.mongo.MongoClient + implementation = oap.storage.mongo.MongoClient parameters { connectionString = "mongodb://:/" // migrationPackage = "my,migrations" diff --git a/pom.xml b/pom.xml index d3bda884fc..663afb4a72 100644 --- a/pom.xml +++ b/pom.xml @@ -71,7 +71,7 @@ - 22.6.11 + 22.7.0 1.4.3 22.0.0 From 8c1b160515427af5b7c362baebaecc81b71bc4e5 Mon Sep 17 00:00:00 2001 From: "igor.petrenko" Date: Fri, 17 Jan 2025 15:54:24 +0200 Subject: [PATCH 2/2] oap-mail: mongo persistence --- .../oap/mail/MailQueuePersistenceFile.java | 18 +---------- .../oap/mail/MailQueuePersistenceMemory.java | 32 +++++++++++++++++++ .../main/resources/META-INF/oap-module.conf | 5 +++ 3 files changed, 38 insertions(+), 17 deletions(-) create mode 100644 oap-mail/oap-mail/src/main/java/oap/mail/MailQueuePersistenceMemory.java diff --git a/oap-mail/oap-mail/src/main/java/oap/mail/MailQueuePersistenceFile.java b/oap-mail/oap-mail/src/main/java/oap/mail/MailQueuePersistenceFile.java index d6fe7b44af..47cc66dbe1 100644 --- a/oap-mail/oap-mail/src/main/java/oap/mail/MailQueuePersistenceFile.java +++ b/oap-mail/oap-mail/src/main/java/oap/mail/MailQueuePersistenceFile.java @@ -1,7 +1,5 @@ package oap.mail; -import io.micrometer.core.instrument.Metrics; -import io.micrometer.core.instrument.Tags; import lombok.extern.slf4j.Slf4j; import oap.json.Binder; import oap.reflect.TypeRef; @@ -10,18 +8,14 @@ import java.nio.file.Path; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; @Slf4j -public class MailQueuePersistenceFile implements MailQueuePersistence { - private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); +public class MailQueuePersistenceFile extends MailQueuePersistenceMemory { private final Path location; public MailQueuePersistenceFile( Path location ) { this.location = location.resolve( "mail.gz" ); load(); - - Metrics.gaugeCollectionSize( "mail_queue", Tags.empty(), queue ); } private void load() { @@ -30,16 +24,6 @@ private void load() { log.debug( "{} messages loaded", size() ); } - @Override - public int size() { - return queue.size(); - } - - @Override - public void add( Message message ) { - queue.add( message ); - } - private synchronized void persist() { Binder.json.marshal( location, this.queue ); } diff --git a/oap-mail/oap-mail/src/main/java/oap/mail/MailQueuePersistenceMemory.java b/oap-mail/oap-mail/src/main/java/oap/mail/MailQueuePersistenceMemory.java new file mode 100644 index 0000000000..84ebc0a982 --- /dev/null +++ b/oap-mail/oap-mail/src/main/java/oap/mail/MailQueuePersistenceMemory.java @@ -0,0 +1,32 @@ +package oap.mail; + +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Tags; +import lombok.extern.slf4j.Slf4j; + +import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; + +@Slf4j +public class MailQueuePersistenceMemory implements MailQueuePersistence { + protected final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + + public MailQueuePersistenceMemory() { + Metrics.gaugeCollectionSize( "mail_queue", Tags.empty(), queue ); + } + + @Override + public int size() { + return queue.size(); + } + + @Override + public void add( Message message ) { + queue.add( message ); + } + + @Override + public Iterator iterator() { + return queue.iterator(); + } +} diff --git a/oap-mail/oap-mail/src/main/resources/META-INF/oap-module.conf b/oap-mail/oap-mail/src/main/resources/META-INF/oap-module.conf index d6b7e36e41..b925843321 100644 --- a/oap-mail/oap-mail/src/main/resources/META-INF/oap-module.conf +++ b/oap-mail/oap-mail/src/main/resources/META-INF/oap-module.conf @@ -27,6 +27,11 @@ services { } } + mail-queue-persistence-memory { + enabled = false + implementation = oap.mail.MailQueuePersistenceMemory + } + oap-mail-queue { implementation = oap.mail.MailQueue parameters {