Add DCAE MOD design tool project
[dcaegen2/platform.git] / mod / designtool / designtool-web / src / main / java / org / apache / nifi / web / dao / impl / StandardConnectionDAO.java
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * Modifications to the original nifi code for the ONAP project are made
18  * available under the Apache License, Version 2.0
19  */
20 package org.apache.nifi.web.dao.impl;
21
22 import org.apache.nifi.authorization.Authorizer;
23 import org.apache.nifi.authorization.RequestAction;
24 import org.apache.nifi.authorization.resource.Authorizable;
25 import org.apache.nifi.authorization.resource.DataAuthorizable;
26 import org.apache.nifi.authorization.user.NiFiUser;
27 import org.apache.nifi.authorization.user.NiFiUserUtils;
28 import org.apache.nifi.connectable.Connectable;
29 import org.apache.nifi.connectable.ConnectableType;
30 import org.apache.nifi.connectable.Connection;
31 import org.apache.nifi.controller.queue.LoadBalanceCompression;
32 import org.apache.nifi.controller.queue.LoadBalanceStrategy;
33 import org.apache.nifi.connectable.Position;
34 import org.apache.nifi.controller.FlowController;
35 import org.apache.nifi.controller.exception.ValidationException;
36 import org.apache.nifi.controller.queue.DropFlowFileStatus;
37 import org.apache.nifi.controller.queue.FlowFileQueue;
38 import org.apache.nifi.controller.queue.ListFlowFileStatus;
39 import org.apache.nifi.controller.repository.ContentNotFoundException;
40 import org.apache.nifi.controller.repository.FlowFileRecord;
41 import org.apache.nifi.flowfile.FlowFilePrioritizer;
42 import org.apache.nifi.flowfile.attributes.CoreAttributes;
43 import org.apache.nifi.groups.ProcessGroup;
44 import org.apache.nifi.groups.RemoteProcessGroup;
45 import org.apache.nifi.processor.Relationship;
46 import org.apache.nifi.remote.RemoteGroupPort;
47 import org.apache.nifi.util.FormatUtils;
48 import org.apache.nifi.web.DownloadableContent;
49 import org.apache.nifi.web.ResourceNotFoundException;
50 import org.apache.nifi.web.api.dto.ConnectableDTO;
51 import org.apache.nifi.web.api.dto.ConnectionDTO;
52 import org.apache.nifi.web.api.dto.PositionDTO;
53 import org.apache.nifi.web.dao.ConnectionDAO;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 import javax.ws.rs.WebApplicationException;
58 import java.io.IOException;
59 import java.io.InputStream;
60 import java.util.ArrayList;
61 import java.util.Collection;
62 import java.util.Collections;
63 import java.util.HashSet;
64 import java.util.List;
65 import java.util.Map;
66 import java.util.Set;
67 import java.util.regex.Matcher;
68
69 public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO {
70
71     private static final Logger logger = LoggerFactory.getLogger(StandardConnectionDAO.class);
72
73     private FlowController flowController;
74     private Authorizer authorizer;
75
76     private Connection locateConnection(final String connectionId) {
77         final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
78         final Connection connection = rootGroup.findConnection(connectionId);
79
80         if (connection == null) {
81             throw new ResourceNotFoundException(String.format("Unable to find connection with id '%s'.", connectionId));
82         } else {
83             return connection;
84         }
85     }
86
87     @Override
88     public boolean hasConnection(String id) {
89         final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
90         return rootGroup.findConnection(id) != null;
91     }
92
93     @Override
94     public Connection getConnection(final String id) {
95         return locateConnection(id);
96     }
97
98     @Override
99     public Set<Connection> getConnections(final String groupId) {
100         final ProcessGroup group = locateProcessGroup(flowController, groupId);
101         return group.getConnections();
102     }
103
104     @Override
105     public DropFlowFileStatus getFlowFileDropRequest(String connectionId, String dropRequestId) {
106         final Connection connection = locateConnection(connectionId);
107         final FlowFileQueue queue = connection.getFlowFileQueue();
108
109         final DropFlowFileStatus dropRequest = queue.getDropFlowFileStatus(dropRequestId);
110         if (dropRequest == null) {
111             throw new ResourceNotFoundException(String.format("Unable to find drop request with id '%s'.", dropRequestId));
112         }
113
114         return dropRequest;
115     }
116
117     @Override
118     public ListFlowFileStatus getFlowFileListingRequest(String connectionId, String listingRequestId) {
119         final Connection connection = locateConnection(connectionId);
120         final FlowFileQueue queue = connection.getFlowFileQueue();
121
122         final ListFlowFileStatus listRequest = queue.getListFlowFileStatus(listingRequestId);
123         if (listRequest == null) {
124             throw new ResourceNotFoundException(String.format("Unable to find listing request with id '%s'.", listingRequestId));
125         }
126
127         return listRequest;
128     }
129
130     @Override
131     public FlowFileRecord getFlowFile(String id, String flowFileUuid) {
132         try {
133             final Connection connection = locateConnection(id);
134             final FlowFileQueue queue = connection.getFlowFileQueue();
135             final FlowFileRecord flowFile = queue.getFlowFile(flowFileUuid);
136
137             if (flowFile == null) {
138                 throw new ResourceNotFoundException(String.format("The FlowFile with UUID %s is no longer in the active queue.", flowFileUuid));
139             }
140
141             // get the attributes and ensure appropriate access
142             final Map<String, String> attributes = flowFile.getAttributes();
143             final Authorizable dataAuthorizable = new DataAuthorizable(connection.getSourceAuthorizable());
144             dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser(), attributes);
145
146             return flowFile;
147         } catch (final IOException ioe) {
148             logger.error(String.format("Unable to get the flowfile (%s) at this time.", flowFileUuid), ioe);
149             throw new IllegalStateException("Unable to get the FlowFile at this time.");
150         }
151     }
152
153     /**
154      * Configures the specified connection using the specified dto.
155      */
156     private void configureConnection(Connection connection, ConnectionDTO connectionDTO) {
157         // validate flow file comparators/prioritizers
158         List<FlowFilePrioritizer> newPrioritizers = null;
159         final List<String> prioritizers = connectionDTO.getPrioritizers();
160         if (isNotNull(prioritizers)) {
161             final List<String> newPrioritizersClasses = new ArrayList<>(prioritizers);
162             newPrioritizers = new ArrayList<>();
163             for (final String className : newPrioritizersClasses) {
164                 try {
165                     newPrioritizers.add(flowController.getFlowManager().createPrioritizer(className));
166                 } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) {
167                     throw new IllegalArgumentException("Unable to set prioritizer " + className + ": " + e);
168                 }
169             }
170         }
171
172         // update connection queue
173         if (isNotNull(connectionDTO.getFlowFileExpiration())) {
174             connection.getFlowFileQueue().setFlowFileExpiration(connectionDTO.getFlowFileExpiration());
175         }
176         if (isNotNull(connectionDTO.getBackPressureObjectThreshold())) {
177             connection.getFlowFileQueue().setBackPressureObjectThreshold(connectionDTO.getBackPressureObjectThreshold());
178         }
179         if (isNotNull(connectionDTO.getBackPressureDataSizeThreshold())) {
180             connection.getFlowFileQueue().setBackPressureDataSizeThreshold(connectionDTO.getBackPressureDataSizeThreshold());
181         }
182         if (isNotNull(newPrioritizers)) {
183             connection.getFlowFileQueue().setPriorities(newPrioritizers);
184         }
185
186         final String loadBalanceStrategyName = connectionDTO.getLoadBalanceStrategy();
187         final String loadBalancePartitionAttribute = connectionDTO.getLoadBalancePartitionAttribute();
188         if (isNotNull(loadBalanceStrategyName)) {
189             final LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.valueOf(loadBalanceStrategyName);
190             connection.getFlowFileQueue().setLoadBalanceStrategy(loadBalanceStrategy, loadBalancePartitionAttribute);
191         }
192
193         final String loadBalanceCompressionName = connectionDTO.getLoadBalanceCompression();
194         if (isNotNull(loadBalanceCompressionName)) {
195             connection.getFlowFileQueue().setLoadBalanceCompression(LoadBalanceCompression.valueOf(loadBalanceCompressionName));
196         }
197
198         // update the connection state
199         if (isNotNull(connectionDTO.getBends())) {
200             final List<Position> bendPoints = new ArrayList<>();
201             for (final PositionDTO bend : connectionDTO.getBends()) {
202                 if (bend != null) {
203                     bendPoints.add(new Position(bend.getX(), bend.getY()));
204                 }
205             }
206             connection.setBendPoints(bendPoints);
207         }
208         if (isNotNull(connectionDTO.getName())) {
209             connection.setName(connectionDTO.getName());
210         }
211         if (isNotNull(connectionDTO.getLabelIndex())) {
212             connection.setLabelIndex(connectionDTO.getLabelIndex());
213         }
214         if (isNotNull(connectionDTO.getzIndex())) {
215             connection.setZIndex(connectionDTO.getzIndex());
216         }
217     }
218
219     /**
220      * Validates the proposed processor configuration.
221      */
222     private List<String> validateProposedConfiguration(final String groupId, final ConnectionDTO connectionDTO) {
223         List<String> validationErrors = new ArrayList<>();
224
225         if (isNotNull(connectionDTO.getBackPressureObjectThreshold()) && connectionDTO.getBackPressureObjectThreshold() < 0) {
226             validationErrors.add("Max queue size must be a non-negative integer");
227         }
228         if (isNotNull(connectionDTO.getFlowFileExpiration())) {
229             Matcher expirationMatcher = FormatUtils.TIME_DURATION_PATTERN.matcher(connectionDTO.getFlowFileExpiration());
230             if (!expirationMatcher.matches()) {
231                 validationErrors.add("Flow file expiration is not a valid time duration (ie 30 sec, 5 min)");
232             }
233         }
234         if (isNotNull(connectionDTO.getLabelIndex())) {
235             if (connectionDTO.getLabelIndex() < 0) {
236                 validationErrors.add("The label index must be positive.");
237             }
238         }
239
240         // validation is required when connecting to a remote process group since each node in a
241         // cluster may or may not be authorized
242         final ConnectableDTO proposedDestination = connectionDTO.getDestination();
243         if (proposedDestination != null && ConnectableType.REMOTE_INPUT_PORT.name().equals(proposedDestination.getType())) {
244             // the group id must be specified
245             if (proposedDestination.getGroupId() == null) {
246                 validationErrors.add("When the destination is a remote input port its group id is required.");
247                 return validationErrors;
248             }
249
250             // attempt to location the proprosed destination
251             final ProcessGroup destinationParentGroup = locateProcessGroup(flowController, groupId);
252             final RemoteProcessGroup remoteProcessGroup = destinationParentGroup.getRemoteProcessGroup(proposedDestination.getGroupId());
253             if (remoteProcessGroup == null) {
254                 validationErrors.add("Unable to find the specified remote process group.");
255                 return validationErrors;
256             }
257
258             // ensure the new destination was found
259             final RemoteGroupPort remoteInputPort = remoteProcessGroup.getInputPort(proposedDestination.getId());
260             if (remoteInputPort == null) {
261                 validationErrors.add("Unable to find the specified destination.");
262                 return validationErrors;
263             }
264         }
265
266         return validationErrors;
267     }
268
269     @Override
270     public Connection createConnection(final String groupId, final ConnectionDTO connectionDTO) {
271         final ProcessGroup group = locateProcessGroup(flowController, groupId);
272
273         if (isNotNull(connectionDTO.getParentGroupId()) && !flowController.getFlowManager().areGroupsSame(connectionDTO.getParentGroupId(), groupId)) {
274             throw new IllegalStateException("Cannot specify a different Parent Group ID than the Group to which the Connection is being added");
275         }
276
277         // get the source and destination connectables
278         final ConnectableDTO sourceConnectableDTO = connectionDTO.getSource();
279         final ConnectableDTO destinationConnectableDTO = connectionDTO.getDestination();
280
281         // ensure both are specified
282         if (sourceConnectableDTO == null || destinationConnectableDTO == null) {
283             throw new IllegalArgumentException("Both source and destinations must be specified.");
284         }
285
286         // if the source/destination connectable's group id has not been set, its inferred to be the current group
287         if (sourceConnectableDTO.getGroupId() == null) {
288             sourceConnectableDTO.setGroupId(groupId);
289         }
290         if (destinationConnectableDTO.getGroupId() == null) {
291             destinationConnectableDTO.setGroupId(groupId);
292         }
293
294         // validate the proposed configuration
295         final List<String> validationErrors = validateProposedConfiguration(groupId, connectionDTO);
296
297         // ensure there was no validation errors
298         if (!validationErrors.isEmpty()) {
299             throw new ValidationException(validationErrors);
300         }
301
302         // find the source
303         final Connectable source;
304         if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceConnectableDTO.getType())) {
305             final ProcessGroup sourceParentGroup = locateProcessGroup(flowController, groupId);
306             final RemoteProcessGroup remoteProcessGroup = sourceParentGroup.getRemoteProcessGroup(sourceConnectableDTO.getGroupId());
307             source = remoteProcessGroup.getOutputPort(sourceConnectableDTO.getId());
308         } else {
309             final ProcessGroup sourceGroup = locateProcessGroup(flowController, sourceConnectableDTO.getGroupId());
310             source = sourceGroup.getConnectable(sourceConnectableDTO.getId());
311         }
312
313         // find the destination
314         final Connectable destination;
315         if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationConnectableDTO.getType())) {
316             final ProcessGroup destinationParentGroup = locateProcessGroup(flowController, groupId);
317             final RemoteProcessGroup remoteProcessGroup = destinationParentGroup.getRemoteProcessGroup(destinationConnectableDTO.getGroupId());
318             destination = remoteProcessGroup.getInputPort(destinationConnectableDTO.getId());
319         } else {
320             final ProcessGroup destinationGroup = locateProcessGroup(flowController, destinationConnectableDTO.getGroupId());
321             destination = destinationGroup.getConnectable(destinationConnectableDTO.getId());
322         }
323
324         // determine the relationships
325         final Set<String> relationships = new HashSet<>();
326         if (isNotNull(connectionDTO.getSelectedRelationships())) {
327             relationships.addAll(connectionDTO.getSelectedRelationships());
328         }
329
330         // create the connection
331         final Connection connection = flowController.createConnection(connectionDTO.getId(), connectionDTO.getName(), source, destination, relationships);
332
333         // configure the connection
334         configureConnection(connection, connectionDTO);
335
336         // add the connection to the group
337         group.addConnection(connection);
338         return connection;
339     }
340
341     @Override
342     public DropFlowFileStatus createFlowFileDropRequest(String id, String dropRequestId) {
343         final Connection connection = locateConnection(id);
344         final FlowFileQueue queue = connection.getFlowFileQueue();
345
346         final NiFiUser user = NiFiUserUtils.getNiFiUser();
347         if (user == null) {
348             throw new WebApplicationException(new Throwable("Unable to access details for current user."));
349         }
350
351         return queue.dropFlowFiles(dropRequestId, user.getIdentity());
352     }
353
354     @Override
355     public ListFlowFileStatus createFlowFileListingRequest(String id, String listingRequestId) {
356         final Connection connection = locateConnection(id);
357         final FlowFileQueue queue = connection.getFlowFileQueue();
358
359         // ensure we can list
360         verifyList(queue);
361
362         return queue.listFlowFiles(listingRequestId, 100);
363     }
364
365     @Override
366     public void verifyCreate(String groupId, ConnectionDTO connectionDTO) {
367         // validate the incoming request
368         final List<String> validationErrors = validateProposedConfiguration(groupId, connectionDTO);
369
370         // ensure there was no validation errors
371         if (!validationErrors.isEmpty()) {
372             throw new ValidationException(validationErrors);
373         }
374
375         // Ensure that both the source and the destination for the connection exist.
376         // In the case that the source or destination is a port in a Remote Process Group,
377         // this is necessary because the ports can change in the background. It may still be
378         // possible for a port to disappear between the 'verify' stage and the creation stage,
379         // but this prevents the case where some nodes already know about the port while other
380         // nodes in the cluster do not. This is a more common case, as users may try to connect
381         // to the port as soon as the port is created.
382         final ConnectableDTO sourceDto = connectionDTO.getSource();
383         if (sourceDto == null || sourceDto.getId() == null) {
384             throw new IllegalArgumentException("Cannot create connection without specifying source");
385         }
386
387         final ConnectableDTO destinationDto = connectionDTO.getDestination();
388         if (destinationDto == null || destinationDto.getId() == null) {
389             throw new IllegalArgumentException("Cannot create connection without specifying destination");
390         }
391
392         if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceDto.getType())) {
393             final ProcessGroup sourceParentGroup = locateProcessGroup(flowController, groupId);
394
395             final RemoteProcessGroup remoteProcessGroup = sourceParentGroup.getRemoteProcessGroup(sourceDto.getGroupId());
396             if (remoteProcessGroup == null) {
397                 throw new IllegalArgumentException("Unable to find the specified remote process group.");
398             }
399
400             final RemoteGroupPort sourceConnectable = remoteProcessGroup.getOutputPort(sourceDto.getId());
401             if (sourceConnectable == null) {
402                 throw new IllegalArgumentException("The specified source for the connection does not exist");
403             } else if (!sourceConnectable.getTargetExists()) {
404                 throw new IllegalArgumentException("The specified remote output port does not exist.");
405             }
406         } else {
407             final ProcessGroup sourceGroup = locateProcessGroup(flowController, sourceDto.getGroupId());
408             final Connectable sourceConnectable = sourceGroup.getConnectable(sourceDto.getId());
409             if (sourceConnectable == null) {
410                 throw new IllegalArgumentException("The specified source for the connection does not exist");
411             }
412         }
413
414         if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationDto.getType())) {
415             final ProcessGroup destinationParentGroup = locateProcessGroup(flowController, groupId);
416
417             final RemoteProcessGroup remoteProcessGroup = destinationParentGroup.getRemoteProcessGroup(destinationDto.getGroupId());
418             if (remoteProcessGroup == null) {
419                 throw new IllegalArgumentException("Unable to find the specified remote process group.");
420             }
421
422             final RemoteGroupPort destinationConnectable = remoteProcessGroup.getInputPort(destinationDto.getId());
423             if (destinationConnectable == null) {
424                 throw new IllegalArgumentException("The specified destination for the connection does not exist");
425             } else if (!destinationConnectable.getTargetExists()) {
426                 throw new IllegalArgumentException("The specified remote input port does not exist.");
427             }
428         } else {
429             final ProcessGroup destinationGroup = locateProcessGroup(flowController, destinationDto.getGroupId());
430             final Connectable destinationConnectable = destinationGroup.getConnectable(destinationDto.getId());
431             if (destinationConnectable == null) {
432                 throw new IllegalArgumentException("The specified destination for the connection does not exist");
433             }
434         }
435     }
436
437     private void verifyList(final FlowFileQueue queue) {
438         queue.verifyCanList();
439     }
440
441     @Override
442     public void verifyList(String id) {
443         final Connection connection = locateConnection(id);
444         final FlowFileQueue queue = connection.getFlowFileQueue();
445         verifyList(queue);
446     }
447
448     @Override
449     public void verifyUpdate(ConnectionDTO connectionDTO) {
450         verifyUpdate(locateConnection(connectionDTO.getId()), connectionDTO);
451     }
452
453     private void verifyUpdate(final Connection connection, final ConnectionDTO connectionDTO) {
454         // determine what the request is attempting
455         if (isAnyNotNull(connectionDTO.getBackPressureDataSizeThreshold(),
456                 connectionDTO.getBackPressureObjectThreshold(),
457                 connectionDTO.getDestination(),
458                 connectionDTO.getFlowFileExpiration(),
459                 connectionDTO.getName(),
460                 connectionDTO.getPosition(),
461                 connectionDTO.getPrioritizers(),
462                 connectionDTO.getSelectedRelationships())) {
463
464             // validate the incoming request
465             final List<String> validationErrors = validateProposedConfiguration(connection.getProcessGroup().getIdentifier(), connectionDTO);
466
467             // ensure there was no validation errors
468             if (!validationErrors.isEmpty()) {
469                 throw new ValidationException(validationErrors);
470             }
471
472             // If destination is changing, ensure that current destination is not running. This check is done here, rather than
473             // in the Connection object itself because the Connection object itself does not know which updates are to occur and
474             // we don't want to prevent updating things like the connection name or backpressure just because the destination is running
475             final Connectable destination = connection.getDestination();
476             if (destination != null && destination.isRunning() && destination.getConnectableType() != ConnectableType.FUNNEL && destination.getConnectableType() != ConnectableType.INPUT_PORT) {
477                 throw new ValidationException(Collections.singletonList("Cannot change the destination of connection because the current destination is running"));
478             }
479
480             // verify that this connection supports modification
481             connection.verifyCanUpdate();
482         }
483     }
484
485     @Override
486     public Connection updateConnection(final ConnectionDTO connectionDTO) {
487         final Connection connection = locateConnection(connectionDTO.getId());
488         final ProcessGroup group = connection.getProcessGroup();
489
490         // ensure we can update
491         verifyUpdate(connection, connectionDTO);
492
493         final Collection<Relationship> newProcessorRelationships = new ArrayList<>();
494         Connectable newDestination = null;
495
496         // ensure that the source ID is correct, if specified.
497         final Connectable existingSource = connection.getSource();
498         if (isNotNull(connectionDTO.getSource()) && !existingSource.getIdentifier().equals(connectionDTO.getSource().getId())) {
499             throw new IllegalStateException("Connection with ID " + connectionDTO.getId() + " has conflicting Source ID");
500         }
501
502         // determine if the destination changed
503         final ConnectableDTO proposedDestination = connectionDTO.getDestination();
504         if (proposedDestination != null) {
505             final Connectable currentDestination = connection.getDestination();
506
507             // handle remote input port differently
508             if (ConnectableType.REMOTE_INPUT_PORT.name().equals(proposedDestination.getType())) {
509                 // the group id must be specified
510                 if (proposedDestination.getGroupId() == null) {
511                     throw new IllegalArgumentException("When the destination is a remote input port its group id is required.");
512                 }
513
514                 // if the current destination is a remote input port
515                 boolean isDifferentRemoteProcessGroup = false;
516                 if (currentDestination.getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) {
517                     RemoteGroupPort remotePort = (RemoteGroupPort) currentDestination;
518                     if (!proposedDestination.getGroupId().equals(remotePort.getRemoteProcessGroup().getIdentifier())) {
519                         isDifferentRemoteProcessGroup = true;
520                     }
521                 }
522
523                 // if the destination is changing or the previous destination was a different remote process group
524                 if (!proposedDestination.getId().equals(currentDestination.getIdentifier()) || isDifferentRemoteProcessGroup) {
525                     final ProcessGroup destinationParentGroup = locateProcessGroup(flowController, group.getIdentifier());
526                     final RemoteProcessGroup remoteProcessGroup = destinationParentGroup.getRemoteProcessGroup(proposedDestination.getGroupId());
527
528                     // ensure the remote process group was found
529                     if (remoteProcessGroup == null) {
530                         throw new IllegalArgumentException("Unable to find the specified remote process group.");
531                     }
532
533                     final RemoteGroupPort remoteInputPort = remoteProcessGroup.getInputPort(proposedDestination.getId());
534
535                     // ensure the new destination was found
536                     if (remoteInputPort == null) {
537                         throw new IllegalArgumentException("Unable to find the specified destination.");
538                     }
539
540                     // ensure the remote port actually exists
541                     if (!remoteInputPort.getTargetExists()) {
542                         throw new IllegalArgumentException("The specified remote input port does not exist.");
543                     } else {
544                         newDestination = remoteInputPort;
545                     }
546                 }
547             } else {
548                 // if there is a different destination id
549                 if (!proposedDestination.getId().equals(currentDestination.getIdentifier())) {
550                     // if the destination connectable's group id has not been set, its inferred to be the current group
551                     if (proposedDestination.getGroupId() == null) {
552                         proposedDestination.setGroupId(group.getIdentifier());
553                     }
554
555                     final ProcessGroup destinationGroup = locateProcessGroup(flowController, proposedDestination.getGroupId());
556                     newDestination = destinationGroup.getConnectable(proposedDestination.getId());
557
558                     // ensure the new destination was found
559                     if (newDestination == null) {
560                         throw new IllegalArgumentException("Unable to find the specified destination.");
561                     }
562                 }
563             }
564         }
565
566         // determine any new relationships
567         final Set<String> relationships = connectionDTO.getSelectedRelationships();
568         if (isNotNull(relationships)) {
569             if (relationships.isEmpty()) {
570                 throw new IllegalArgumentException("Cannot remove all relationships from Connection with ID " + connection.getIdentifier() + " -- remove the Connection instead");
571             }
572             if (existingSource == null) {
573                 throw new IllegalArgumentException("Cannot specify new relationships without including the source.");
574             }
575
576             final Connectable destination = newDestination == null ? connection.getDestination() : newDestination;
577
578             for (final String relationship : relationships) {
579                 int prevSize = newProcessorRelationships.size();
580
581                 final Relationship processorRelationshipSource = existingSource.getRelationship(relationship);
582
583                 if (processorRelationshipSource != null) {
584                     newProcessorRelationships.add(processorRelationshipSource);
585                 }
586
587                 final Relationship processorRelationshipDest = destination.getRelationship(relationship);
588
589                 if (processorRelationshipDest != null) {
590                     newProcessorRelationships.add(processorRelationshipDest);
591                 }
592
593                 if (newProcessorRelationships.size() == prevSize) {
594                     throw new IllegalArgumentException("Unable to locate " + relationship + " relationship.");
595                 }
596             }
597         }
598
599         // configure the connection
600         configureConnection(connection, connectionDTO);
601         group.onComponentModified();
602
603         // update the relationships if necessary
604         if (!newProcessorRelationships.isEmpty()) {
605             connection.setRelationships(newProcessorRelationships);
606         }
607
608         // update the destination if necessary
609         if (isNotNull(newDestination)) {
610             connection.setDestination(newDestination);
611         }
612
613         return connection;
614     }
615
616     @Override
617     public void verifyDelete(String id) {
618         final Connection connection = locateConnection(id);
619         connection.verifyCanDelete();
620     }
621
622     @Override
623     public void deleteConnection(final String id) {
624         final Connection connection = locateConnection(id);
625         connection.getProcessGroup().removeConnection(connection);
626     }
627
628     @Override
629     public DropFlowFileStatus deleteFlowFileDropRequest(String connectionId, String dropRequestId) {
630         final Connection connection = locateConnection(connectionId);
631         final FlowFileQueue queue = connection.getFlowFileQueue();
632
633         final DropFlowFileStatus dropFlowFileStatus = queue.cancelDropFlowFileRequest(dropRequestId);
634         if (dropFlowFileStatus == null) {
635             throw new ResourceNotFoundException(String.format("Unable to find drop request with id '%s'.", dropRequestId));
636         }
637
638         return dropFlowFileStatus;
639     }
640
641     @Override
642     public ListFlowFileStatus deleteFlowFileListingRequest(String connectionId, String listingRequestId) {
643         final Connection connection = locateConnection(connectionId);
644         final FlowFileQueue queue = connection.getFlowFileQueue();
645
646         final ListFlowFileStatus listFlowFileStatus = queue.cancelListFlowFileRequest(listingRequestId);
647         if (listFlowFileStatus == null) {
648             throw new ResourceNotFoundException(String.format("Unable to find listing request with id '%s'.", listingRequestId));
649         }
650
651         return listFlowFileStatus;
652     }
653
654     @Override
655     public DownloadableContent getContent(String id, String flowFileUuid, String requestUri) {
656         try {
657             final NiFiUser user = NiFiUserUtils.getNiFiUser();
658
659             final Connection connection = locateConnection(id);
660             final FlowFileQueue queue = connection.getFlowFileQueue();
661             final FlowFileRecord flowFile = queue.getFlowFile(flowFileUuid);
662
663             if (flowFile == null) {
664                 throw new ResourceNotFoundException(String.format("The FlowFile with UUID %s is no longer in the active queue.", flowFileUuid));
665             }
666
667             // get the attributes and ensure appropriate access
668             final Map<String, String> attributes = flowFile.getAttributes();
669             final Authorizable dataAuthorizable = new DataAuthorizable(connection.getSourceAuthorizable());
670             dataAuthorizable.authorize(authorizer, RequestAction.READ, user, attributes);
671
672             // get the filename and fall back to the identifier (should never happen)
673             String filename = attributes.get(CoreAttributes.FILENAME.key());
674             if (filename == null) {
675                 filename = flowFileUuid;
676             }
677
678             // get the mime-type
679             final String type = attributes.get(CoreAttributes.MIME_TYPE.key());
680
681             // get the content
682             final InputStream content = flowController.getContent(flowFile, user.getIdentity(), requestUri);
683             return new DownloadableContent(filename, type, content);
684         } catch (final ContentNotFoundException cnfe) {
685             throw new ResourceNotFoundException("Unable to find the specified content.");
686         } catch (final IOException ioe) {
687             logger.error(String.format("Unable to get the content for flowfile (%s) at this time.", flowFileUuid), ioe);
688             throw new IllegalStateException("Unable to get the content at this time.");
689         }
690     }
691
692     /* setters */
693     public void setFlowController(final FlowController flowController) {
694         this.flowController = flowController;
695     }
696
697     public void setAuthorizer(Authorizer authorizer) {
698         this.authorizer = authorizer;
699     }
700 }