2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * Modifications to the original nifi code for the ONAP project are made
18 * available under the Apache License, Version 2.0
20 package org.apache.nifi.web.dao.impl;
22 import org.apache.nifi.authorization.Authorizer;
23 import org.apache.nifi.authorization.RequestAction;
24 import org.apache.nifi.authorization.resource.Authorizable;
25 import org.apache.nifi.authorization.resource.DataAuthorizable;
26 import org.apache.nifi.authorization.user.NiFiUser;
27 import org.apache.nifi.authorization.user.NiFiUserUtils;
28 import org.apache.nifi.connectable.Connectable;
29 import org.apache.nifi.connectable.ConnectableType;
30 import org.apache.nifi.connectable.Connection;
31 import org.apache.nifi.controller.queue.LoadBalanceCompression;
32 import org.apache.nifi.controller.queue.LoadBalanceStrategy;
33 import org.apache.nifi.connectable.Position;
34 import org.apache.nifi.controller.FlowController;
35 import org.apache.nifi.controller.exception.ValidationException;
36 import org.apache.nifi.controller.queue.DropFlowFileStatus;
37 import org.apache.nifi.controller.queue.FlowFileQueue;
38 import org.apache.nifi.controller.queue.ListFlowFileStatus;
39 import org.apache.nifi.controller.repository.ContentNotFoundException;
40 import org.apache.nifi.controller.repository.FlowFileRecord;
41 import org.apache.nifi.flowfile.FlowFilePrioritizer;
42 import org.apache.nifi.flowfile.attributes.CoreAttributes;
43 import org.apache.nifi.groups.ProcessGroup;
44 import org.apache.nifi.groups.RemoteProcessGroup;
45 import org.apache.nifi.processor.Relationship;
46 import org.apache.nifi.remote.RemoteGroupPort;
47 import org.apache.nifi.util.FormatUtils;
48 import org.apache.nifi.web.DownloadableContent;
49 import org.apache.nifi.web.ResourceNotFoundException;
50 import org.apache.nifi.web.api.dto.ConnectableDTO;
51 import org.apache.nifi.web.api.dto.ConnectionDTO;
52 import org.apache.nifi.web.api.dto.PositionDTO;
53 import org.apache.nifi.web.dao.ConnectionDAO;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
57 import javax.ws.rs.WebApplicationException;
58 import java.io.IOException;
59 import java.io.InputStream;
60 import java.util.ArrayList;
61 import java.util.Collection;
62 import java.util.Collections;
63 import java.util.HashSet;
64 import java.util.List;
67 import java.util.regex.Matcher;
69 public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO {
71 private static final Logger logger = LoggerFactory.getLogger(StandardConnectionDAO.class);
73 private FlowController flowController;
74 private Authorizer authorizer;
76 private Connection locateConnection(final String connectionId) {
77 final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
78 final Connection connection = rootGroup.findConnection(connectionId);
80 if (connection == null) {
81 throw new ResourceNotFoundException(String.format("Unable to find connection with id '%s'.", connectionId));
88 public boolean hasConnection(String id) {
89 final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
90 return rootGroup.findConnection(id) != null;
94 public Connection getConnection(final String id) {
95 return locateConnection(id);
99 public Set<Connection> getConnections(final String groupId) {
100 final ProcessGroup group = locateProcessGroup(flowController, groupId);
101 return group.getConnections();
105 public DropFlowFileStatus getFlowFileDropRequest(String connectionId, String dropRequestId) {
106 final Connection connection = locateConnection(connectionId);
107 final FlowFileQueue queue = connection.getFlowFileQueue();
109 final DropFlowFileStatus dropRequest = queue.getDropFlowFileStatus(dropRequestId);
110 if (dropRequest == null) {
111 throw new ResourceNotFoundException(String.format("Unable to find drop request with id '%s'.", dropRequestId));
118 public ListFlowFileStatus getFlowFileListingRequest(String connectionId, String listingRequestId) {
119 final Connection connection = locateConnection(connectionId);
120 final FlowFileQueue queue = connection.getFlowFileQueue();
122 final ListFlowFileStatus listRequest = queue.getListFlowFileStatus(listingRequestId);
123 if (listRequest == null) {
124 throw new ResourceNotFoundException(String.format("Unable to find listing request with id '%s'.", listingRequestId));
131 public FlowFileRecord getFlowFile(String id, String flowFileUuid) {
133 final Connection connection = locateConnection(id);
134 final FlowFileQueue queue = connection.getFlowFileQueue();
135 final FlowFileRecord flowFile = queue.getFlowFile(flowFileUuid);
137 if (flowFile == null) {
138 throw new ResourceNotFoundException(String.format("The FlowFile with UUID %s is no longer in the active queue.", flowFileUuid));
141 // get the attributes and ensure appropriate access
142 final Map<String, String> attributes = flowFile.getAttributes();
143 final Authorizable dataAuthorizable = new DataAuthorizable(connection.getSourceAuthorizable());
144 dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser(), attributes);
147 } catch (final IOException ioe) {
148 logger.error(String.format("Unable to get the flowfile (%s) at this time.", flowFileUuid), ioe);
149 throw new IllegalStateException("Unable to get the FlowFile at this time.");
154 * Configures the specified connection using the specified dto.
156 private void configureConnection(Connection connection, ConnectionDTO connectionDTO) {
157 // validate flow file comparators/prioritizers
158 List<FlowFilePrioritizer> newPrioritizers = null;
159 final List<String> prioritizers = connectionDTO.getPrioritizers();
160 if (isNotNull(prioritizers)) {
161 final List<String> newPrioritizersClasses = new ArrayList<>(prioritizers);
162 newPrioritizers = new ArrayList<>();
163 for (final String className : newPrioritizersClasses) {
165 newPrioritizers.add(flowController.getFlowManager().createPrioritizer(className));
166 } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) {
167 throw new IllegalArgumentException("Unable to set prioritizer " + className + ": " + e);
172 // update connection queue
173 if (isNotNull(connectionDTO.getFlowFileExpiration())) {
174 connection.getFlowFileQueue().setFlowFileExpiration(connectionDTO.getFlowFileExpiration());
176 if (isNotNull(connectionDTO.getBackPressureObjectThreshold())) {
177 connection.getFlowFileQueue().setBackPressureObjectThreshold(connectionDTO.getBackPressureObjectThreshold());
179 if (isNotNull(connectionDTO.getBackPressureDataSizeThreshold())) {
180 connection.getFlowFileQueue().setBackPressureDataSizeThreshold(connectionDTO.getBackPressureDataSizeThreshold());
182 if (isNotNull(newPrioritizers)) {
183 connection.getFlowFileQueue().setPriorities(newPrioritizers);
186 final String loadBalanceStrategyName = connectionDTO.getLoadBalanceStrategy();
187 final String loadBalancePartitionAttribute = connectionDTO.getLoadBalancePartitionAttribute();
188 if (isNotNull(loadBalanceStrategyName)) {
189 final LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.valueOf(loadBalanceStrategyName);
190 connection.getFlowFileQueue().setLoadBalanceStrategy(loadBalanceStrategy, loadBalancePartitionAttribute);
193 final String loadBalanceCompressionName = connectionDTO.getLoadBalanceCompression();
194 if (isNotNull(loadBalanceCompressionName)) {
195 connection.getFlowFileQueue().setLoadBalanceCompression(LoadBalanceCompression.valueOf(loadBalanceCompressionName));
198 // update the connection state
199 if (isNotNull(connectionDTO.getBends())) {
200 final List<Position> bendPoints = new ArrayList<>();
201 for (final PositionDTO bend : connectionDTO.getBends()) {
203 bendPoints.add(new Position(bend.getX(), bend.getY()));
206 connection.setBendPoints(bendPoints);
208 if (isNotNull(connectionDTO.getName())) {
209 connection.setName(connectionDTO.getName());
211 if (isNotNull(connectionDTO.getLabelIndex())) {
212 connection.setLabelIndex(connectionDTO.getLabelIndex());
214 if (isNotNull(connectionDTO.getzIndex())) {
215 connection.setZIndex(connectionDTO.getzIndex());
220 * Validates the proposed processor configuration.
222 private List<String> validateProposedConfiguration(final String groupId, final ConnectionDTO connectionDTO) {
223 List<String> validationErrors = new ArrayList<>();
225 if (isNotNull(connectionDTO.getBackPressureObjectThreshold()) && connectionDTO.getBackPressureObjectThreshold() < 0) {
226 validationErrors.add("Max queue size must be a non-negative integer");
228 if (isNotNull(connectionDTO.getFlowFileExpiration())) {
229 Matcher expirationMatcher = FormatUtils.TIME_DURATION_PATTERN.matcher(connectionDTO.getFlowFileExpiration());
230 if (!expirationMatcher.matches()) {
231 validationErrors.add("Flow file expiration is not a valid time duration (ie 30 sec, 5 min)");
234 if (isNotNull(connectionDTO.getLabelIndex())) {
235 if (connectionDTO.getLabelIndex() < 0) {
236 validationErrors.add("The label index must be positive.");
240 // validation is required when connecting to a remote process group since each node in a
241 // cluster may or may not be authorized
242 final ConnectableDTO proposedDestination = connectionDTO.getDestination();
243 if (proposedDestination != null && ConnectableType.REMOTE_INPUT_PORT.name().equals(proposedDestination.getType())) {
244 // the group id must be specified
245 if (proposedDestination.getGroupId() == null) {
246 validationErrors.add("When the destination is a remote input port its group id is required.");
247 return validationErrors;
250 // attempt to location the proprosed destination
251 final ProcessGroup destinationParentGroup = locateProcessGroup(flowController, groupId);
252 final RemoteProcessGroup remoteProcessGroup = destinationParentGroup.getRemoteProcessGroup(proposedDestination.getGroupId());
253 if (remoteProcessGroup == null) {
254 validationErrors.add("Unable to find the specified remote process group.");
255 return validationErrors;
258 // ensure the new destination was found
259 final RemoteGroupPort remoteInputPort = remoteProcessGroup.getInputPort(proposedDestination.getId());
260 if (remoteInputPort == null) {
261 validationErrors.add("Unable to find the specified destination.");
262 return validationErrors;
266 return validationErrors;
270 public Connection createConnection(final String groupId, final ConnectionDTO connectionDTO) {
271 final ProcessGroup group = locateProcessGroup(flowController, groupId);
273 if (isNotNull(connectionDTO.getParentGroupId()) && !flowController.getFlowManager().areGroupsSame(connectionDTO.getParentGroupId(), groupId)) {
274 throw new IllegalStateException("Cannot specify a different Parent Group ID than the Group to which the Connection is being added");
277 // get the source and destination connectables
278 final ConnectableDTO sourceConnectableDTO = connectionDTO.getSource();
279 final ConnectableDTO destinationConnectableDTO = connectionDTO.getDestination();
281 // ensure both are specified
282 if (sourceConnectableDTO == null || destinationConnectableDTO == null) {
283 throw new IllegalArgumentException("Both source and destinations must be specified.");
286 // if the source/destination connectable's group id has not been set, its inferred to be the current group
287 if (sourceConnectableDTO.getGroupId() == null) {
288 sourceConnectableDTO.setGroupId(groupId);
290 if (destinationConnectableDTO.getGroupId() == null) {
291 destinationConnectableDTO.setGroupId(groupId);
294 // validate the proposed configuration
295 final List<String> validationErrors = validateProposedConfiguration(groupId, connectionDTO);
297 // ensure there was no validation errors
298 if (!validationErrors.isEmpty()) {
299 throw new ValidationException(validationErrors);
303 final Connectable source;
304 if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceConnectableDTO.getType())) {
305 final ProcessGroup sourceParentGroup = locateProcessGroup(flowController, groupId);
306 final RemoteProcessGroup remoteProcessGroup = sourceParentGroup.getRemoteProcessGroup(sourceConnectableDTO.getGroupId());
307 source = remoteProcessGroup.getOutputPort(sourceConnectableDTO.getId());
309 final ProcessGroup sourceGroup = locateProcessGroup(flowController, sourceConnectableDTO.getGroupId());
310 source = sourceGroup.getConnectable(sourceConnectableDTO.getId());
313 // find the destination
314 final Connectable destination;
315 if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationConnectableDTO.getType())) {
316 final ProcessGroup destinationParentGroup = locateProcessGroup(flowController, groupId);
317 final RemoteProcessGroup remoteProcessGroup = destinationParentGroup.getRemoteProcessGroup(destinationConnectableDTO.getGroupId());
318 destination = remoteProcessGroup.getInputPort(destinationConnectableDTO.getId());
320 final ProcessGroup destinationGroup = locateProcessGroup(flowController, destinationConnectableDTO.getGroupId());
321 destination = destinationGroup.getConnectable(destinationConnectableDTO.getId());
324 // determine the relationships
325 final Set<String> relationships = new HashSet<>();
326 if (isNotNull(connectionDTO.getSelectedRelationships())) {
327 relationships.addAll(connectionDTO.getSelectedRelationships());
330 // create the connection
331 final Connection connection = flowController.createConnection(connectionDTO.getId(), connectionDTO.getName(), source, destination, relationships);
333 // configure the connection
334 configureConnection(connection, connectionDTO);
336 // add the connection to the group
337 group.addConnection(connection);
342 public DropFlowFileStatus createFlowFileDropRequest(String id, String dropRequestId) {
343 final Connection connection = locateConnection(id);
344 final FlowFileQueue queue = connection.getFlowFileQueue();
346 final NiFiUser user = NiFiUserUtils.getNiFiUser();
348 throw new WebApplicationException(new Throwable("Unable to access details for current user."));
351 return queue.dropFlowFiles(dropRequestId, user.getIdentity());
355 public ListFlowFileStatus createFlowFileListingRequest(String id, String listingRequestId) {
356 final Connection connection = locateConnection(id);
357 final FlowFileQueue queue = connection.getFlowFileQueue();
359 // ensure we can list
362 return queue.listFlowFiles(listingRequestId, 100);
366 public void verifyCreate(String groupId, ConnectionDTO connectionDTO) {
367 // validate the incoming request
368 final List<String> validationErrors = validateProposedConfiguration(groupId, connectionDTO);
370 // ensure there was no validation errors
371 if (!validationErrors.isEmpty()) {
372 throw new ValidationException(validationErrors);
375 // Ensure that both the source and the destination for the connection exist.
376 // In the case that the source or destination is a port in a Remote Process Group,
377 // this is necessary because the ports can change in the background. It may still be
378 // possible for a port to disappear between the 'verify' stage and the creation stage,
379 // but this prevents the case where some nodes already know about the port while other
380 // nodes in the cluster do not. This is a more common case, as users may try to connect
381 // to the port as soon as the port is created.
382 final ConnectableDTO sourceDto = connectionDTO.getSource();
383 if (sourceDto == null || sourceDto.getId() == null) {
384 throw new IllegalArgumentException("Cannot create connection without specifying source");
387 final ConnectableDTO destinationDto = connectionDTO.getDestination();
388 if (destinationDto == null || destinationDto.getId() == null) {
389 throw new IllegalArgumentException("Cannot create connection without specifying destination");
392 if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceDto.getType())) {
393 final ProcessGroup sourceParentGroup = locateProcessGroup(flowController, groupId);
395 final RemoteProcessGroup remoteProcessGroup = sourceParentGroup.getRemoteProcessGroup(sourceDto.getGroupId());
396 if (remoteProcessGroup == null) {
397 throw new IllegalArgumentException("Unable to find the specified remote process group.");
400 final RemoteGroupPort sourceConnectable = remoteProcessGroup.getOutputPort(sourceDto.getId());
401 if (sourceConnectable == null) {
402 throw new IllegalArgumentException("The specified source for the connection does not exist");
403 } else if (!sourceConnectable.getTargetExists()) {
404 throw new IllegalArgumentException("The specified remote output port does not exist.");
407 final ProcessGroup sourceGroup = locateProcessGroup(flowController, sourceDto.getGroupId());
408 final Connectable sourceConnectable = sourceGroup.getConnectable(sourceDto.getId());
409 if (sourceConnectable == null) {
410 throw new IllegalArgumentException("The specified source for the connection does not exist");
414 if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationDto.getType())) {
415 final ProcessGroup destinationParentGroup = locateProcessGroup(flowController, groupId);
417 final RemoteProcessGroup remoteProcessGroup = destinationParentGroup.getRemoteProcessGroup(destinationDto.getGroupId());
418 if (remoteProcessGroup == null) {
419 throw new IllegalArgumentException("Unable to find the specified remote process group.");
422 final RemoteGroupPort destinationConnectable = remoteProcessGroup.getInputPort(destinationDto.getId());
423 if (destinationConnectable == null) {
424 throw new IllegalArgumentException("The specified destination for the connection does not exist");
425 } else if (!destinationConnectable.getTargetExists()) {
426 throw new IllegalArgumentException("The specified remote input port does not exist.");
429 final ProcessGroup destinationGroup = locateProcessGroup(flowController, destinationDto.getGroupId());
430 final Connectable destinationConnectable = destinationGroup.getConnectable(destinationDto.getId());
431 if (destinationConnectable == null) {
432 throw new IllegalArgumentException("The specified destination for the connection does not exist");
437 private void verifyList(final FlowFileQueue queue) {
438 queue.verifyCanList();
442 public void verifyList(String id) {
443 final Connection connection = locateConnection(id);
444 final FlowFileQueue queue = connection.getFlowFileQueue();
449 public void verifyUpdate(ConnectionDTO connectionDTO) {
450 verifyUpdate(locateConnection(connectionDTO.getId()), connectionDTO);
453 private void verifyUpdate(final Connection connection, final ConnectionDTO connectionDTO) {
454 // determine what the request is attempting
455 if (isAnyNotNull(connectionDTO.getBackPressureDataSizeThreshold(),
456 connectionDTO.getBackPressureObjectThreshold(),
457 connectionDTO.getDestination(),
458 connectionDTO.getFlowFileExpiration(),
459 connectionDTO.getName(),
460 connectionDTO.getPosition(),
461 connectionDTO.getPrioritizers(),
462 connectionDTO.getSelectedRelationships())) {
464 // validate the incoming request
465 final List<String> validationErrors = validateProposedConfiguration(connection.getProcessGroup().getIdentifier(), connectionDTO);
467 // ensure there was no validation errors
468 if (!validationErrors.isEmpty()) {
469 throw new ValidationException(validationErrors);
472 // If destination is changing, ensure that current destination is not running. This check is done here, rather than
473 // in the Connection object itself because the Connection object itself does not know which updates are to occur and
474 // we don't want to prevent updating things like the connection name or backpressure just because the destination is running
475 final Connectable destination = connection.getDestination();
476 if (destination != null && destination.isRunning() && destination.getConnectableType() != ConnectableType.FUNNEL && destination.getConnectableType() != ConnectableType.INPUT_PORT) {
477 throw new ValidationException(Collections.singletonList("Cannot change the destination of connection because the current destination is running"));
480 // verify that this connection supports modification
481 connection.verifyCanUpdate();
486 public Connection updateConnection(final ConnectionDTO connectionDTO) {
487 final Connection connection = locateConnection(connectionDTO.getId());
488 final ProcessGroup group = connection.getProcessGroup();
490 // ensure we can update
491 verifyUpdate(connection, connectionDTO);
493 final Collection<Relationship> newProcessorRelationships = new ArrayList<>();
494 Connectable newDestination = null;
496 // ensure that the source ID is correct, if specified.
497 final Connectable existingSource = connection.getSource();
498 if (isNotNull(connectionDTO.getSource()) && !existingSource.getIdentifier().equals(connectionDTO.getSource().getId())) {
499 throw new IllegalStateException("Connection with ID " + connectionDTO.getId() + " has conflicting Source ID");
502 // determine if the destination changed
503 final ConnectableDTO proposedDestination = connectionDTO.getDestination();
504 if (proposedDestination != null) {
505 final Connectable currentDestination = connection.getDestination();
507 // handle remote input port differently
508 if (ConnectableType.REMOTE_INPUT_PORT.name().equals(proposedDestination.getType())) {
509 // the group id must be specified
510 if (proposedDestination.getGroupId() == null) {
511 throw new IllegalArgumentException("When the destination is a remote input port its group id is required.");
514 // if the current destination is a remote input port
515 boolean isDifferentRemoteProcessGroup = false;
516 if (currentDestination.getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) {
517 RemoteGroupPort remotePort = (RemoteGroupPort) currentDestination;
518 if (!proposedDestination.getGroupId().equals(remotePort.getRemoteProcessGroup().getIdentifier())) {
519 isDifferentRemoteProcessGroup = true;
523 // if the destination is changing or the previous destination was a different remote process group
524 if (!proposedDestination.getId().equals(currentDestination.getIdentifier()) || isDifferentRemoteProcessGroup) {
525 final ProcessGroup destinationParentGroup = locateProcessGroup(flowController, group.getIdentifier());
526 final RemoteProcessGroup remoteProcessGroup = destinationParentGroup.getRemoteProcessGroup(proposedDestination.getGroupId());
528 // ensure the remote process group was found
529 if (remoteProcessGroup == null) {
530 throw new IllegalArgumentException("Unable to find the specified remote process group.");
533 final RemoteGroupPort remoteInputPort = remoteProcessGroup.getInputPort(proposedDestination.getId());
535 // ensure the new destination was found
536 if (remoteInputPort == null) {
537 throw new IllegalArgumentException("Unable to find the specified destination.");
540 // ensure the remote port actually exists
541 if (!remoteInputPort.getTargetExists()) {
542 throw new IllegalArgumentException("The specified remote input port does not exist.");
544 newDestination = remoteInputPort;
548 // if there is a different destination id
549 if (!proposedDestination.getId().equals(currentDestination.getIdentifier())) {
550 // if the destination connectable's group id has not been set, its inferred to be the current group
551 if (proposedDestination.getGroupId() == null) {
552 proposedDestination.setGroupId(group.getIdentifier());
555 final ProcessGroup destinationGroup = locateProcessGroup(flowController, proposedDestination.getGroupId());
556 newDestination = destinationGroup.getConnectable(proposedDestination.getId());
558 // ensure the new destination was found
559 if (newDestination == null) {
560 throw new IllegalArgumentException("Unable to find the specified destination.");
566 // determine any new relationships
567 final Set<String> relationships = connectionDTO.getSelectedRelationships();
568 if (isNotNull(relationships)) {
569 if (relationships.isEmpty()) {
570 throw new IllegalArgumentException("Cannot remove all relationships from Connection with ID " + connection.getIdentifier() + " -- remove the Connection instead");
572 if (existingSource == null) {
573 throw new IllegalArgumentException("Cannot specify new relationships without including the source.");
576 final Connectable destination = newDestination == null ? connection.getDestination() : newDestination;
578 for (final String relationship : relationships) {
579 int prevSize = newProcessorRelationships.size();
581 final Relationship processorRelationshipSource = existingSource.getRelationship(relationship);
583 if (processorRelationshipSource != null) {
584 newProcessorRelationships.add(processorRelationshipSource);
587 final Relationship processorRelationshipDest = destination.getRelationship(relationship);
589 if (processorRelationshipDest != null) {
590 newProcessorRelationships.add(processorRelationshipDest);
593 if (newProcessorRelationships.size() == prevSize) {
594 throw new IllegalArgumentException("Unable to locate " + relationship + " relationship.");
599 // configure the connection
600 configureConnection(connection, connectionDTO);
601 group.onComponentModified();
603 // update the relationships if necessary
604 if (!newProcessorRelationships.isEmpty()) {
605 connection.setRelationships(newProcessorRelationships);
608 // update the destination if necessary
609 if (isNotNull(newDestination)) {
610 connection.setDestination(newDestination);
617 public void verifyDelete(String id) {
618 final Connection connection = locateConnection(id);
619 connection.verifyCanDelete();
623 public void deleteConnection(final String id) {
624 final Connection connection = locateConnection(id);
625 connection.getProcessGroup().removeConnection(connection);
629 public DropFlowFileStatus deleteFlowFileDropRequest(String connectionId, String dropRequestId) {
630 final Connection connection = locateConnection(connectionId);
631 final FlowFileQueue queue = connection.getFlowFileQueue();
633 final DropFlowFileStatus dropFlowFileStatus = queue.cancelDropFlowFileRequest(dropRequestId);
634 if (dropFlowFileStatus == null) {
635 throw new ResourceNotFoundException(String.format("Unable to find drop request with id '%s'.", dropRequestId));
638 return dropFlowFileStatus;
642 public ListFlowFileStatus deleteFlowFileListingRequest(String connectionId, String listingRequestId) {
643 final Connection connection = locateConnection(connectionId);
644 final FlowFileQueue queue = connection.getFlowFileQueue();
646 final ListFlowFileStatus listFlowFileStatus = queue.cancelListFlowFileRequest(listingRequestId);
647 if (listFlowFileStatus == null) {
648 throw new ResourceNotFoundException(String.format("Unable to find listing request with id '%s'.", listingRequestId));
651 return listFlowFileStatus;
655 public DownloadableContent getContent(String id, String flowFileUuid, String requestUri) {
657 final NiFiUser user = NiFiUserUtils.getNiFiUser();
659 final Connection connection = locateConnection(id);
660 final FlowFileQueue queue = connection.getFlowFileQueue();
661 final FlowFileRecord flowFile = queue.getFlowFile(flowFileUuid);
663 if (flowFile == null) {
664 throw new ResourceNotFoundException(String.format("The FlowFile with UUID %s is no longer in the active queue.", flowFileUuid));
667 // get the attributes and ensure appropriate access
668 final Map<String, String> attributes = flowFile.getAttributes();
669 final Authorizable dataAuthorizable = new DataAuthorizable(connection.getSourceAuthorizable());
670 dataAuthorizable.authorize(authorizer, RequestAction.READ, user, attributes);
672 // get the filename and fall back to the identifier (should never happen)
673 String filename = attributes.get(CoreAttributes.FILENAME.key());
674 if (filename == null) {
675 filename = flowFileUuid;
679 final String type = attributes.get(CoreAttributes.MIME_TYPE.key());
682 final InputStream content = flowController.getContent(flowFile, user.getIdentity(), requestUri);
683 return new DownloadableContent(filename, type, content);
684 } catch (final ContentNotFoundException cnfe) {
685 throw new ResourceNotFoundException("Unable to find the specified content.");
686 } catch (final IOException ioe) {
687 logger.error(String.format("Unable to get the content for flowfile (%s) at this time.", flowFileUuid), ioe);
688 throw new IllegalStateException("Unable to get the content at this time.");
693 public void setFlowController(final FlowController flowController) {
694 this.flowController = flowController;
697 public void setAuthorizer(Authorizer authorizer) {
698 this.authorizer = authorizer;