Skip to content

Commit

Permalink
:fix: insert of deviceEvent was still causing shared lock, the only s…
Browse files Browse the repository at this point in the history
…olution in this case is to obtain a pessimistic lock on device before inserting deviceevent

Signed-off-by: dseurotech <[email protected]>
  • Loading branch information
dseurotech committed Jan 25, 2024
1 parent 476c8c3 commit 286265c
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import javax.persistence.Embedded;
import javax.persistence.EntityExistsException;
import javax.persistence.EntityManager;
import javax.persistence.LockModeType;
import javax.persistence.NonUniqueResultException;
import javax.persistence.PersistenceException;
import javax.persistence.TypedQuery;
Expand Down Expand Up @@ -127,9 +128,13 @@ public Optional<E> find(TxContext txContext, KapuaId scopeId, KapuaId entityId)
}

protected Optional<E> doFind(EntityManager em, KapuaId scopeId, KapuaId entityId) {
return doFind(em, scopeId, entityId, null);

Check warning on line 131 in commons/src/main/java/org/eclipse/kapua/commons/jpa/KapuaEntityJpaRepository.java

View check run for this annotation

Codecov / codecov/patch

commons/src/main/java/org/eclipse/kapua/commons/jpa/KapuaEntityJpaRepository.java#L131

Added line #L131 was not covered by tests
}

protected Optional<E> doFind(EntityManager em, KapuaId scopeId, KapuaId entityId, LockModeType lockModeType) {
final KapuaEid eId = KapuaEid.parseKapuaId(entityId);
// Checking existence
final Optional<E> entityToFind = Optional.ofNullable(em.find(concreteClass, eId));
final Optional<E> entityToFind = Optional.ofNullable(em.find(concreteClass, eId, lockModeType));

Check warning on line 137 in commons/src/main/java/org/eclipse/kapua/commons/jpa/KapuaEntityJpaRepository.java

View check run for this annotation

Codecov / codecov/patch

commons/src/main/java/org/eclipse/kapua/commons/jpa/KapuaEntityJpaRepository.java#L137

Added line #L137 was not covered by tests

return entityToFind
.filter(e -> scopeId == null || KapuaId.ANY.equals(scopeId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,17 @@ public <R> R execute(TxConsumer<R> transactionConsumer,
} catch (Exception ex) {
//In JPA, all exceptions (even caught ones) force the transaction in rollback-only mode
txContext.rollback();
final boolean canTryAgain = txContext.isRecoverableException(ex)
&& ++retry <= maxRetryAttemptsAllowed;
if (canTryAgain) {
logger.warn("Recoverable exception, retrying", ex);
continue;
final boolean isRecoverableException = txContext.isRecoverableException(ex);
if (!isRecoverableException) {
logger.error("Non-recoverable exception, failing", ex);
throw txContext.convertPersistenceException(ex);

Check warning on line 61 in service/api/src/main/java/org/eclipse/kapua/storage/TxManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

service/api/src/main/java/org/eclipse/kapua/storage/TxManagerImpl.java#L60-L61

Added lines #L60 - L61 were not covered by tests
}
logger.error("Non-recoverable exception or retry attempts exceeded, failing", ex);
throw txContext.convertPersistenceException(ex);
final boolean canTryAgain = ++retry <= maxRetryAttemptsAllowed;
if (!canTryAgain) {
logger.error("Retry attempts exceeded (%d/%d), failing", retry, maxRetryAttemptsAllowed, ex);
throw txContext.convertPersistenceException(ex);
}
logger.warn("Recoverable exception '{}':'{}', retrying", ex.getClass(), ex.getMessage());
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
import org.eclipse.kapua.storage.KapuaUpdatableEntityRepository;
import org.eclipse.kapua.storage.TxContext;

import java.util.Date;
import java.util.Optional;

public interface DeviceRepository extends
KapuaUpdatableEntityRepository<Device, DeviceListResult> {
Optional<Device> findByClientId(TxContext tx, KapuaId scopeId, String clientId) throws KapuaException;

void updateLastEvent(TxContext tx, KapuaId scopeId, KapuaId deviceId, KapuaId deviceEventId, Date receivedOn);
Optional<Device> findForUpdate(TxContext tx, KapuaId scopeId, KapuaId deviceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
*******************************************************************************/
package org.eclipse.kapua.service.device.registry.event.internal;

import org.eclipse.kapua.KapuaEntityNotFoundException;
import org.eclipse.kapua.KapuaException;
import org.eclipse.kapua.commons.util.ArgumentValidator;
import org.eclipse.kapua.model.domain.Actions;
import org.eclipse.kapua.model.id.KapuaId;
import org.eclipse.kapua.model.query.KapuaQuery;
import org.eclipse.kapua.service.authorization.AuthorizationService;
import org.eclipse.kapua.service.authorization.permission.PermissionFactory;
import org.eclipse.kapua.service.device.registry.Device;
import org.eclipse.kapua.service.device.registry.DeviceDomains;
import org.eclipse.kapua.service.device.registry.DeviceRepository;
import org.eclipse.kapua.service.device.registry.event.DeviceEvent;
Expand Down Expand Up @@ -85,12 +87,12 @@ public DeviceEvent create(DeviceEventCreator deviceEventCreator) throws KapuaExc
authorizationService.checkPermission(permissionFactory.newPermission(DeviceDomains.DEVICE_EVENT_DOMAIN, Actions.write, deviceEventCreator.getScopeId()));
return txManager.execute(tx -> {
// Check that device exists
// final Device device = deviceRepository.find(tx, deviceEventCreator.getScopeId(), deviceEventCreator.getDeviceId())
// .orElseThrow(() -> new KapuaEntityNotFoundException(Device.TYPE, deviceEventCreator.getDeviceId()));
final Device device = deviceRepository.findForUpdate(tx, deviceEventCreator.getScopeId(), deviceEventCreator.getDeviceId())

Check warning on line 90 in service/device/registry/internal/src/main/java/org/eclipse/kapua/service/device/registry/event/internal/DeviceEventServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

service/device/registry/internal/src/main/java/org/eclipse/kapua/service/device/registry/event/internal/DeviceEventServiceImpl.java#L90

Added line #L90 was not covered by tests
.orElseThrow(() -> new KapuaEntityNotFoundException(Device.TYPE, deviceEventCreator.getDeviceId()));

// Create the event
DeviceEvent newEvent = entityFactory.newEntity(deviceEventCreator.getScopeId());
newEvent.setDeviceId(deviceEventCreator.getDeviceId());
newEvent.setDeviceId(device.getId());
newEvent.setReceivedOn(deviceEventCreator.getReceivedOn());
newEvent.setSentOn(deviceEventCreator.getSentOn());
newEvent.setResource(deviceEventCreator.getResource());
Expand All @@ -100,7 +102,9 @@ public DeviceEvent create(DeviceEventCreator deviceEventCreator) throws KapuaExc
newEvent.setPosition(deviceEventCreator.getPosition());

final DeviceEvent created = repository.create(tx, newEvent);
deviceRepository.updateLastEvent(tx, deviceEventCreator.getScopeId(), deviceEventCreator.getDeviceId(), newEvent.getId(), newEvent.getReceivedOn());
device.setLastEventId(created.getId());

Check warning on line 105 in service/device/registry/internal/src/main/java/org/eclipse/kapua/service/device/registry/event/internal/DeviceEventServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

service/device/registry/internal/src/main/java/org/eclipse/kapua/service/device/registry/event/internal/DeviceEventServiceImpl.java#L105

Added line #L105 was not covered by tests
//Do not call update explicitly, the transaction ending will automatically update the entity
// deviceRepository.update(tx, device, device);
return newEvent;
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.eclipse.kapua.service.device.registry.DeviceRepository;
import org.eclipse.kapua.storage.TxContext;

import java.util.Date;
import java.util.Optional;

public class CachingDeviceRepository
Expand All @@ -47,8 +46,13 @@ public Optional<Device> findByClientId(TxContext tx, KapuaId scopeId, String cli
}

@Override
public void updateLastEvent(TxContext tx, KapuaId scopeId, KapuaId deviceId, KapuaId deviceEventId, Date receivedOn) {
wrapped.updateLastEvent(tx, scopeId, deviceId, deviceEventId, receivedOn);
public Optional<Device> findForUpdate(TxContext tx, KapuaId scopeId, KapuaId deviceId) {
/*
The correct approach in thi scenario is to leave to JPA the persistence of the updated entity at transaction's closure,
without calling explicitly update. Therefore if we don't clear the cache the next items would find an outdated optlock, and fail
*/
entityCache.remove(scopeId, deviceId);
final Optional<Device> found = wrapped.findForUpdate(tx, scopeId, deviceId);
return found;

Check warning on line 56 in service/device/registry/internal/src/main/java/org/eclipse/kapua/service/device/registry/internal/CachingDeviceRepository.java

View check run for this annotation

Codecov / codecov/patch

service/device/registry/internal/src/main/java/org/eclipse/kapua/service/device/registry/internal/CachingDeviceRepository.java#L54-L56

Added lines #L54 - L56 were not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,15 @@
import org.eclipse.kapua.commons.jpa.JpaAwareTxContext;
import org.eclipse.kapua.commons.jpa.KapuaJpaRepositoryConfiguration;
import org.eclipse.kapua.commons.jpa.KapuaUpdatableEntityJpaRepository;
import org.eclipse.kapua.commons.model.id.KapuaEid;
import org.eclipse.kapua.commons.security.KapuaSecurityUtils;
import org.eclipse.kapua.model.id.KapuaId;
import org.eclipse.kapua.service.device.registry.Device;
import org.eclipse.kapua.service.device.registry.DeviceAttributes;
import org.eclipse.kapua.service.device.registry.DeviceListResult;
import org.eclipse.kapua.service.device.registry.DeviceRepository;
import org.eclipse.kapua.service.device.registry.event.internal.DeviceEventImpl;
import org.eclipse.kapua.service.device.registry.event.internal.DeviceEventImpl_;
import org.eclipse.kapua.storage.TxContext;

import javax.persistence.EntityManager;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaUpdate;
import javax.persistence.criteria.Join;
import javax.persistence.criteria.JoinType;
import javax.persistence.criteria.Root;
import java.util.Date;
import javax.persistence.LockModeType;
import java.util.Optional;

public class DeviceImplJpaRepository
Expand All @@ -53,25 +44,9 @@ public Optional<Device> findByClientId(TxContext tx, KapuaId scopeId, String cli
}

@Override
public void updateLastEvent(TxContext tx, KapuaId scopeId, KapuaId deviceId, KapuaId deviceEventId, Date receivedOn) {
public Optional<Device> findForUpdate(TxContext tx, KapuaId scopeId, KapuaId deviceId) {
final EntityManager em = JpaAwareTxContext.extractEntityManager(tx);
final CriteriaBuilder cb = em.getCriteriaBuilder();
final CriteriaUpdate<DeviceImpl> update = cb.createCriteriaUpdate(DeviceImpl.class);
final Root<DeviceImpl> root = update.from(DeviceImpl.class);
final Join<DeviceImpl, DeviceEventImpl> lastEventJoin = root.join(DeviceImpl_.lastEvent, JoinType.LEFT);
update.set(root.get(DeviceImpl_.lastEventId), KapuaEid.parseKapuaId(deviceEventId));
update.set(root.get(DeviceImpl_.modifiedBy), KapuaEid.parseKapuaId(KapuaSecurityUtils.getSession().getUserId()));
update.set(root.get(DeviceImpl_.modifiedOn), new Date());
update.where(
cb.and(
cb.equal(root.get(DeviceImpl_.scopeId), KapuaEid.parseKapuaId(scopeId)),
cb.equal(root.get(DeviceImpl_.id), KapuaEid.parseKapuaId(deviceId)),
cb.or(
cb.isNull(lastEventJoin.get(DeviceEventImpl_.receivedOn)),
cb.lessThan(lastEventJoin.get(DeviceEventImpl_.receivedOn), receivedOn)
)
)
);
em.createQuery(update).executeUpdate();
final Optional<Device> device = doFind(em, scopeId, deviceId, LockModeType.PESSIMISTIC_WRITE);
return device;

Check warning on line 50 in service/device/registry/internal/src/main/java/org/eclipse/kapua/service/device/registry/internal/DeviceImplJpaRepository.java

View check run for this annotation

Codecov / codecov/patch

service/device/registry/internal/src/main/java/org/eclipse/kapua/service/device/registry/internal/DeviceImplJpaRepository.java#L48-L50

Added lines #L48 - L50 were not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@

<properties>
<property name="javax.persistence.lock.timeout" value="1000"/>
<!-- <property name="eclipselink.logging.level.sql" value="FINE"/>
<property name="eclipselink.logging.parameters" value="true"/> -->
<!-- <property name="javax.persistence.lock.timeout" value="100000"/>-->
<!-- <property name="eclipselink.logging.level" value="FINE"/>-->
<!-- <property name="eclipselink.logging.level.sql" value="FINE"/>-->
<!-- <property name="eclipselink.logging.parameters" value="true"/>-->

<property name="eclipselink.logging.logger" value="org.eclipse.persistence.logging.slf4j.SLF4JLogger"/>
</properties>
Expand Down

0 comments on commit 286265c

Please sign in to comment.