diff --git a/golang/.env.dist b/golang/.env.dist index 5136987..4d3f696 100644 --- a/golang/.env.dist +++ b/golang/.env.dist @@ -1,5 +1,2 @@ KAFKA_URL=kafka:9092 -PULSAR_URL=pulsar://pulsar:6650 MYSQL_URL=root:root@(mariadb:3306)/reviews -MESSAGING_PROTOCOL=protobuf -EVENT_STREAM_TYPE=pulsar diff --git a/golang/docker-compose.yml b/golang/docker-compose.yml index bc1fc9c..9b2f04a 100644 --- a/golang/docker-compose.yml +++ b/golang/docker-compose.yml @@ -19,21 +19,21 @@ services: networks: ppro: - event-stream-golang-review-created-consumer: - container_name: pp_event_stream_golang_review_created_consumer - build: ./ - env_file: - - ./.env - - ./.env.dist - entrypoint: /bin/event-stream-golang consume-review-created - working_dir: /srv - volumes: - - ./:/srv:cached - deploy: - restart_policy: - condition: on-failure - networks: - ppro: +# event-stream-golang-review-created-consumer: +# container_name: pp_event_stream_golang_review_created_consumer +# build: ./ +# env_file: +# - ./.env +# - ./.env.dist +# entrypoint: /bin/event-stream-golang consume-review-created +# working_dir: /srv +# volumes: +# - ./:/srv:cached +# deploy: +# restart_policy: +# condition: on-failure +# networks: +# ppro: zookeeper: image: docker.io/bitnami/zookeeper:3.7 @@ -48,6 +48,7 @@ services: condition: on-failure networks: ppro: + kafka: image: docker.io/bitnami/kafka:3 ports: @@ -65,25 +66,6 @@ services: networks: ppro: - pulsar: - image: apachepulsar/pulsar:2.8.1 - ports: - - "8081:8081" - - "6650:6650" - environment: - - BOOKIE_MEM=" -Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g" - command: > - /bin/bash -c - "bin/apply-config-from-env.py /conf/pulsar.conf - && exec bin/pulsar standalone --advertised-address pulsar" - volumes: - - ./conf:/conf:cached - networks: - ppro: - deploy: - restart_policy: - condition: on-failure - mariadb: image: mariadb:10.6.4 restart: on-failure diff --git a/golang/internal/cmd/consume_review_created_event.go b/golang/internal/cmd/consume_review_created_event.go index f7791a2..6fdcfae 100644 --- a/golang/internal/cmd/consume_review_created_event.go +++ b/golang/internal/cmd/consume_review_created_event.go @@ -1,8 +1,6 @@ package cmd import ( - "fmt" - "github.com/urfave/cli" ) @@ -15,7 +13,7 @@ func consumeReviewCreatedEvent(baseFlags []cli.Flag) cli.Command { } func consumeReviewCreated(c *cli.Context) error { - serviceLocator := newServiceLocatorFromCliContext(c) + /*serviceLocator := newServiceLocatorFromCliContext(c) migrate := Migrate{db: serviceLocator.MysqlDB()} migrate.Migrate() @@ -25,7 +23,7 @@ func consumeReviewCreated(c *cli.Context) error { err := consumer.Consume() if err != nil { fmt.Println(err) - } + }*/ return nil } diff --git a/golang/internal/cmd/main.go b/golang/internal/cmd/main.go index 8351130..58b945e 100644 --- a/golang/internal/cmd/main.go +++ b/golang/internal/cmd/main.go @@ -19,22 +19,10 @@ func GetApp(version string) *cli.App { Name: "event_stream-url", EnvVar: "KAFKA_URL", }, - cli.StringFlag{ - Name: "pulsar-url", - EnvVar: "PULSAR_URL", - }, cli.StringFlag{ Name: "mysql-url", EnvVar: "MYSQL_URL", }, - cli.StringFlag{ - Name: "messaging-protocol", - EnvVar: "MESSAGING_PROTOCOL", - }, - cli.StringFlag{ - Name: "event-stream-type", - EnvVar: "EVENT_STREAM_TYPE", - }, } app.Commands = []cli.Command{ @@ -48,9 +36,6 @@ func GetApp(version string) *cli.App { func newServiceLocatorFromCliContext(c *cli.Context) *serviceLocator { return newServiceLocator( c.String("kafka-url"), - c.String("pulsar-url"), c.String("mysql-url"), - c.String("event-stream-type"), - c.String("messaging-protocol"), ) } diff --git a/golang/internal/cmd/migrate_sql.go b/golang/internal/cmd/migrate_sql.go index d4ecb5f..766d117 100644 --- a/golang/internal/cmd/migrate_sql.go +++ b/golang/internal/cmd/migrate_sql.go @@ -22,21 +22,6 @@ func (m *Migrate) Migrate() { comment TEXT DEFAULT '', rating INT DEFAULT 0, PRIMARY KEY (uuid) -);`, - ) - - m.db.MustExec(`CREATE TABLE IF NOT EXISTS review_events_outbox ( - id BIGINT NOT NULL AUTO_INCREMENT, - uuid VARCHAR(256) NOT NULL, - aggregate_id VARCHAR(256) NOT NULL, - name VARCHAR(256) NOT NULL, - payload TEXT NOT NULL, - version VARCHAR(256) NOT NULL, - status INT NOT NULL, - message_counter_by_aggregate INT NOT NULL, - PRIMARY KEY (id), - UNIQUE (uuid), - UNIQUE (aggregate_id, message_counter_by_aggregate) );`, ) } diff --git a/golang/internal/cmd/service_locator.go b/golang/internal/cmd/service_locator.go index 88bccc8..87ea3f8 100644 --- a/golang/internal/cmd/service_locator.go +++ b/golang/internal/cmd/service_locator.go @@ -4,42 +4,27 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/jmoiron/sqlx" - "github.com/ProntoPro/event-stream-golang/internal/pkg/infrastructure/event_stream/review_rating_incremented" + "github.com/ProntoPro/event-stream-golang/internal/pkg/domain" + "github.com/ProntoPro/event-stream-golang/internal/pkg/infrastructure/http/patch_review" "github.com/ProntoPro/event-stream-golang/internal/pkg/application/commands" "github.com/ProntoPro/event-stream-golang/internal/pkg/application/queries" - "github.com/ProntoPro/event-stream-golang/internal/pkg/infrastructure/event_stream/review_created" get_reviews2 "github.com/ProntoPro/event-stream-golang/internal/pkg/infrastructure/projectors/get_reviews" - "github.com/ProntoPro/event-stream-golang/internal/pkg/infrastructure/event_stream" "github.com/ProntoPro/event-stream-golang/internal/pkg/infrastructure/http/create_review" "github.com/ProntoPro/event-stream-golang/internal/pkg/infrastructure/http/get_reviews" "github.com/ProntoPro/event-stream-golang/internal/pkg/infrastructure/mysql" "github.com/ProntoPro/event-stream-golang/pkg/kafka" - "github.com/ProntoPro/event-stream-golang/pkg/pulsar" -) - -const ( - KafkaEventStream = "kafka" - PulsarEventStream = "pulsar" - JSONFormat = "json" - ProtobufFormat = "protobuf" ) func newServiceLocator( kafkaURL string, - pulsarURL string, mysqlURL string, - eventStream string, - format string, ) *serviceLocator { return &serviceLocator{ - kafkaURL: kafkaURL, - pulsarURL: pulsarURL, - mysqlURL: mysqlURL, - eventStreamKind: eventStream, - format: format, + kafkaURL: kafkaURL, + mysqlURL: mysqlURL, } } @@ -48,30 +33,12 @@ type serviceLocator struct { getReviewsQueryHandler *queries.GetReviewsQueryHandler incrementReviewRatingCommandHandler *commands.IncrementReviewRatingCommandHandler - createReviewHandler *create_review.CreateReviewHandler - getReviewsHandler *get_reviews.GetReviewsHandler - patchReviewHandler *patch_review.PatchReviewHandler - createReviewConsumer event_stream.Consumer - createReviewProducer event_stream.Producer - incrementReviewRatingConsumer event_stream.Consumer - incrementReviewRatingProducer event_stream.Producer - - reviewCreatedMarshaller review_created.ReviewCreatedEventMarshaller - reviewCreatedEventProducer *review_created.ReviewCreatedEventProducer - reviewCreatedEventConsumer *review_created.ReviewCreatedEventConsumer - reviewCreatedEventJSONMarshaller *review_created.ReviewCreatedEventJSONMarshaller - reviewCreatedEventProtobufMarshaller *review_created.ReviewCreatedEventProtobufMarshaller - - reviewRatingIncrementedMarshaller review_rating_incremented.ReviewRatingIncrementedEventMarshaller - reviewRatingIncrementedEventProducer *review_rating_incremented.ReviewRatingIncrementedEventProducer - reviewRatingIncrementedEventConsumer *review_rating_incremented.ReviewRatingIncrementedEventConsumer - reviewRatingIncrementedEventJSONMarshaller *review_rating_incremented.ReviewRatingIncrementedEventJSONMarshaller - reviewRatingIncrementedEventProtobufMarshaller *review_rating_incremented.ReviewRatingIncrementedEventProtobufMarshaller - - mysqlCommandsReviewRepository *mysql.CommandsReviewRepository - mysqlQueriesReviewRepository *mysql.QueriesReviewsRepository - mysqlIntegrationEventOutboxRepository *mysql.IntegrationReviewEventsOutboxRepository - mysqlTransactionManager *mysql.TransactionManager + createReviewHandler *create_review.CreateReviewHandler + getReviewsHandler *get_reviews.GetReviewsHandler + patchReviewHandler *patch_review.PatchReviewHandler + + mysqlCommandsReviewRepository *mysql.CommandsReviewRepository + mysqlQueriesReviewRepository *mysql.QueriesReviewsRepository reviewCreatedProjector *get_reviews2.ReviewCreatedProjector reviewRatingIncrementedProjector *get_reviews2.ReviewRatingIncrementedProjector @@ -79,22 +46,13 @@ type serviceLocator struct { reviewCreatedKafkaConsumer *kafka.Consumer reviewCreatedKafkaProducer *kafka.Producer - reviewCreatedPulsarConsumer *pulsar.Consumer - reviewCreatedPulsarProducer *pulsar.Producer - reviewRatingIncrementedKafkaConsumer *kafka.Consumer reviewRatingIncrementedKafkaProducer *kafka.Producer - reviewRatingIncrementedPulsarConsumer *pulsar.Consumer - reviewRatingIncrementedPulsarProducer *pulsar.Producer - mysqlDB *sqlx.DB kafkaURL string - pulsarURL string mysqlURL string - eventStreamKind string - format string } func (s *serviceLocator) GetReviewsProjector() *get_reviews2.ReviewCreatedProjector { @@ -117,9 +75,6 @@ func (s *serviceLocator) CreateReviewCommandHandler() *commands.CreateReviewComm if s.createReviewCommandHandler == nil { s.createReviewCommandHandler = commands.NewCreateReviewCommandHandler( s.CommandReviewRepository(), - s.MysqlTransactionManager(), - s.MysqlIntegrationEventOutboxRepository(), - s.ReviewCreatedEventBus(), ) } @@ -138,16 +93,13 @@ func (s *serviceLocator) IncrementReviewRatingCommandHandler() *commands.Increme if s.incrementReviewRatingCommandHandler == nil { s.incrementReviewRatingCommandHandler = commands.NewIncrementReviewRatingCommandHandler( s.CommandReviewRepository(), - s.MysqlTransactionManager(), - s.MysqlIntegrationEventOutboxRepository(), - s.ReviewCreatedEventBus(), ) } return s.incrementReviewRatingCommandHandler } -func (s *serviceLocator) CommandReviewRepository() commands.ReviewRepository { +func (s *serviceLocator) CommandReviewRepository() domain.ReviewRepository { return s.MysqlCommandReviewRepository() } @@ -155,14 +107,6 @@ func (s *serviceLocator) QueryReviewRepository() queries.ReviewRepository { return s.MysqlQueryReviewsRepository() } -func (s *serviceLocator) ReviewCreatedEventBus() commands.IntegrationEventBus { - return s.ReviewCreatedEventProducer() -} - -func (s *serviceLocator) ReviewRatingIncrementedEventBus() commands.IntegrationEventBus { - return s.ReviewRatingIncrementedEventProducer() -} - func (s *serviceLocator) CreateReviewHandler() *create_review.CreateReviewHandler { if s.createReviewHandler == nil { s.createReviewHandler = create_review.NewCreateReviewHandler(s.CreateReviewCommandHandler()) @@ -187,98 +131,6 @@ func (s *serviceLocator) PatchReviewHandler() *patch_review.PatchReviewHandler { return s.patchReviewHandler } -func (s *serviceLocator) CreateReviewConsumer() event_stream.Consumer { - if s.createReviewConsumer == nil { - switch s.eventStreamKind { - case KafkaEventStream: - s.createReviewConsumer = s.ReviewCreatedKafkaConsumer() - case PulsarEventStream: - s.createReviewConsumer = s.ReviewCreatedPulsarConsumer() - } - } - - return s.createReviewConsumer -} - -func (s *serviceLocator) CreateReviewProducer() event_stream.Producer { - if s.createReviewProducer == nil { - switch s.eventStreamKind { - case KafkaEventStream: - s.createReviewProducer = s.ReviewCreatedKafkaProducer() - case PulsarEventStream: - s.createReviewProducer = s.ReviewCreatedPulsarProducer() - } - } - - return s.createReviewProducer -} - -func (s *serviceLocator) IncrementReviewRatingConsumer() event_stream.Consumer { - if s.incrementReviewRatingConsumer == nil { - switch s.eventStreamKind { - case KafkaEventStream: - s.incrementReviewRatingConsumer = s.ReviewRatingIncrementedKafkaConsumer() - case PulsarEventStream: - s.incrementReviewRatingConsumer = s.ReviewRatingIncrementedPulsarConsumer() - } - } - - return s.incrementReviewRatingConsumer -} - -func (s *serviceLocator) IncrementReviewRatingProducer() event_stream.Producer { - if s.incrementReviewRatingProducer == nil { - switch s.eventStreamKind { - case KafkaEventStream: - s.incrementReviewRatingProducer = s.ReviewCreatedKafkaProducer() - case PulsarEventStream: - s.incrementReviewRatingProducer = s.ReviewCreatedPulsarProducer() - } - } - - return s.incrementReviewRatingProducer -} - -func (s *serviceLocator) ReviewCreatedMarshaller() review_created.ReviewCreatedEventMarshaller { - if s.reviewCreatedMarshaller == nil { - switch s.format { - case JSONFormat: - s.reviewCreatedMarshaller = s.ReviewCreatedEventJSONMarshaller() - case ProtobufFormat: - s.reviewCreatedMarshaller = s.ReviewCreatedEventProtobufMarshaller() - } - } - - return s.reviewCreatedMarshaller -} - -func (s *serviceLocator) ReviewRatingIncrementedMarshaller() review_rating_incremented.ReviewRatingIncrementedEventMarshaller { - if s.reviewRatingIncrementedMarshaller == nil { - switch s.format { - case JSONFormat: - s.reviewRatingIncrementedMarshaller = s.ReviewRatingIncrementedEventJSONMarshaller() - case ProtobufFormat: - s.reviewRatingIncrementedMarshaller = s.ReviewRatingIncrementedEventProtobufMarshaller() - } - } - - return s.reviewRatingIncrementedMarshaller -} - -func (s *serviceLocator) MysqlIntegrationEventOutboxRepository() *mysql.IntegrationReviewEventsOutboxRepository { - if s.mysqlIntegrationEventOutboxRepository == nil { - s.mysqlIntegrationEventOutboxRepository = mysql.NewIntegrationReviewEventsOutboxRepository(s.MysqlDB()) - } - return s.mysqlIntegrationEventOutboxRepository -} - -func (s *serviceLocator) MysqlTransactionManager() *mysql.TransactionManager { - if s.mysqlTransactionManager == nil { - s.mysqlTransactionManager = mysql.NewTransactionManager(s.MysqlDB()) - } - return s.mysqlTransactionManager -} - func (s *serviceLocator) MysqlCommandReviewRepository() *mysql.CommandsReviewRepository { if s.mysqlCommandsReviewRepository == nil { s.mysqlCommandsReviewRepository = mysql.NewReviewRepository(s.MysqlDB()) @@ -293,52 +145,6 @@ func (s *serviceLocator) MysqlQueryReviewsRepository() *mysql.QueriesReviewsRepo return s.mysqlQueriesReviewRepository } -func (s *serviceLocator) ReviewCreatedEventProducer() *review_created.ReviewCreatedEventProducer { - if s.reviewCreatedEventProducer == nil { - s.reviewCreatedEventProducer = review_created.NewReviewCreatedEventProducer( - s.CreateReviewProducer(), - s.ReviewCreatedMarshaller(), - ) - } - - return s.reviewCreatedEventProducer -} - -func (s *serviceLocator) ReviewRatingIncrementedEventProducer() *review_rating_incremented.ReviewRatingIncrementedEventProducer { - if s.reviewRatingIncrementedEventProducer == nil { - s.reviewRatingIncrementedEventProducer = review_rating_incremented.NewReviewRatingIncrementedEventProducer( - s.IncrementReviewRatingProducer(), - s.ReviewRatingIncrementedMarshaller(), - ) - } - - return s.reviewRatingIncrementedEventProducer -} - -func (s *serviceLocator) ReviewCreatedEventConsumer() *review_created.ReviewCreatedEventConsumer { - if s.reviewCreatedEventConsumer == nil { - s.reviewCreatedEventConsumer = review_created.NewReviewCreatedEventConsumer( - s.CreateReviewConsumer(), - s.ReviewCreatedMarshaller(), - s.GetReviewsProjector(), - ) - } - - return s.reviewCreatedEventConsumer -} - -func (s *serviceLocator) ReviewRatingIncrementedEventConsumer() *review_rating_incremented.ReviewRatingIncrementedEventConsumer { - if s.reviewRatingIncrementedEventConsumer == nil { - s.reviewRatingIncrementedEventConsumer = review_rating_incremented.NewReviewRatingIncrementedEventConsumer( - s.CreateReviewConsumer(), - s.ReviewRatingIncrementedMarshaller(), - s.ReviewRatingIncrementedProjector(), - ) - } - - return s.reviewRatingIncrementedEventConsumer -} - func (s *serviceLocator) ReviewCreatedKafkaConsumer() *kafka.Consumer { if s.reviewCreatedKafkaConsumer == nil { var err error @@ -361,60 +167,6 @@ func (s *serviceLocator) ReviewCreatedKafkaProducer() *kafka.Producer { return s.reviewCreatedKafkaProducer } -func (s *serviceLocator) ReviewCreatedPulsarConsumer() *pulsar.Consumer { - if s.reviewCreatedPulsarConsumer == nil { - var err error - s.reviewCreatedPulsarConsumer, err = pulsar.NewConsumer(s.PulsarURL(), "review_created") - if err != nil { - panic(err) - } - } - return s.reviewCreatedPulsarConsumer -} - -func (s *serviceLocator) ReviewCreatedPulsarProducer() *pulsar.Producer { - if s.reviewCreatedPulsarProducer == nil { - var err error - s.reviewCreatedPulsarProducer, err = pulsar.NewProducer(s.PulsarURL(), "review_created") - if err != nil { - panic(err) - } - } - return s.reviewCreatedPulsarProducer -} - -func (s *serviceLocator) ReviewCreatedEventJSONMarshaller() *review_created.ReviewCreatedEventJSONMarshaller { - if s.reviewCreatedEventJSONMarshaller == nil { - s.reviewCreatedEventJSONMarshaller = &review_created.ReviewCreatedEventJSONMarshaller{} - } - - return s.reviewCreatedEventJSONMarshaller -} - -func (s *serviceLocator) ReviewCreatedEventProtobufMarshaller() *review_created.ReviewCreatedEventProtobufMarshaller { - if s.reviewCreatedEventProtobufMarshaller == nil { - s.reviewCreatedEventProtobufMarshaller = &review_created.ReviewCreatedEventProtobufMarshaller{} - } - - return s.reviewCreatedEventProtobufMarshaller -} - -func (s *serviceLocator) ReviewRatingIncrementedEventJSONMarshaller() *review_rating_incremented.ReviewRatingIncrementedEventJSONMarshaller { - if s.reviewRatingIncrementedEventJSONMarshaller == nil { - s.reviewRatingIncrementedEventJSONMarshaller = &review_rating_incremented.ReviewRatingIncrementedEventJSONMarshaller{} - } - - return s.reviewRatingIncrementedEventJSONMarshaller -} - -func (s *serviceLocator) ReviewRatingIncrementedEventProtobufMarshaller() *review_rating_incremented.ReviewRatingIncrementedEventProtobufMarshaller { - if s.reviewRatingIncrementedEventProtobufMarshaller == nil { - s.reviewRatingIncrementedEventProtobufMarshaller = &review_rating_incremented.ReviewRatingIncrementedEventProtobufMarshaller{} - } - - return s.reviewRatingIncrementedEventProtobufMarshaller -} - func (s *serviceLocator) ReviewRatingIncrementedKafkaConsumer() *kafka.Consumer { if s.reviewRatingIncrementedKafkaConsumer == nil { var err error @@ -437,28 +189,6 @@ func (s *serviceLocator) ReviewRatingIncrementedKafkaProducer() *kafka.Producer return s.reviewRatingIncrementedKafkaProducer } -func (s *serviceLocator) ReviewRatingIncrementedPulsarConsumer() *pulsar.Consumer { - if s.reviewRatingIncrementedPulsarConsumer == nil { - var err error - s.reviewRatingIncrementedPulsarConsumer, err = pulsar.NewConsumer(s.PulsarURL(), "review_rating_incremented") - if err != nil { - panic(err) - } - } - return s.reviewRatingIncrementedPulsarConsumer -} - -func (s *serviceLocator) ReviewRatingIncrementedPulsarProducer() *pulsar.Producer { - if s.reviewRatingIncrementedPulsarProducer == nil { - var err error - s.reviewRatingIncrementedPulsarProducer, err = pulsar.NewProducer(s.PulsarURL(), "review_rating_incremented") - if err != nil { - panic(err) - } - } - return s.reviewRatingIncrementedPulsarProducer -} - func (s *serviceLocator) MysqlDB() *sqlx.DB { if s.mysqlDB == nil { var err error @@ -477,15 +207,3 @@ func (s *serviceLocator) MysqlURL() string { func (s *serviceLocator) KafkaURL() string { return s.kafkaURL } - -func (s *serviceLocator) PulsarURL() string { - return s.pulsarURL -} - -func (s *serviceLocator) EventStreamKind() string { - return s.eventStreamKind -} - -func (s *serviceLocator) Format() string { - return s.format -} diff --git a/golang/internal/pkg/application/commands/create_review.go b/golang/internal/pkg/application/commands/create_review.go index 2b1d51c..a5b8de2 100644 --- a/golang/internal/pkg/application/commands/create_review.go +++ b/golang/internal/pkg/application/commands/create_review.go @@ -1,36 +1,22 @@ package commands import ( - "github.com/google/uuid" - "github.com/sirupsen/logrus" - "github.com/ProntoPro/event-stream-golang/internal/pkg/domain" -) - -const ( - reviewCreatedEventVersion = "0.1.0" - reviewCreatedEventName = "review_created" + "github.com/google/uuid" ) func NewCreateReviewCommandHandler( reviewRepository ReviewRepository, - transactionManager TransactionManager, - eventOutboxRepository IntegrationEventOutboxRepository, - eventBus IntegrationEventBus, ) *CreateReviewCommandHandler { return &CreateReviewCommandHandler{ - reviewRepository: reviewRepository, - transactionManager: transactionManager, - eventOutboxRepository: eventOutboxRepository, - eventBus: eventBus, + reviewRepository: reviewRepository, } } type CreateReviewCommandHandler struct { - reviewRepository ReviewRepository - transactionManager TransactionManager - eventOutboxRepository IntegrationEventOutboxRepository - eventBus IntegrationEventBus + reviewRepository ReviewRepository + eventRepository EventRepository + transactionFactory TransactionFactory } type CreateReviewCommand struct { @@ -38,78 +24,61 @@ type CreateReviewCommand struct { Rating int32 } +type ReviewRepository interface { + SaveTransactional(transaction Transaction, review *domain.Review) error + FindByUUID(reviewUUID uuid.UUID) (*domain.Review, error) +} + type ReviewCreatedEvent struct { ReviewUUID string Comment string Rating int32 } +type EventRepository interface { + SaveTransactional(transaction Transaction, name string, payload interface{}) error +} + +type Transaction interface { + Begin() error + Commit() error +} + +type TransactionFactory interface { + Create() (Transaction, error) +} + func (h *CreateReviewCommandHandler) Execute(command CreateReviewCommand) error { - transaction, err := openTransaction(h.transactionManager) + review := domain.NewReview(command.Comment, command.Rating) + + transaction, err := h.transactionFactory.Create() if err != nil { return err } - defer transaction.Rollback() - - events, err := h.executeTransactionally(command, transaction) + err = transaction.Begin() if err != nil { return err } - return h.executePostTransaction(events) -} - -func (h *CreateReviewCommandHandler) executeTransactionally( - command CreateReviewCommand, - transaction Transaction, -) ([]IntegrationEvent, error) { - review := domain.NewReview(command.Comment, command.Rating) - - err := h.reviewRepository.Save(review, transaction) + err = h.reviewRepository.SaveTransactional(transaction, review) if err != nil { - return nil, err + return err } - event := IntegrationEvent{ - UUID: uuid.New().String(), - AggregateID: review.Uuid().String(), - Name: reviewCreatedEventName, - Payload: ReviewCreatedEvent{ + err = h.eventRepository.SaveTransactional( + transaction, + "review_created", + ReviewCreatedEvent{ ReviewUUID: review.Uuid().String(), Comment: review.Comment(), Rating: review.Rating(), }, - Status: ToBeDispatched, - Version: reviewCreatedEventVersion, - } - - err = h.eventOutboxRepository.Save(event, transaction) - if err != nil { - return nil, err - } + ) err = transaction.Commit() if err != nil { - return nil, err - } - - return []IntegrationEvent{event}, nil -} - -func (h *CreateReviewCommandHandler) executePostTransaction(events []IntegrationEvent) error { - for _, event := range events { - h.eventBus.DispatchEvent( - event, - ) - - event.Status = Dispatched - // error is not handle here since it's acceptable to have the event dispatched again (at least once). - // The event may be dispatched again by a dedicated job. - err := h.eventOutboxRepository.Save(event, nil) - if err != nil { - logrus.Error(err) - } + return err } return nil diff --git a/golang/internal/pkg/application/commands/event.go b/golang/internal/pkg/application/commands/event.go deleted file mode 100644 index e7ef877..0000000 --- a/golang/internal/pkg/application/commands/event.go +++ /dev/null @@ -1,25 +0,0 @@ -package commands - -type IntegrationEventStatus int - -const ( - ToBeDispatched IntegrationEventStatus = iota - Dispatched -) - -type IntegrationEvent struct { - UUID string - AggregateID string - Name string - Payload interface{} - Version string - Status IntegrationEventStatus -} - -type IntegrationEventOutboxRepository interface { - Save(event IntegrationEvent, transaction Transaction) error -} - -type IntegrationEventBus interface { - DispatchEvent(event IntegrationEvent) -} diff --git a/golang/internal/pkg/application/commands/increment_review_rating.go b/golang/internal/pkg/application/commands/increment_review_rating.go index 466aa39..aedd8ea 100644 --- a/golang/internal/pkg/application/commands/increment_review_rating.go +++ b/golang/internal/pkg/application/commands/increment_review_rating.go @@ -4,33 +4,20 @@ import ( "fmt" "github.com/google/uuid" - "github.com/sirupsen/logrus" -) -const ( - reviewRatingIncrementEventEventVersion = "0.1.0" - reviewRatingIncrementEventName = "review_rating_incremented" + "github.com/ProntoPro/event-stream-golang/internal/pkg/domain" ) func NewIncrementReviewRatingCommandHandler( - reviewRepository ReviewRepository, - transactionManager TransactionManager, - eventOutboxRepository IntegrationEventOutboxRepository, - eventBus IntegrationEventBus, + reviewRepository domain.ReviewRepository, ) *IncrementReviewRatingCommandHandler { return &IncrementReviewRatingCommandHandler{ - reviewRepository: reviewRepository, - transactionManager: transactionManager, - eventOutboxRepository: eventOutboxRepository, - eventBus: eventBus, + reviewRepository: reviewRepository, } } type IncrementReviewRatingCommandHandler struct { - reviewRepository ReviewRepository - transactionManager TransactionManager - eventOutboxRepository IntegrationEventOutboxRepository - eventBus IntegrationEventBus + reviewRepository domain.ReviewRepository } type IncrementReviewRatingCommand struct { @@ -42,79 +29,16 @@ type ReviewRatingIncrementedEvent struct { } func (h *IncrementReviewRatingCommandHandler) Execute(command IncrementReviewRatingCommand) error { - transaction, err := openTransaction(h.transactionManager) - if err != nil { - return err - } - - defer transaction.Rollback() - - events, err := h.executeTransactionally(command, transaction) - if err != nil { - return err - } - - return h.executePostTransaction(events) -} - -func (h *IncrementReviewRatingCommandHandler) executeTransactionally( - command IncrementReviewRatingCommand, - transaction Transaction, -) ([]IntegrationEvent, error) { review, err := h.reviewRepository.FindByUUID(command.ReviewUUID) if err != nil { - return nil, err + return err } if review == nil { - return nil, fmt.Errorf("review not found") + return fmt.Errorf("review not found") } review.IncrementRating() - err = h.reviewRepository.Save(review, transaction) - if err != nil { - return nil, err - } - - event := IntegrationEvent{ - UUID: uuid.New().String(), - AggregateID: review.Uuid().String(), - Name: reviewRatingIncrementEventName, - Payload: ReviewRatingIncrementedEvent{ - ReviewUUID: review.Uuid().String(), - }, - Status: ToBeDispatched, - Version: reviewRatingIncrementEventEventVersion, - } - - err = h.eventOutboxRepository.Save(event, transaction) - if err != nil { - return nil, err - } - - err = transaction.Commit() - if err != nil { - return nil, err - } - - return []IntegrationEvent{event}, nil -} - -func (h *IncrementReviewRatingCommandHandler) executePostTransaction(events []IntegrationEvent) error { - for _, event := range events { - h.eventBus.DispatchEvent( - event, - ) - - event.Status = Dispatched - // error is not handle here since it's acceptable to have the event dispatched again (at least once). - // The event may be dispatched again by a dedicated job. - err := h.eventOutboxRepository.Save(event, nil) - if err != nil { - logrus.Error(err) - } - } - - return nil + return h.reviewRepository.Save(review) } diff --git a/golang/internal/pkg/application/commands/review_repository.go b/golang/internal/pkg/application/commands/review_repository.go deleted file mode 100644 index bb2275a..0000000 --- a/golang/internal/pkg/application/commands/review_repository.go +++ /dev/null @@ -1,12 +0,0 @@ -package commands - -import ( - "github.com/google/uuid" - - "github.com/ProntoPro/event-stream-golang/internal/pkg/domain" -) - -type ReviewRepository interface { - Save(review *domain.Review, transaction Transaction) error - FindByUUID(reviewUUID uuid.UUID) (*domain.Review, error) -} diff --git a/golang/internal/pkg/application/commands/transaction_manager.go b/golang/internal/pkg/application/commands/transaction_manager.go deleted file mode 100644 index 074289b..0000000 --- a/golang/internal/pkg/application/commands/transaction_manager.go +++ /dev/null @@ -1,25 +0,0 @@ -package commands - -type TransactionManager interface { - Create() (Transaction, error) -} - -type Transaction interface { - Begin() error - Commit() error - // Rollback must be idempotent - Rollback() -} - -func openTransaction(transactionManager TransactionManager) (Transaction, error) { - transaction, err := transactionManager.Create() - if err != nil { - return nil, err - } - - err = transaction.Begin() - if err != nil { - return nil, err - } - return transaction, nil -} diff --git a/golang/internal/pkg/infrastructure/event_stream/consumer.go b/golang/internal/pkg/infrastructure/event_stream/consumer.go deleted file mode 100644 index 878f1c8..0000000 --- a/golang/internal/pkg/infrastructure/event_stream/consumer.go +++ /dev/null @@ -1,5 +0,0 @@ -package event_stream - -type Consumer interface { - ConsumeAll() (<-chan []byte, error) -} diff --git a/golang/internal/pkg/infrastructure/event_stream/producer.go b/golang/internal/pkg/infrastructure/event_stream/producer.go deleted file mode 100644 index 55af0a4..0000000 --- a/golang/internal/pkg/infrastructure/event_stream/producer.go +++ /dev/null @@ -1,5 +0,0 @@ -package event_stream - -type Producer interface { - Dispatch(message []byte) error -} diff --git a/golang/internal/pkg/infrastructure/event_stream/review_created/review_created_event.go b/golang/internal/pkg/infrastructure/event_stream/review_created/review_created_event.go deleted file mode 100644 index ae8a343..0000000 --- a/golang/internal/pkg/infrastructure/event_stream/review_created/review_created_event.go +++ /dev/null @@ -1,11 +0,0 @@ -package review_created - -type ReviewCreatedEventMessage struct { - Review ReviewMessage `json:"review"` -} - -type ReviewMessage struct { - UUID string `json:"uuid"` - Comment string `json:"comment"` - Rating int32 `json:"rating"` -} diff --git a/golang/internal/pkg/infrastructure/event_stream/review_created/review_created_event.pb.go b/golang/internal/pkg/infrastructure/event_stream/review_created/review_created_event.pb.go deleted file mode 100644 index 04fbf95..0000000 --- a/golang/internal/pkg/infrastructure/event_stream/review_created/review_created_event.pb.go +++ /dev/null @@ -1,231 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.26.0 -// protoc v3.6.1 -// source: internal/pkg/infrastructure/event_stream/review_rating_incremented_event.proto - -package review_created - -import ( - reflect "reflect" - sync "sync" - - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type ReviewCreatedEvent struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Review *ReviewCreatedEvent_Review `protobuf:"bytes,1,opt,name=review,proto3" json:"review,omitempty"` -} - -func (x *ReviewCreatedEvent) Reset() { - *x = ReviewCreatedEvent{} - if protoimpl.UnsafeEnabled { - mi := &file_internal_pkg_infrastructure_kafka_review_created_event_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ReviewCreatedEvent) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ReviewCreatedEvent) ProtoMessage() {} - -func (x *ReviewCreatedEvent) ProtoReflect() protoreflect.Message { - mi := &file_internal_pkg_infrastructure_kafka_review_created_event_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ReviewCreatedEvent.ProtoReflect.Descriptor instead. -func (*ReviewCreatedEvent) Descriptor() ([]byte, []int) { - return file_internal_pkg_infrastructure_kafka_review_created_event_proto_rawDescGZIP(), []int{0} -} - -func (x *ReviewCreatedEvent) GetReview() *ReviewCreatedEvent_Review { - if x != nil { - return x.Review - } - return nil -} - -type ReviewCreatedEvent_Review struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Uuid string `protobuf:"bytes,1,opt,name=uuid,proto3" json:"uuid,omitempty"` - Comment string `protobuf:"bytes,2,opt,name=comment,proto3" json:"comment,omitempty"` - Rating int32 `protobuf:"varint,3,opt,name=rating,proto3" json:"rating,omitempty"` -} - -func (x *ReviewCreatedEvent_Review) Reset() { - *x = ReviewCreatedEvent_Review{} - if protoimpl.UnsafeEnabled { - mi := &file_internal_pkg_infrastructure_kafka_review_created_event_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ReviewCreatedEvent_Review) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ReviewCreatedEvent_Review) ProtoMessage() {} - -func (x *ReviewCreatedEvent_Review) ProtoReflect() protoreflect.Message { - mi := &file_internal_pkg_infrastructure_kafka_review_created_event_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ReviewCreatedEvent_Review.ProtoReflect.Descriptor instead. -func (*ReviewCreatedEvent_Review) Descriptor() ([]byte, []int) { - return file_internal_pkg_infrastructure_kafka_review_created_event_proto_rawDescGZIP(), []int{0, 0} -} - -func (x *ReviewCreatedEvent_Review) GetUuid() string { - if x != nil { - return x.Uuid - } - return "" -} - -func (x *ReviewCreatedEvent_Review) GetComment() string { - if x != nil { - return x.Comment - } - return "" -} - -func (x *ReviewCreatedEvent_Review) GetRating() int32 { - if x != nil { - return x.Rating - } - return 0 -} - -var File_internal_pkg_infrastructure_kafka_review_created_event_proto protoreflect.FileDescriptor - -var file_internal_pkg_infrastructure_kafka_review_created_event_proto_rawDesc = []byte{ - 0x0a, 0x3c, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x69, - 0x6e, 0x66, 0x72, 0x61, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x75, 0x72, 0x65, 0x2f, 0x6b, 0x61, - 0x66, 0x6b, 0x61, 0x2f, 0x72, 0x65, 0x76, 0x69, 0x65, 0x77, 0x5f, 0x63, 0x72, 0x65, 0x61, 0x74, - 0x65, 0x64, 0x5f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x98, - 0x01, 0x0a, 0x12, 0x52, 0x65, 0x76, 0x69, 0x65, 0x77, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, - 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x32, 0x0a, 0x06, 0x72, 0x65, 0x76, 0x69, 0x65, 0x77, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x52, 0x65, 0x76, 0x69, 0x65, 0x77, 0x43, 0x72, - 0x65, 0x61, 0x74, 0x65, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x52, 0x65, 0x76, 0x69, 0x65, - 0x77, 0x52, 0x06, 0x72, 0x65, 0x76, 0x69, 0x65, 0x77, 0x1a, 0x4e, 0x0a, 0x06, 0x52, 0x65, 0x76, - 0x69, 0x65, 0x77, 0x12, 0x12, 0x0a, 0x04, 0x75, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x04, 0x75, 0x75, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x65, - 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x65, 0x6e, - 0x74, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x61, 0x74, 0x69, 0x6e, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x05, 0x52, 0x06, 0x72, 0x61, 0x74, 0x69, 0x6e, 0x67, 0x42, 0x23, 0x5a, 0x21, 0x69, 0x6e, 0x74, - 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x69, 0x6e, 0x66, 0x72, 0x61, 0x73, - 0x74, 0x72, 0x75, 0x63, 0x74, 0x75, 0x72, 0x65, 0x2f, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_internal_pkg_infrastructure_kafka_review_created_event_proto_rawDescOnce sync.Once - file_internal_pkg_infrastructure_kafka_review_created_event_proto_rawDescData = file_internal_pkg_infrastructure_kafka_review_created_event_proto_rawDesc -) - -func file_internal_pkg_infrastructure_kafka_review_created_event_proto_rawDescGZIP() []byte { - file_internal_pkg_infrastructure_kafka_review_created_event_proto_rawDescOnce.Do(func() { - file_internal_pkg_infrastructure_kafka_review_created_event_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_pkg_infrastructure_kafka_review_created_event_proto_rawDescData) - }) - return file_internal_pkg_infrastructure_kafka_review_created_event_proto_rawDescData -} - -var file_internal_pkg_infrastructure_kafka_review_created_event_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_internal_pkg_infrastructure_kafka_review_created_event_proto_goTypes = []interface{}{ - (*ReviewCreatedEvent)(nil), // 0: ReviewCreatedEvent - (*ReviewCreatedEvent_Review)(nil), // 1: ReviewCreatedEvent.Review -} -var file_internal_pkg_infrastructure_kafka_review_created_event_proto_depIdxs = []int32{ - 1, // 0: ReviewCreatedEvent.review:type_name -> ReviewCreatedEvent.Review - 1, // [1:1] is the sub-list for method output_type - 1, // [1:1] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name -} - -func init() { file_internal_pkg_infrastructure_kafka_review_created_event_proto_init() } -func file_internal_pkg_infrastructure_kafka_review_created_event_proto_init() { - if File_internal_pkg_infrastructure_kafka_review_created_event_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_internal_pkg_infrastructure_kafka_review_created_event_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReviewCreatedEvent); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_internal_pkg_infrastructure_kafka_review_created_event_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReviewCreatedEvent_Review); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_internal_pkg_infrastructure_kafka_review_created_event_proto_rawDesc, - NumEnums: 0, - NumMessages: 2, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_internal_pkg_infrastructure_kafka_review_created_event_proto_goTypes, - DependencyIndexes: file_internal_pkg_infrastructure_kafka_review_created_event_proto_depIdxs, - MessageInfos: file_internal_pkg_infrastructure_kafka_review_created_event_proto_msgTypes, - }.Build() - File_internal_pkg_infrastructure_kafka_review_created_event_proto = out.File - file_internal_pkg_infrastructure_kafka_review_created_event_proto_rawDesc = nil - file_internal_pkg_infrastructure_kafka_review_created_event_proto_goTypes = nil - file_internal_pkg_infrastructure_kafka_review_created_event_proto_depIdxs = nil -} diff --git a/golang/internal/pkg/infrastructure/event_stream/review_created/review_created_event.proto b/golang/internal/pkg/infrastructure/event_stream/review_created/review_created_event.proto deleted file mode 100644 index 8bb2fcd..0000000 --- a/golang/internal/pkg/infrastructure/event_stream/review_created/review_created_event.proto +++ /dev/null @@ -1,12 +0,0 @@ -syntax = "proto3"; -option go_package = "internal/pkg/infrastructure/event_stream/review_created"; - -message ReviewCreatedEvent { - message Review { - string uuid = 1; - string comment = 2; - int32 rating = 3; - } - - Review review = 1; -} diff --git a/golang/internal/pkg/infrastructure/event_stream/review_created/review_created_event_consumer.go b/golang/internal/pkg/infrastructure/event_stream/review_created/review_created_event_consumer.go deleted file mode 100644 index 7451065..0000000 --- a/golang/internal/pkg/infrastructure/event_stream/review_created/review_created_event_consumer.go +++ /dev/null @@ -1,61 +0,0 @@ -package review_created - -import ( - "github.com/sirupsen/logrus" - - "github.com/ProntoPro/event-stream-golang/internal/pkg/application/commands" - "github.com/ProntoPro/event-stream-golang/internal/pkg/infrastructure/event_stream" - "github.com/ProntoPro/event-stream-golang/internal/pkg/infrastructure/projectors/get_reviews" -) - -func NewReviewCreatedEventConsumer( - consumer event_stream.Consumer, - reviewCreatedEventMarshaller ReviewCreatedEventMarshaller, - projector *get_reviews.ReviewCreatedProjector, -) *ReviewCreatedEventConsumer { - return &ReviewCreatedEventConsumer{ - consumer: consumer, - reviewCreatedEventMarshaller: reviewCreatedEventMarshaller, - projector: projector, - } -} - -type ReviewCreatedEventConsumer struct { - consumer event_stream.Consumer - reviewCreatedEventMarshaller ReviewCreatedEventMarshaller - projector *get_reviews.ReviewCreatedProjector -} - -func (r *ReviewCreatedEventConsumer) Consume() error { - messages, err := r.consumer.ConsumeAll() - - if err != nil { - logrus.Error(err) - return err - } - - for message := range messages { - logrus.Infof("processing message %s", string(message)) - eventMessage, err := r.reviewCreatedEventMarshaller.Unmarshal(message) - if err != nil { - // todo handle error - logrus.Error(err) - } - - logrus.Infof("processing review created event %#v", eventMessage) - - err = r.projector.Project( - &commands.ReviewCreatedEvent{ - ReviewUUID: eventMessage.Review.UUID, - Comment: eventMessage.Review.Comment, - Rating: eventMessage.Review.Rating, - }, - ) - if err != nil { - // todo handle error - logrus.Error(err) - } - } - - return nil -} diff --git a/golang/internal/pkg/infrastructure/event_stream/review_created/review_created_event_marshaller.go b/golang/internal/pkg/infrastructure/event_stream/review_created/review_created_event_marshaller.go deleted file mode 100644 index c0e93ae..0000000 --- a/golang/internal/pkg/infrastructure/event_stream/review_created/review_created_event_marshaller.go +++ /dev/null @@ -1,65 +0,0 @@ -package review_created - -import ( - "encoding/json" - - "google.golang.org/protobuf/proto" -) - -type ReviewCreatedEventMarshaller interface { - Marshal(event *ReviewCreatedEventMessage) ([]byte, error) - Unmarshal([]byte) (*ReviewCreatedEventMessage, error) -} - -type ReviewCreatedEventJSONMarshaller struct { -} - -func (r *ReviewCreatedEventJSONMarshaller) Marshal(event *ReviewCreatedEventMessage) ([]byte, error) { - marshalledMessage, err := json.Marshal(event) - if err != nil { - return nil, err - } - - return marshalledMessage, nil -} - -func (r *ReviewCreatedEventJSONMarshaller) Unmarshal(bytes []byte) (*ReviewCreatedEventMessage, error) { - reviewCreatedEvent := &ReviewCreatedEventMessage{} - err := json.Unmarshal(bytes, reviewCreatedEvent) - if err != nil { - return nil, err - } - - return reviewCreatedEvent, nil -} - -type ReviewCreatedEventProtobufMarshaller struct { -} - -func (r *ReviewCreatedEventProtobufMarshaller) Marshal(event *ReviewCreatedEventMessage) ([]byte, error) { - protobufMessage := &ReviewCreatedEvent{ - Review: &ReviewCreatedEvent_Review{ - Uuid: event.Review.UUID, - Comment: event.Review.Comment, - Rating: event.Review.Rating, - }, - } - - return proto.Marshal(protobufMessage) -} - -func (r *ReviewCreatedEventProtobufMarshaller) Unmarshal(bytes []byte) (*ReviewCreatedEventMessage, error) { - reviewCreatedEvent := &ReviewCreatedEvent{} - err := proto.Unmarshal(bytes, reviewCreatedEvent) - if err != nil { - return nil, err - } - - return &ReviewCreatedEventMessage{ - Review: ReviewMessage{ - UUID: reviewCreatedEvent.Review.GetUuid(), - Comment: reviewCreatedEvent.Review.GetComment(), - Rating: reviewCreatedEvent.Review.GetRating(), - }, - }, nil -} diff --git a/golang/internal/pkg/infrastructure/event_stream/review_created/review_created_event_producer.go b/golang/internal/pkg/infrastructure/event_stream/review_created/review_created_event_producer.go deleted file mode 100644 index 8c9ca5e..0000000 --- a/golang/internal/pkg/infrastructure/event_stream/review_created/review_created_event_producer.go +++ /dev/null @@ -1,61 +0,0 @@ -package review_created - -import ( - "fmt" - - "github.com/sirupsen/logrus" - - "github.com/ProntoPro/event-stream-golang/internal/pkg/application/commands" - "github.com/ProntoPro/event-stream-golang/internal/pkg/infrastructure/event_stream" -) - -func NewReviewCreatedEventProducer( - producer event_stream.Producer, - reviewCreatedEventMarshaller ReviewCreatedEventMarshaller, -) *ReviewCreatedEventProducer { - return &ReviewCreatedEventProducer{ - producer: producer, - reviewCreatedEventMarshaller: reviewCreatedEventMarshaller, - } -} - -type ReviewCreatedEventProducer struct { - producer event_stream.Producer - reviewCreatedEventMarshaller ReviewCreatedEventMarshaller -} - -func (r *ReviewCreatedEventProducer) DispatchEvent(event commands.IntegrationEvent) { - eventPayload, ok := event.Payload.(commands.ReviewCreatedEvent) - if !ok { - r.handleErrors(fmt.Errorf("unsupported event payload")) - } - - messageData, err := r.reviewCreatedEventMarshaller.Marshal( - &ReviewCreatedEventMessage{ - Review: ReviewMessage{ - UUID: eventPayload.ReviewUUID, - Comment: eventPayload.Comment, - Rating: eventPayload.Rating, - }, - }, - ) - - if err != nil { - r.handleErrors(err) - } - - // todo what happens if we fail before here... - err = r.producer.Dispatch( - messageData, - ) - // todo what happens if we fail here... - - if err != nil { - r.handleErrors(err) - } -} - -func (r ReviewCreatedEventProducer) handleErrors(err error) { - // todo handle error so to avoid losing events - logrus.Errorf("error sending review created message via event stream: %s\n", err.Error()) -} diff --git a/golang/internal/pkg/infrastructure/event_stream/review_rating_incremented/review_rating_incremented_event.go b/golang/internal/pkg/infrastructure/event_stream/review_rating_incremented/review_rating_incremented_event.go deleted file mode 100644 index 042a228..0000000 --- a/golang/internal/pkg/infrastructure/event_stream/review_rating_incremented/review_rating_incremented_event.go +++ /dev/null @@ -1,9 +0,0 @@ -package review_rating_incremented - -type ReviewRatingIncrementedEventMessage struct { - Review ReviewMessage `json:"review"` -} - -type ReviewMessage struct { - UUID string `json:"uuid"` -} diff --git a/golang/internal/pkg/infrastructure/event_stream/review_rating_incremented/review_rating_incremented_event.pb.go b/golang/internal/pkg/infrastructure/event_stream/review_rating_incremented/review_rating_incremented_event.pb.go deleted file mode 100644 index a5627eb..0000000 --- a/golang/internal/pkg/infrastructure/event_stream/review_rating_incremented/review_rating_incremented_event.pb.go +++ /dev/null @@ -1,220 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.27.1 -// protoc v3.6.1 -// source: internal/pkg/infrastructure/event_stream/review_rating_incremented/review_rating_incremented_event.proto - -package review_rating_incremented - -import ( - reflect "reflect" - sync "sync" - - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type ReviewRatingIncrementedEvent struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Review *ReviewRatingIncrementedEvent_Review `protobuf:"bytes,1,opt,name=review,proto3" json:"review,omitempty"` -} - -func (x *ReviewRatingIncrementedEvent) Reset() { - *x = ReviewRatingIncrementedEvent{} - if protoimpl.UnsafeEnabled { - mi := &file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ReviewRatingIncrementedEvent) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ReviewRatingIncrementedEvent) ProtoMessage() {} - -func (x *ReviewRatingIncrementedEvent) ProtoReflect() protoreflect.Message { - mi := &file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ReviewRatingIncrementedEvent.ProtoReflect.Descriptor instead. -func (*ReviewRatingIncrementedEvent) Descriptor() ([]byte, []int) { - return file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_rawDescGZIP(), []int{0} -} - -func (x *ReviewRatingIncrementedEvent) GetReview() *ReviewRatingIncrementedEvent_Review { - if x != nil { - return x.Review - } - return nil -} - -type ReviewRatingIncrementedEvent_Review struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Uuid string `protobuf:"bytes,1,opt,name=uuid,proto3" json:"uuid,omitempty"` -} - -func (x *ReviewRatingIncrementedEvent_Review) Reset() { - *x = ReviewRatingIncrementedEvent_Review{} - if protoimpl.UnsafeEnabled { - mi := &file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ReviewRatingIncrementedEvent_Review) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ReviewRatingIncrementedEvent_Review) ProtoMessage() {} - -func (x *ReviewRatingIncrementedEvent_Review) ProtoReflect() protoreflect.Message { - mi := &file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ReviewRatingIncrementedEvent_Review.ProtoReflect.Descriptor instead. -func (*ReviewRatingIncrementedEvent_Review) Descriptor() ([]byte, []int) { - return file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_rawDescGZIP(), []int{0, 0} -} - -func (x *ReviewRatingIncrementedEvent_Review) GetUuid() string { - if x != nil { - return x.Uuid - } - return "" -} - -var File_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto protoreflect.FileDescriptor - -var file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_rawDesc = []byte{ - 0x0a, 0x68, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x69, - 0x6e, 0x66, 0x72, 0x61, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x75, 0x72, 0x65, 0x2f, 0x65, 0x76, - 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x72, 0x65, 0x76, 0x69, 0x65, - 0x77, 0x5f, 0x72, 0x61, 0x74, 0x69, 0x6e, 0x67, 0x5f, 0x69, 0x6e, 0x63, 0x72, 0x65, 0x6d, 0x65, - 0x6e, 0x74, 0x65, 0x64, 0x2f, 0x72, 0x65, 0x76, 0x69, 0x65, 0x77, 0x5f, 0x72, 0x61, 0x74, 0x69, - 0x6e, 0x67, 0x5f, 0x69, 0x6e, 0x63, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x65, 0x64, 0x5f, 0x65, - 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x7a, 0x0a, 0x1c, 0x52, 0x65, - 0x76, 0x69, 0x65, 0x77, 0x52, 0x61, 0x74, 0x69, 0x6e, 0x67, 0x49, 0x6e, 0x63, 0x72, 0x65, 0x6d, - 0x65, 0x6e, 0x74, 0x65, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x3c, 0x0a, 0x06, 0x72, 0x65, - 0x76, 0x69, 0x65, 0x77, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x52, 0x65, 0x76, - 0x69, 0x65, 0x77, 0x52, 0x61, 0x74, 0x69, 0x6e, 0x67, 0x49, 0x6e, 0x63, 0x72, 0x65, 0x6d, 0x65, - 0x6e, 0x74, 0x65, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x52, 0x65, 0x76, 0x69, 0x65, 0x77, - 0x52, 0x06, 0x72, 0x65, 0x76, 0x69, 0x65, 0x77, 0x1a, 0x1c, 0x0a, 0x06, 0x52, 0x65, 0x76, 0x69, - 0x65, 0x77, 0x12, 0x12, 0x0a, 0x04, 0x75, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x04, 0x75, 0x75, 0x69, 0x64, 0x42, 0x44, 0x5a, 0x42, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, - 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x69, 0x6e, 0x66, 0x72, 0x61, 0x73, 0x74, 0x72, 0x75, - 0x63, 0x74, 0x75, 0x72, 0x65, 0x2f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x2f, 0x72, 0x65, 0x76, 0x69, 0x65, 0x77, 0x5f, 0x72, 0x61, 0x74, 0x69, 0x6e, 0x67, - 0x5f, 0x69, 0x6e, 0x63, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x65, 0x64, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_rawDescOnce sync.Once - file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_rawDescData = file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_rawDesc -) - -func file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_rawDescGZIP() []byte { - file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_rawDescOnce.Do(func() { - file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_rawDescData) - }) - return file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_rawDescData -} - -var file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_goTypes = []interface{}{ - (*ReviewRatingIncrementedEvent)(nil), // 0: ReviewRatingIncrementedEvent - (*ReviewRatingIncrementedEvent_Review)(nil), // 1: ReviewRatingIncrementedEvent.Review -} -var file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_depIdxs = []int32{ - 1, // 0: ReviewRatingIncrementedEvent.review:type_name -> ReviewRatingIncrementedEvent.Review - 1, // [1:1] is the sub-list for method output_type - 1, // [1:1] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name -} - -func init() { - file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_init() -} -func file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_init() { - if File_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReviewRatingIncrementedEvent); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReviewRatingIncrementedEvent_Review); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_rawDesc, - NumEnums: 0, - NumMessages: 2, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_goTypes, - DependencyIndexes: file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_depIdxs, - MessageInfos: file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_msgTypes, - }.Build() - File_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto = out.File - file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_rawDesc = nil - file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_goTypes = nil - file_internal_pkg_infrastructure_event_stream_review_rating_incremented_review_rating_incremented_event_proto_depIdxs = nil -} diff --git a/golang/internal/pkg/infrastructure/event_stream/review_rating_incremented/review_rating_incremented_event.proto b/golang/internal/pkg/infrastructure/event_stream/review_rating_incremented/review_rating_incremented_event.proto deleted file mode 100644 index 28babf3..0000000 --- a/golang/internal/pkg/infrastructure/event_stream/review_rating_incremented/review_rating_incremented_event.proto +++ /dev/null @@ -1,10 +0,0 @@ -syntax = "proto3"; -option go_package = "internal/pkg/infrastructure/event_stream/review_rating_incremented"; - -message ReviewRatingIncrementedEvent { - message Review { - string uuid = 1; - } - - Review review = 1; -} diff --git a/golang/internal/pkg/infrastructure/event_stream/review_rating_incremented/review_rating_incremented_event_consumer.go b/golang/internal/pkg/infrastructure/event_stream/review_rating_incremented/review_rating_incremented_event_consumer.go deleted file mode 100644 index 06c5a42..0000000 --- a/golang/internal/pkg/infrastructure/event_stream/review_rating_incremented/review_rating_incremented_event_consumer.go +++ /dev/null @@ -1,59 +0,0 @@ -package review_rating_incremented - -import ( - "github.com/sirupsen/logrus" - - "github.com/ProntoPro/event-stream-golang/internal/pkg/application/commands" - "github.com/ProntoPro/event-stream-golang/internal/pkg/infrastructure/event_stream" - "github.com/ProntoPro/event-stream-golang/internal/pkg/infrastructure/projectors/get_reviews" -) - -func NewReviewRatingIncrementedEventConsumer( - consumer event_stream.Consumer, - reviewCreatedEventMarshaller ReviewRatingIncrementedEventMarshaller, - projector *get_reviews.ReviewRatingIncrementedProjector, -) *ReviewRatingIncrementedEventConsumer { - return &ReviewRatingIncrementedEventConsumer{ - consumer: consumer, - reviewCreatedEventMarshaller: reviewCreatedEventMarshaller, - projector: projector, - } -} - -type ReviewRatingIncrementedEventConsumer struct { - consumer event_stream.Consumer - reviewCreatedEventMarshaller ReviewRatingIncrementedEventMarshaller - projector *get_reviews.ReviewRatingIncrementedProjector -} - -func (r *ReviewRatingIncrementedEventConsumer) Consume() error { - messages, err := r.consumer.ConsumeAll() - - if err != nil { - logrus.Error(err) - return err - } - - for message := range messages { - logrus.Infof("processing message %s", string(message)) - eventMessage, err := r.reviewCreatedEventMarshaller.Unmarshal(message) - if err != nil { - // todo handle error - logrus.Error(err) - } - - logrus.Infof("processing review created event %#v", eventMessage) - - err = r.projector.Project( - &commands.ReviewRatingIncrementedEvent{ - ReviewUUID: eventMessage.Review.UUID, - }, - ) - if err != nil { - // todo handle error - logrus.Error(err) - } - } - - return nil -} diff --git a/golang/internal/pkg/infrastructure/event_stream/review_rating_incremented/review_rating_incremented_event_marshaller.go b/golang/internal/pkg/infrastructure/event_stream/review_rating_incremented/review_rating_incremented_event_marshaller.go deleted file mode 100644 index 969a32e..0000000 --- a/golang/internal/pkg/infrastructure/event_stream/review_rating_incremented/review_rating_incremented_event_marshaller.go +++ /dev/null @@ -1,61 +0,0 @@ -package review_rating_incremented - -import ( - "encoding/json" - - "google.golang.org/protobuf/proto" -) - -type ReviewRatingIncrementedEventMarshaller interface { - Marshal(event *ReviewRatingIncrementedEventMessage) ([]byte, error) - Unmarshal([]byte) (*ReviewRatingIncrementedEventMessage, error) -} - -type ReviewRatingIncrementedEventJSONMarshaller struct { -} - -func (r *ReviewRatingIncrementedEventJSONMarshaller) Marshal(event *ReviewRatingIncrementedEventMessage) ([]byte, error) { - marshalledMessage, err := json.Marshal(event) - if err != nil { - return nil, err - } - - return marshalledMessage, nil -} - -func (r *ReviewRatingIncrementedEventJSONMarshaller) Unmarshal(bytes []byte) (*ReviewRatingIncrementedEventMessage, error) { - reviewCreatedEvent := &ReviewRatingIncrementedEventMessage{} - err := json.Unmarshal(bytes, reviewCreatedEvent) - if err != nil { - return nil, err - } - - return reviewCreatedEvent, nil -} - -type ReviewRatingIncrementedEventProtobufMarshaller struct { -} - -func (r *ReviewRatingIncrementedEventProtobufMarshaller) Marshal(event *ReviewRatingIncrementedEventMessage) ([]byte, error) { - protobufMessage := &ReviewRatingIncrementedEvent{ - Review: &ReviewRatingIncrementedEvent_Review{ - Uuid: event.Review.UUID, - }, - } - - return proto.Marshal(protobufMessage) -} - -func (r *ReviewRatingIncrementedEventProtobufMarshaller) Unmarshal(bytes []byte) (*ReviewRatingIncrementedEventMessage, error) { - reviewCreatedEvent := &ReviewRatingIncrementedEvent{} - err := proto.Unmarshal(bytes, reviewCreatedEvent) - if err != nil { - return nil, err - } - - return &ReviewRatingIncrementedEventMessage{ - Review: ReviewMessage{ - UUID: reviewCreatedEvent.Review.GetUuid(), - }, - }, nil -} diff --git a/golang/internal/pkg/infrastructure/event_stream/review_rating_incremented/review_rating_incremented_event_producer.go b/golang/internal/pkg/infrastructure/event_stream/review_rating_incremented/review_rating_incremented_event_producer.go deleted file mode 100644 index 78ecac8..0000000 --- a/golang/internal/pkg/infrastructure/event_stream/review_rating_incremented/review_rating_incremented_event_producer.go +++ /dev/null @@ -1,59 +0,0 @@ -package review_rating_incremented - -import ( - "fmt" - - "github.com/sirupsen/logrus" - - "github.com/ProntoPro/event-stream-golang/internal/pkg/application/commands" - "github.com/ProntoPro/event-stream-golang/internal/pkg/infrastructure/event_stream" -) - -func NewReviewRatingIncrementedEventProducer( - producer event_stream.Producer, - reviewCreatedEventMarshaller ReviewRatingIncrementedEventMarshaller, -) *ReviewRatingIncrementedEventProducer { - return &ReviewRatingIncrementedEventProducer{ - producer: producer, - reviewCreatedEventMarshaller: reviewCreatedEventMarshaller, - } -} - -type ReviewRatingIncrementedEventProducer struct { - producer event_stream.Producer - reviewCreatedEventMarshaller ReviewRatingIncrementedEventMarshaller -} - -func (r *ReviewRatingIncrementedEventProducer) DispatchEvent(event commands.IntegrationEvent) { - eventPayload, ok := event.Payload.(commands.ReviewRatingIncrementedEvent) - if !ok { - r.handleErrors(fmt.Errorf("unsupported event payload")) - } - - messageData, err := r.reviewCreatedEventMarshaller.Marshal( - &ReviewRatingIncrementedEventMessage{ - Review: ReviewMessage{ - UUID: eventPayload.ReviewUUID, - }, - }, - ) - - if err != nil { - r.handleErrors(err) - } - - // todo what happens if we fail before here... - err = r.producer.Dispatch( - messageData, - ) - // todo what happens if we fail here... - - if err != nil { - r.handleErrors(err) - } -} - -func (r ReviewRatingIncrementedEventProducer) handleErrors(err error) { - // todo handle error so to avoid losing events - logrus.Errorf("error sending review created message via event stream: %s\n", err.Error()) -} diff --git a/golang/internal/pkg/infrastructure/in_memory/create_review_repository.go b/golang/internal/pkg/infrastructure/in_memory/create_review_repository.go deleted file mode 100644 index e118f10..0000000 --- a/golang/internal/pkg/infrastructure/in_memory/create_review_repository.go +++ /dev/null @@ -1,15 +0,0 @@ -package in_memory - -import ( - "github.com/ProntoPro/event-stream-golang/internal/pkg/domain" -) - -type CreateReviewRepository struct { - reviews []*domain.Review -} - -func (r *CreateReviewRepository) Add(review *domain.Review) error { - r.reviews = append(r.reviews, review) - - return nil -} diff --git a/golang/internal/pkg/infrastructure/in_memory/get_reviews_repository.go b/golang/internal/pkg/infrastructure/in_memory/get_reviews_repository.go deleted file mode 100644 index 3bf1a3a..0000000 --- a/golang/internal/pkg/infrastructure/in_memory/get_reviews_repository.go +++ /dev/null @@ -1,41 +0,0 @@ -package in_memory - -import ( - "math" - - "github.com/sirupsen/logrus" - - "github.com/ProntoPro/event-stream-golang/internal/pkg/application/queries" -) - -type GetReviewsRepository struct { - reviews []queries.Review -} - -func (r *GetReviewsRepository) Find(query queries.GetReviewsQuery) ([]queries.Review, error) { - elementsAfterOffset := int64(len(r.reviews)) - query.Offset - - logrus.Infof( - "finding limit: %d offset: %d, len: %d, aft: %d", - query.Limit, - query.Offset, - len(r.reviews), - elementsAfterOffset, - ) - - if elementsAfterOffset <= 0 { - logrus.Infof("not enough elements") - - return []queries.Review{}, nil - } - - min := math.Min(float64(elementsAfterOffset), float64(query.Limit)) - - return r.reviews[query.Offset:int(min)], nil -} - -func (r *GetReviewsRepository) Add(review *queries.Review) error { - r.reviews = append(r.reviews, *review) - - return nil -} diff --git a/golang/internal/pkg/infrastructure/mysql/commands_review_repository.go b/golang/internal/pkg/infrastructure/mysql/commands_review_repository.go index 24c1b02..e0f6080 100644 --- a/golang/internal/pkg/infrastructure/mysql/commands_review_repository.go +++ b/golang/internal/pkg/infrastructure/mysql/commands_review_repository.go @@ -1,13 +1,8 @@ package mysql import ( - "database/sql" - "github.com/google/uuid" "github.com/jmoiron/sqlx" - "github.com/sirupsen/logrus" - - "github.com/ProntoPro/event-stream-golang/internal/pkg/application/commands" "github.com/ProntoPro/event-stream-golang/internal/pkg/domain" ) @@ -54,21 +49,12 @@ func (r *CommandsReviewRepository) FindByUUID(reviewUUID uuid.UUID) (*domain.Rev return domain.CreateFromRepository(reviewUUID, reviewRow.Comment, reviewRow.Rating), nil } -func (r *CommandsReviewRepository) Save(review *domain.Review, transaction commands.Transaction) error { - tx, shouldCommit, err := getTransaction(transaction, r.db) +func (r *CommandsReviewRepository) Save(review *domain.Review) error { + tx, err := r.db.Begin() if err != nil { return err } - if shouldCommit { - defer func(tx *sql.Tx) { - err := tx.Rollback() - if err != nil { - logrus.Error(err) - } - }(tx) - } - _, err = tx.Exec( "INSERT INTO reviews_write (uuid, comment, rating) VALUES (?, ?, ?)", review.Uuid(), @@ -79,8 +65,5 @@ func (r *CommandsReviewRepository) Save(review *domain.Review, transaction comma return err } - if shouldCommit { - return tx.Commit() - } - return nil + return tx.Commit() } diff --git a/golang/internal/pkg/infrastructure/mysql/integration_event_outbox_repository.go b/golang/internal/pkg/infrastructure/mysql/integration_event_outbox_repository.go deleted file mode 100644 index cff3ae6..0000000 --- a/golang/internal/pkg/infrastructure/mysql/integration_event_outbox_repository.go +++ /dev/null @@ -1,121 +0,0 @@ -package mysql - -import ( - "database/sql" - "encoding/json" - "fmt" - - "github.com/jmoiron/sqlx" - "github.com/sirupsen/logrus" - - "github.com/ProntoPro/event-stream-golang/internal/pkg/application/commands" -) - -type IntegrationReviewEventsOutboxRepository struct { - db *sqlx.DB -} - -func NewIntegrationReviewEventsOutboxRepository(db *sqlx.DB) *IntegrationReviewEventsOutboxRepository { - return &IntegrationReviewEventsOutboxRepository{db: db} -} - -type Payload struct { - UUID string `json:"uuid"` - Comment string `json:"comment"` - Rating int32 `json:"rating"` -} - -func (i *IntegrationReviewEventsOutboxRepository) Save( - event commands.IntegrationEvent, - transaction commands.Transaction, -) error { - eventPayload, ok := event.Payload.(commands.ReviewCreatedEvent) - if !ok { - return fmt.Errorf("unsupported event payload") - } - - payload, err := json.Marshal( - Payload{ - UUID: eventPayload.ReviewUUID, - Comment: eventPayload.Comment, - Rating: eventPayload.Rating, - }, - ) - if err != nil { - return err - } - - tx, shouldCommit, err := getTransaction(transaction, i.db) - if err != nil { - return err - } - - if shouldCommit { - defer func(tx *sql.Tx) { - err := tx.Rollback() - if err != nil { - logrus.Error(err) - } - }(tx) - } - - lastMessageCounterByAggregate, err := selectLastMessageCounterByAggregate(event.AggregateID, i.db) - if err != nil { - return err - } - - // this approach assumes that (aggregate_id, message_counter_by_aggregate) are unique and - // the Read Uncommitted isolation level is not used. - // In this way the counter will be perfectly consequential for each aggregate. - messageCounterByAggregate := lastMessageCounterByAggregate + 1 - - _, err = tx.Exec( - "INSERT INTO review_events_outbox "+ - "(uuid, aggregate_id, name, payload, version, status, message_counter_by_aggregate) "+ - "VALUES (?, ?, ?, ?, ?, ?, ?) "+ - "ON DUPLICATE KEY UPDATE status=?", - event.UUID, - event.AggregateID, - event.Name, - payload, - event.Version, - int(event.Status), - messageCounterByAggregate, - int(event.Status), - ) - if err != nil { - return err - } - - if shouldCommit { - return tx.Commit() - } - - return nil -} - -func selectLastMessageCounterByAggregate(aggregateId string, db *sqlx.DB) (int32, error) { - rows, err := db.Query( - `SELECT MAX(message_counter_by_aggregate) - FROM review_events_outbox - WHERE aggregate_id = ?`, - aggregateId, - ) - if err != nil { - return 0, err - } - - rows.Next() - var lastMessageCounterByAggregateId sql.NullInt32 - err = rows.Scan(&lastMessageCounterByAggregateId) - if err != nil { - return 0, err - } - - var result int32 - if lastMessageCounterByAggregateId.Valid { - result = lastMessageCounterByAggregateId.Int32 - } - - return result, err -} diff --git a/golang/internal/pkg/infrastructure/mysql/transaction_manager.go b/golang/internal/pkg/infrastructure/mysql/transaction_manager.go deleted file mode 100644 index 25314f7..0000000 --- a/golang/internal/pkg/infrastructure/mysql/transaction_manager.go +++ /dev/null @@ -1,78 +0,0 @@ -package mysql - -import ( - "database/sql" - "fmt" - - "github.com/jmoiron/sqlx" - - "github.com/ProntoPro/event-stream-golang/internal/pkg/application/commands" -) - -type TransactionManager struct { - db *sqlx.DB -} - -func NewTransactionManager(db *sqlx.DB) *TransactionManager { - return &TransactionManager{db: db} -} - -func (t *TransactionManager) Create() (commands.Transaction, error) { - return &Transaction{db: t.db}, nil -} - -type Transaction struct { - db *sqlx.DB - tx *sql.Tx -} - -func (t *Transaction) Tx() *sql.Tx { - return t.tx -} - -func (t *Transaction) Begin() error { - var err error - t.tx, err = t.db.Begin() - - return err -} - -func (t *Transaction) Commit() error { - if t.tx == nil { - return fmt.Errorf("transaction is nil. Begin transaction before commit") - } - - return t.tx.Commit() -} - -func (t *Transaction) Rollback() { - if t.tx == nil { - return - } - - _ = t.tx.Rollback() -} - -func getTransaction( - transaction commands.Transaction, - db *sqlx.DB, -) (*sql.Tx, bool, error) { - if shouldCreateNewTransaction(transaction) { - tx, err := db.Begin() - return tx, true, err - } - - t := transaction.(*Transaction) - - return t.Tx(), false, nil -} - -func shouldCreateNewTransaction(transaction commands.Transaction) bool { - if transaction == nil { - return true - } - - _, ok := transaction.(*Transaction) - - return !ok -}