Skip to content

Commit

Permalink
NIFI-3785: Moved move logic to ServiceFacade. Fixed move api cluster …
Browse files Browse the repository at this point in the history
…replication. Fixed error handling and made error message be a banner instead of a toast.
  • Loading branch information
Freedom9339 committed Feb 28, 2025
1 parent 25a5658 commit 31a70f4
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.prometheus.client.CollectorRegistry;
import org.apache.nifi.authorization.AuthorizeAccess;
import org.apache.nifi.authorization.ProcessGroupAuthorizable;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.bundle.BundleCoordinate;
Expand Down Expand Up @@ -134,6 +135,7 @@
import org.apache.nifi.web.api.entity.PortStatusEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupOptionEntity;
import org.apache.nifi.web.api.entity.ProcessGroupRecursivity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity;
Expand Down Expand Up @@ -2205,6 +2207,32 @@ ControllerServiceReferencingComponentsEntity updateControllerServiceReferencingC
*/
ControllerServiceEntity moveControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO, final String newProcessGroupID);

/**
* Gets all available process group move options
*
* @param controllerServiceId id
* @return A list of all process group move options
*/
List<ProcessGroupOptionEntity> getAllProcessGroupOptions(String controllerServiceId);

/**
* Generated a process group move option
*
* @param controllerServiceDTO The controller service DTO
* @param processGroup The authorizable process group
* @return A process group move option
*/
ProcessGroupOptionEntity generateProcessGroupOption(ControllerServiceDTO controllerServiceDTO, ProcessGroupAuthorizable processGroup);

/**
* Gets all conflicting components preventing a move operation
*
* @param controllerServiceDTO The controller service DTO
* @param processGroup The authorizable process group
* @return A list of all conflicting components
*/
List<String> getConflictingComponents(ControllerServiceDTO controllerServiceDTO, ProcessGroupAuthorizable processGroup);

/**
* Performs verification of the given Configuration for the Controller Service with the given ID
* @param controllerServiceId the id of the controller service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import org.apache.nifi.authorization.AuthorizationResult.Result;
import org.apache.nifi.authorization.AuthorizeAccess;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.ComponentAuthorizable;
import org.apache.nifi.authorization.Group;
import org.apache.nifi.authorization.ProcessGroupAuthorizable;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.User;
Expand Down Expand Up @@ -360,6 +362,7 @@
import org.apache.nifi.web.api.entity.PortStatusEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupOptionEntity;
import org.apache.nifi.web.api.entity.ProcessGroupRecursivity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
Expand Down Expand Up @@ -3002,6 +3005,87 @@ public ControllerServiceEntity moveControllerService(final Revision revision, fi
return entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities);
}

@Override
public List<ProcessGroupOptionEntity> getAllProcessGroupOptions(String controllerServiceId) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
List<ProcessGroupOptionEntity> options = new ArrayList<>();
final ControllerServiceDTO controllerServiceDTO = getControllerService(controllerServiceId, true).getComponent();
ProcessGroupEntity currentProcessGroup = getProcessGroup(controllerServiceDTO.getParentGroupId());

if (currentProcessGroup.getComponent().getParentGroupId() != null) {
final ProcessGroupAuthorizable authorizableProcessGroupParent = authorizableLookup.getProcessGroup(currentProcessGroup.getComponent().getParentGroupId());
if (authorizableProcessGroupParent.getAuthorizable().isAuthorized(authorizer, RequestAction.READ, user)
&& authorizableProcessGroupParent.getAuthorizable().isAuthorized(authorizer, RequestAction.WRITE, user)) {
ProcessGroupOptionEntity option = generateProcessGroupOption(controllerServiceDTO, authorizableProcessGroupParent);
option.setText(option.getText() + " (Parent)");
options.add(option);
}
}

currentProcessGroup.getComponent().getContents().getProcessGroups().forEach(processGroup -> {
final ProcessGroupAuthorizable authorizableProcessGroup = authorizableLookup.getProcessGroup(processGroup.getId());
if (authorizableProcessGroup.getAuthorizable().isAuthorized(authorizer, RequestAction.READ, user)
&& authorizableProcessGroup.getAuthorizable().isAuthorized(authorizer, RequestAction.WRITE, user)) {
options.add(generateProcessGroupOption(controllerServiceDTO, authorizableProcessGroup));
}
});

return options;
}

@Override
public ProcessGroupOptionEntity generateProcessGroupOption(ControllerServiceDTO controllerServiceDTO, ProcessGroupAuthorizable processGroup) {
List<String> conflictingComponents = getConflictingComponents(controllerServiceDTO, processGroup);

ProcessGroupOptionEntity option = new ProcessGroupOptionEntity();
option.setText(processGroup.getProcessGroup().getName());
option.setValue(processGroup.getProcessGroup().getIdentifier());
option.setDisabled(false);

if (!conflictingComponents.isEmpty()) {
String errorMessage = "Cannot move to this process group because the following components would be out of scope: ";
errorMessage += String.join(" ", conflictingComponents);
option.setDescription(errorMessage);
option.setDisabled(true);
}

return option;
}

@Override
public List<String> getConflictingComponents(ControllerServiceDTO controllerServiceDTO, ProcessGroupAuthorizable processGroup) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
List<String> conflictingComponents = new ArrayList<>();
controllerServiceDTO.getReferencingComponents().forEach(referencingComponent -> {
if (processGroup.getProcessGroup().findProcessor(referencingComponent.getId()) == null
&& processGroup.getProcessGroup().findControllerService(referencingComponent.getId(), true, false) == null) {
final Authorizable componentAuthorizable = authorizableLookup.getControllerServiceReferencingComponent(controllerServiceDTO.getId(), referencingComponent.getId());
if (componentAuthorizable.isAuthorized(authorizer, RequestAction.READ, user)) {
conflictingComponents.add("[" + referencingComponent.getComponent().getName() + "]");
} else {
conflictingComponents.add("[" + referencingComponent.getId() + "]");
}
}
});

controllerServiceDTO.getProperties().forEach((key, value) -> {
try {
ControllerServiceEntity refControllerService = this.getControllerService(value, false);
if (refControllerService != null) {
if (processGroup.getProcessGroup().findControllerService(value, false, true) == null) {
ComponentAuthorizable componentAuthorizable = authorizableLookup.getControllerService(value);
if (componentAuthorizable.getAuthorizable().isAuthorized(authorizer, RequestAction.READ, user)) {
conflictingComponents.add("[" + refControllerService.getComponent().getName() + "]");
} else {
conflictingComponents.add("[" + refControllerService.getId() + "]");
}
}
}
} catch (Exception ignored) { }
});
return conflictingComponents;
}

@Override
public List<ConfigVerificationResultDTO> performControllerServiceConfigVerification(final String controllerServiceId, final Map<String, String> properties, final Map<String, String> variables) {
return controllerServiceDAO.verifyConfiguration(controllerServiceId, properties, variables);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AuthorizableLookup;
import org.apache.nifi.authorization.AuthorizeControllerServiceReference;
import org.apache.nifi.authorization.AuthorizeParameterReference;
import org.apache.nifi.authorization.Authorizer;
Expand Down Expand Up @@ -88,13 +87,13 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -694,7 +693,7 @@ public Response getProcessGroupOptions(
throw new IllegalArgumentException("Controller service id must be specified.");
}

List<ProcessGroupOptionEntity> options = new ArrayList<>();
AtomicReference<List<ProcessGroupOptionEntity>> options = new AtomicReference<>();
serviceFacade.authorizeAccess(lookup -> {
final NiFiUser user = NiFiUserUtils.getNiFiUser();

Expand All @@ -707,75 +706,11 @@ public Response getProcessGroupOptions(
authorized = authorized && authorizableProcessGroupCurrent.getAuthorizable().isAuthorized(authorizer, RequestAction.WRITE, user);

if (authorized) {
if (authorizableProcessGroupCurrent.getProcessGroup().getParent() != null) {
final ProcessGroupAuthorizable authorizableProcessGroupParent = lookup.getProcessGroup(authorizableProcessGroupCurrent.getProcessGroup().getParent().getIdentifier());
if (authorizableProcessGroupParent.getAuthorizable().isAuthorized(authorizer, RequestAction.READ, user)
&& authorizableProcessGroupParent.getAuthorizable().isAuthorized(authorizer, RequestAction.WRITE, user)) {
options.add(generateProcessGroupOption(controllerServiceDTO, authorizableProcessGroupParent, lookup, user));
}
}

authorizableProcessGroupCurrent.getProcessGroup().getProcessGroups().forEach(processGroup -> {
final ProcessGroupAuthorizable authorizableProcessGroup = lookup.getProcessGroup(processGroup.getIdentifier());
if (authorizableProcessGroup.getAuthorizable().isAuthorized(authorizer, RequestAction.READ, user)
&& authorizableProcessGroup.getAuthorizable().isAuthorized(authorizer, RequestAction.WRITE, user)) {
options.add(generateProcessGroupOption(controllerServiceDTO, authorizableProcessGroup, lookup, user));
}
});
}
});

return generateOkResponse(options).build();
}

private ProcessGroupOptionEntity generateProcessGroupOption(ControllerServiceDTO controllerServiceDTO, ProcessGroupAuthorizable processGroup, AuthorizableLookup lookup, NiFiUser user) {
List<String> conflictingComponents = getConflictingComponents(controllerServiceDTO, processGroup, lookup, user);

ProcessGroupOptionEntity option = new ProcessGroupOptionEntity();
option.setText(processGroup.getProcessGroup().getName());
option.setValue(processGroup.getProcessGroup().getIdentifier());
option.setDisabled(false);

if (!conflictingComponents.isEmpty()) {
String errorMessage = "Cannot move to this process group because the following components would be out of scope: ";
errorMessage += String.join(" ", conflictingComponents);
option.setDescription(errorMessage);
option.setDisabled(true);
}

return option;
}

private List<String> getConflictingComponents(ControllerServiceDTO controllerServiceDTO, ProcessGroupAuthorizable processGroup, AuthorizableLookup lookup, NiFiUser user) {
List<String> conflictingComponents = new ArrayList<>();
controllerServiceDTO.getReferencingComponents().forEach(referencingComponent -> {
if (processGroup.getProcessGroup().findProcessor(referencingComponent.getId()) == null
&& processGroup.getProcessGroup().findControllerService(referencingComponent.getId(), true, false) == null) {
final Authorizable componentAuthorizable = lookup.getControllerServiceReferencingComponent(controllerServiceDTO.getId(), referencingComponent.getId());
if (componentAuthorizable.isAuthorized(authorizer, RequestAction.READ, user)) {
conflictingComponents.add("[" + referencingComponent.getComponent().getName() + "]");
} else {
conflictingComponents.add("[Unauthorized Component]");
}
options.set(serviceFacade.getAllProcessGroupOptions(controllerServiceId));
}
});

controllerServiceDTO.getProperties().forEach((key, value) -> {
try {
ControllerServiceEntity refControllerService = serviceFacade.getControllerService(value, false);
if (refControllerService != null) {
if (processGroup.getProcessGroup().findControllerService(value, false, true) == null) {
ComponentAuthorizable componentAuthorizable = lookup.getControllerService(value);
if (componentAuthorizable.getAuthorizable().isAuthorized(authorizer, RequestAction.READ, user)) {
conflictingComponents.add("[" + refControllerService.getComponent().getName() + "]");
} else {
conflictingComponents.add("[Unauthorized Component]");
}
}
}
} catch (Exception ignored) { }
});
return conflictingComponents;
return generateOkResponse(options.get()).build();
}

/**
Expand Down Expand Up @@ -829,6 +764,12 @@ public Response moveControllerServices(
throw new IllegalArgumentException("ParentGroupId must be specified.");
}

if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, requestControllerServiceEntity);
} else if (isDisconnectedFromCluster()) {
verifyDisconnectedNodeModification(requestControllerServiceEntity.isDisconnectedNodeAcknowledged());
}

final Revision requestRevision = getRevision(requestControllerServiceEntity, id);
return withWriteLock(
serviceFacade,
Expand All @@ -850,7 +791,7 @@ public Response moveControllerServices(
authorizableProcessGroupOld.getAuthorizable().authorize(authorizer, RequestAction.WRITE, user);

// Verify all referencing and referenced components are still within scope
List<String> conflictingComponents = getConflictingComponents(requestControllerServiceDTO, authorizableProcessGroupNew, lookup, user);
List<String> conflictingComponents = serviceFacade.getConflictingComponents(requestControllerServiceDTO, authorizableProcessGroupNew);

if (!conflictingComponents.isEmpty()) {
String errorMessage = "Could not move controller service because the following components would be out of scope: ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,11 +694,15 @@ export class ControllerServicesEffects {
}
});
}),
catchError((errorResponse: HttpErrorResponse) => {
this.dialog.closeAll();
return of(
ErrorActions.snackBarError({ error: this.errorHelper.getErrorString(errorResponse) })
);
tap({
error: (errorResponse: HttpErrorResponse) => {
this.dialog.closeAll();
this.store.dispatch(
ErrorActions.snackBarError({
error: this.errorHelper.getErrorString(errorResponse)
})
);
}
})
)
)
Expand All @@ -719,9 +723,17 @@ export class ControllerServicesEffects {
}
})
),
catchError((errorResponse: HttpErrorResponse) =>
of(ErrorActions.snackBarError({ error: this.errorHelper.getErrorString(errorResponse) }))
)
catchError((errorResponse: HttpErrorResponse) => {
if (this.errorHelper.showErrorInContext(errorResponse.status)) {
return of(
ControllerServicesActions.controllerServicesBannerApiError({
error: this.errorHelper.getErrorString(errorResponse)
})
);
} else {
return of(this.errorHelper.fullScreenError(errorResponse));
}
})
)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import {
inlineCreateControllerServiceSuccess,
loadControllerServices,
loadControllerServicesSuccess,
resetControllerServicesState
resetControllerServicesState,
moveControllerService
} from './controller-services.actions';
import { produce } from 'immer';
import { ControllerServicesState } from './index';
Expand Down Expand Up @@ -76,10 +77,16 @@ export const controllerServicesReducer = createReducer(
...state,
saving: false
})),
on(createControllerService, configureControllerService, deleteControllerService, (state) => ({
...state,
saving: true
})),
on(
createControllerService,
configureControllerService,
deleteControllerService,
moveControllerService,
(state) => ({
...state,
saving: true
})
),
on(createControllerServiceSuccess, (state, { response }) => {
return produce(state, (draftState) => {
draftState.controllerServices.push(response.controllerService);
Expand Down
Loading

0 comments on commit 31a70f4

Please sign in to comment.