Skip to content

Commit

Permalink
feat(publishing-events): change Kafka event publishing keys for holdi…
Browse files Browse the repository at this point in the history
…ngs and items. (#1108)
  • Loading branch information
SvitlanaKovalova1 authored Nov 5, 2024
1 parent 0b0e44f commit 0dbe3be
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 40 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

### Features
* Modify endpoint for bulk instances upsert with publish events flag ([MODINVSTOR-1283](https://folio-org.atlassian.net/browse/MODINVSTOR-1283))
* Change Kafka event publishing keys for holdings and items ([MODINVSTOR-1281](https://folio-org.atlassian.net/browse/MODINVSTOR-1281))

### Bug fixes
* Description ([ISSUE](https://folio-org.atlassian.net/browse/ISSUE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static io.vertx.core.Future.succeededFuture;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.logging.log4j.LogManager.getLogger;
import static org.folio.rest.support.ResponseUtil.isCreateSuccessResponse;
Expand Down Expand Up @@ -133,20 +132,20 @@ protected List<Triple<String, E, E>> mapOldRecordsToNew(List<Pair<String, D>> ol
var oldRecordPair = idToOldRecordPairMap.get(getId(newRecordPair.getValue()));
return triple(newRecordPair.getKey(), convertDomainToEvent(oldRecordPair.getKey(), oldRecordPair.getValue()),
convertDomainToEvent(newRecordPair.getKey(), newRecordPair.getValue()));
}).collect(toList());
}).toList();
}

private Future<Void> publishRecordsCreated(Collection<D> records) {
return convertDomainsToEvents(records).compose(domainEventService::publishRecordsCreated);
}

private Future<List<Pair<String, E>>> convertDomainsToEvents(Collection<D> domains) {
protected Future<List<Pair<String, E>>> convertDomainsToEvents(Collection<D> domains) {
return getRecordIds(domains).map(pairs -> pairs.stream()
.map(pair -> pair(pair.getKey(), convertDomainToEvent(pair.getKey(), pair.getValue())))
.collect(toList()));
.toList());
}

private Future<List<Triple<String, E, E>>> convertDomainsToEvents(Collection<D> newRecords,
protected Future<List<Triple<String, E, E>>> convertDomainsToEvents(Collection<D> newRecords,
Collection<D> oldRecords) {

return getRecordIds(oldRecords).compose(oldRecordsInstanceIds -> getRecordIds(newRecords).map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected Future<List<Pair<String, HoldingsRecord>>> getRecordIds(
Collection<HoldingsRecord> holdingsRecords) {

return succeededFuture(holdingsRecords.stream()
.map(hr -> pair(hr.getInstanceId(), hr))
.map(hr -> pair(hr.getId(), hr))
.toList());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package org.folio.services.domainevent;

import static io.vertx.core.Future.succeededFuture;
import static java.util.stream.Collectors.toMap;
import static org.apache.logging.log4j.LogManager.getLogger;
import static org.folio.InventoryKafkaTopic.ITEM;
import static org.folio.InventoryKafkaTopic.REINDEX_RECORDS;
import static org.folio.rest.support.ResponseUtil.isDeleteSuccessResponse;
import static org.folio.rest.tools.utils.TenantTool.tenantId;

import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
Expand All @@ -20,6 +24,7 @@
import org.folio.rest.jaxrs.model.HoldingsRecord;
import org.folio.rest.jaxrs.model.Item;
import org.folio.rest.jaxrs.model.PublishReindexRecords;
import org.folio.rest.support.CollectionUtil;

public class ItemDomainEventPublisher extends AbstractDomainEventPublisher<Item, ItemWithInstanceId> {
private static final Logger log = getLogger(ItemDomainEventPublisher.class);
Expand All @@ -42,7 +47,7 @@ public Future<Void> publishUpdated(Item newItem, Item oldItem, HoldingsRecord ne
ItemWithInstanceId oldItemWithId = new ItemWithInstanceId(oldItem, oldHoldings.getInstanceId());
ItemWithInstanceId newItemWithId = new ItemWithInstanceId(newItem, newHoldings.getInstanceId());

return domainEventService.publishRecordUpdated(newHoldings.getInstanceId(), oldItemWithId, newItemWithId);
return domainEventService.publishRecordUpdated(newItem.getId(), oldItemWithId, newItemWithId);
}

public Future<Void> publishUpdated(HoldingsRecord oldHoldings, HoldingsRecord newHoldings, List<Item> oldItems) {
Expand All @@ -67,9 +72,18 @@ public Future<Void> publishReindexItems(String key, List<Map<String, Object>> it
}

@Override
public void publishRemoved(String instanceId, String itemRaw) {
String instanceIdAndItemRaw = "{\"instanceId\":\"" + instanceId + "\"," + itemRaw.substring(1);
domainEventService.publishRecordRemoved(instanceId, instanceIdAndItemRaw);
public Handler<Response> publishRemoved(Item removedRecord) {
return response -> {
if (!isDeleteSuccessResponse(response)) {
log.warn("Item record removal failed, no event will be sent");
return;
}
getRecordIds(List.of(removedRecord))
.map(CollectionUtil::getFirst)
.map(Pair::getKey)
.compose(instanceId -> domainEventService.publishRecordRemoved(
removedRecord.getId(), convertDomainToEvent(instanceId, removedRecord)));
};
}

@Override
Expand All @@ -85,11 +99,33 @@ protected ItemWithInstanceId convertDomainToEvent(String instanceId, Item item)
return new ItemWithInstanceId(item, instanceId);
}

@Override
protected List<Triple<String, ItemWithInstanceId, ItemWithInstanceId>> mapOldRecordsToNew(
List<Pair<String, Item>> oldRecords, List<Pair<String, Item>> newRecords) {

var idToOldRecordPairMap = oldRecords.stream().collect(toMap(pair -> getId(pair.getValue()), pair -> pair));

return newRecords.stream().map(newRecordPair -> {
var oldRecordPair = idToOldRecordPairMap.get(getId(newRecordPair.getValue()));
return triple(newRecordPair.getValue().getId(), convertDomainToEvent(
oldRecordPair.getKey(), oldRecordPair.getValue()),
convertDomainToEvent(newRecordPair.getKey(), newRecordPair.getValue()));
}).toList();
}

@Override
protected String getId(Item item) {
return item.getId();
}

@Override
protected Future<List<Pair<String, ItemWithInstanceId>>> convertDomainsToEvents(Collection<Item> domains) {
return getRecordIds(domains)
.map(pairs -> pairs.stream()
.map(pair -> pair(pair.getValue().getId(), convertDomainToEvent(pair.getKey(), pair.getValue())))
.toList());
}

private List<Triple<String, ItemWithInstanceId, ItemWithInstanceId>> mapOldItemsToNew(
HoldingsRecord oldHoldings, HoldingsRecord newHoldings, Collection<Item> oldItems, Collection<Item> newItems) {

Expand Down
16 changes: 14 additions & 2 deletions src/main/java/org/folio/services/holding/HoldingsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import static org.folio.services.batch.BatchOperationContextFactory.buildBatchOperationContext;
import static org.folio.validator.HridValidators.refuseWhenHridChanged;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
Expand All @@ -29,6 +31,7 @@
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
import org.folio.dbschema.ObjectMapperTool;
import org.folio.persist.HoldingsRepository;
import org.folio.persist.InstanceRepository;
import org.folio.rest.jaxrs.model.HoldingsRecord;
Expand All @@ -51,6 +54,8 @@

public class HoldingsService {
private static final Logger log = getLogger(HoldingsService.class);
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperTool.getMapper();

private final Context vertxContext;
private final Map<String, String> okapiHeaders;
private final PostgresClient postgresClient;
Expand Down Expand Up @@ -146,8 +151,15 @@ public Future<Response> deleteHoldings(String cql) {
// https://sonarcloud.io/organizations/folio-org/rules?open=java%3AS1602&rule_key=java%3AS1602
return holdingsRepository.delete(cql)
.onSuccess(rowSet -> vertxContext.runOnContext(runLater ->
rowSet.iterator().forEachRemaining(row ->
domainEventPublisher.publishRemoved(row.getString(0), row.getString(1))
rowSet.iterator().forEachRemaining(row -> {
try {
var holdingId = OBJECT_MAPPER.readTree(row.getString(1)).get("id").textValue();
domainEventPublisher.publishRemoved(holdingId, row.getString(1));
} catch (JsonProcessingException ex) {
log.error(String.format("deleteHoldings:: Failed to parse json : %s", ex.getMessage()), ex);
throw new IllegalArgumentException(ex.getCause());
}
}
)
))
.map(Response.noContent().build());
Expand Down
25 changes: 23 additions & 2 deletions src/main/java/org/folio/services/item/ItemService.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static java.util.Objects.isNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.logging.log4j.LogManager.getLogger;
import static org.folio.dbschema.ObjectMapperTool.readValue;
import static org.folio.rest.impl.HoldingsStorageApi.HOLDINGS_RECORD_TABLE;
import static org.folio.rest.impl.ItemStorageApi.ITEM_TABLE;
Expand All @@ -24,6 +25,8 @@
import static org.folio.validator.HridValidators.refuseWhenHridChanged;
import static org.folio.validator.NotesValidators.refuseLongNotes;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
Expand All @@ -42,6 +45,8 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.ws.rs.core.Response;
import org.apache.logging.log4j.Logger;
import org.folio.dbschema.ObjectMapperTool;
import org.folio.okapi.common.XOkapiHeaders;
import org.folio.persist.HoldingsRepository;
import org.folio.persist.ItemRepository;
Expand All @@ -63,10 +68,16 @@
import org.folio.validator.NotesValidators;

public class ItemService {

private static final Logger log = getLogger(ItemService.class);
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperTool.getMapper();
private static final Pattern KEY_ALREADY_EXISTS_PATTERN = Pattern.compile(
": Key \\(([^=]+)\\)=\\((.*)\\) already exists.$");
private static final Pattern KEY_NOT_PRESENT_PATTERN = Pattern.compile(
": Key \\(([^=]+)\\)=\\((.*)\\) is not present in table \"(.*)\".$");
private static final String INSTANCE_ID_WITH_ITEM_JSON = """
{"instanceId": "%s",%s
""";

private final HridManager hridManager;
private final ItemEffectiveValuesService effectiveValuesService;
Expand Down Expand Up @@ -207,8 +218,18 @@ public Future<Response> deleteItems(String cql) {
// https://sonarcloud.io/organizations/folio-org/rules?open=java%3AS1602&rule_key=java%3AS1602
return itemRepository.delete(cql)
.onSuccess(rowSet -> vertxContext.runOnContext(runLater ->
rowSet.iterator().forEachRemaining(row ->
domainEventService.publishRemoved(row.getString(0), row.getString(1))
rowSet.iterator().forEachRemaining(row -> {
try {
var instanceIdAndItemRaw = INSTANCE_ID_WITH_ITEM_JSON.formatted(
row.getString(0), row.getString(1).substring(1));
var itemId = OBJECT_MAPPER.readTree(row.getString(1)).get("id").textValue();

domainEventService.publishRemoved(itemId, instanceIdAndItemRaw);
} catch (JsonProcessingException ex) {
log.error(String.format("deleteItems:: Failed to parse json : %s", ex.getMessage()), ex);
throw new IllegalArgumentException(ex.getCause());
}
}
)
))
.map(Response.noContent().build());
Expand Down
3 changes: 1 addition & 2 deletions src/test/java/org/folio/rest/api/HoldingsStorageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2534,8 +2534,7 @@ public void canUsePutToCreateHoldingsWhenHridIsSupplied() {

// Make sure a create event published vs update event
holdingsMessageChecks.createdMessagePublished(holdingsFromGet);
holdingsMessageChecks.noHoldingsUpdatedMessagePublished(
instanceId.toString(), holdingsId.toString());
holdingsMessageChecks.noHoldingsUpdatedMessagePublished(holdingsId.toString());

log.info("Finished canUsePutToCreateAHoldingsWhenHRIDIsSupplied");
}
Expand Down
12 changes: 10 additions & 2 deletions src/test/java/org/folio/rest/support/kafka/FakeKafkaConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,19 @@ public Collection<EventMessage> getMessagesForReindexRecords(List<String> ids) {
.toList();
}

public Collection<EventMessage> getMessagesForHoldings(String instanceId, String holdingsId) {
public Collection<EventMessage> getMessagesForHoldings(String holdingsId) {
return collectedHoldingsMessages.messagesByGroupKey(instanceAndIdKey(holdingsId, holdingsId));
}

public Collection<EventMessage> getMessagesForDeleteAllHoldings(String instanceId, String holdingsId) {
return collectedHoldingsMessages.messagesByGroupKey(instanceAndIdKey(instanceId, holdingsId));
}

public Collection<EventMessage> getMessagesForItem(String instanceId, String itemId) {
public Collection<EventMessage> getMessagesForItem(String itemId) {
return collectedItemMessages.messagesByGroupKey(instanceAndIdKey(itemId, itemId));
}

public Collection<EventMessage> getMessagesForItemWithInstanceIdKey(String instanceId, String itemId) {
return collectedItemMessages.messagesByGroupKey(instanceAndIdKey(instanceId, itemId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,38 +25,31 @@ private static String getId(JsonObject holdings) {
return holdings.getString("id");
}

private static String getInstanceId(JsonObject holdings) {
return holdings.getString("instanceId");
}

public void createdMessagePublished(JsonObject holdings) {
final var holdingsId = getId(holdings);
final var instanceId = getInstanceId(holdings);

awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(instanceId, holdingsId),
awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(holdingsId),
eventMessageMatchers.hasCreateEventMessageFor(holdings));
}

public void createdMessagePublished(JsonObject holdings, String tenantIdExpected, String okapiUrlExpected) {
final var holdingsId = getId(holdings);
final var instanceId = getInstanceId(holdings);

awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(instanceId, holdingsId),
awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(holdingsId),
eventMessageMatchers.hasCreateEventMessageFor(holdings, tenantIdExpected, okapiUrlExpected));
}

public void updatedMessagePublished(JsonObject oldHoldings,
JsonObject newHoldings) {

final var holdingsId = getId(newHoldings);
final var instanceId = getInstanceId(newHoldings);

oldHoldings.remove("holdingsItems");
oldHoldings.remove("bareHoldingsItems");
newHoldings.remove("holdingsItems");
newHoldings.remove("bareHoldingsItems");

awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(instanceId, holdingsId),
awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(holdingsId),
eventMessageMatchers.hasUpdateEventMessageFor(oldHoldings, newHoldings));
}

Expand All @@ -65,36 +58,33 @@ public void updatedMessagePublished(JsonObject oldHoldings,
String okapiUrlExpected) {

final var holdingsId = getId(newHoldings);
final var instanceId = getInstanceId(newHoldings);

oldHoldings.remove("holdingsItems");
oldHoldings.remove("bareHoldingsItems");
newHoldings.remove("holdingsItems");
newHoldings.remove("bareHoldingsItems");

awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(instanceId, holdingsId),
awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(holdingsId),
eventMessageMatchers.hasUpdateEventMessageFor(oldHoldings, newHoldings, okapiUrlExpected));
}

public void noHoldingsUpdatedMessagePublished(String instanceId,
String holdingsId) {
public void noHoldingsUpdatedMessagePublished(String holdingsId) {

awaitDuring(1, SECONDS)
.until(() -> kafkaConsumer.getMessagesForHoldings(instanceId, holdingsId),
.until(() -> kafkaConsumer.getMessagesForHoldings(holdingsId),
eventMessageMatchers.hasNoUpdateEventMessage());
}

public void deletedMessagePublished(JsonObject holdings) {
final var holdingsId = getId(holdings);
final var instanceId = getInstanceId(holdings);

awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(instanceId, holdingsId),
awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(holdingsId),
eventMessageMatchers.hasDeleteEventMessageFor(holdings));
}

public void allHoldingsDeletedMessagePublished() {
awaitAtMost()
.until(() -> kafkaConsumer.getMessagesForHoldings(NULL_ID, null),
.until(() -> kafkaConsumer.getMessagesForDeleteAllHoldings(NULL_ID, null),
eventMessageMatchers.hasDeleteAllEventMessage());
}
}
Loading

0 comments on commit 0dbe3be

Please sign in to comment.