Skip to content

Commit

Permalink
Merge pull request #7366 from alrossi/feature/9.2/bulk-ha
Browse files Browse the repository at this point in the history
dcache-bulk: implement HA
  • Loading branch information
svemeyer authored Oct 4, 2023
2 parents 195fbb0 + ab16db9 commit 9367ede
Show file tree
Hide file tree
Showing 12 changed files with 268 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,19 @@ information. Requests will be load balanced over available instances of `topo`.
This service collects nightly statistics about available pools. One can have
multiple instances of `statistics` and each instance will collect the same
information.

### `bulk`

This service processes bulk requests for pinning/unpinning, staging/releasing,
file deletion and qos updating or manipulation. It does not directly affect
transfers, but is perhaps more critical than the other services in this group
because it supports the `REST` API.

Bulk is fully replicable. All instances *_must_* share the same database instance.
The configuration should be synchronized such that all instances are configured
the same way.

Request querying is load-balanced over the physical instances, but only the
leader instance is responsible for submission, cancelling or clearing requests,
and only the leader actually processes them (i.e., runs the request container
servicing the request).
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath;

import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import diskCacheV111.util.CacheException;
import diskCacheV111.util.FsPath;
import diskCacheV111.util.PnfsHandler;
import diskCacheV111.util.TimeoutCacheException;
import diskCacheV111.vehicles.Message;
import dmg.cells.nucleus.CellAddressCore;
import dmg.cells.nucleus.CellLifeCycleAware;
import dmg.cells.nucleus.CellMessageReceiver;
import dmg.cells.nucleus.Reply;
Expand All @@ -81,10 +83,12 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.security.auth.Subject;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.dcache.auth.Subjects;
import org.dcache.auth.attributes.Restriction;
import org.dcache.auth.attributes.Restrictions;
import org.dcache.cells.CellStub;
import org.dcache.cells.HAServiceLeadershipManager;
import org.dcache.cells.MessageReply;
import org.dcache.namespace.FileAttribute;
import org.dcache.services.bulk.BulkRequest.Depth;
Expand All @@ -101,7 +105,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
/**
* Bulk service façade. Handles incoming messages. Handles restart reloading.
*/
public final class BulkService implements CellLifeCycleAware, CellMessageReceiver {
public final class BulkService implements CellLifeCycleAware, CellMessageReceiver,
LeaderLatchListener {

private static final Logger LOGGER = LoggerFactory.getLogger(BulkService.class);
private static final String TARGET_COUNT_ERROR_FORMAT = "The number of targets %s exceeds "
Expand All @@ -120,14 +125,18 @@ public final class BulkService implements CellLifeCycleAware, CellMessageReceive
private BulkTargetStore targetStore;
private BulkSubmissionHandler submissionHandler;
private BulkServiceStatistics statistics;
private HAServiceLeadershipManager leadershipManager;
private ExecutorService incomingExecutorService;
private CellStub namespace;
private CellStub endpoint;
private Depth allowedDepth;
private int maxRequestsPerUser;
private int maxFlatTargets;
private int maxShallowTargets;
private int maxRecursiveTargets;

private boolean initialized;

@Override
public void afterStart() {
/*
Expand All @@ -141,31 +150,51 @@ public void afterStart() {
* is necessary for processing request targets.
*/
waitForNamespace();
}

@Override
public synchronized void isLeader() {
LOGGER.info("isLeader called");
incomingExecutorService.execute(() -> initialize());
}

@Override
public synchronized void notLeader() {
LOGGER.info("notLeader called");
try {
incomingExecutorService.execute(() -> parkService());
} catch (Exception e) {
LOGGER.error("notLeader: {}, cause: {}.",
e.getMessage(), String.valueOf(Throwables.getRootCause(e)));
}
}

public Reply messageArrived(BulkRequestMessage message) {
LOGGER.trace("received BulkRequestMessage {}", message);
MessageReply<Message> reply = new MessageReply<>();
incomingExecutorService.execute(() -> {
try {
BulkRequest request = message.getRequest();
Subject subject = message.getSubject();
Restriction restriction = message.getRestriction();
checkQuota(subject);
String uid = UUID.randomUUID().toString();
request.setUid(uid);
checkRestrictions(restriction, uid);
checkActivity(request);
checkDepthConstraints(request);
requestStore.store(subject, restriction, request);
statistics.incrementRequestsReceived(request.getActivity());
requestManager.signal();
message.setRequestUrl(request.getUrlPrefix() + "/" + request.getUid());
reply.reply(message);
if (!leadershipManager.hasLeadership()) {
relayToLeader(message, reply);
} else {
BulkRequest request = message.getRequest();
Subject subject = message.getSubject();
Restriction restriction = message.getRestriction();
checkQuota(subject);
String uid = UUID.randomUUID().toString();
request.setUid(uid);
checkRestrictions(restriction, uid);
checkActivity(request);
checkDepthConstraints(request);
requestStore.store(subject, restriction, request);
statistics.incrementRequestsReceived(request.getActivity());
requestManager.signal();
message.setRequestUrl(request.getUrlPrefix() + "/" + request.getUid());
reply.reply(message);
}
} catch (BulkServiceException e) {
LOGGER.error("messageArrived(BulkRequestMessage) {}: {}", message, e.toString());
LOGGER.error("messageArrived(BulkRequestMessage) {}: {}", message,
e.toString());
reply.fail(message, e);
} catch (Exception e) {
reply.fail(message, e);
Expand All @@ -178,7 +207,6 @@ public Reply messageArrived(BulkRequestMessage message) {

public Reply messageArrived(BulkRequestListMessage message) {
LOGGER.trace("received BulkRequestListMessage {}", message);

MessageReply<Message> reply = new MessageReply<>();
incomingExecutorService.execute(() -> {
try {
Expand All @@ -203,7 +231,6 @@ public Reply messageArrived(BulkRequestListMessage message) {

public Reply messageArrived(BulkRequestStatusMessage message) {
LOGGER.trace("received BulkRequestStatusMessage {}", message);

MessageReply<Message> reply = new MessageReply<>();
incomingExecutorService.execute(() -> {
Subject subject = message.getSubject();
Expand Down Expand Up @@ -234,26 +261,29 @@ public Reply messageArrived(BulkRequestStatusMessage message) {

public Reply messageArrived(BulkRequestCancelMessage message) {
LOGGER.trace("received BulkRequestCancelMessage {}", message);

MessageReply<Message> reply = new MessageReply<>();
incomingExecutorService.execute(() -> {
try {
Subject subject = message.getSubject();
String uuid = message.getRequestUuid();
/*
* First check to see if the request corresponds to a stored one.
*/
requestStore.getKey(uuid);
checkRestrictions(message.getRestriction(), uuid);
matchActivity(message.getActivity(), uuid);
List<String> targetPaths = message.getTargetPaths();
if (targetPaths == null || targetPaths.isEmpty()) {
submissionHandler.cancelRequest(subject, uuid);
if (!leadershipManager.hasLeadership()) {
relayToLeader(message, reply);
} else {
validateTargets(uuid, subject, targetPaths);
submissionHandler.cancelTargets(subject, uuid, targetPaths);
Subject subject = message.getSubject();
String uuid = message.getRequestUuid();
/*
* First check to see if the request corresponds to a stored one.
*/
requestStore.getKey(uuid);
checkRestrictions(message.getRestriction(), uuid);
matchActivity(message.getActivity(), uuid);
List<String> targetPaths = message.getTargetPaths();
if (targetPaths == null || targetPaths.isEmpty()) {
submissionHandler.cancelRequest(subject, uuid);
} else {
validateTargets(uuid, subject, targetPaths);
submissionHandler.cancelTargets(subject, uuid, targetPaths);
}
reply.reply(message);
}
reply.reply(message);
} catch (BulkServiceException e) {
LOGGER.error("messageArrived(BulkRequestCancelMessage) {}: {}", message,
e.toString());
Expand All @@ -267,22 +297,29 @@ public Reply messageArrived(BulkRequestCancelMessage message) {
return reply;
}

public static void main(String[] args) {
System.out.println(Long.parseLong("60000L"));
}

public Reply messageArrived(BulkRequestClearMessage message) {
LOGGER.trace("received BulkRequestClearMessage {}", message);

MessageReply<Message> reply = new MessageReply<>();
incomingExecutorService.execute(() -> {
try {
String uuid = message.getRequestUuid();
Subject subject = message.getSubject();
/*
* First check to see if the request corresponds to a stored one.
*/
requestStore.getKey(uuid);
checkRestrictions(message.getRestriction(), uuid);
matchActivity(message.getActivity(), uuid);
submissionHandler.clearRequest(subject, uuid, message.isCancelIfRunning());
reply.reply(message);
if (!leadershipManager.hasLeadership()) {
relayToLeader(message, reply);
} else {
String uuid = message.getRequestUuid();
Subject subject = message.getSubject();
/*
* First check to see if the request corresponds to a stored one.
*/
requestStore.getKey(uuid);
checkRestrictions(message.getRestriction(), uuid);
matchActivity(message.getActivity(), uuid);
submissionHandler.clearRequest(subject, uuid, message.isCancelIfRunning());
reply.reply(message);
}
} catch (BulkServiceException e) {
LOGGER.error("messageArrived(BulkRequestClearMessage) {}: {}", message,
e.toString());
Expand All @@ -298,7 +335,6 @@ public Reply messageArrived(BulkRequestClearMessage message) {

public Reply messageArrived(BulkArchivedRequestInfoMessage message) {
LOGGER.trace("received BulkArchivedRequestInfoMessage {}", message);

MessageReply<Message> reply = new MessageReply<>();
incomingExecutorService.execute(() -> {
try {
Expand All @@ -322,7 +358,6 @@ public Reply messageArrived(BulkArchivedRequestInfoMessage message) {

public Reply messageArrived(BulkArchivedSummaryInfoMessage message) {
LOGGER.trace("received BulkArchivedSummaryInfoMessage {}", message);

MessageReply<Message> reply = new MessageReply<>();
incomingExecutorService.execute(() -> {
try {
Expand Down Expand Up @@ -368,6 +403,16 @@ public synchronized void setAllowedDepth(Depth allowedDepth) {
this.allowedDepth = allowedDepth;
}

@Required
public void setEndpointStub(CellStub endpoint) {
this.endpoint = endpoint;
}

@Required
public void setLeadershipManager(HAServiceLeadershipManager leadershipManager) {
this.leadershipManager = leadershipManager;
}

@Required
public synchronized void setMaxFlatTargets(int maxFlatTargets) {
this.maxFlatTargets = maxFlatTargets;
Expand Down Expand Up @@ -435,6 +480,22 @@ private void checkActivity(BulkRequest request) throws BulkServiceException {
}
}

private <M extends Message> void relayToLeader(M message, MessageReply<M> reply)
throws Exception {
CellAddressCore cellAddressCore = leadershipManager.getLeaderAddress();
endpoint.setDestination(cellAddressCore.toString());
Message response = endpoint.sendAndWait(message);
if (response.getReturnCode() != 0) {
Object error = response.getErrorObject();
if (error instanceof Exception) {
throw (Exception) error;
}
throw new Exception(String.valueOf(error));
} else {
reply.reply(message);
}
}

private synchronized void checkDepthConstraints(BulkRequest request)
throws BulkPermissionDeniedException {
switch (request.getExpandDirectories()) {
Expand Down Expand Up @@ -515,7 +576,12 @@ private synchronized void checkTargetCount(BulkRequest request)
}
}

private void initialize() {
private synchronized void initialize() {
if (initialized) {
LOGGER.info("Service already initialized.");
return;
}

/*
* See store specifics for how reload is handled, but the minimal contract is
* that all incomplete requests be reset to the QUEUED state.
Expand All @@ -542,6 +608,25 @@ private void initialize() {
requestManager.signal();

LOGGER.info("Service startup completed.");

initialized = true;
}

private synchronized void parkService() {
if (!initialized) {
LOGGER.info("Service already parked.");
return;
}

LOGGER.info("Stopping the job manager.");
try {
requestManager.shutdown();
} catch (Exception e) {
LOGGER.error("parkService error: {}, {}.",
e.getMessage(), String.valueOf(Throwables.getRootCause(e)));
}

initialized = false;
}

private void matchActivity(String activity, String uuid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ public final class BulkActivityFactory implements CellMessageSender, Environment
private QoSResponseReceiver qoSResponseReceiver;
private CellEndpoint endpoint;

private boolean initialized;

/**
* Generates an instance of the plugin-specific activity to be used by the request jobs.
*
Expand Down Expand Up @@ -157,13 +159,16 @@ public Map<String, BulkActivityProvider> getProviders() {
}

public void initialize() {
ServiceLoader<BulkActivityProvider> serviceLoader
= ServiceLoader.load(BulkActivityProvider.class);
for (BulkActivityProvider provider : serviceLoader) {
String activity = provider.getActivity();
provider.setMaxPermits(maxPermits.get(activity));
provider.configure(environment);
providers.put(provider.getActivity(), provider);
if (!initialized) {
ServiceLoader<BulkActivityProvider> serviceLoader
= ServiceLoader.load(BulkActivityProvider.class);
for (BulkActivityProvider provider : serviceLoader) {
String activity = provider.getActivity();
provider.setMaxPermits(maxPermits.get(activity));
provider.configure(environment);
providers.put(provider.getActivity(), provider);
}
initialized = true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ public interface BulkRequestManager extends SignalAware {
*/
void cancelTargets(String id, List<String> targetPaths);

/**
* Should wipe out any in-memory request state.
*/
void shutdown() throws Exception;

/**
* Implementation-specific.
*/
Expand Down
Loading

0 comments on commit 9367ede

Please sign in to comment.