2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * Modifications to the original nifi code for the ONAP project are made
18 * available under the Apache License, Version 2.0
20 package org.apache.nifi.web;
22 import com.google.common.collect.Sets;
23 import org.apache.commons.collections4.CollectionUtils;
24 import org.apache.nifi.action.Action;
25 import org.apache.nifi.action.Component;
26 import org.apache.nifi.action.FlowChangeAction;
27 import org.apache.nifi.action.Operation;
28 import org.apache.nifi.action.details.FlowChangePurgeDetails;
29 import org.apache.nifi.admin.service.AuditService;
30 import org.apache.nifi.authorization.AccessDeniedException;
31 import org.apache.nifi.authorization.AccessPolicy;
32 import org.apache.nifi.authorization.AuthorizableLookup;
33 import org.apache.nifi.authorization.AuthorizationRequest;
34 import org.apache.nifi.authorization.AuthorizationResult;
35 import org.apache.nifi.authorization.AuthorizationResult.Result;
36 import org.apache.nifi.authorization.AuthorizeAccess;
37 import org.apache.nifi.authorization.Authorizer;
38 import org.apache.nifi.authorization.Group;
39 import org.apache.nifi.authorization.RequestAction;
40 import org.apache.nifi.authorization.Resource;
41 import org.apache.nifi.authorization.User;
42 import org.apache.nifi.authorization.UserContextKeys;
43 import org.apache.nifi.authorization.resource.Authorizable;
44 import org.apache.nifi.authorization.resource.EnforcePolicyPermissionsThroughBaseResource;
45 import org.apache.nifi.authorization.resource.OperationAuthorizable;
46 import org.apache.nifi.authorization.resource.ResourceFactory;
47 import org.apache.nifi.authorization.user.NiFiUser;
48 import org.apache.nifi.authorization.user.NiFiUserUtils;
49 import org.apache.nifi.bundle.BundleCoordinate;
50 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
51 import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
52 import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
53 import org.apache.nifi.cluster.coordination.node.ClusterRoles;
54 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
55 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
56 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
57 import org.apache.nifi.cluster.coordination.node.OffloadCode;
58 import org.apache.nifi.cluster.event.NodeEvent;
59 import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException;
60 import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
61 import org.apache.nifi.cluster.protocol.NodeIdentifier;
62 import org.apache.nifi.components.ConfigurableComponent;
63 import org.apache.nifi.components.PropertyDescriptor;
64 import org.apache.nifi.components.RequiredPermission;
65 import org.apache.nifi.components.ValidationResult;
66 import org.apache.nifi.components.Validator;
67 import org.apache.nifi.components.state.Scope;
68 import org.apache.nifi.components.state.StateMap;
69 import org.apache.nifi.connectable.Connectable;
70 import org.apache.nifi.connectable.Connection;
71 import org.apache.nifi.connectable.Funnel;
72 import org.apache.nifi.connectable.Port;
73 import org.apache.nifi.controller.ComponentNode;
74 import org.apache.nifi.controller.Counter;
75 import org.apache.nifi.controller.FlowController;
76 import org.apache.nifi.controller.ProcessorNode;
77 import org.apache.nifi.controller.ReportingTaskNode;
78 import org.apache.nifi.controller.ScheduledState;
79 import org.apache.nifi.controller.Snippet;
80 import org.apache.nifi.controller.Template;
81 import org.apache.nifi.controller.label.Label;
82 import org.apache.nifi.controller.leader.election.LeaderElectionManager;
83 import org.apache.nifi.controller.repository.claim.ContentDirection;
84 import org.apache.nifi.controller.service.ControllerServiceNode;
85 import org.apache.nifi.controller.service.ControllerServiceReference;
86 import org.apache.nifi.controller.service.ControllerServiceState;
87 import org.apache.nifi.controller.status.ProcessGroupStatus;
88 import org.apache.nifi.controller.status.ProcessorStatus;
89 import org.apache.nifi.diagnostics.SystemDiagnostics;
90 import org.apache.nifi.events.BulletinFactory;
91 import org.apache.nifi.groups.ProcessGroup;
92 import org.apache.nifi.groups.ProcessGroupCounts;
93 import org.apache.nifi.groups.RemoteProcessGroup;
94 import org.apache.nifi.history.History;
95 import org.apache.nifi.history.HistoryQuery;
96 import org.apache.nifi.history.PreviousValue;
97 import org.apache.nifi.registry.ComponentVariableRegistry;
98 import org.apache.nifi.registry.authorization.Permissions;
99 import org.apache.nifi.registry.bucket.Bucket;
100 import org.apache.nifi.registry.client.NiFiRegistryException;
101 import org.apache.nifi.registry.flow.FlowRegistry;
102 import org.apache.nifi.registry.flow.FlowRegistryClient;
103 import org.apache.nifi.registry.flow.VersionControlInformation;
104 import org.apache.nifi.registry.flow.VersionedComponent;
105 import org.apache.nifi.registry.flow.VersionedConnection;
106 import org.apache.nifi.registry.flow.VersionedFlow;
107 import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
108 import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
109 import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
110 import org.apache.nifi.registry.flow.VersionedFlowState;
111 import org.apache.nifi.registry.flow.VersionedProcessGroup;
112 import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
113 import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor;
114 import org.apache.nifi.registry.flow.diff.DifferenceType;
115 import org.apache.nifi.registry.flow.diff.FlowComparator;
116 import org.apache.nifi.registry.flow.diff.FlowComparison;
117 import org.apache.nifi.registry.flow.diff.FlowDifference;
118 import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
119 import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
120 import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor;
121 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
122 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
123 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
124 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
125 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort;
126 import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
127 import org.apache.nifi.remote.RemoteGroupPort;
128 import org.apache.nifi.remote.RootGroupPort;
129 import org.apache.nifi.reporting.Bulletin;
130 import org.apache.nifi.reporting.BulletinQuery;
131 import org.apache.nifi.reporting.BulletinRepository;
132 import org.apache.nifi.reporting.ComponentType;
133 import org.apache.nifi.util.BundleUtils;
134 import org.apache.nifi.util.FlowDifferenceFilters;
135 import org.apache.nifi.util.NiFiProperties;
136 import org.apache.nifi.web.api.dto.AccessPolicyDTO;
137 import org.apache.nifi.web.api.dto.AccessPolicySummaryDTO;
138 import org.apache.nifi.web.api.dto.AffectedComponentDTO;
139 import org.apache.nifi.web.api.dto.BucketDTO;
140 import org.apache.nifi.web.api.dto.BulletinBoardDTO;
141 import org.apache.nifi.web.api.dto.BulletinDTO;
142 import org.apache.nifi.web.api.dto.BulletinQueryDTO;
143 import org.apache.nifi.web.api.dto.BundleDTO;
144 import org.apache.nifi.web.api.dto.ClusterDTO;
145 import org.apache.nifi.web.api.dto.ComponentDTO;
146 import org.apache.nifi.web.api.dto.ComponentDifferenceDTO;
147 import org.apache.nifi.web.api.dto.ComponentHistoryDTO;
148 import org.apache.nifi.web.api.dto.ComponentReferenceDTO;
149 import org.apache.nifi.web.api.dto.ComponentRestrictionPermissionDTO;
150 import org.apache.nifi.web.api.dto.ComponentStateDTO;
151 import org.apache.nifi.web.api.dto.ConnectionDTO;
152 import org.apache.nifi.web.api.dto.ControllerConfigurationDTO;
153 import org.apache.nifi.web.api.dto.ControllerDTO;
154 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
155 import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
156 import org.apache.nifi.web.api.dto.CounterDTO;
157 import org.apache.nifi.web.api.dto.CountersDTO;
158 import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
159 import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
160 import org.apache.nifi.web.api.dto.DropRequestDTO;
161 import org.apache.nifi.web.api.dto.DtoFactory;
162 import org.apache.nifi.web.api.dto.EntityFactory;
163 import org.apache.nifi.web.api.dto.FlowConfigurationDTO;
164 import org.apache.nifi.web.api.dto.FlowFileDTO;
165 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
166 import org.apache.nifi.web.api.dto.FunnelDTO;
167 import org.apache.nifi.web.api.dto.LabelDTO;
168 import org.apache.nifi.web.api.dto.ListingRequestDTO;
169 import org.apache.nifi.web.api.dto.NodeDTO;
170 import org.apache.nifi.web.api.dto.PermissionsDTO;
171 import org.apache.nifi.web.api.dto.PortDTO;
172 import org.apache.nifi.web.api.dto.PreviousValueDTO;
173 import org.apache.nifi.web.api.dto.ProcessGroupDTO;
174 import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
175 import org.apache.nifi.web.api.dto.ProcessorDTO;
176 import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
177 import org.apache.nifi.web.api.dto.PropertyHistoryDTO;
178 import org.apache.nifi.web.api.dto.RegistryDTO;
179 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
180 import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
181 import org.apache.nifi.web.api.dto.ReportingTaskDTO;
182 import org.apache.nifi.web.api.dto.RequiredPermissionDTO;
183 import org.apache.nifi.web.api.dto.ResourceDTO;
184 import org.apache.nifi.web.api.dto.RevisionDTO;
185 import org.apache.nifi.web.api.dto.SnippetDTO;
186 import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
187 import org.apache.nifi.web.api.dto.TemplateDTO;
188 import org.apache.nifi.web.api.dto.UserDTO;
189 import org.apache.nifi.web.api.dto.UserGroupDTO;
190 import org.apache.nifi.web.api.dto.VariableRegistryDTO;
191 import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
192 import org.apache.nifi.web.api.dto.VersionedFlowDTO;
193 import org.apache.nifi.web.api.dto.action.HistoryDTO;
194 import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
195 import org.apache.nifi.web.api.dto.diagnostics.ConnectionDiagnosticsDTO;
196 import org.apache.nifi.web.api.dto.diagnostics.ControllerServiceDiagnosticsDTO;
197 import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsDTO;
198 import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsSnapshotDTO;
199 import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO;
200 import org.apache.nifi.web.api.dto.flow.FlowDTO;
201 import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
202 import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
203 import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO;
204 import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO;
205 import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
206 import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
207 import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
208 import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO;
209 import org.apache.nifi.web.api.dto.status.PortStatusDTO;
210 import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
211 import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
212 import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
213 import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
214 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
215 import org.apache.nifi.web.api.entity.AccessPolicyEntity;
216 import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity;
217 import org.apache.nifi.web.api.entity.ActionEntity;
218 import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
219 import org.apache.nifi.web.api.entity.AffectedComponentEntity;
220 import org.apache.nifi.web.api.entity.BucketEntity;
221 import org.apache.nifi.web.api.entity.BulletinEntity;
222 import org.apache.nifi.web.api.entity.ComponentReferenceEntity;
223 import org.apache.nifi.web.api.entity.ConnectionEntity;
224 import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
225 import org.apache.nifi.web.api.entity.ControllerBulletinsEntity;
226 import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
227 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
228 import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity;
229 import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
230 import org.apache.nifi.web.api.entity.CurrentUserEntity;
231 import org.apache.nifi.web.api.entity.FlowComparisonEntity;
232 import org.apache.nifi.web.api.entity.FlowConfigurationEntity;
233 import org.apache.nifi.web.api.entity.FlowEntity;
234 import org.apache.nifi.web.api.entity.FunnelEntity;
235 import org.apache.nifi.web.api.entity.LabelEntity;
236 import org.apache.nifi.web.api.entity.PortEntity;
237 import org.apache.nifi.web.api.entity.PortStatusEntity;
238 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
239 import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
240 import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
241 import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
242 import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity;
243 import org.apache.nifi.web.api.entity.ProcessorEntity;
244 import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
245 import org.apache.nifi.web.api.entity.RegistryClientEntity;
246 import org.apache.nifi.web.api.entity.RegistryEntity;
247 import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
248 import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity;
249 import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity;
250 import org.apache.nifi.web.api.entity.ReportingTaskEntity;
251 import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
252 import org.apache.nifi.web.api.entity.SnippetEntity;
253 import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity;
254 import org.apache.nifi.web.api.entity.StatusHistoryEntity;
255 import org.apache.nifi.web.api.entity.TemplateEntity;
256 import org.apache.nifi.web.api.entity.TenantEntity;
257 import org.apache.nifi.web.api.entity.UserEntity;
258 import org.apache.nifi.web.api.entity.UserGroupEntity;
259 import org.apache.nifi.web.api.entity.VariableEntity;
260 import org.apache.nifi.web.api.entity.VariableRegistryEntity;
261 import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
262 import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
263 import org.apache.nifi.web.api.entity.VersionedFlowEntity;
264 import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
265 import org.apache.nifi.web.controller.ControllerFacade;
266 import org.apache.nifi.web.dao.AccessPolicyDAO;
267 import org.apache.nifi.web.dao.ConnectionDAO;
268 import org.apache.nifi.web.dao.ControllerServiceDAO;
269 import org.apache.nifi.web.dao.FunnelDAO;
270 import org.apache.nifi.web.dao.LabelDAO;
271 import org.apache.nifi.web.dao.PortDAO;
272 import org.apache.nifi.web.dao.ProcessGroupDAO;
273 import org.apache.nifi.web.dao.ProcessorDAO;
274 import org.apache.nifi.web.dao.RegistryDAO;
275 import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
276 import org.apache.nifi.web.dao.ReportingTaskDAO;
277 import org.apache.nifi.web.dao.SnippetDAO;
278 import org.apache.nifi.web.dao.TemplateDAO;
279 import org.apache.nifi.web.dao.UserDAO;
280 import org.apache.nifi.web.dao.UserGroupDAO;
281 import org.apache.nifi.web.revision.DeleteRevisionTask;
282 import org.apache.nifi.web.revision.ExpiredRevisionClaimException;
283 import org.apache.nifi.web.revision.RevisionClaim;
284 import org.apache.nifi.web.revision.RevisionManager;
285 import org.apache.nifi.web.revision.RevisionUpdate;
286 import org.apache.nifi.web.revision.StandardRevisionClaim;
287 import org.apache.nifi.web.revision.StandardRevisionUpdate;
288 import org.apache.nifi.web.revision.UpdateRevisionTask;
289 import org.apache.nifi.web.util.SnippetUtils;
290 import org.slf4j.Logger;
291 import org.slf4j.LoggerFactory;
293 import javax.ws.rs.WebApplicationException;
294 import javax.ws.rs.core.Response;
295 import java.io.IOException;
296 import java.nio.charset.StandardCharsets;
297 import java.util.ArrayList;
298 import java.util.Arrays;
299 import java.util.Collection;
300 import java.util.Collections;
301 import java.util.Comparator;
302 import java.util.Date;
303 import java.util.HashMap;
304 import java.util.HashSet;
305 import java.util.LinkedHashMap;
306 import java.util.LinkedHashSet;
307 import java.util.List;
308 import java.util.ListIterator;
309 import java.util.Map;
310 import java.util.Objects;
311 import java.util.Optional;
312 import java.util.Set;
313 import java.util.UUID;
314 import java.util.concurrent.TimeUnit;
315 import java.util.function.Function;
316 import java.util.function.Predicate;
317 import java.util.function.Supplier;
318 import java.util.stream.Collectors;
319 import java.util.stream.Stream;
322 * Implementation of NiFiServiceFacade that performs revision checking.
324 public class StandardNiFiServiceFacade implements NiFiServiceFacade {
325 private static final Logger logger = LoggerFactory.getLogger(StandardNiFiServiceFacade.class);
326 private static final int VALIDATION_WAIT_MILLIS = 50;
328 // nifi core components
329 private ControllerFacade controllerFacade;
330 private SnippetUtils snippetUtils;
333 private RevisionManager revisionManager;
334 private BulletinRepository bulletinRepository;
336 // data access objects
337 private ProcessorDAO processorDAO;
338 private ProcessGroupDAO processGroupDAO;
339 private RemoteProcessGroupDAO remoteProcessGroupDAO;
340 private LabelDAO labelDAO;
341 private FunnelDAO funnelDAO;
342 private SnippetDAO snippetDAO;
343 private PortDAO inputPortDAO;
344 private PortDAO outputPortDAO;
345 private ConnectionDAO connectionDAO;
346 private ControllerServiceDAO controllerServiceDAO;
347 private ReportingTaskDAO reportingTaskDAO;
348 private TemplateDAO templateDAO;
349 private UserDAO userDAO;
350 private UserGroupDAO userGroupDAO;
351 private AccessPolicyDAO accessPolicyDAO;
352 private RegistryDAO registryDAO;
353 private ClusterCoordinator clusterCoordinator;
354 private HeartbeatMonitor heartbeatMonitor;
355 private LeaderElectionManager leaderElectionManager;
357 // administrative services
358 private AuditService auditService;
361 private FlowRegistryClient flowRegistryClient;
364 private NiFiProperties properties;
365 private DtoFactory dtoFactory;
366 private EntityFactory entityFactory;
368 private Authorizer authorizer;
370 private AuthorizableLookup authorizableLookup;
372 // -----------------------------------------
373 // Synchronization methods
374 // -----------------------------------------
376 public void authorizeAccess(final AuthorizeAccess authorizeAccess) {
377 authorizeAccess.authorize(authorizableLookup);
381 public void verifyRevision(final Revision revision, final NiFiUser user) {
382 final Revision curRevision = revisionManager.getRevision(revision.getComponentId());
383 if (revision.equals(curRevision)) {
387 throw new InvalidRevisionException(revision + " is not the most up-to-date revision. This component appears to have been modified");
391 public void verifyRevisions(final Set<Revision> revisions, final NiFiUser user) {
392 for (final Revision revision : revisions) {
393 verifyRevision(revision, user);
398 public Set<Revision> getRevisionsFromGroup(final String groupId, final Function<ProcessGroup, Set<String>> getComponents) {
399 final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
400 final Set<String> componentIds = getComponents.apply(group);
401 return componentIds.stream().map(id -> revisionManager.getRevision(id)).collect(Collectors.toSet());
405 public Set<Revision> getRevisionsFromSnippet(final String snippetId) {
406 final Snippet snippet = snippetDAO.getSnippet(snippetId);
407 final Set<String> componentIds = new HashSet<>();
408 componentIds.addAll(snippet.getProcessors().keySet());
409 componentIds.addAll(snippet.getFunnels().keySet());
410 componentIds.addAll(snippet.getLabels().keySet());
411 componentIds.addAll(snippet.getConnections().keySet());
412 componentIds.addAll(snippet.getInputPorts().keySet());
413 componentIds.addAll(snippet.getOutputPorts().keySet());
414 componentIds.addAll(snippet.getProcessGroups().keySet());
415 componentIds.addAll(snippet.getRemoteProcessGroups().keySet());
416 return componentIds.stream().map(id -> revisionManager.getRevision(id)).collect(Collectors.toSet());
419 // -----------------------------------------
420 // Verification Operations
421 // -----------------------------------------
424 public void verifyListQueue(final String connectionId) {
425 connectionDAO.verifyList(connectionId);
429 public void verifyCreateConnection(final String groupId, final ConnectionDTO connectionDTO) {
430 connectionDAO.verifyCreate(groupId, connectionDTO);
434 public void verifyUpdateConnection(final ConnectionDTO connectionDTO) {
435 // if connection does not exist, then the update request is likely creating it
436 // so we don't verify since it will fail
437 if (connectionDAO.hasConnection(connectionDTO.getId())) {
438 connectionDAO.verifyUpdate(connectionDTO);
440 connectionDAO.verifyCreate(connectionDTO.getParentGroupId(), connectionDTO);
445 public void verifyDeleteConnection(final String connectionId) {
446 connectionDAO.verifyDelete(connectionId);
450 public void verifyDeleteFunnel(final String funnelId) {
451 funnelDAO.verifyDelete(funnelId);
455 public void verifyUpdateInputPort(final PortDTO inputPortDTO) {
456 // if connection does not exist, then the update request is likely creating it
457 // so we don't verify since it will fail
458 if (inputPortDAO.hasPort(inputPortDTO.getId())) {
459 inputPortDAO.verifyUpdate(inputPortDTO);
464 public void verifyDeleteInputPort(final String inputPortId) {
465 inputPortDAO.verifyDelete(inputPortId);
469 public void verifyUpdateOutputPort(final PortDTO outputPortDTO) {
470 // if connection does not exist, then the update request is likely creating it
471 // so we don't verify since it will fail
472 if (outputPortDAO.hasPort(outputPortDTO.getId())) {
473 outputPortDAO.verifyUpdate(outputPortDTO);
478 public void verifyDeleteOutputPort(final String outputPortId) {
479 outputPortDAO.verifyDelete(outputPortId);
483 public void verifyCreateProcessor(ProcessorDTO processorDTO) {
484 processorDAO.verifyCreate(processorDTO);
488 public void verifyUpdateProcessor(final ProcessorDTO processorDTO) {
489 // if group does not exist, then the update request is likely creating it
490 // so we don't verify since it will fail
491 if (processorDAO.hasProcessor(processorDTO.getId())) {
492 processorDAO.verifyUpdate(processorDTO);
494 verifyCreateProcessor(processorDTO);
499 public void verifyDeleteProcessor(final String processorId) {
500 processorDAO.verifyDelete(processorId);
504 public void verifyScheduleComponents(final String groupId, final ScheduledState state, final Set<String> componentIds) {
505 processGroupDAO.verifyScheduleComponents(groupId, state, componentIds);
509 public void verifyEnableComponents(String processGroupId, ScheduledState state, Set<String> componentIds) {
510 processGroupDAO.verifyEnableComponents(processGroupId, state, componentIds);
514 public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Collection<String> serviceIds) {
515 processGroupDAO.verifyActivateControllerServices(state, serviceIds);
519 public void verifyDeleteProcessGroup(final String groupId) {
520 processGroupDAO.verifyDelete(groupId);
524 public void verifyUpdateRemoteProcessGroup(final RemoteProcessGroupDTO remoteProcessGroupDTO) {
525 // if remote group does not exist, then the update request is likely creating it
526 // so we don't verify since it will fail
527 if (remoteProcessGroupDAO.hasRemoteProcessGroup(remoteProcessGroupDTO.getId())) {
528 remoteProcessGroupDAO.verifyUpdate(remoteProcessGroupDTO);
533 public void verifyUpdateRemoteProcessGroupInputPort(final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
534 remoteProcessGroupDAO.verifyUpdateInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO);
538 public void verifyUpdateRemoteProcessGroupOutputPort(final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
539 remoteProcessGroupDAO.verifyUpdateOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO);
543 public void verifyDeleteRemoteProcessGroup(final String remoteProcessGroupId) {
544 remoteProcessGroupDAO.verifyDelete(remoteProcessGroupId);
548 public void verifyCreateControllerService(ControllerServiceDTO controllerServiceDTO) {
549 controllerServiceDAO.verifyCreate(controllerServiceDTO);
553 public void verifyUpdateControllerService(final ControllerServiceDTO controllerServiceDTO) {
554 // if service does not exist, then the update request is likely creating it
555 // so we don't verify since it will fail
556 if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId())) {
557 controllerServiceDAO.verifyUpdate(controllerServiceDTO);
559 verifyCreateControllerService(controllerServiceDTO);
564 public void verifyUpdateControllerServiceReferencingComponents(final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) {
565 controllerServiceDAO.verifyUpdateReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
569 public void verifyDeleteControllerService(final String controllerServiceId) {
570 controllerServiceDAO.verifyDelete(controllerServiceId);
574 public void verifyCreateReportingTask(ReportingTaskDTO reportingTaskDTO) {
575 reportingTaskDAO.verifyCreate(reportingTaskDTO);
579 public void verifyUpdateReportingTask(final ReportingTaskDTO reportingTaskDTO) {
580 // if tasks does not exist, then the update request is likely creating it
581 // so we don't verify since it will fail
582 if (reportingTaskDAO.hasReportingTask(reportingTaskDTO.getId())) {
583 reportingTaskDAO.verifyUpdate(reportingTaskDTO);
585 verifyCreateReportingTask(reportingTaskDTO);
590 public void verifyDeleteReportingTask(final String reportingTaskId) {
591 reportingTaskDAO.verifyDelete(reportingTaskId);
594 // -----------------------------------------
596 // -----------------------------------------
599 public AccessPolicyEntity updateAccessPolicy(final Revision revision, final AccessPolicyDTO accessPolicyDTO) {
600 final Authorizable authorizable = authorizableLookup.getAccessPolicyById(accessPolicyDTO.getId());
601 final RevisionUpdate<AccessPolicyDTO> snapshot = updateComponent(revision,
603 () -> accessPolicyDAO.updateAccessPolicy(accessPolicyDTO),
605 final Set<TenantEntity> users = accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet());
606 final Set<TenantEntity> userGroups = accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet());
607 final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource());
608 return dtoFactory.createAccessPolicyDto(accessPolicy, userGroups, users, componentReference);
611 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizable);
612 return entityFactory.createAccessPolicyEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
616 public UserEntity updateUser(final Revision revision, final UserDTO userDTO) {
617 final Authorizable usersAuthorizable = authorizableLookup.getTenant();
618 final Set<Group> groups = userGroupDAO.getUserGroupsForUser(userDTO.getId());
619 final Set<AccessPolicy> policies = userGroupDAO.getAccessPoliciesForUser(userDTO.getId());
620 final RevisionUpdate<UserDTO> snapshot = updateComponent(revision,
622 () -> userDAO.updateUser(userDTO),
624 final Set<TenantEntity> tenantEntities = groups.stream().map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet());
625 final Set<AccessPolicySummaryEntity> policyEntities = policies.stream().map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
626 return dtoFactory.createUserDto(user, tenantEntities, policyEntities);
629 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(usersAuthorizable);
630 return entityFactory.createUserEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
634 public UserGroupEntity updateUserGroup(final Revision revision, final UserGroupDTO userGroupDTO) {
635 final Authorizable userGroupsAuthorizable = authorizableLookup.getTenant();
636 final Set<AccessPolicy> policies = userGroupDAO.getAccessPoliciesForUserGroup(userGroupDTO.getId());
637 final RevisionUpdate<UserGroupDTO> snapshot = updateComponent(revision,
638 userGroupsAuthorizable,
639 () -> userGroupDAO.updateUserGroup(userGroupDTO),
641 final Set<TenantEntity> tenantEntities = userGroup.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet());
642 final Set<AccessPolicySummaryEntity> policyEntities = policies.stream().map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
643 return dtoFactory.createUserGroupDto(userGroup, tenantEntities, policyEntities);
647 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(userGroupsAuthorizable);
648 return entityFactory.createUserGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
652 public ConnectionEntity updateConnection(final Revision revision, final ConnectionDTO connectionDTO) {
653 final Connection connectionNode = connectionDAO.getConnection(connectionDTO.getId());
655 final RevisionUpdate<ConnectionDTO> snapshot = updateComponent(
658 () -> connectionDAO.updateConnection(connectionDTO),
659 connection -> dtoFactory.createConnectionDto(connection));
661 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connectionNode);
662 final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionNode.getIdentifier()));
663 return entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status);
667 public ProcessorEntity updateProcessor(final Revision revision, final ProcessorDTO processorDTO) {
668 // get the component, ensure we have access to it, and perform the update request
669 final ProcessorNode processorNode = processorDAO.getProcessor(processorDTO.getId());
670 final RevisionUpdate<ProcessorDTO> snapshot = updateComponent(revision,
672 () -> processorDAO.updateProcessor(processorDTO),
674 awaitValidationCompletion(proc);
675 return dtoFactory.createProcessorDto(proc);
678 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processorNode);
679 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processorNode));
680 final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processorNode.getIdentifier()));
681 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processorNode.getIdentifier()));
682 final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
683 return entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
686 private void awaitValidationCompletion(final ComponentNode component) {
687 component.getValidationStatus(VALIDATION_WAIT_MILLIS, TimeUnit.MILLISECONDS);
691 public LabelEntity updateLabel(final Revision revision, final LabelDTO labelDTO) {
692 final Label labelNode = labelDAO.getLabel(labelDTO.getId());
693 final RevisionUpdate<LabelDTO> snapshot = updateComponent(revision,
695 () -> labelDAO.updateLabel(labelDTO),
696 label -> dtoFactory.createLabelDto(label));
698 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(labelNode);
699 return entityFactory.createLabelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
703 public FunnelEntity updateFunnel(final Revision revision, final FunnelDTO funnelDTO) {
704 final Funnel funnelNode = funnelDAO.getFunnel(funnelDTO.getId());
705 final RevisionUpdate<FunnelDTO> snapshot = updateComponent(revision,
707 () -> funnelDAO.updateFunnel(funnelDTO),
708 funnel -> dtoFactory.createFunnelDto(funnel));
710 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnelNode);
711 return entityFactory.createFunnelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
716 * Updates a component with the given revision, using the provided supplier to call
717 * into the appropriate DAO and the provided function to convert the component into a DTO.
719 * @param revision the current revision
720 * @param daoUpdate a Supplier that will update the component via the appropriate DAO
721 * @param dtoCreation a Function to convert a component into a dao
722 * @param <D> the DTO Type of the updated component
723 * @param <C> the Component Type of the updated component
724 * @return A RevisionUpdate that represents the new configuration
726 private <D, C> RevisionUpdate<D> updateComponent(final Revision revision, final Authorizable authorizable, final Supplier<C> daoUpdate, final Function<C, D> dtoCreation) {
728 final NiFiUser user = NiFiUserUtils.getNiFiUser();
730 final RevisionUpdate<D> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(revision), user, new UpdateRevisionTask<D>() {
732 public RevisionUpdate<D> update() {
733 // get the updated component
734 final C component = daoUpdate.get();
736 // save updated controller
737 controllerFacade.save();
739 final D dto = dtoCreation.apply(component);
741 final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId());
742 final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity());
743 return new StandardRevisionUpdate<>(dto, lastModification);
747 return updatedComponent;
748 } catch (final ExpiredRevisionClaimException erce) {
749 throw new InvalidRevisionException("Failed to update component " + authorizable, erce);
755 public void verifyUpdateSnippet(final SnippetDTO snippetDto, final Set<String> affectedComponentIds) {
756 // if snippet does not exist, then the update request is likely creating it
757 // so we don't verify since it will fail
758 if (snippetDAO.hasSnippet(snippetDto.getId())) {
759 snippetDAO.verifyUpdateSnippetComponent(snippetDto);
764 public SnippetEntity updateSnippet(final Set<Revision> revisions, final SnippetDTO snippetDto) {
765 final NiFiUser user = NiFiUserUtils.getNiFiUser();
766 final RevisionClaim revisionClaim = new StandardRevisionClaim(revisions);
768 final RevisionUpdate<SnippetDTO> snapshot;
770 snapshot = revisionManager.updateRevision(revisionClaim, user, new UpdateRevisionTask<SnippetDTO>() {
772 public RevisionUpdate<SnippetDTO> update() {
773 // get the updated component
774 final Snippet snippet = snippetDAO.updateSnippetComponents(snippetDto);
777 snippetDAO.dropSnippet(snippet.getId());
779 // save updated controller
780 controllerFacade.save();
782 // increment the revisions
783 final Set<Revision> updatedRevisions = revisions.stream().map(revision -> {
784 final Revision currentRevision = revisionManager.getRevision(revision.getComponentId());
785 return currentRevision.incrementRevision(revision.getClientId());
786 }).collect(Collectors.toSet());
788 final SnippetDTO dto = dtoFactory.createSnippetDto(snippet);
789 return new StandardRevisionUpdate<>(dto, null, updatedRevisions);
792 } catch (final ExpiredRevisionClaimException e) {
793 throw new InvalidRevisionException("Failed to update Snippet", e);
796 return entityFactory.createSnippetEntity(snapshot.getComponent());
800 public PortEntity updateInputPort(final Revision revision, final PortDTO inputPortDTO) {
801 final Port inputPortNode = inputPortDAO.getPort(inputPortDTO.getId());
802 final RevisionUpdate<PortDTO> snapshot = updateComponent(revision,
804 () -> inputPortDAO.updatePort(inputPortDTO),
805 port -> dtoFactory.createPortDto(port));
807 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(inputPortNode);
808 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(inputPortNode));
809 final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(inputPortNode.getIdentifier()));
810 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(inputPortNode.getIdentifier()));
811 final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
812 return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
816 public PortEntity updateOutputPort(final Revision revision, final PortDTO outputPortDTO) {
817 final Port outputPortNode = outputPortDAO.getPort(outputPortDTO.getId());
818 final RevisionUpdate<PortDTO> snapshot = updateComponent(revision,
820 () -> outputPortDAO.updatePort(outputPortDTO),
821 port -> dtoFactory.createPortDto(port));
823 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(outputPortNode);
824 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(outputPortNode), NiFiUserUtils.getNiFiUser());
825 final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(outputPortNode.getIdentifier()));
826 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(outputPortNode.getIdentifier()));
827 final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
828 return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
832 public RemoteProcessGroupEntity updateRemoteProcessGroup(final Revision revision, final RemoteProcessGroupDTO remoteProcessGroupDTO) {
833 final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId());
834 final RevisionUpdate<RemoteProcessGroupDTO> snapshot = updateComponent(
836 remoteProcessGroupNode,
837 () -> remoteProcessGroupDAO.updateRemoteProcessGroup(remoteProcessGroupDTO),
838 remoteProcessGroup -> dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));
840 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode);
841 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode));
842 final RevisionDTO updateRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
843 final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroupNode,
844 controllerFacade.getRemoteProcessGroupStatus(remoteProcessGroupNode.getIdentifier()));
845 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(remoteProcessGroupNode.getIdentifier()));
846 final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
847 return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), updateRevision, permissions, operatePermissions, status, bulletinEntities);
851 public RemoteProcessGroupPortEntity updateRemoteProcessGroupInputPort(
852 final Revision revision, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
854 final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupPortDTO.getGroupId());
855 final RevisionUpdate<RemoteProcessGroupPortDTO> snapshot = updateComponent(
857 remoteProcessGroupNode,
858 () -> remoteProcessGroupDAO.updateRemoteProcessGroupInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO),
859 remoteGroupPort -> dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort));
861 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode);
862 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode));
863 final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
864 return entityFactory.createRemoteProcessGroupPortEntity(snapshot.getComponent(), updatedRevision, permissions, operatePermissions);
868 public RemoteProcessGroupPortEntity updateRemoteProcessGroupOutputPort(
869 final Revision revision, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
871 final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupPortDTO.getGroupId());
872 final RevisionUpdate<RemoteProcessGroupPortDTO> snapshot = updateComponent(
874 remoteProcessGroupNode,
875 () -> remoteProcessGroupDAO.updateRemoteProcessGroupOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO),
876 remoteGroupPort -> dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort));
878 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode);
879 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode));
880 final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
881 return entityFactory.createRemoteProcessGroupPortEntity(snapshot.getComponent(), updatedRevision, permissions, operatePermissions);
885 public Set<AffectedComponentDTO> getActiveComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) {
886 final ProcessGroup group = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId());
888 throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistryDto.getProcessGroupId());
891 final Map<String, String> variableMap = new HashMap<>();
892 variableRegistryDto.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null
893 .map(VariableEntity::getVariable)
894 .forEach(var -> variableMap.put(var.getName(), var.getValue()));
896 final Set<AffectedComponentDTO> affectedComponentDtos = new HashSet<>();
898 final Set<String> updatedVariableNames = getUpdatedVariables(group, variableMap);
899 for (final String variableName : updatedVariableNames) {
900 final Set<ComponentNode> affectedComponents = group.getComponentsAffectedByVariable(variableName);
902 for (final ComponentNode component : affectedComponents) {
903 if (component instanceof ProcessorNode) {
904 final ProcessorNode procNode = (ProcessorNode) component;
905 if (procNode.isRunning()) {
906 affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(procNode));
908 } else if (component instanceof ControllerServiceNode) {
909 final ControllerServiceNode serviceNode = (ControllerServiceNode) component;
910 if (serviceNode.isActive()) {
911 affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(serviceNode));
914 throw new RuntimeException("Found unexpected type of Component [" + component.getCanonicalClassName() + "] dependending on variable");
919 return affectedComponentDtos;
923 public Set<AffectedComponentEntity> getComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) {
924 final ProcessGroup group = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId());
926 throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistryDto.getProcessGroupId());
929 final Map<String, String> variableMap = new HashMap<>();
930 variableRegistryDto.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null
931 .map(VariableEntity::getVariable)
932 .forEach(var -> variableMap.put(var.getName(), var.getValue()));
934 final Set<AffectedComponentEntity> affectedComponentEntities = new HashSet<>();
936 final Set<String> updatedVariableNames = getUpdatedVariables(group, variableMap);
937 for (final String variableName : updatedVariableNames) {
938 final Set<ComponentNode> affectedComponents = group.getComponentsAffectedByVariable(variableName);
939 affectedComponentEntities.addAll(dtoFactory.createAffectedComponentEntities(affectedComponents, revisionManager));
942 return affectedComponentEntities;
945 private Set<String> getUpdatedVariables(final ProcessGroup group, final Map<String, String> newVariableValues) {
946 final Set<String> updatedVariableNames = new HashSet<>();
948 final ComponentVariableRegistry registry = group.getVariableRegistry();
949 for (final Map.Entry<String, String> entry : newVariableValues.entrySet()) {
950 final String varName = entry.getKey();
951 final String newValue = entry.getValue();
953 final String curValue = registry.getVariableValue(varName);
954 if (!Objects.equals(newValue, curValue)) {
955 updatedVariableNames.add(varName);
959 return updatedVariableNames;
964 public VariableRegistryEntity updateVariableRegistry(Revision revision, VariableRegistryDTO variableRegistryDto) {
965 final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId());
966 final RevisionUpdate<VariableRegistryDTO> snapshot = updateComponent(revision,
968 () -> processGroupDAO.updateVariableRegistry(variableRegistryDto),
969 processGroup -> dtoFactory.createVariableRegistryDto(processGroup, revisionManager));
971 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode);
972 final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
973 return entityFactory.createVariableRegistryEntity(snapshot.getComponent(), updatedRevision, permissions);
978 public ProcessGroupEntity updateProcessGroup(final Revision revision, final ProcessGroupDTO processGroupDTO) {
979 final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(processGroupDTO.getId());
980 final RevisionUpdate<ProcessGroupDTO> snapshot = updateComponent(revision,
982 () -> processGroupDAO.updateProcessGroup(processGroupDTO),
983 processGroup -> dtoFactory.createProcessGroupDto(processGroup));
985 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode);
986 final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
987 final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroupNode.getIdentifier()));
988 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroupNode.getIdentifier()));
989 final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
990 return entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, permissions, status, bulletinEntities);
994 public void verifyUpdateProcessGroup(ProcessGroupDTO processGroupDTO) {
995 if (processGroupDAO.hasProcessGroup(processGroupDTO.getId())) {
996 processGroupDAO.verifyUpdate(processGroupDTO);
1001 public ScheduleComponentsEntity enableComponents(String processGroupId, ScheduledState state, Map<String, Revision> componentRevisions) {
1002 final NiFiUser user = NiFiUserUtils.getNiFiUser();
1004 final RevisionUpdate<ScheduleComponentsEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new
1005 UpdateRevisionTask<ScheduleComponentsEntity>() {
1007 public RevisionUpdate<ScheduleComponentsEntity> update() {
1008 // schedule the components
1009 processGroupDAO.enableComponents(processGroupId, state, componentRevisions.keySet());
1011 // update the revisions
1012 final Map<String, Revision> updatedRevisions = new HashMap<>();
1013 for (final Revision revision : componentRevisions.values()) {
1014 final Revision currentRevision = revisionManager.getRevision(revision.getComponentId());
1015 updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId()));
1019 controllerFacade.save();
1021 // gather details for response
1022 final ScheduleComponentsEntity entity = new ScheduleComponentsEntity();
1023 entity.setId(processGroupId);
1024 entity.setState(state.name());
1025 return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values()));
1029 return updatedComponent.getComponent();
1033 public ScheduleComponentsEntity scheduleComponents(final String processGroupId, final ScheduledState state, final Map<String, Revision> componentRevisions) {
1034 final NiFiUser user = NiFiUserUtils.getNiFiUser();
1035 final RevisionUpdate<ScheduleComponentsEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new
1036 UpdateRevisionTask<ScheduleComponentsEntity>() {
1038 public RevisionUpdate<ScheduleComponentsEntity> update() {
1039 // schedule the components
1040 processGroupDAO.scheduleComponents(processGroupId, state, componentRevisions.keySet());
1042 // update the revisions
1043 final Map<String, Revision> updatedRevisions = new HashMap<>();
1044 for (final Revision revision : componentRevisions.values()) {
1045 final Revision currentRevision = revisionManager.getRevision(revision.getComponentId());
1046 updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId()));
1050 controllerFacade.save();
1052 // gather details for response
1053 final ScheduleComponentsEntity entity = new ScheduleComponentsEntity();
1054 entity.setId(processGroupId);
1055 entity.setState(state.name());
1056 return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values()));
1060 return updatedComponent.getComponent();
1064 public ActivateControllerServicesEntity activateControllerServices(final String processGroupId, final ControllerServiceState state, final Map<String, Revision> serviceRevisions) {
1065 final NiFiUser user = NiFiUserUtils.getNiFiUser();
1066 final RevisionUpdate<ActivateControllerServicesEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(serviceRevisions.values()), user,
1067 new UpdateRevisionTask<ActivateControllerServicesEntity>() {
1069 public RevisionUpdate<ActivateControllerServicesEntity> update() {
1070 // schedule the components
1071 processGroupDAO.activateControllerServices(processGroupId, state, serviceRevisions.keySet());
1073 // update the revisions
1074 final Map<String, Revision> updatedRevisions = new HashMap<>();
1075 for (final Revision revision : serviceRevisions.values()) {
1076 final Revision currentRevision = revisionManager.getRevision(revision.getComponentId());
1077 updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId()));
1081 controllerFacade.save();
1083 // gather details for response
1084 final ActivateControllerServicesEntity entity = new ActivateControllerServicesEntity();
1085 entity.setId(processGroupId);
1086 entity.setState(state.name());
1087 return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values()));
1091 return updatedComponent.getComponent();
1096 public ControllerConfigurationEntity updateControllerConfiguration(final Revision revision, final ControllerConfigurationDTO controllerConfigurationDTO) {
1097 final RevisionUpdate<ControllerConfigurationDTO> updatedComponent = updateComponent(
1101 if (controllerConfigurationDTO.getMaxTimerDrivenThreadCount() != null) {
1102 controllerFacade.setMaxTimerDrivenThreadCount(controllerConfigurationDTO.getMaxTimerDrivenThreadCount());
1104 if (controllerConfigurationDTO.getMaxEventDrivenThreadCount() != null) {
1105 controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount());
1108 return controllerConfigurationDTO;
1110 controller -> dtoFactory.createControllerConfigurationDto(controllerFacade));
1112 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerFacade);
1113 final RevisionDTO updateRevision = dtoFactory.createRevisionDTO(updatedComponent.getLastModification());
1114 return entityFactory.createControllerConfigurationEntity(updatedComponent.getComponent(), updateRevision, permissions);
1119 public NodeDTO updateNode(final NodeDTO nodeDTO) {
1120 final NiFiUser user = NiFiUserUtils.getNiFiUser();
1122 throw new WebApplicationException(new Throwable("Unable to access details for current user."));
1124 final String userDn = user.getIdentity();
1126 final NodeIdentifier nodeId = clusterCoordinator.getNodeIdentifier(nodeDTO.getNodeId());
1127 if (nodeId == null) {
1128 throw new UnknownNodeException("No node exists with ID " + nodeDTO.getNodeId());
1132 if (NodeConnectionState.CONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) {
1133 clusterCoordinator.requestNodeConnect(nodeId, userDn);
1134 } else if (NodeConnectionState.OFFLOADING.name().equalsIgnoreCase(nodeDTO.getStatus())) {
1135 clusterCoordinator.requestNodeOffload(nodeId, OffloadCode.OFFLOADED,
1136 "User " + userDn + " requested that node be offloaded");
1137 } else if (NodeConnectionState.DISCONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) {
1138 clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.USER_DISCONNECTED,
1139 "User " + userDn + " requested that node be disconnected from cluster");
1142 return getNode(nodeId);
1146 public CounterDTO updateCounter(final String counterId) {
1147 return dtoFactory.createCounterDto(controllerFacade.resetCounter(counterId));
1151 public void verifyCanClearProcessorState(final String processorId) {
1152 processorDAO.verifyClearState(processorId);
1156 public void clearProcessorState(final String processorId) {
1157 processorDAO.clearState(processorId);
1161 public void verifyCanClearControllerServiceState(final String controllerServiceId) {
1162 controllerServiceDAO.verifyClearState(controllerServiceId);
1166 public void clearControllerServiceState(final String controllerServiceId) {
1167 controllerServiceDAO.clearState(controllerServiceId);
1171 public void verifyCanClearReportingTaskState(final String reportingTaskId) {
1172 reportingTaskDAO.verifyClearState(reportingTaskId);
1176 public void clearReportingTaskState(final String reportingTaskId) {
1177 reportingTaskDAO.clearState(reportingTaskId);
1181 public ConnectionEntity deleteConnection(final Revision revision, final String connectionId) {
1182 final Connection connection = connectionDAO.getConnection(connectionId);
1183 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
1184 final ConnectionDTO snapshot = deleteComponent(
1186 connection.getResource(),
1187 () -> connectionDAO.deleteConnection(connectionId),
1188 false, // no policies to remove
1189 dtoFactory.createConnectionDto(connection));
1191 return entityFactory.createConnectionEntity(snapshot, null, permissions, null);
1195 public DropRequestDTO deleteFlowFileDropRequest(final String connectionId, final String dropRequestId) {
1196 return dtoFactory.createDropRequestDTO(connectionDAO.deleteFlowFileDropRequest(connectionId, dropRequestId));
1200 public ListingRequestDTO deleteFlowFileListingRequest(final String connectionId, final String listingRequestId) {
1201 final Connection connection = connectionDAO.getConnection(connectionId);
1202 final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.deleteFlowFileListingRequest(connectionId, listingRequestId));
1204 // include whether the source and destination are running
1205 if (connection.getSource() != null) {
1206 listRequest.setSourceRunning(connection.getSource().isRunning());
1208 if (connection.getDestination() != null) {
1209 listRequest.setDestinationRunning(connection.getDestination().isRunning());
1216 public ProcessorEntity deleteProcessor(final Revision revision, final String processorId) {
1217 final ProcessorNode processor = processorDAO.getProcessor(processorId);
1218 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor);
1219 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor));
1220 final ProcessorDTO snapshot = deleteComponent(
1222 processor.getResource(),
1223 () -> processorDAO.deleteProcessor(processorId),
1225 dtoFactory.createProcessorDto(processor));
1227 return entityFactory.createProcessorEntity(snapshot, null, permissions, operatePermissions, null, null);
1231 public ProcessorEntity terminateProcessor(final String processorId) {
1232 processorDAO.terminate(processorId);
1233 return getProcessor(processorId);
1237 public void verifyTerminateProcessor(final String processorId) {
1238 processorDAO.verifyTerminate(processorId);
1242 public LabelEntity deleteLabel(final Revision revision, final String labelId) {
1243 final Label label = labelDAO.getLabel(labelId);
1244 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label);
1245 final LabelDTO snapshot = deleteComponent(
1247 label.getResource(),
1248 () -> labelDAO.deleteLabel(labelId),
1250 dtoFactory.createLabelDto(label));
1252 return entityFactory.createLabelEntity(snapshot, null, permissions);
1256 public UserEntity deleteUser(final Revision revision, final String userId) {
1257 final User user = userDAO.getUser(userId);
1258 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
1259 final Set<TenantEntity> userGroups = user != null ? userGroupDAO.getUserGroupsForUser(userId).stream()
1260 .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()) : null;
1261 final Set<AccessPolicySummaryEntity> policyEntities = user != null ? userGroupDAO.getAccessPoliciesForUser(userId).stream()
1262 .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()) : null;
1264 final String resourceIdentifier = ResourceFactory.getTenantResource().getIdentifier() + "/" + userId;
1265 final UserDTO snapshot = deleteComponent(
1269 public String getIdentifier() {
1270 return resourceIdentifier;
1274 public String getName() {
1275 return resourceIdentifier;
1279 public String getSafeDescription() {
1280 return "User " + userId;
1283 () -> userDAO.deleteUser(userId),
1284 false, // no user specific policies to remove
1285 dtoFactory.createUserDto(user, userGroups, policyEntities));
1287 return entityFactory.createUserEntity(snapshot, null, permissions);
1291 public UserGroupEntity deleteUserGroup(final Revision revision, final String userGroupId) {
1292 final Group userGroup = userGroupDAO.getUserGroup(userGroupId);
1293 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
1294 final Set<TenantEntity> users = userGroup != null ? userGroup.getUsers().stream()
1295 .map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()) : null;
1296 final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(userGroup.getIdentifier()).stream()
1297 .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
1299 final String resourceIdentifier = ResourceFactory.getTenantResource().getIdentifier() + "/" + userGroupId;
1300 final UserGroupDTO snapshot = deleteComponent(
1304 public String getIdentifier() {
1305 return resourceIdentifier;
1309 public String getName() {
1310 return resourceIdentifier;
1314 public String getSafeDescription() {
1315 return "User Group " + userGroupId;
1318 () -> userGroupDAO.deleteUserGroup(userGroupId),
1319 false, // no user group specific policies to remove
1320 dtoFactory.createUserGroupDto(userGroup, users, policyEntities));
1322 return entityFactory.createUserGroupEntity(snapshot, null, permissions);
1326 public AccessPolicyEntity deleteAccessPolicy(final Revision revision, final String accessPolicyId) {
1327 final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(accessPolicyId);
1328 final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource());
1329 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicyId));
1330 final Set<TenantEntity> userGroups = accessPolicy != null ? accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()) : null;
1331 final Set<TenantEntity> users = accessPolicy != null ? accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()) : null;
1332 final AccessPolicyDTO snapshot = deleteComponent(
1336 public String getIdentifier() {
1337 return accessPolicy.getResource();
1341 public String getName() {
1342 return accessPolicy.getResource();
1346 public String getSafeDescription() {
1347 return "Policy " + accessPolicyId;
1350 () -> accessPolicyDAO.deleteAccessPolicy(accessPolicyId),
1351 false, // no need to clean up any policies as it's already been removed above
1352 dtoFactory.createAccessPolicyDto(accessPolicy, userGroups, users, componentReference));
1354 return entityFactory.createAccessPolicyEntity(snapshot, null, permissions);
1358 public FunnelEntity deleteFunnel(final Revision revision, final String funnelId) {
1359 final Funnel funnel = funnelDAO.getFunnel(funnelId);
1360 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnel);
1361 final FunnelDTO snapshot = deleteComponent(
1363 funnel.getResource(),
1364 () -> funnelDAO.deleteFunnel(funnelId),
1366 dtoFactory.createFunnelDto(funnel));
1368 return entityFactory.createFunnelEntity(snapshot, null, permissions);
1372 * Deletes a component using the Optimistic Locking Manager
1374 * @param revision the current revision
1375 * @param resource the resource being removed
1376 * @param deleteAction the action that deletes the component via the appropriate DAO object
1377 * @param cleanUpPolicies whether or not the policies for this resource should be removed as well - not necessary when there are
1378 * no component specific policies or if the policies of the component are inherited
1379 * @return a dto that represents the new configuration
1381 private <D, C> D deleteComponent(final Revision revision, final Resource resource, final Runnable deleteAction, final boolean cleanUpPolicies, final D dto) {
1382 final RevisionClaim claim = new StandardRevisionClaim(revision);
1383 final NiFiUser user = NiFiUserUtils.getNiFiUser();
1385 return revisionManager.deleteRevision(claim, user, new DeleteRevisionTask<D>() {
1387 public D performTask() {
1388 logger.debug("Attempting to delete component {} with claim {}", resource.getIdentifier(), claim);
1390 // run the delete action
1394 controllerFacade.save();
1395 logger.debug("Deletion of component {} was successful", resource.getIdentifier());
1397 if (cleanUpPolicies) {
1398 cleanUpPolicies(resource);
1407 * Clean up the policies for the specified component resource.
1409 * @param componentResource the resource for the component
1411 private void cleanUpPolicies(final Resource componentResource) {
1412 // ensure the authorizer supports configuration
1413 if (accessPolicyDAO.supportsConfigurableAuthorizer()) {
1414 final List<Resource> resources = new ArrayList<>();
1415 resources.add(componentResource);
1416 resources.add(ResourceFactory.getDataResource(componentResource));
1417 resources.add(ResourceFactory.getProvenanceDataResource(componentResource));
1418 resources.add(ResourceFactory.getDataTransferResource(componentResource));
1419 resources.add(ResourceFactory.getPolicyResource(componentResource));
1421 for (final Resource resource : resources) {
1422 for (final RequestAction action : RequestAction.values()) {
1424 // since the component is being deleted, also delete any relevant access policies
1425 final AccessPolicy readPolicy = accessPolicyDAO.getAccessPolicy(action, resource.getIdentifier());
1426 if (readPolicy != null) {
1427 accessPolicyDAO.deleteAccessPolicy(readPolicy.getIdentifier());
1429 } catch (final Exception e) {
1430 logger.warn(String.format("Unable to remove access policy for %s %s after component removal.", action, resource.getIdentifier()), e);
1438 public void verifyDeleteSnippet(final String snippetId, final Set<String> affectedComponentIds) {
1439 snippetDAO.verifyDeleteSnippetComponents(snippetId);
1443 public SnippetEntity deleteSnippet(final Set<Revision> revisions, final String snippetId) {
1444 final Snippet snippet = snippetDAO.getSnippet(snippetId);
1446 // grab the resources in the snippet so we can delete the policies afterwards
1447 final Set<Resource> snippetResources = new HashSet<>();
1448 snippet.getProcessors().keySet().forEach(id -> snippetResources.add(processorDAO.getProcessor(id).getResource()));
1449 snippet.getInputPorts().keySet().forEach(id -> snippetResources.add(inputPortDAO.getPort(id).getResource()));
1450 snippet.getOutputPorts().keySet().forEach(id -> snippetResources.add(outputPortDAO.getPort(id).getResource()));
1451 snippet.getFunnels().keySet().forEach(id -> snippetResources.add(funnelDAO.getFunnel(id).getResource()));
1452 snippet.getLabels().keySet().forEach(id -> snippetResources.add(labelDAO.getLabel(id).getResource()));
1453 snippet.getRemoteProcessGroups().keySet().forEach(id -> snippetResources.add(remoteProcessGroupDAO.getRemoteProcessGroup(id).getResource()));
1454 snippet.getProcessGroups().keySet().forEach(id -> {
1455 final ProcessGroup processGroup = processGroupDAO.getProcessGroup(id);
1457 // add the process group
1458 snippetResources.add(processGroup.getResource());
1460 // add each encapsulated component
1461 processGroup.findAllProcessors().forEach(processor -> snippetResources.add(processor.getResource()));
1462 processGroup.findAllInputPorts().forEach(inputPort -> snippetResources.add(inputPort.getResource()));
1463 processGroup.findAllOutputPorts().forEach(outputPort -> snippetResources.add(outputPort.getResource()));
1464 processGroup.findAllFunnels().forEach(funnel -> snippetResources.add(funnel.getResource()));
1465 processGroup.findAllLabels().forEach(label -> snippetResources.add(label.getResource()));
1466 processGroup.findAllProcessGroups().forEach(childGroup -> snippetResources.add(childGroup.getResource()));
1467 processGroup.findAllRemoteProcessGroups().forEach(remoteProcessGroup -> snippetResources.add(remoteProcessGroup.getResource()));
1468 processGroup.findAllTemplates().forEach(template -> snippetResources.add(template.getResource()));
1469 processGroup.findAllControllerServices().forEach(controllerService -> snippetResources.add(controllerService.getResource()));
1472 final NiFiUser user = NiFiUserUtils.getNiFiUser();
1473 final RevisionClaim claim = new StandardRevisionClaim(revisions);
1474 final SnippetDTO dto = revisionManager.deleteRevision(claim, user, new DeleteRevisionTask<SnippetDTO>() {
1476 public SnippetDTO performTask() {
1477 // delete the components in the snippet
1478 snippetDAO.deleteSnippetComponents(snippetId);
1481 snippetDAO.dropSnippet(snippetId);
1484 controllerFacade.save();
1486 // create the dto for the snippet that was just removed
1487 return dtoFactory.createSnippetDto(snippet);
1491 // clean up component policies
1492 snippetResources.forEach(resource -> cleanUpPolicies(resource));
1494 return entityFactory.createSnippetEntity(dto);
1498 public PortEntity deleteInputPort(final Revision revision, final String inputPortId) {
1499 final Port port = inputPortDAO.getPort(inputPortId);
1500 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port);
1501 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port));
1502 final PortDTO snapshot = deleteComponent(
1505 () -> inputPortDAO.deletePort(inputPortId),
1507 dtoFactory.createPortDto(port));
1509 return entityFactory.createPortEntity(snapshot, null, permissions, operatePermissions, null, null);
1513 public PortEntity deleteOutputPort(final Revision revision, final String outputPortId) {
1514 final Port port = outputPortDAO.getPort(outputPortId);
1515 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port);
1516 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port));
1517 final PortDTO snapshot = deleteComponent(
1520 () -> outputPortDAO.deletePort(outputPortId),
1522 dtoFactory.createPortDto(port));
1524 return entityFactory.createPortEntity(snapshot, null, permissions, operatePermissions, null, null);
1528 public ProcessGroupEntity deleteProcessGroup(final Revision revision, final String groupId) {
1529 final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
1530 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
1532 // grab the resources in the snippet so we can delete the policies afterwards
1533 final Set<Resource> groupResources = new HashSet<>();
1534 processGroup.findAllProcessors().forEach(processor -> groupResources.add(processor.getResource()));
1535 processGroup.findAllInputPorts().forEach(inputPort -> groupResources.add(inputPort.getResource()));
1536 processGroup.findAllOutputPorts().forEach(outputPort -> groupResources.add(outputPort.getResource()));
1537 processGroup.findAllFunnels().forEach(funnel -> groupResources.add(funnel.getResource()));
1538 processGroup.findAllLabels().forEach(label -> groupResources.add(label.getResource()));
1539 processGroup.findAllProcessGroups().forEach(childGroup -> groupResources.add(childGroup.getResource()));
1540 processGroup.findAllRemoteProcessGroups().forEach(remoteProcessGroup -> groupResources.add(remoteProcessGroup.getResource()));
1541 processGroup.findAllTemplates().forEach(template -> groupResources.add(template.getResource()));
1542 processGroup.findAllControllerServices().forEach(controllerService -> groupResources.add(controllerService.getResource()));
1544 final ProcessGroupDTO snapshot = deleteComponent(
1546 processGroup.getResource(),
1547 () -> processGroupDAO.deleteProcessGroup(groupId),
1549 dtoFactory.createProcessGroupDto(processGroup));
1551 // delete all applicable component policies
1552 groupResources.forEach(groupResource -> cleanUpPolicies(groupResource));
1554 return entityFactory.createProcessGroupEntity(snapshot, null, permissions, null, null);
1558 public RemoteProcessGroupEntity deleteRemoteProcessGroup(final Revision revision, final String remoteProcessGroupId) {
1559 final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
1560 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup);
1561 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroup));
1562 final RemoteProcessGroupDTO snapshot = deleteComponent(
1564 remoteProcessGroup.getResource(),
1565 () -> remoteProcessGroupDAO.deleteRemoteProcessGroup(remoteProcessGroupId),
1567 dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));
1569 return entityFactory.createRemoteProcessGroupEntity(snapshot, null, permissions, operatePermissions, null, null);
1573 public void deleteTemplate(final String id) {
1574 // delete the template and save the flow
1575 templateDAO.deleteTemplate(id);
1576 controllerFacade.save();
1580 public ConnectionEntity createConnection(final Revision revision, final String groupId, final ConnectionDTO connectionDTO) {
1581 final RevisionUpdate<ConnectionDTO> snapshot = createComponent(
1584 () -> connectionDAO.createConnection(groupId, connectionDTO),
1585 connection -> dtoFactory.createConnectionDto(connection));
1587 final Connection connection = connectionDAO.getConnection(connectionDTO.getId());
1588 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
1589 final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionDTO.getId()));
1590 return entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status);
1594 public DropRequestDTO createFlowFileDropRequest(final String connectionId, final String dropRequestId) {
1595 return dtoFactory.createDropRequestDTO(connectionDAO.createFlowFileDropRequest(connectionId, dropRequestId));
1599 public ListingRequestDTO createFlowFileListingRequest(final String connectionId, final String listingRequestId) {
1600 final Connection connection = connectionDAO.getConnection(connectionId);
1601 final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.createFlowFileListingRequest(connectionId, listingRequestId));
1603 // include whether the source and destination are running
1604 if (connection.getSource() != null) {
1605 listRequest.setSourceRunning(connection.getSource().isRunning());
1607 if (connection.getDestination() != null) {
1608 listRequest.setDestinationRunning(connection.getDestination().isRunning());
1615 public ProcessorEntity createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) {
1616 final RevisionUpdate<ProcessorDTO> snapshot = createComponent(
1619 () -> processorDAO.createProcessor(groupId, processorDTO),
1621 awaitValidationCompletion(processor);
1622 return dtoFactory.createProcessorDto(processor);
1625 final ProcessorNode processor = processorDAO.getProcessor(processorDTO.getId());
1626 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor);
1627 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor));
1628 final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processorDTO.getId()));
1629 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processorDTO.getId()));
1630 final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
1631 return entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
1635 public LabelEntity createLabel(final Revision revision, final String groupId, final LabelDTO labelDTO) {
1636 final RevisionUpdate<LabelDTO> snapshot = createComponent(
1639 () -> labelDAO.createLabel(groupId, labelDTO),
1640 label -> dtoFactory.createLabelDto(label));
1642 final Label label = labelDAO.getLabel(labelDTO.getId());
1643 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label);
1644 return entityFactory.createLabelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
1648 * Creates a component using the optimistic locking manager.
1650 * @param componentDto the DTO that will be used to create the component
1651 * @param daoCreation A Supplier that will create the NiFi Component to use
1652 * @param dtoCreation a Function that will convert the NiFi Component into a corresponding DTO
1653 * @param <D> the DTO Type
1654 * @param <C> the NiFi Component Type
1655 * @return a RevisionUpdate that represents the updated configuration
1657 private <D, C> RevisionUpdate<D> createComponent(final Revision revision, final ComponentDTO componentDto, final Supplier<C> daoCreation, final Function<C, D> dtoCreation) {
1658 final NiFiUser user = NiFiUserUtils.getNiFiUser();
1660 // read lock on the containing group
1661 // request claim for component to be created... revision already verified (version == 0)
1662 final RevisionClaim claim = new StandardRevisionClaim(revision);
1664 // update revision through revision manager
1665 return revisionManager.updateRevision(claim, user, () -> {
1666 // add the component
1667 final C component = daoCreation.get();
1670 controllerFacade.save();
1672 final D dto = dtoCreation.apply(component);
1673 final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
1674 return new StandardRevisionUpdate<>(dto, lastMod);
1679 public BulletinEntity createBulletin(final BulletinDTO bulletinDTO, final Boolean canRead){
1680 final Bulletin bulletin = BulletinFactory.createBulletin(bulletinDTO.getCategory(),bulletinDTO.getLevel(),bulletinDTO.getMessage());
1681 bulletinRepository.addBulletin(bulletin);
1682 return entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin),canRead);
1686 public FunnelEntity createFunnel(final Revision revision, final String groupId, final FunnelDTO funnelDTO) {
1687 final RevisionUpdate<FunnelDTO> snapshot = createComponent(
1690 () -> funnelDAO.createFunnel(groupId, funnelDTO),
1691 funnel -> dtoFactory.createFunnelDto(funnel));
1693 final Funnel funnel = funnelDAO.getFunnel(funnelDTO.getId());
1694 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnel);
1695 return entityFactory.createFunnelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
1699 public AccessPolicyEntity createAccessPolicy(final Revision revision, final AccessPolicyDTO accessPolicyDTO) {
1700 final Authorizable tenantAuthorizable = authorizableLookup.getTenant();
1701 final String creator = NiFiUserUtils.getNiFiUserIdentity();
1703 final AccessPolicy newAccessPolicy = accessPolicyDAO.createAccessPolicy(accessPolicyDTO);
1704 final ComponentReferenceEntity componentReference = createComponentReferenceEntity(newAccessPolicy.getResource());
1705 final AccessPolicyDTO newAccessPolicyDto = dtoFactory.createAccessPolicyDto(newAccessPolicy,
1706 newAccessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()),
1707 newAccessPolicy.getUsers().stream().map(userId -> {
1708 final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userId));
1709 return entityFactory.createTenantEntity(dtoFactory.createTenantDTO(userDAO.getUser(userId)), userRevision,
1710 dtoFactory.createPermissionsDto(tenantAuthorizable));
1711 }).collect(Collectors.toSet()), componentReference);
1713 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicyDTO.getId()));
1714 return entityFactory.createAccessPolicyEntity(newAccessPolicyDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions);
1718 public UserEntity createUser(final Revision revision, final UserDTO userDTO) {
1719 final String creator = NiFiUserUtils.getNiFiUserIdentity();
1720 final User newUser = userDAO.createUser(userDTO);
1721 final Set<TenantEntity> tenantEntities = userGroupDAO.getUserGroupsForUser(newUser.getIdentifier()).stream()
1722 .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet());
1723 final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUser(newUser.getIdentifier()).stream()
1724 .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
1725 final UserDTO newUserDto = dtoFactory.createUserDto(newUser, tenantEntities, policyEntities);
1727 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
1728 return entityFactory.createUserEntity(newUserDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions);
1731 private ComponentReferenceEntity createComponentReferenceEntity(final String resource) {
1732 ComponentReferenceEntity componentReferenceEntity = null;
1734 // get the component authorizable
1735 Authorizable componentAuthorizable = authorizableLookup.getAuthorizableFromResource(resource);
1737 // if this represents an authorizable whose policy permissions are enforced through the base resource,
1738 // get the underlying base authorizable for the component reference
1739 if (componentAuthorizable instanceof EnforcePolicyPermissionsThroughBaseResource) {
1740 componentAuthorizable = ((EnforcePolicyPermissionsThroughBaseResource) componentAuthorizable).getBaseAuthorizable();
1743 final ComponentReferenceDTO componentReference = dtoFactory.createComponentReferenceDto(componentAuthorizable);
1744 if (componentReference != null) {
1745 final PermissionsDTO componentReferencePermissions = dtoFactory.createPermissionsDto(componentAuthorizable);
1746 final RevisionDTO componentReferenceRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(componentReference.getId()));
1747 componentReferenceEntity = entityFactory.createComponentReferenceEntity(componentReference, componentReferenceRevision, componentReferencePermissions);
1749 } catch (final ResourceNotFoundException e) {
1750 // component not found for the specified resource
1753 return componentReferenceEntity;
1756 private AccessPolicySummaryEntity createAccessPolicySummaryEntity(final AccessPolicy ap) {
1757 final ComponentReferenceEntity componentReference = createComponentReferenceEntity(ap.getResource());
1758 final AccessPolicySummaryDTO apSummary = dtoFactory.createAccessPolicySummaryDto(ap, componentReference);
1759 final PermissionsDTO apPermissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(ap.getIdentifier()));
1760 final RevisionDTO apRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(ap.getIdentifier()));
1761 return entityFactory.createAccessPolicySummaryEntity(apSummary, apRevision, apPermissions);
1765 public UserGroupEntity createUserGroup(final Revision revision, final UserGroupDTO userGroupDTO) {
1766 final String creator = NiFiUserUtils.getNiFiUserIdentity();
1767 final Group newUserGroup = userGroupDAO.createUserGroup(userGroupDTO);
1768 final Set<TenantEntity> tenantEntities = newUserGroup.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet());
1769 final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(newUserGroup.getIdentifier()).stream()
1770 .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
1771 final UserGroupDTO newUserGroupDto = dtoFactory.createUserGroupDto(newUserGroup, tenantEntities, policyEntities);
1773 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
1774 return entityFactory.createUserGroupEntity(newUserGroupDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions);
1777 private void validateSnippetContents(final FlowSnippetDTO flow) {
1778 // validate any processors
1779 if (flow.getProcessors() != null) {
1780 for (final ProcessorDTO processorDTO : flow.getProcessors()) {
1781 final ProcessorNode processorNode = processorDAO.getProcessor(processorDTO.getId());
1782 processorDTO.setValidationStatus(processorNode.getValidationStatus().name());
1784 final Collection<ValidationResult> validationErrors = processorNode.getValidationErrors();
1785 if (validationErrors != null && !validationErrors.isEmpty()) {
1786 final List<String> errors = new ArrayList<>();
1787 for (final ValidationResult validationResult : validationErrors) {
1788 errors.add(validationResult.toString());
1790 processorDTO.setValidationErrors(errors);
1795 if (flow.getInputPorts() != null) {
1796 for (final PortDTO portDTO : flow.getInputPorts()) {
1797 final Port port = inputPortDAO.getPort(portDTO.getId());
1798 final Collection<ValidationResult> validationErrors = port.getValidationErrors();
1799 if (validationErrors != null && !validationErrors.isEmpty()) {
1800 final List<String> errors = new ArrayList<>();
1801 for (final ValidationResult validationResult : validationErrors) {
1802 errors.add(validationResult.toString());
1804 portDTO.setValidationErrors(errors);
1809 if (flow.getOutputPorts() != null) {
1810 for (final PortDTO portDTO : flow.getOutputPorts()) {
1811 final Port port = outputPortDAO.getPort(portDTO.getId());
1812 final Collection<ValidationResult> validationErrors = port.getValidationErrors();
1813 if (validationErrors != null && !validationErrors.isEmpty()) {
1814 final List<String> errors = new ArrayList<>();
1815 for (final ValidationResult validationResult : validationErrors) {
1816 errors.add(validationResult.toString());
1818 portDTO.setValidationErrors(errors);
1823 // get any remote process group issues
1824 if (flow.getRemoteProcessGroups() != null) {
1825 for (final RemoteProcessGroupDTO remoteProcessGroupDTO : flow.getRemoteProcessGroups()) {
1826 final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId());
1828 if (remoteProcessGroup.getAuthorizationIssue() != null) {
1829 remoteProcessGroupDTO.setAuthorizationIssues(Arrays.asList(remoteProcessGroup.getAuthorizationIssue()));
1836 public FlowEntity copySnippet(final String groupId, final String snippetId, final Double originX, final Double originY, final String idGenerationSeed) {
1837 // create the new snippet
1838 final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY, idGenerationSeed);
1841 controllerFacade.save();
1844 snippetDAO.dropSnippet(snippetId);
1846 // post process new flow snippet
1847 final FlowDTO flowDto = postProcessNewFlowSnippet(groupId, snippet);
1849 final FlowEntity flowEntity = new FlowEntity();
1850 flowEntity.setFlow(flowDto);
1855 public SnippetEntity createSnippet(final SnippetDTO snippetDTO) {
1856 // add the component
1857 final Snippet snippet = snippetDAO.createSnippet(snippetDTO);
1860 controllerFacade.save();
1862 final SnippetDTO dto = dtoFactory.createSnippetDto(snippet);
1863 final RevisionUpdate<SnippetDTO> snapshot = new StandardRevisionUpdate<>(dto, null);
1865 return entityFactory.createSnippetEntity(snapshot.getComponent());
1869 public PortEntity createInputPort(final Revision revision, final String groupId, final PortDTO inputPortDTO) {
1870 final RevisionUpdate<PortDTO> snapshot = createComponent(
1873 () -> inputPortDAO.createPort(groupId, inputPortDTO),
1874 port -> dtoFactory.createPortDto(port));
1876 final Port port = inputPortDAO.getPort(inputPortDTO.getId());
1877 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port);
1878 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port));
1879 final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(port.getIdentifier()));
1880 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
1881 final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
1882 return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
1886 public PortEntity createOutputPort(final Revision revision, final String groupId, final PortDTO outputPortDTO) {
1887 final RevisionUpdate<PortDTO> snapshot = createComponent(
1890 () -> outputPortDAO.createPort(groupId, outputPortDTO),
1891 port -> dtoFactory.createPortDto(port));
1893 final Port port = outputPortDAO.getPort(outputPortDTO.getId());
1894 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port);
1895 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port));
1896 final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(port.getIdentifier()));
1897 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
1898 final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
1899 return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
1903 public ProcessGroupEntity createProcessGroup(final Revision revision, final String parentGroupId, final ProcessGroupDTO processGroupDTO) {
1904 final RevisionUpdate<ProcessGroupDTO> snapshot = createComponent(
1907 () -> processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO),
1908 processGroup -> dtoFactory.createProcessGroupDto(processGroup));
1910 final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupDTO.getId());
1911 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
1912 final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroup.getIdentifier()));
1913 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroup.getIdentifier()));
1914 final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
1915 return entityFactory.createProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status, bulletinEntities);
1919 public RemoteProcessGroupEntity createRemoteProcessGroup(final Revision revision, final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) {
1920 final RevisionUpdate<RemoteProcessGroupDTO> snapshot = createComponent(
1922 remoteProcessGroupDTO,
1923 () -> remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO),
1924 remoteProcessGroup -> dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));
1926 final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId());
1927 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup);
1928 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroup));
1929 final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroup, controllerFacade.getRemoteProcessGroupStatus(remoteProcessGroup.getIdentifier()));
1930 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(remoteProcessGroup.getIdentifier()));
1931 final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
1932 return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()),
1933 permissions, operatePermissions, status, bulletinEntities);
1937 public boolean isRemoteGroupPortConnected(final String remoteProcessGroupId, final String remotePortId) {
1938 final RemoteProcessGroup rpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
1939 RemoteGroupPort port = rpg.getInputPort(remotePortId);
1941 return port.hasIncomingConnection();
1944 port = rpg.getOutputPort(remotePortId);
1946 return !port.getConnections().isEmpty();
1949 throw new ResourceNotFoundException("Could not find Port with ID " + remotePortId + " as a child of RemoteProcessGroup with ID " + remoteProcessGroupId);
1953 public void verifyCanAddTemplate(String groupId, String name) {
1954 templateDAO.verifyCanAddTemplate(name, groupId);
1958 public void verifyComponentTypes(FlowSnippetDTO snippet) {
1959 templateDAO.verifyComponentTypes(snippet);
1963 public void verifyComponentTypes(final VersionedProcessGroup versionedGroup) {
1964 controllerFacade.verifyComponentTypes(versionedGroup);
1968 public void verifyImportProcessGroup(final VersionControlInformationDTO versionControlInfo, final VersionedProcessGroup contents, final String groupId) {
1969 final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
1970 verifyImportProcessGroup(versionControlInfo, contents, group);
1973 private void verifyImportProcessGroup(final VersionControlInformationDTO vciDto, final VersionedProcessGroup contents, final ProcessGroup group) {
1974 if (group == null) {
1978 final VersionControlInformation vci = group.getVersionControlInformation();
1980 // Note that we do not compare the Registry ID here because there could be two registry clients
1981 // that point to the same server (one could point to localhost while another points to 127.0.0.1, for instance)..
1982 if (Objects.equals(vciDto.getBucketId(), vci.getBucketIdentifier())
1983 && Objects.equals(vciDto.getFlowId(), vci.getFlowIdentifier())) {
1985 throw new IllegalStateException("Cannot import the specified Versioned Flow into the Process Group because doing so would cause a recursive dataflow. "
1986 + "If Process Group A contains Process Group B, then Process Group B is not allowed to contain the flow identified by Process Group A.");
1990 final Set<VersionedProcessGroup> childGroups = contents.getProcessGroups();
1991 if (childGroups != null) {
1992 for (final VersionedProcessGroup childGroup : childGroups) {
1993 final VersionedFlowCoordinates childCoordinates = childGroup.getVersionedFlowCoordinates();
1994 if (childCoordinates != null) {
1995 final VersionControlInformationDTO childVci = new VersionControlInformationDTO();
1996 childVci.setBucketId(childCoordinates.getBucketId());
1997 childVci.setFlowId(childCoordinates.getFlowId());
1998 verifyImportProcessGroup(childVci, childGroup, group);
2003 verifyImportProcessGroup(vciDto, contents, group.getParent());
2007 public TemplateDTO createTemplate(final String name, final String description, final String snippetId, final String groupId, final Optional<String> idGenerationSeed) {
2008 // get the specified snippet
2009 final Snippet snippet = snippetDAO.getSnippet(snippetId);
2011 // create the template
2012 final TemplateDTO templateDTO = new TemplateDTO();
2013 templateDTO.setName(name);
2014 templateDTO.setDescription(description);
2015 templateDTO.setTimestamp(new Date());
2016 templateDTO.setSnippet(snippetUtils.populateFlowSnippet(snippet, true, true, true));
2017 templateDTO.setEncodingVersion(TemplateDTO.MAX_ENCODING_VERSION);
2019 // set the id based on the specified seed
2020 final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString();
2021 templateDTO.setId(uuid);
2023 // create the template
2024 final Template template = templateDAO.createTemplate(templateDTO, groupId);
2027 snippetDAO.dropSnippet(snippetId);
2030 controllerFacade.save();
2032 return dtoFactory.createTemplateDTO(template);
2036 * Ensures default values are populated for all components in this snippet. This is necessary to handle old templates without default values
2037 * and when existing properties have default values introduced.
2039 * @param snippet snippet
2041 private void ensureDefaultPropertyValuesArePopulated(final FlowSnippetDTO snippet) {
2042 if (snippet != null) {
2043 if (snippet.getControllerServices() != null) {
2044 snippet.getControllerServices().forEach(dto -> {
2045 if (dto.getProperties() == null) {
2046 dto.setProperties(new LinkedHashMap<>());
2050 final ConfigurableComponent configurableComponent = controllerFacade.getTemporaryComponent(dto.getType(), dto.getBundle());
2051 configurableComponent.getPropertyDescriptors().forEach(descriptor -> {
2052 if (dto.getProperties().get(descriptor.getName()) == null) {
2053 dto.getProperties().put(descriptor.getName(), descriptor.getDefaultValue());
2056 } catch (final Exception e) {
2057 logger.warn(String.format("Unable to create ControllerService of type %s to populate default values.", dto.getType()));
2062 if (snippet.getProcessors() != null) {
2063 snippet.getProcessors().forEach(dto -> {
2064 if (dto.getConfig() == null) {
2065 dto.setConfig(new ProcessorConfigDTO());
2068 final ProcessorConfigDTO config = dto.getConfig();
2069 if (config.getProperties() == null) {
2070 config.setProperties(new LinkedHashMap<>());
2074 final ConfigurableComponent configurableComponent = controllerFacade.getTemporaryComponent(dto.getType(), dto.getBundle());
2075 configurableComponent.getPropertyDescriptors().forEach(descriptor -> {
2076 if (config.getProperties().get(descriptor.getName()) == null) {
2077 config.getProperties().put(descriptor.getName(), descriptor.getDefaultValue());
2080 } catch (final Exception e) {
2081 logger.warn(String.format("Unable to create Processor of type %s to populate default values.", dto.getType()));
2086 if (snippet.getProcessGroups() != null) {
2087 snippet.getProcessGroups().forEach(processGroup -> {
2088 ensureDefaultPropertyValuesArePopulated(processGroup.getContents());
2095 public TemplateDTO importTemplate(final TemplateDTO templateDTO, final String groupId, final Optional<String> idGenerationSeed) {
2097 final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString();
2098 templateDTO.setId(uuid);
2100 // mark the timestamp
2101 templateDTO.setTimestamp(new Date());
2103 // ensure default values are populated
2104 ensureDefaultPropertyValuesArePopulated(templateDTO.getSnippet());
2106 // import the template
2107 final Template template = templateDAO.importTemplate(templateDTO, groupId);
2110 controllerFacade.save();
2112 // return the template dto
2113 return dtoFactory.createTemplateDTO(template);
2117 * Post processes a new flow snippet including validation, removing the snippet, and DTO conversion.
2119 * @param groupId group id
2120 * @param snippet snippet
2123 private FlowDTO postProcessNewFlowSnippet(final String groupId, final FlowSnippetDTO snippet) {
2124 // validate the new snippet
2125 validateSnippetContents(snippet);
2127 // identify all components added
2128 final Set<String> identifiers = new HashSet<>();
2129 snippet.getProcessors().stream()
2130 .map(proc -> proc.getId())
2131 .forEach(id -> identifiers.add(id));
2132 snippet.getConnections().stream()
2133 .map(conn -> conn.getId())
2134 .forEach(id -> identifiers.add(id));
2135 snippet.getInputPorts().stream()
2136 .map(port -> port.getId())
2137 .forEach(id -> identifiers.add(id));
2138 snippet.getOutputPorts().stream()
2139 .map(port -> port.getId())
2140 .forEach(id -> identifiers.add(id));
2141 snippet.getProcessGroups().stream()
2142 .map(group -> group.getId())
2143 .forEach(id -> identifiers.add(id));
2144 snippet.getRemoteProcessGroups().stream()
2145 .map(remoteGroup -> remoteGroup.getId())
2146 .forEach(id -> identifiers.add(id));
2147 snippet.getRemoteProcessGroups().stream()
2148 .filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getInputPorts() != null)
2149 .flatMap(remoteGroup -> remoteGroup.getContents().getInputPorts().stream())
2150 .map(remoteInputPort -> remoteInputPort.getId())
2151 .forEach(id -> identifiers.add(id));
2152 snippet.getRemoteProcessGroups().stream()
2153 .filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getOutputPorts() != null)
2154 .flatMap(remoteGroup -> remoteGroup.getContents().getOutputPorts().stream())
2155 .map(remoteOutputPort -> remoteOutputPort.getId())
2156 .forEach(id -> identifiers.add(id));
2157 snippet.getLabels().stream()
2158 .map(label -> label.getId())
2159 .forEach(id -> identifiers.add(id));
2161 final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
2162 final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId);
2163 return dtoFactory.createFlowDto(group, groupStatus, snippet, revisionManager, this::getProcessGroupBulletins);
2167 public FlowEntity createTemplateInstance(final String groupId, final Double originX, final Double originY, final String templateEncodingVersion,
2168 final FlowSnippetDTO requestSnippet, final String idGenerationSeed) {
2170 // instantiate the template - there is no need to make another copy of the flow snippet since the actual template
2171 // was copied and this dto is only used to instantiate it's components (which as already completed)
2172 final FlowSnippetDTO snippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateEncodingVersion, requestSnippet, idGenerationSeed);
2175 controllerFacade.save();
2177 // post process the new flow snippet
2178 final FlowDTO flowDto = postProcessNewFlowSnippet(groupId, snippet);
2180 final FlowEntity flowEntity = new FlowEntity();
2181 flowEntity.setFlow(flowDto);
2186 public ControllerServiceEntity createControllerService(final Revision revision, final String groupId, final ControllerServiceDTO controllerServiceDTO) {
2187 controllerServiceDTO.setParentGroupId(groupId);
2189 final NiFiUser user = NiFiUserUtils.getNiFiUser();
2191 // request claim for component to be created... revision already verified (version == 0)
2192 final RevisionClaim claim = new StandardRevisionClaim(revision);
2194 final RevisionUpdate<ControllerServiceDTO> snapshot;
2195 if (groupId == null) {
2196 // update revision through revision manager
2197 snapshot = revisionManager.updateRevision(claim, user, () -> {
2198 // Unfortunately, we can not use the createComponent() method here because createComponent() wants to obtain the read lock
2199 // on the group. The Controller Service may or may not have a Process Group (it won't if it's controller-scoped).
2200 final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO);
2201 controllerFacade.save();
2203 awaitValidationCompletion(controllerService);
2204 final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(controllerService);
2206 final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
2207 return new StandardRevisionUpdate<>(dto, lastMod);
2210 snapshot = revisionManager.updateRevision(claim, user, () -> {
2211 final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO);
2212 controllerFacade.save();
2214 awaitValidationCompletion(controllerService);
2215 final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(controllerService);
2217 final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
2218 return new StandardRevisionUpdate<>(dto, lastMod);
2222 final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceDTO.getId());
2223 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService);
2224 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService));
2225 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(controllerServiceDTO.getId()));
2226 final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
2227 return entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities);
2231 public ControllerServiceEntity updateControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) {
2232 // get the component, ensure we have access to it, and perform the update request
2233 final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceDTO.getId());
2234 final RevisionUpdate<ControllerServiceDTO> snapshot = updateComponent(revision,
2236 () -> controllerServiceDAO.updateControllerService(controllerServiceDTO),
2238 awaitValidationCompletion(cs);
2239 final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(cs);
2240 final ControllerServiceReference ref = controllerService.getReferences();
2241 final ControllerServiceReferencingComponentsEntity referencingComponentsEntity =
2242 createControllerServiceReferencingComponentsEntity(ref, Sets.newHashSet(controllerService.getIdentifier()));
2243 dto.setReferencingComponents(referencingComponentsEntity.getControllerServiceReferencingComponents());
2247 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService);
2248 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService));
2249 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(controllerServiceDTO.getId()));
2250 final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
2251 return entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities);
2256 public ControllerServiceReferencingComponentsEntity updateControllerServiceReferencingComponents(
2257 final Map<String, Revision> referenceRevisions, final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) {
2259 final RevisionClaim claim = new StandardRevisionClaim(referenceRevisions.values());
2261 final NiFiUser user = NiFiUserUtils.getNiFiUser();
2262 final RevisionUpdate<ControllerServiceReferencingComponentsEntity> update = revisionManager.updateRevision(claim, user,
2263 new UpdateRevisionTask<ControllerServiceReferencingComponentsEntity>() {
2265 public RevisionUpdate<ControllerServiceReferencingComponentsEntity> update() {
2266 final Set<ComponentNode> updated = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
2267 final ControllerServiceReference updatedReference = controllerServiceDAO.getControllerService(controllerServiceId).getReferences();
2269 // get the revisions of the updated components
2270 final Map<String, Revision> updatedRevisions = new HashMap<>();
2271 for (final ComponentNode component : updated) {
2272 final Revision currentRevision = revisionManager.getRevision(component.getIdentifier());
2273 final Revision requestRevision = referenceRevisions.get(component.getIdentifier());
2274 updatedRevisions.put(component.getIdentifier(), currentRevision.incrementRevision(requestRevision.getClientId()));
2277 // ensure the revision for all referencing components is included regardless of whether they were updated in this request
2278 for (final ComponentNode component : updatedReference.findRecursiveReferences(ComponentNode.class)) {
2279 updatedRevisions.putIfAbsent(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier()));
2282 final ControllerServiceReferencingComponentsEntity entity = createControllerServiceReferencingComponentsEntity(updatedReference, updatedRevisions);
2283 return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values()));
2287 return update.getComponent();
2291 * Finds the identifiers for all components referencing a ControllerService.
2293 * @param reference ControllerServiceReference
2294 * @param visited ControllerServices we've already visited
2296 private void findControllerServiceReferencingComponentIdentifiers(final ControllerServiceReference reference, final Set<ControllerServiceNode> visited) {
2297 for (final ComponentNode component : reference.getReferencingComponents()) {
2299 // if this is a ControllerService consider it's referencing components
2300 if (component instanceof ControllerServiceNode) {
2301 final ControllerServiceNode node = (ControllerServiceNode) component;
2302 if (!visited.contains(node)) {
2304 findControllerServiceReferencingComponentIdentifiers(node.getReferences(), visited);
2311 * Creates entities for components referencing a ControllerService using their current revision.
2313 * @param reference ControllerServiceReference
2314 * @return The entity
2316 private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity(final ControllerServiceReference reference, final Set<String> lockedIds) {
2317 final Set<ControllerServiceNode> visited = new HashSet<>();
2318 visited.add(reference.getReferencedComponent());
2319 findControllerServiceReferencingComponentIdentifiers(reference, visited);
2321 final Map<String, Revision> referencingRevisions = new HashMap<>();
2322 for (final ComponentNode component : reference.getReferencingComponents()) {
2323 referencingRevisions.put(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier()));
2326 return createControllerServiceReferencingComponentsEntity(reference, referencingRevisions);
2330 * Creates entities for components referencing a ControllerService using the specified revisions.
2332 * @param reference ControllerServiceReference
2333 * @param revisions The revisions
2334 * @return The entity
2336 private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity(
2337 final ControllerServiceReference reference, final Map<String, Revision> revisions) {
2338 final Set<ControllerServiceNode> visited = new HashSet<>();
2339 visited.add(reference.getReferencedComponent());
2340 return createControllerServiceReferencingComponentsEntity(reference, revisions, visited);
2344 * Creates entities for components referencing a ControllerServcie using the specified revisions.
2346 * @param reference ControllerServiceReference
2347 * @param revisions The revisions
2348 * @param visited Which services we've already considered (in case of cycle)
2349 * @return The entity
2351 private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity(
2352 final ControllerServiceReference reference, final Map<String, Revision> revisions, final Set<ControllerServiceNode> visited) {
2354 final String modifier = NiFiUserUtils.getNiFiUserIdentity();
2355 final Set<ComponentNode> referencingComponents = reference.getReferencingComponents();
2357 final Set<ControllerServiceReferencingComponentEntity> componentEntities = new HashSet<>();
2358 for (final ComponentNode refComponent : referencingComponents) {
2359 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(refComponent);
2360 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(refComponent));
2362 final Revision revision = revisions.get(refComponent.getIdentifier());
2363 final FlowModification flowMod = new FlowModification(revision, modifier);
2364 final RevisionDTO revisionDto = dtoFactory.createRevisionDTO(flowMod);
2365 final ControllerServiceReferencingComponentDTO dto = dtoFactory.createControllerServiceReferencingComponentDTO(refComponent);
2367 if (refComponent instanceof ControllerServiceNode) {
2368 final ControllerServiceNode node = (ControllerServiceNode) refComponent;
2370 // indicate if we've hit a cycle
2371 dto.setReferenceCycle(visited.contains(node));
2373 // mark node as visited before building the reference cycle
2376 // if we haven't encountered this service before include it's referencing components
2377 if (!dto.getReferenceCycle()) {
2378 final ControllerServiceReference refReferences = node.getReferences();
2379 final Map<String, Revision> referencingRevisions = new HashMap<>(revisions);
2380 for (final ComponentNode component : refReferences.getReferencingComponents()) {
2381 referencingRevisions.putIfAbsent(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier()));
2383 final ControllerServiceReferencingComponentsEntity references = createControllerServiceReferencingComponentsEntity(refReferences, referencingRevisions, visited);
2384 dto.setReferencingComponents(references.getControllerServiceReferencingComponents());
2388 componentEntities.add(entityFactory.createControllerServiceReferencingComponentEntity(refComponent.getIdentifier(), dto, revisionDto, permissions, operatePermissions));
2391 final ControllerServiceReferencingComponentsEntity entity = new ControllerServiceReferencingComponentsEntity();
2392 entity.setControllerServiceReferencingComponents(componentEntities);
2397 public ControllerServiceEntity deleteControllerService(final Revision revision, final String controllerServiceId) {
2398 final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId);
2399 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService);
2400 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService));
2401 final ControllerServiceDTO snapshot = deleteComponent(
2403 controllerService.getResource(),
2404 () -> controllerServiceDAO.deleteControllerService(controllerServiceId),
2406 dtoFactory.createControllerServiceDto(controllerService));
2408 return entityFactory.createControllerServiceEntity(snapshot, null, permissions, operatePermissions, null);
2413 public RegistryClientEntity createRegistryClient(Revision revision, RegistryDTO registryDTO) {
2414 final NiFiUser user = NiFiUserUtils.getNiFiUser();
2416 // request claim for component to be created... revision already verified (version == 0)
2417 final RevisionClaim claim = new StandardRevisionClaim(revision);
2419 // update revision through revision manager
2420 final RevisionUpdate<FlowRegistry> revisionUpdate = revisionManager.updateRevision(claim, user, () -> {
2421 // add the component
2422 final FlowRegistry registry = registryDAO.createFlowRegistry(registryDTO);
2425 controllerFacade.save();
2427 final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
2428 return new StandardRevisionUpdate<>(registry, lastMod);
2431 final FlowRegistry registry = revisionUpdate.getComponent();
2432 return createRegistryClientEntity(registry);
2436 public RegistryClientEntity getRegistryClient(final String registryId) {
2437 final FlowRegistry registry = registryDAO.getFlowRegistry(registryId);
2438 return createRegistryClientEntity(registry);
2441 private RegistryClientEntity createRegistryClientEntity(final FlowRegistry flowRegistry) {
2442 if (flowRegistry == null) {
2446 final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(flowRegistry.getIdentifier()));
2447 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getController());
2448 final RegistryDTO dto = dtoFactory.createRegistryDto(flowRegistry);
2450 return entityFactory.createRegistryClientEntity(dto, revision, permissions);
2453 private VersionedFlowEntity createVersionedFlowEntity(final String registryId, final VersionedFlow versionedFlow) {
2454 if (versionedFlow == null) {
2458 final VersionedFlowDTO dto = new VersionedFlowDTO();
2459 dto.setRegistryId(registryId);
2460 dto.setBucketId(versionedFlow.getBucketIdentifier());
2461 dto.setFlowId(versionedFlow.getIdentifier());
2462 dto.setFlowName(versionedFlow.getName());
2463 dto.setDescription(versionedFlow.getDescription());
2465 final VersionedFlowEntity entity = new VersionedFlowEntity();
2466 entity.setVersionedFlow(dto);
2471 private VersionedFlowSnapshotMetadataEntity createVersionedFlowSnapshotMetadataEntity(final String registryId, final VersionedFlowSnapshotMetadata metadata) {
2472 if (metadata == null) {
2476 final VersionedFlowSnapshotMetadataEntity entity = new VersionedFlowSnapshotMetadataEntity();
2477 entity.setRegistryId(registryId);
2478 entity.setVersionedFlowMetadata(metadata);
2484 public Set<RegistryClientEntity> getRegistryClients() {
2485 return registryDAO.getFlowRegistries().stream()
2486 .map(this::createRegistryClientEntity)
2487 .collect(Collectors.toSet());
2491 public Set<RegistryEntity> getRegistriesForUser(final NiFiUser user) {
2492 return registryDAO.getFlowRegistriesForUser(user).stream()
2493 .map(flowRegistry -> entityFactory.createRegistryEntity(dtoFactory.createRegistryDto(flowRegistry)))
2494 .collect(Collectors.toSet());
2498 public Set<BucketEntity> getBucketsForUser(final String registryId, final NiFiUser user) {
2499 return registryDAO.getBucketsForUser(registryId, user).stream()
2501 if (bucket == null) {
2505 final BucketDTO dto = new BucketDTO();
2506 dto.setId(bucket.getIdentifier());
2507 dto.setName(bucket.getName());
2508 dto.setDescription(bucket.getDescription());
2509 dto.setCreated(bucket.getCreatedTimestamp());
2511 final Permissions regPermissions = bucket.getPermissions();
2512 final PermissionsDTO permissions = new PermissionsDTO();
2513 permissions.setCanRead(regPermissions.getCanRead());
2514 permissions.setCanWrite(regPermissions.getCanWrite());
2516 return entityFactory.createBucketEntity(dto, permissions);
2518 .collect(Collectors.toSet());
2522 public Set<VersionedFlowEntity> getFlowsForUser(String registryId, String bucketId, NiFiUser user) {
2523 return registryDAO.getFlowsForUser(registryId, bucketId, user).stream()
2524 .map(vf -> createVersionedFlowEntity(registryId, vf))
2525 .collect(Collectors.toSet());
2529 public Set<VersionedFlowSnapshotMetadataEntity> getFlowVersionsForUser(String registryId, String bucketId, String flowId, NiFiUser user) {
2530 return registryDAO.getFlowVersionsForUser(registryId, bucketId, flowId, user).stream()
2531 .map(md -> createVersionedFlowSnapshotMetadataEntity(registryId, md))
2532 .collect(Collectors.toSet());
2536 public RegistryClientEntity updateRegistryClient(Revision revision, RegistryDTO registryDTO) {
2537 final RevisionClaim revisionClaim = new StandardRevisionClaim(revision);
2538 final NiFiUser user = NiFiUserUtils.getNiFiUser();
2540 final FlowRegistry registry = registryDAO.getFlowRegistry(registryDTO.getId());
2541 final RevisionUpdate<FlowRegistry> revisionUpdate = revisionManager.updateRevision(revisionClaim, user, () -> {
2542 final boolean duplicateName = registryDAO.getFlowRegistries().stream()
2543 .anyMatch(reg -> reg.getName().equals(registryDTO.getName()) && !reg.getIdentifier().equals(registryDTO.getId()));
2545 if (duplicateName) {
2546 throw new IllegalStateException("Cannot update Flow Registry because a Flow Registry already exists with the name " + registryDTO.getName());
2549 registry.setDescription(registryDTO.getDescription());
2550 registry.setName(registryDTO.getName());
2551 registry.setURL(registryDTO.getUri());
2553 controllerFacade.save();
2555 final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId());
2556 final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity());
2558 return new StandardRevisionUpdate<>(registry, lastModification);
2561 final FlowRegistry updatedReg = revisionUpdate.getComponent();
2562 return createRegistryClientEntity(updatedReg);
2566 public void verifyDeleteRegistry(String registryId) {
2567 processGroupDAO.verifyDeleteFlowRegistry(registryId);
2571 public RegistryClientEntity deleteRegistryClient(final Revision revision, final String registryId) {
2572 final RevisionClaim claim = new StandardRevisionClaim(revision);
2573 final NiFiUser user = NiFiUserUtils.getNiFiUser();
2575 final FlowRegistry registry = revisionManager.deleteRevision(claim, user, () -> {
2576 final FlowRegistry reg = registryDAO.removeFlowRegistry(registryId);
2577 controllerFacade.save();
2581 return createRegistryClientEntity(registry);
2585 public ReportingTaskEntity createReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) {
2586 final NiFiUser user = NiFiUserUtils.getNiFiUser();
2588 // request claim for component to be created... revision already verified (version == 0)
2589 final RevisionClaim claim = new StandardRevisionClaim(revision);
2591 // update revision through revision manager
2592 final RevisionUpdate<ReportingTaskDTO> snapshot = revisionManager.updateRevision(claim, user, () -> {
2593 // create the reporting task
2594 final ReportingTaskNode reportingTask = reportingTaskDAO.createReportingTask(reportingTaskDTO);
2597 controllerFacade.save();
2598 awaitValidationCompletion(reportingTask);
2600 final ReportingTaskDTO dto = dtoFactory.createReportingTaskDto(reportingTask);
2601 final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
2602 return new StandardRevisionUpdate<>(dto, lastMod);
2605 final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskDTO.getId());
2606 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask);
2607 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask));
2608 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier()));
2609 final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
2610 return entityFactory.createReportingTaskEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities);
2614 public ReportingTaskEntity updateReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) {
2615 // get the component, ensure we have access to it, and perform the update request
2616 final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskDTO.getId());
2617 final RevisionUpdate<ReportingTaskDTO> snapshot = updateComponent(revision,
2619 () -> reportingTaskDAO.updateReportingTask(reportingTaskDTO),
2621 awaitValidationCompletion(rt);
2622 return dtoFactory.createReportingTaskDto(rt);
2625 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask);
2626 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask));
2627 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier()));
2628 final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
2629 return entityFactory.createReportingTaskEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities);
2633 public ReportingTaskEntity deleteReportingTask(final Revision revision, final String reportingTaskId) {
2634 final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId);
2635 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask);
2636 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask));
2637 final ReportingTaskDTO snapshot = deleteComponent(
2639 reportingTask.getResource(),
2640 () -> reportingTaskDAO.deleteReportingTask(reportingTaskId),
2642 dtoFactory.createReportingTaskDto(reportingTask));
2644 return entityFactory.createReportingTaskEntity(snapshot, null, permissions, operatePermissions, null);
2648 public void deleteActions(final Date endDate) {
2649 // get the user from the request
2650 final NiFiUser user = NiFiUserUtils.getNiFiUser();
2652 throw new WebApplicationException(new Throwable("Unable to access details for current user."));
2655 // create the purge details
2656 final FlowChangePurgeDetails details = new FlowChangePurgeDetails();
2657 details.setEndDate(endDate);
2659 // create a purge action to record that records are being removed
2660 final FlowChangeAction purgeAction = new FlowChangeAction();
2661 purgeAction.setUserIdentity(user.getIdentity());
2662 purgeAction.setOperation(Operation.Purge);
2663 purgeAction.setTimestamp(new Date());
2664 purgeAction.setSourceId("Flow Controller");
2665 purgeAction.setSourceName("History");
2666 purgeAction.setSourceType(Component.Controller);
2667 purgeAction.setActionDetails(details);
2669 // purge corresponding actions
2670 auditService.purgeActions(endDate, purgeAction);
2674 public ProvenanceDTO submitProvenance(final ProvenanceDTO query) {
2675 return controllerFacade.submitProvenance(query);
2679 public void deleteProvenance(final String queryId) {
2680 controllerFacade.deleteProvenanceQuery(queryId);
2684 public LineageDTO submitLineage(final LineageDTO lineage) {
2685 return controllerFacade.submitLineage(lineage);
2689 public void deleteLineage(final String lineageId) {
2690 controllerFacade.deleteLineage(lineageId);
2694 public ProvenanceEventDTO submitReplay(final Long eventId) {
2695 return controllerFacade.submitReplay(eventId);
2698 // -----------------------------------------
2700 // -----------------------------------------
2703 public SearchResultsDTO searchController(final String query) {
2704 return controllerFacade.search(query);
2708 public DownloadableContent getContent(final String connectionId, final String flowFileUuid, final String uri) {
2709 return connectionDAO.getContent(connectionId, flowFileUuid, uri);
2713 public DownloadableContent getContent(final Long eventId, final String uri, final ContentDirection contentDirection) {
2714 return controllerFacade.getContent(eventId, uri, contentDirection);
2718 public ProvenanceDTO getProvenance(final String queryId, final Boolean summarize, final Boolean incrementalResults) {
2719 return controllerFacade.getProvenanceQuery(queryId, summarize, incrementalResults);
2723 public LineageDTO getLineage(final String lineageId) {
2724 return controllerFacade.getLineage(lineageId);
2728 public ProvenanceOptionsDTO getProvenanceSearchOptions() {
2729 return controllerFacade.getProvenanceSearchOptions();
2733 public ProvenanceEventDTO getProvenanceEvent(final Long id) {
2734 return controllerFacade.getProvenanceEvent(id);
2738 public ProcessGroupStatusEntity getProcessGroupStatus(final String groupId, final boolean recursive) {
2739 final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
2740 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
2741 final ProcessGroupStatusDTO dto = dtoFactory.createProcessGroupStatusDto(processGroup, controllerFacade.getProcessGroupStatus(groupId));
2743 // prune the response as necessary
2745 pruneChildGroups(dto.getAggregateSnapshot());
2746 if (dto.getNodeSnapshots() != null) {
2747 for (final NodeProcessGroupStatusSnapshotDTO nodeSnapshot : dto.getNodeSnapshots()) {
2748 pruneChildGroups(nodeSnapshot.getStatusSnapshot());
2753 return entityFactory.createProcessGroupStatusEntity(dto, permissions);
2756 private void pruneChildGroups(final ProcessGroupStatusSnapshotDTO snapshot) {
2757 for (final ProcessGroupStatusSnapshotEntity childProcessGroupStatusEntity : snapshot.getProcessGroupStatusSnapshots()) {
2758 final ProcessGroupStatusSnapshotDTO childProcessGroupStatus = childProcessGroupStatusEntity.getProcessGroupStatusSnapshot();
2759 childProcessGroupStatus.setConnectionStatusSnapshots(null);
2760 childProcessGroupStatus.setProcessGroupStatusSnapshots(null);
2761 childProcessGroupStatus.setInputPortStatusSnapshots(null);
2762 childProcessGroupStatus.setOutputPortStatusSnapshots(null);
2763 childProcessGroupStatus.setProcessorStatusSnapshots(null);
2764 childProcessGroupStatus.setRemoteProcessGroupStatusSnapshots(null);
2769 public ControllerStatusDTO getControllerStatus() {
2770 return controllerFacade.getControllerStatus();
2774 public ComponentStateDTO getProcessorState(final String processorId) {
2775 final StateMap clusterState = isClustered() ? processorDAO.getState(processorId, Scope.CLUSTER) : null;
2776 final StateMap localState = processorDAO.getState(processorId, Scope.LOCAL);
2778 // processor will be non null as it was already found when getting the state
2779 final ProcessorNode processor = processorDAO.getProcessor(processorId);
2780 return dtoFactory.createComponentStateDTO(processorId, processor.getProcessor().getClass(), localState, clusterState);
2784 public ComponentStateDTO getControllerServiceState(final String controllerServiceId) {
2785 final StateMap clusterState = isClustered() ? controllerServiceDAO.getState(controllerServiceId, Scope.CLUSTER) : null;
2786 final StateMap localState = controllerServiceDAO.getState(controllerServiceId, Scope.LOCAL);
2788 // controller service will be non null as it was already found when getting the state
2789 final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId);
2790 return dtoFactory.createComponentStateDTO(controllerServiceId, controllerService.getControllerServiceImplementation().getClass(), localState, clusterState);
2794 public ComponentStateDTO getReportingTaskState(final String reportingTaskId) {
2795 final StateMap clusterState = isClustered() ? reportingTaskDAO.getState(reportingTaskId, Scope.CLUSTER) : null;
2796 final StateMap localState = reportingTaskDAO.getState(reportingTaskId, Scope.LOCAL);
2798 // reporting task will be non null as it was already found when getting the state
2799 final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId);
2800 return dtoFactory.createComponentStateDTO(reportingTaskId, reportingTask.getReportingTask().getClass(), localState, clusterState);
2804 public CountersDTO getCounters() {
2805 final List<Counter> counters = controllerFacade.getCounters();
2806 final Set<CounterDTO> counterDTOs = new LinkedHashSet<>(counters.size());
2807 for (final Counter counter : counters) {
2808 counterDTOs.add(dtoFactory.createCounterDto(counter));
2811 final CountersSnapshotDTO snapshotDto = dtoFactory.createCountersDto(counterDTOs);
2812 final CountersDTO countersDto = new CountersDTO();
2813 countersDto.setAggregateSnapshot(snapshotDto);
2818 private ConnectionEntity createConnectionEntity(final Connection connection) {
2819 final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(connection.getIdentifier()));
2820 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
2821 final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connection.getIdentifier()));
2822 return entityFactory.createConnectionEntity(dtoFactory.createConnectionDto(connection), revision, permissions, status);
2826 public Set<ConnectionEntity> getConnections(final String groupId) {
2827 final Set<Connection> connections = connectionDAO.getConnections(groupId);
2828 return connections.stream()
2829 .map(connection -> createConnectionEntity(connection))
2830 .collect(Collectors.toSet());
2834 public ConnectionEntity getConnection(final String connectionId) {
2835 final Connection connection = connectionDAO.getConnection(connectionId);
2836 return createConnectionEntity(connection);
2840 public DropRequestDTO getFlowFileDropRequest(final String connectionId, final String dropRequestId) {
2841 return dtoFactory.createDropRequestDTO(connectionDAO.getFlowFileDropRequest(connectionId, dropRequestId));
2845 public ListingRequestDTO getFlowFileListingRequest(final String connectionId, final String listingRequestId) {
2846 final Connection connection = connectionDAO.getConnection(connectionId);
2847 final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.getFlowFileListingRequest(connectionId, listingRequestId));
2849 // include whether the source and destination are running
2850 if (connection.getSource() != null) {
2851 listRequest.setSourceRunning(connection.getSource().isRunning());
2853 if (connection.getDestination() != null) {
2854 listRequest.setDestinationRunning(connection.getDestination().isRunning());
2861 public FlowFileDTO getFlowFile(final String connectionId, final String flowFileUuid) {
2862 return dtoFactory.createFlowFileDTO(connectionDAO.getFlowFile(connectionId, flowFileUuid));
2866 public ConnectionStatusEntity getConnectionStatus(final String connectionId) {
2867 final Connection connection = connectionDAO.getConnection(connectionId);
2868 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
2869 final ConnectionStatusDTO dto = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionId));
2870 return entityFactory.createConnectionStatusEntity(dto, permissions);
2874 public StatusHistoryEntity getConnectionStatusHistory(final String connectionId) {
2875 final Connection connection = connectionDAO.getConnection(connectionId);
2876 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
2877 final StatusHistoryDTO dto = controllerFacade.getConnectionStatusHistory(connectionId);
2878 return entityFactory.createStatusHistoryEntity(dto, permissions);
2881 private ProcessorEntity createProcessorEntity(final ProcessorNode processor, final NiFiUser user) {
2882 final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processor.getIdentifier()));
2883 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor, user);
2884 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor));
2885 final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier()));
2886 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processor.getIdentifier()));
2887 final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
2888 return entityFactory.createProcessorEntity(dtoFactory.createProcessorDto(processor), revision, permissions, operatePermissions, status, bulletinEntities);
2892 public Set<ProcessorEntity> getProcessors(final String groupId, final boolean includeDescendants) {
2893 final Set<ProcessorNode> processors = processorDAO.getProcessors(groupId, includeDescendants);
2894 final NiFiUser user = NiFiUserUtils.getNiFiUser();
2895 return processors.stream()
2896 .map(processor -> createProcessorEntity(processor, user))
2897 .collect(Collectors.toSet());
2901 public TemplateDTO exportTemplate(final String id) {
2902 final Template template = templateDAO.getTemplate(id);
2903 final TemplateDTO templateDetails = template.getDetails();
2905 final TemplateDTO templateDTO = dtoFactory.createTemplateDTO(template);
2906 templateDTO.setSnippet(dtoFactory.copySnippetContents(templateDetails.getSnippet()));
2911 public TemplateDTO getTemplate(final String id) {
2912 return dtoFactory.createTemplateDTO(templateDAO.getTemplate(id));
2916 public Set<TemplateEntity> getTemplates() {
2917 return templateDAO.getTemplates().stream()
2919 final TemplateDTO dto = dtoFactory.createTemplateDTO(template);
2920 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(template);
2922 final TemplateEntity entity = new TemplateEntity();
2923 entity.setId(dto.getId());
2924 entity.setPermissions(permissions);
2925 entity.setTemplate(dto);
2927 }).collect(Collectors.toSet());
2931 public Set<DocumentedTypeDTO> getWorkQueuePrioritizerTypes() {
2932 return controllerFacade.getFlowFileComparatorTypes();
2936 public Set<DocumentedTypeDTO> getProcessorTypes(final String bundleGroup, final String bundleArtifact, final String type) {
2937 return controllerFacade.getFlowFileProcessorTypes(bundleGroup, bundleArtifact, type);
2941 public Set<DocumentedTypeDTO> getControllerServiceTypes(final String serviceType, final String serviceBundleGroup, final String serviceBundleArtifact, final String serviceBundleVersion,
2942 final String bundleGroup, final String bundleArtifact, final String type) {
2943 return controllerFacade.getControllerServiceTypes(serviceType, serviceBundleGroup, serviceBundleArtifact, serviceBundleVersion, bundleGroup, bundleArtifact, type);
2947 public Set<DocumentedTypeDTO> getReportingTaskTypes(final String bundleGroup, final String bundleArtifact, final String type) {
2948 return controllerFacade.getReportingTaskTypes(bundleGroup, bundleArtifact, type);
2952 public ProcessorEntity getProcessor(final String id) {
2953 final ProcessorNode processor = processorDAO.getProcessor(id);
2954 return createProcessorEntity(processor, NiFiUserUtils.getNiFiUser());
2958 public PropertyDescriptorDTO getProcessorPropertyDescriptor(final String id, final String property) {
2959 final ProcessorNode processor = processorDAO.getProcessor(id);
2960 PropertyDescriptor descriptor = processor.getPropertyDescriptor(property);
2962 // return an invalid descriptor if the processor doesn't support this property
2963 if (descriptor == null) {
2964 descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
2967 return dtoFactory.createPropertyDescriptorDto(descriptor, processor.getProcessGroup().getIdentifier());
2971 public ProcessorStatusEntity getProcessorStatus(final String id) {
2972 final ProcessorNode processor = processorDAO.getProcessor(id);
2973 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor);
2974 final ProcessorStatusDTO dto = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(id));
2975 return entityFactory.createProcessorStatusEntity(dto, permissions);
2979 public StatusHistoryEntity getProcessorStatusHistory(final String id) {
2980 final ProcessorNode processor = processorDAO.getProcessor(id);
2981 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor);
2982 final StatusHistoryDTO dto = controllerFacade.getProcessorStatusHistory(id);
2983 return entityFactory.createStatusHistoryEntity(dto, permissions);
2986 private boolean authorizeBulletin(final Bulletin bulletin) {
2987 final String sourceId = bulletin.getSourceId();
2988 final ComponentType type = bulletin.getSourceType();
2990 final Authorizable authorizable;
2994 authorizable = authorizableLookup.getProcessor(sourceId).getAuthorizable();
2996 case REPORTING_TASK:
2997 authorizable = authorizableLookup.getReportingTask(sourceId).getAuthorizable();
2999 case CONTROLLER_SERVICE:
3000 authorizable = authorizableLookup.getControllerService(sourceId).getAuthorizable();
3002 case FLOW_CONTROLLER:
3003 authorizable = controllerFacade;
3006 authorizable = authorizableLookup.getInputPort(sourceId);
3009 authorizable = authorizableLookup.getOutputPort(sourceId);
3011 case REMOTE_PROCESS_GROUP:
3012 authorizable = authorizableLookup.getRemoteProcessGroup(sourceId);
3015 throw new WebApplicationException(Response.serverError().entity("An unexpected type of component is the source of this bulletin.").build());
3017 } catch (final ResourceNotFoundException e) {
3018 // if the underlying component is gone, disallow
3022 // perform the authorization
3023 final AuthorizationResult result = authorizable.checkAuthorization(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
3024 return Result.Approved.equals(result.getResult());
3028 public BulletinBoardDTO getBulletinBoard(final BulletinQueryDTO query) {
3030 final BulletinQuery.Builder queryBuilder = new BulletinQuery.Builder()
3031 .groupIdMatches(query.getGroupId())
3032 .sourceIdMatches(query.getSourceId())
3033 .nameMatches(query.getName())
3034 .messageMatches(query.getMessage())
3035 .after(query.getAfter())
3036 .limit(query.getLimit());
3038 // perform the query
3039 final List<Bulletin> results = bulletinRepository.findBulletins(queryBuilder.build());
3041 // perform the query and generate the results - iterating in reverse order since we are
3042 // getting the most recent results by ordering by timestamp desc above. this gets the
3043 // exact results we want but in reverse order
3044 final List<BulletinEntity> bulletinEntities = new ArrayList<>();
3045 for (final ListIterator<Bulletin> bulletinIter = results.listIterator(results.size()); bulletinIter.hasPrevious(); ) {
3046 final Bulletin bulletin = bulletinIter.previous();
3047 bulletinEntities.add(entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), authorizeBulletin(bulletin)));
3050 // create the bulletin board
3051 final BulletinBoardDTO bulletinBoard = new BulletinBoardDTO();
3052 bulletinBoard.setBulletins(bulletinEntities);
3053 bulletinBoard.setGenerated(new Date());
3054 return bulletinBoard;
3058 public SystemDiagnosticsDTO getSystemDiagnostics() {
3059 final SystemDiagnostics sysDiagnostics = controllerFacade.getSystemDiagnostics();
3060 return dtoFactory.createSystemDiagnosticsDto(sysDiagnostics);
3064 public List<ResourceDTO> getResources() {
3065 final List<Resource> resources = controllerFacade.getResources();
3066 final List<ResourceDTO> resourceDtos = new ArrayList<>(resources.size());
3067 for (final Resource resource : resources) {
3068 resourceDtos.add(dtoFactory.createResourceDto(resource));
3070 return resourceDtos;
3074 public void discoverCompatibleBundles(VersionedProcessGroup versionedGroup) {
3075 BundleUtils.discoverCompatibleBundles(controllerFacade.getExtensionManager(), versionedGroup);
3079 public BundleCoordinate getCompatibleBundle(String type, BundleDTO bundleDTO) {
3080 return BundleUtils.getCompatibleBundle(controllerFacade.getExtensionManager(), type, bundleDTO);
3084 public ConfigurableComponent getTempComponent(String classType, BundleCoordinate bundleCoordinate) {
3085 return controllerFacade.getExtensionManager().getTempComponent(classType, bundleCoordinate);
3089 * Ensures the specified user has permission to access the specified port. This method does
3090 * not utilize the DataTransferAuthorizable as that will enforce the entire chain is
3091 * authorized for the transfer. This method is only invoked when obtaining the site to site
3092 * details so the entire chain isn't necessary.
3094 private boolean isUserAuthorized(final NiFiUser user, final RootGroupPort port) {
3095 final boolean isSiteToSiteSecure = Boolean.TRUE.equals(properties.isSiteToSiteSecure());
3097 // if site to site is not secure, allow all users
3098 if (!isSiteToSiteSecure) {
3102 final Map<String, String> userContext;
3103 if (user.getClientAddress() != null && !user.getClientAddress().trim().isEmpty()) {
3104 userContext = new HashMap<>();
3105 userContext.put(UserContextKeys.CLIENT_ADDRESS.name(), user.getClientAddress());
3110 final AuthorizationRequest request = new AuthorizationRequest.Builder()
3111 .resource(ResourceFactory.getDataTransferResource(port.getResource()))
3112 .identity(user.getIdentity())
3113 .groups(user.getGroups())
3114 .anonymous(user.isAnonymous())
3115 .accessAttempt(false)
3116 .action(RequestAction.WRITE)
3117 .userContext(userContext)
3118 .explanationSupplier(() -> "Unable to retrieve port details.")
3121 final AuthorizationResult result = authorizer.authorize(request);
3122 return Result.Approved.equals(result.getResult());
3126 public ControllerDTO getSiteToSiteDetails() {
3127 final NiFiUser user = NiFiUserUtils.getNiFiUser();
3129 throw new WebApplicationException(new Throwable("Unable to access details for current user."));
3132 // serialize the input ports this NiFi has access to
3133 final Set<PortDTO> inputPortDtos = new LinkedHashSet<>();
3134 final Set<RootGroupPort> inputPorts = controllerFacade.getInputPorts();
3135 for (final RootGroupPort inputPort : inputPorts) {
3136 if (isUserAuthorized(user, inputPort)) {
3137 final PortDTO dto = new PortDTO();
3138 dto.setId(inputPort.getIdentifier());
3139 dto.setName(inputPort.getName());
3140 dto.setComments(inputPort.getComments());
3141 dto.setState(inputPort.getScheduledState().toString());
3142 inputPortDtos.add(dto);
3146 // serialize the output ports this NiFi has access to
3147 final Set<PortDTO> outputPortDtos = new LinkedHashSet<>();
3148 for (final RootGroupPort outputPort : controllerFacade.getOutputPorts()) {
3149 if (isUserAuthorized(user, outputPort)) {
3150 final PortDTO dto = new PortDTO();
3151 dto.setId(outputPort.getIdentifier());
3152 dto.setName(outputPort.getName());
3153 dto.setComments(outputPort.getComments());
3154 dto.setState(outputPort.getScheduledState().toString());
3155 outputPortDtos.add(dto);
3159 // get the root group
3160 final ProcessGroup rootGroup = processGroupDAO.getProcessGroup(controllerFacade.getRootGroupId());
3161 final ProcessGroupCounts counts = rootGroup.getCounts();
3163 // create the controller dto
3164 final ControllerDTO controllerDTO = new ControllerDTO();
3165 controllerDTO.setId(controllerFacade.getRootGroupId());
3166 controllerDTO.setInstanceId(controllerFacade.getInstanceId());
3167 controllerDTO.setName(controllerFacade.getName());
3168 controllerDTO.setComments(controllerFacade.getComments());
3169 controllerDTO.setInputPorts(inputPortDtos);
3170 controllerDTO.setOutputPorts(outputPortDtos);
3171 controllerDTO.setInputPortCount(inputPortDtos.size());
3172 controllerDTO.setOutputPortCount(outputPortDtos.size());
3173 controllerDTO.setRunningCount(counts.getRunningCount());
3174 controllerDTO.setStoppedCount(counts.getStoppedCount());
3175 controllerDTO.setInvalidCount(counts.getInvalidCount());
3176 controllerDTO.setDisabledCount(counts.getDisabledCount());
3178 // determine the site to site configuration
3179 controllerDTO.setRemoteSiteListeningPort(controllerFacade.getRemoteSiteListeningPort());
3180 controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getRemoteSiteListeningHttpPort());
3181 controllerDTO.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure());
3183 return controllerDTO;
3187 public ControllerConfigurationEntity getControllerConfiguration() {
3188 final Revision rev = revisionManager.getRevision(FlowController.class.getSimpleName());
3189 final ControllerConfigurationDTO dto = dtoFactory.createControllerConfigurationDto(controllerFacade);
3190 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerFacade);
3191 final RevisionDTO revision = dtoFactory.createRevisionDTO(rev);
3192 return entityFactory.createControllerConfigurationEntity(dto, revision, permissions);
3196 public ControllerBulletinsEntity getControllerBulletins() {
3197 final NiFiUser user = NiFiUserUtils.getNiFiUser();
3198 final ControllerBulletinsEntity controllerBulletinsEntity = new ControllerBulletinsEntity();
3200 final List<BulletinEntity> controllerBulletinEntities = new ArrayList<>();
3202 final Authorizable controllerAuthorizable = authorizableLookup.getController();
3203 final boolean authorized = controllerAuthorizable.isAuthorized(authorizer, RequestAction.READ, user);
3204 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController());
3205 controllerBulletinEntities.addAll(bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, authorized)).collect(Collectors.toList()));
3207 // get the controller service bulletins
3208 final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build();
3209 final List<Bulletin> allControllerServiceBulletins = bulletinRepository.findBulletins(controllerServiceQuery);
3210 final List<BulletinEntity> controllerServiceBulletinEntities = new ArrayList<>();
3211 for (final Bulletin bulletin : allControllerServiceBulletins) {
3213 final Authorizable controllerServiceAuthorizable = authorizableLookup.getControllerService(bulletin.getSourceId()).getAuthorizable();
3214 final boolean controllerServiceAuthorized = controllerServiceAuthorizable.isAuthorized(authorizer, RequestAction.READ, user);
3216 final BulletinEntity controllerServiceBulletin = entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), controllerServiceAuthorized);
3217 controllerServiceBulletinEntities.add(controllerServiceBulletin);
3218 controllerBulletinEntities.add(controllerServiceBulletin);
3219 } catch (final ResourceNotFoundException e) {
3220 // controller service missing.. skip
3223 controllerBulletinsEntity.setControllerServiceBulletins(controllerServiceBulletinEntities);
3225 // get the reporting task bulletins
3226 final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build();
3227 final List<Bulletin> allReportingTaskBulletins = bulletinRepository.findBulletins(reportingTaskQuery);
3228 final List<BulletinEntity> reportingTaskBulletinEntities = new ArrayList<>();
3229 for (final Bulletin bulletin : allReportingTaskBulletins) {
3231 final Authorizable reportingTaskAuthorizable = authorizableLookup.getReportingTask(bulletin.getSourceId()).getAuthorizable();
3232 final boolean reportingTaskAuthorizableAuthorized = reportingTaskAuthorizable.isAuthorized(authorizer, RequestAction.READ, user);
3234 final BulletinEntity reportingTaskBulletin = entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), reportingTaskAuthorizableAuthorized);
3235 reportingTaskBulletinEntities.add(reportingTaskBulletin);
3236 controllerBulletinEntities.add(reportingTaskBulletin);
3237 } catch (final ResourceNotFoundException e) {
3238 // reporting task missing.. skip
3241 controllerBulletinsEntity.setReportingTaskBulletins(reportingTaskBulletinEntities);
3243 controllerBulletinsEntity.setBulletins(pruneAndSortBulletins(controllerBulletinEntities, BulletinRepository.MAX_BULLETINS_FOR_CONTROLLER));
3244 return controllerBulletinsEntity;
3248 public FlowConfigurationEntity getFlowConfiguration() {
3249 final FlowConfigurationDTO dto = dtoFactory.createFlowConfigurationDto(properties.getAutoRefreshInterval(),
3250 properties.getDefaultBackPressureObjectThreshold(), properties.getDefaultBackPressureDataSizeThreshold(),properties.getDcaeDistributorApiHostname());
3251 final FlowConfigurationEntity entity = new FlowConfigurationEntity();
3252 entity.setFlowConfiguration(dto);
3257 public AccessPolicyEntity getAccessPolicy(final String accessPolicyId) {
3258 final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(accessPolicyId);
3259 return createAccessPolicyEntity(accessPolicy);
3263 public AccessPolicyEntity getAccessPolicy(final RequestAction requestAction, final String resource) {
3264 Authorizable authorizable;
3266 authorizable = authorizableLookup.getAuthorizableFromResource(resource);
3267 } catch (final ResourceNotFoundException e) {
3268 // unable to find the underlying authorizable... user authorized based on top level /policies... create
3269 // an anonymous authorizable to attempt to locate an existing policy for this resource
3270 authorizable = new Authorizable() {
3272 public Authorizable getParentAuthorizable() {
3277 public Resource getResource() {
3278 return new Resource() {
3280 public String getIdentifier() {
3285 public String getName() {
3290 public String getSafeDescription() {
3291 return "Policy " + resource;
3298 final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(requestAction, authorizable);
3299 return createAccessPolicyEntity(accessPolicy);
3302 private AccessPolicyEntity createAccessPolicyEntity(final AccessPolicy accessPolicy) {
3303 final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(accessPolicy.getIdentifier()));
3304 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicy.getIdentifier()));
3305 final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource());
3306 return entityFactory.createAccessPolicyEntity(
3307 dtoFactory.createAccessPolicyDto(accessPolicy,
3308 accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()),
3309 accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()), componentReference),
3310 revision, permissions);
3314 public UserEntity getUser(final String userId) {
3315 final User user = userDAO.getUser(userId);
3316 return createUserEntity(user, true);
3320 public Set<UserEntity> getUsers() {
3321 final Set<User> users = userDAO.getUsers();
3322 return users.stream()
3323 .map(user -> createUserEntity(user, false))
3324 .collect(Collectors.toSet());
3327 private UserEntity createUserEntity(final User user, final boolean enforceUserExistence) {
3328 final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(user.getIdentifier()));
3329 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
3330 final Set<TenantEntity> userGroups = userGroupDAO.getUserGroupsForUser(user.getIdentifier()).stream()
3331 .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(enforceUserExistence)).collect(Collectors.toSet());
3332 final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUser(user.getIdentifier()).stream()
3333 .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
3334 return entityFactory.createUserEntity(dtoFactory.createUserDto(user, userGroups, policyEntities), userRevision, permissions);
3337 private UserGroupEntity createUserGroupEntity(final Group userGroup, final boolean enforceGroupExistence) {
3338 final RevisionDTO userGroupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userGroup.getIdentifier()));
3339 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
3340 final Set<TenantEntity> users = userGroup.getUsers().stream().map(mapUserIdToTenantEntity(enforceGroupExistence)).collect(Collectors.toSet());
3341 final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(userGroup.getIdentifier()).stream()
3342 .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
3343 return entityFactory.createUserGroupEntity(dtoFactory.createUserGroupDto(userGroup, users, policyEntities), userGroupRevision, permissions);
3347 public UserGroupEntity getUserGroup(final String userGroupId) {
3348 final Group userGroup = userGroupDAO.getUserGroup(userGroupId);
3349 return createUserGroupEntity(userGroup, true);
3353 public Set<UserGroupEntity> getUserGroups() {
3354 final Set<Group> userGroups = userGroupDAO.getUserGroups();
3355 return userGroups.stream()
3356 .map(userGroup -> createUserGroupEntity(userGroup, false))
3357 .collect(Collectors.toSet());
3360 private LabelEntity createLabelEntity(final Label label) {
3361 final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(label.getIdentifier()));
3362 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label);
3363 return entityFactory.createLabelEntity(dtoFactory.createLabelDto(label), revision, permissions);
3367 public Set<LabelEntity> getLabels(final String groupId) {
3368 final Set<Label> labels = labelDAO.getLabels(groupId);
3369 return labels.stream()
3370 .map(label -> createLabelEntity(label))
3371 .collect(Collectors.toSet());
3375 public LabelEntity getLabel(final String labelId) {
3376 final Label label = labelDAO.getLabel(labelId);
3377 return createLabelEntity(label);
3380 private FunnelEntity createFunnelEntity(final Funnel funnel) {
3381 final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(funnel.getIdentifier()));
3382 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnel);
3383 return entityFactory.createFunnelEntity(dtoFactory.createFunnelDto(funnel), revision, permissions);
3387 public Set<FunnelEntity> getFunnels(final String groupId) {
3388 final Set<Funnel> funnels = funnelDAO.getFunnels(groupId);
3389 return funnels.stream()
3390 .map(funnel -> createFunnelEntity(funnel))
3391 .collect(Collectors.toSet());
3395 public FunnelEntity getFunnel(final String funnelId) {
3396 final Funnel funnel = funnelDAO.getFunnel(funnelId);
3397 return createFunnelEntity(funnel);
3400 private PortEntity createInputPortEntity(final Port port) {
3401 final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier()));
3402 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port, NiFiUserUtils.getNiFiUser());
3403 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port), NiFiUserUtils.getNiFiUser());
3404 final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(port.getIdentifier()));
3405 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
3406 final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
3407 return entityFactory.createPortEntity(dtoFactory.createPortDto(port), revision, permissions, operatePermissions, status, bulletinEntities);
3410 private PortEntity createOutputPortEntity(final Port port) {
3411 final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier()));
3412 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port, NiFiUserUtils.getNiFiUser());
3413 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port), NiFiUserUtils.getNiFiUser());
3414 final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(port.getIdentifier()));
3415 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
3416 final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
3417 return entityFactory.createPortEntity(dtoFactory.createPortDto(port), revision, permissions, operatePermissions, status, bulletinEntities);
3421 public Set<PortEntity> getInputPorts(final String groupId) {
3422 final Set<Port> inputPorts = inputPortDAO.getPorts(groupId);
3423 return inputPorts.stream()
3424 .map(port -> createInputPortEntity(port))
3425 .collect(Collectors.toSet());
3429 public Set<PortEntity> getOutputPorts(final String groupId) {
3430 final Set<Port> ports = outputPortDAO.getPorts(groupId);
3431 return ports.stream()
3432 .map(port -> createOutputPortEntity(port))
3433 .collect(Collectors.toSet());
3436 private ProcessGroupEntity createProcessGroupEntity(final ProcessGroup group) {
3437 final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(group.getIdentifier()));
3438 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(group);
3439 final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(group.getIdentifier()));
3440 final List<BulletinEntity> bulletins = getProcessGroupBulletins(group);
3441 return entityFactory.createProcessGroupEntity(dtoFactory.createProcessGroupDto(group), revision, permissions, status, bulletins);
3444 private List<BulletinEntity> getProcessGroupBulletins(final ProcessGroup group) {
3445 final List<Bulletin> bulletins = new ArrayList<>(bulletinRepository.findBulletinsForGroupBySource(group.getIdentifier()));
3447 for (final ProcessGroup descendantGroup : group.findAllProcessGroups()) {
3448 bulletins.addAll(bulletinRepository.findBulletinsForGroupBySource(descendantGroup.getIdentifier()));
3451 List<BulletinEntity> bulletinEntities = new ArrayList<>();
3452 for (final Bulletin bulletin : bulletins) {
3453 bulletinEntities.add(entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), authorizeBulletin(bulletin)));
3456 return pruneAndSortBulletins(bulletinEntities, BulletinRepository.MAX_BULLETINS_PER_COMPONENT);
3459 private List<BulletinEntity> pruneAndSortBulletins(final List<BulletinEntity> bulletinEntities, final int maxBulletins) {
3460 // sort the bulletins
3461 Collections.sort(bulletinEntities, new Comparator<BulletinEntity>() {
3463 public int compare(BulletinEntity o1, BulletinEntity o2) {
3464 if (o1 == null && o2 == null) {
3474 return -Long.compare(o1.getId(), o2.getId());
3478 // prune the response to only include the max number of bulletins
3479 if (bulletinEntities.size() > maxBulletins) {
3480 return bulletinEntities.subList(0, maxBulletins);
3482 return bulletinEntities;
3487 public Set<ProcessGroupEntity> getProcessGroups(final String parentGroupId) {
3488 final Set<ProcessGroup> groups = processGroupDAO.getProcessGroups(parentGroupId);
3489 return groups.stream()
3490 .map(group -> createProcessGroupEntity(group))
3491 .collect(Collectors.toSet());
3494 private RemoteProcessGroupEntity createRemoteGroupEntity(final RemoteProcessGroup rpg, final NiFiUser user) {
3495 final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(rpg.getIdentifier()));
3496 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(rpg, user);
3497 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(rpg), user);
3498 final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(rpg, controllerFacade.getRemoteProcessGroupStatus(rpg.getIdentifier()));
3499 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(rpg.getIdentifier()));
3500 final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
3501 return entityFactory.createRemoteProcessGroupEntity(dtoFactory.createRemoteProcessGroupDto(rpg), revision, permissions, operatePermissions, status, bulletinEntities);
3505 public Set<RemoteProcessGroupEntity> getRemoteProcessGroups(final String groupId) {
3506 final NiFiUser user = NiFiUserUtils.getNiFiUser();
3507 final Set<RemoteProcessGroup> rpgs = remoteProcessGroupDAO.getRemoteProcessGroups(groupId);
3508 return rpgs.stream()
3509 .map(rpg -> createRemoteGroupEntity(rpg, user))
3510 .collect(Collectors.toSet());
3514 public PortEntity getInputPort(final String inputPortId) {
3515 final Port port = inputPortDAO.getPort(inputPortId);
3516 return createInputPortEntity(port);
3520 public PortStatusEntity getInputPortStatus(final String inputPortId) {
3521 final Port inputPort = inputPortDAO.getPort(inputPortId);
3522 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(inputPort);
3523 final PortStatusDTO dto = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(inputPortId));
3524 return entityFactory.createPortStatusEntity(dto, permissions);
3528 public PortEntity getOutputPort(final String outputPortId) {
3529 final Port port = outputPortDAO.getPort(outputPortId);
3530 return createOutputPortEntity(port);
3534 public PortStatusEntity getOutputPortStatus(final String outputPortId) {
3535 final Port outputPort = outputPortDAO.getPort(outputPortId);
3536 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(outputPort);
3537 final PortStatusDTO dto = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(outputPortId));
3538 return entityFactory.createPortStatusEntity(dto, permissions);
3542 public RemoteProcessGroupEntity getRemoteProcessGroup(final String remoteProcessGroupId) {
3543 final RemoteProcessGroup rpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
3544 return createRemoteGroupEntity(rpg, NiFiUserUtils.getNiFiUser());
3548 public RemoteProcessGroupStatusEntity getRemoteProcessGroupStatus(final String id) {
3549 final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(id);
3550 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup);
3551 final RemoteProcessGroupStatusDTO dto = dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroup, controllerFacade.getRemoteProcessGroupStatus(id));
3552 return entityFactory.createRemoteProcessGroupStatusEntity(dto, permissions);
3556 public StatusHistoryEntity getRemoteProcessGroupStatusHistory(final String id) {
3557 final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(id);
3558 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup);
3559 final StatusHistoryDTO dto = controllerFacade.getRemoteProcessGroupStatusHistory(id);
3560 return entityFactory.createStatusHistoryEntity(dto, permissions);
3564 public CurrentUserEntity getCurrentUser() {
3565 final NiFiUser user = NiFiUserUtils.getNiFiUser();
3566 final CurrentUserEntity entity = new CurrentUserEntity();
3567 entity.setIdentity(user.getIdentity());
3568 entity.setAnonymous(user.isAnonymous());
3569 entity.setProvenancePermissions(dtoFactory.createPermissionsDto(authorizableLookup.getProvenance()));
3570 entity.setCountersPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getCounters()));
3571 entity.setTenantsPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getTenant()));
3572 entity.setControllerPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getController()));
3573 entity.setPoliciesPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getPolicies()));
3574 entity.setSystemPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getSystem()));
3575 entity.setCanVersionFlows(CollectionUtils.isNotEmpty(flowRegistryClient.getRegistryIdentifiers()));
3577 entity.setRestrictedComponentsPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getRestrictedComponents()));
3579 final Set<ComponentRestrictionPermissionDTO> componentRestrictionPermissions = new HashSet<>();
3580 Arrays.stream(RequiredPermission.values()).forEach(requiredPermission -> {
3581 final PermissionsDTO restrictionPermissions = dtoFactory.createPermissionsDto(authorizableLookup.getRestrictedComponents(requiredPermission));
3583 final RequiredPermissionDTO requiredPermissionDto = new RequiredPermissionDTO();
3584 requiredPermissionDto.setId(requiredPermission.getPermissionIdentifier());
3585 requiredPermissionDto.setLabel(requiredPermission.getPermissionLabel());
3587 final ComponentRestrictionPermissionDTO componentRestrictionPermissionDto = new ComponentRestrictionPermissionDTO();
3588 componentRestrictionPermissionDto.setRequiredPermission(requiredPermissionDto);
3589 componentRestrictionPermissionDto.setPermissions(restrictionPermissions);
3591 componentRestrictionPermissions.add(componentRestrictionPermissionDto);
3593 entity.setComponentRestrictionPermissions(componentRestrictionPermissions);
3599 public ProcessGroupFlowEntity getProcessGroupFlow(final String groupId) {
3600 final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
3602 // Get the Process Group Status but we only need a status depth of one because for any child process group,
3603 // we ignore the status of each individual components. I.e., if Process Group A has child Group B, and child Group B
3604 // has a Processor, we don't care about the individual stats of that Processor because the ProcessGroupFlowEntity
3605 // doesn't include that anyway. So we can avoid including the information in the status that is returned.
3606 final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId, 1);
3607 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
3608 return entityFactory.createProcessGroupFlowEntity(dtoFactory.createProcessGroupFlowDto(processGroup, groupStatus, revisionManager, this::getProcessGroupBulletins), permissions);
3612 public ProcessGroupEntity getProcessGroup(final String groupId) {
3613 final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
3614 return createProcessGroupEntity(processGroup);
3617 private ControllerServiceEntity createControllerServiceEntity(final ControllerServiceNode serviceNode, final Set<String> serviceIds) {
3618 final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(serviceNode);
3620 final ControllerServiceReference ref = serviceNode.getReferences();
3621 final ControllerServiceReferencingComponentsEntity referencingComponentsEntity = createControllerServiceReferencingComponentsEntity(ref, serviceIds);
3622 dto.setReferencingComponents(referencingComponentsEntity.getControllerServiceReferencingComponents());
3624 final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(serviceNode.getIdentifier()));
3625 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(serviceNode, NiFiUserUtils.getNiFiUser());
3626 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(serviceNode), NiFiUserUtils.getNiFiUser());
3627 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(serviceNode.getIdentifier()));
3628 final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
3629 return entityFactory.createControllerServiceEntity(dto, revision, permissions, operatePermissions, bulletinEntities);
3633 public VariableRegistryEntity getVariableRegistry(final String groupId, final boolean includeAncestorGroups) {
3634 final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
3635 if (processGroup == null) {
3636 throw new ResourceNotFoundException("Could not find group with ID " + groupId);
3639 return createVariableRegistryEntity(processGroup, includeAncestorGroups);
3642 private VariableRegistryEntity createVariableRegistryEntity(final ProcessGroup processGroup, final boolean includeAncestorGroups) {
3643 final VariableRegistryDTO registryDto = dtoFactory.createVariableRegistryDto(processGroup, revisionManager);
3644 final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier()));
3645 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
3647 if (includeAncestorGroups) {
3648 ProcessGroup parent = processGroup.getParent();
3649 while (parent != null) {
3650 final PermissionsDTO parentPerms = dtoFactory.createPermissionsDto(parent);
3651 if (Boolean.TRUE.equals(parentPerms.getCanRead())) {
3652 final VariableRegistryDTO parentRegistryDto = dtoFactory.createVariableRegistryDto(parent, revisionManager);
3653 final Set<VariableEntity> parentVariables = parentRegistryDto.getVariables();
3654 registryDto.getVariables().addAll(parentVariables);
3657 parent = parent.getParent();
3661 return entityFactory.createVariableRegistryEntity(registryDto, revision, permissions);
3665 public VariableRegistryEntity populateAffectedComponents(final VariableRegistryDTO variableRegistryDto) {
3666 final String groupId = variableRegistryDto.getProcessGroupId();
3667 final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
3668 if (processGroup == null) {
3669 throw new ResourceNotFoundException("Could not find group with ID " + groupId);
3672 final VariableRegistryDTO registryDto = dtoFactory.populateAffectedComponents(variableRegistryDto, processGroup, revisionManager);
3673 final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier()));
3674 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
3675 return entityFactory.createVariableRegistryEntity(registryDto, revision, permissions);
3679 public Set<ControllerServiceEntity> getControllerServices(final String groupId, final boolean includeAncestorGroups, final boolean includeDescendantGroups) {
3680 final Set<ControllerServiceNode> serviceNodes = controllerServiceDAO.getControllerServices(groupId, includeAncestorGroups, includeDescendantGroups);
3681 final Set<String> serviceIds = serviceNodes.stream().map(service -> service.getIdentifier()).collect(Collectors.toSet());
3683 return serviceNodes.stream()
3684 .map(serviceNode -> createControllerServiceEntity(serviceNode, serviceIds))
3685 .collect(Collectors.toSet());
3689 public ControllerServiceEntity getControllerService(final String controllerServiceId) {
3690 final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId);
3691 return createControllerServiceEntity(controllerService, Sets.newHashSet(controllerServiceId));
3695 public PropertyDescriptorDTO getControllerServicePropertyDescriptor(final String id, final String property) {
3696 final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(id);
3697 PropertyDescriptor descriptor = controllerService.getControllerServiceImplementation().getPropertyDescriptor(property);
3699 // return an invalid descriptor if the controller service doesn't support this property
3700 if (descriptor == null) {
3701 descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
3704 final String groupId = controllerService.getProcessGroup() == null ? null : controllerService.getProcessGroup().getIdentifier();
3705 return dtoFactory.createPropertyDescriptorDto(descriptor, groupId);
3709 public ControllerServiceReferencingComponentsEntity getControllerServiceReferencingComponents(final String controllerServiceId) {
3710 final ControllerServiceNode service = controllerServiceDAO.getControllerService(controllerServiceId);
3711 final ControllerServiceReference ref = service.getReferences();
3712 return createControllerServiceReferencingComponentsEntity(ref, Sets.newHashSet(controllerServiceId));
3715 private ReportingTaskEntity createReportingTaskEntity(final ReportingTaskNode reportingTask) {
3716 final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(reportingTask.getIdentifier()));
3717 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask);
3718 final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask));
3719 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier()));
3720 final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
3721 return entityFactory.createReportingTaskEntity(dtoFactory.createReportingTaskDto(reportingTask), revision, permissions, operatePermissions, bulletinEntities);
3725 public Set<ReportingTaskEntity> getReportingTasks() {
3726 final Set<ReportingTaskNode> reportingTasks = reportingTaskDAO.getReportingTasks();
3727 return reportingTasks.stream()
3728 .map(reportingTask -> createReportingTaskEntity(reportingTask))
3729 .collect(Collectors.toSet());
3733 public ReportingTaskEntity getReportingTask(final String reportingTaskId) {
3734 final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId);
3735 return createReportingTaskEntity(reportingTask);
3739 public PropertyDescriptorDTO getReportingTaskPropertyDescriptor(final String id, final String property) {
3740 final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(id);
3741 PropertyDescriptor descriptor = reportingTask.getReportingTask().getPropertyDescriptor(property);
3743 // return an invalid descriptor if the reporting task doesn't support this property
3744 if (descriptor == null) {
3745 descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
3748 return dtoFactory.createPropertyDescriptorDto(descriptor, null);
3752 public StatusHistoryEntity getProcessGroupStatusHistory(final String groupId) {
3753 final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
3754 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
3755 final StatusHistoryDTO dto = controllerFacade.getProcessGroupStatusHistory(groupId);
3756 return entityFactory.createStatusHistoryEntity(dto, permissions);
3760 public VersionControlComponentMappingEntity registerFlowWithFlowRegistry(final String groupId, final StartVersionControlRequestEntity requestEntity) {
3761 final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
3763 final VersionControlInformation currentVci = processGroup.getVersionControlInformation();
3764 final int expectedVersion = currentVci == null ? 1 : currentVci.getVersion() + 1;
3766 // Create a VersionedProcessGroup snapshot of the flow as it is currently.
3767 final InstantiatedVersionedProcessGroup versionedProcessGroup = createFlowSnapshot(groupId);
3769 final VersionedFlowDTO versionedFlowDto = requestEntity.getVersionedFlow();
3770 final String flowId = versionedFlowDto.getFlowId() == null ? UUID.randomUUID().toString() : versionedFlowDto.getFlowId();
3772 final VersionedFlow versionedFlow = new VersionedFlow();
3773 versionedFlow.setBucketIdentifier(versionedFlowDto.getBucketId());
3774 versionedFlow.setCreatedTimestamp(System.currentTimeMillis());
3775 versionedFlow.setDescription(versionedFlowDto.getDescription());
3776 versionedFlow.setModifiedTimestamp(versionedFlow.getCreatedTimestamp());
3777 versionedFlow.setName(versionedFlowDto.getFlowName());
3778 versionedFlow.setIdentifier(flowId);
3780 // Add the Versioned Flow and first snapshot to the Flow Registry
3781 final String registryId = requestEntity.getVersionedFlow().getRegistryId();
3782 final VersionedFlowSnapshot registeredSnapshot;
3783 final VersionedFlow registeredFlow;
3785 String action = "create the flow";
3787 // first, create the flow in the registry, if necessary
3788 if (versionedFlowDto.getFlowId() == null) {
3789 registeredFlow = registerVersionedFlow(registryId, versionedFlow);
3791 registeredFlow = getVersionedFlow(registryId, versionedFlowDto.getBucketId(), versionedFlowDto.getFlowId());
3794 action = "add the local flow to the Flow Registry as the first Snapshot";
3796 // add first snapshot to the flow in the registry
3797 registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, versionedFlowDto.getComments(), expectedVersion);
3798 } catch (final NiFiRegistryException e) {
3799 throw new IllegalArgumentException(e.getLocalizedMessage());
3800 } catch (final IOException ioe) {
3801 throw new IllegalStateException("Failed to communicate with Flow Registry when attempting to " + action);
3804 final Bucket bucket = registeredSnapshot.getBucket();
3805 final VersionedFlow flow = registeredSnapshot.getFlow();
3807 // Update the Process Group with the new VersionControlInformation. (Send this to all nodes).
3808 final VersionControlInformationDTO vci = new VersionControlInformationDTO();
3809 vci.setBucketId(bucket.getIdentifier());
3810 vci.setBucketName(bucket.getName());
3811 vci.setFlowId(flow.getIdentifier());
3812 vci.setFlowName(flow.getName());
3813 vci.setFlowDescription(flow.getDescription());
3814 vci.setGroupId(groupId);
3815 vci.setRegistryId(registryId);
3816 vci.setRegistryName(getFlowRegistryName(registryId));
3817 vci.setVersion(registeredSnapshot.getSnapshotMetadata().getVersion());
3818 vci.setState(VersionedFlowState.UP_TO_DATE.name());
3820 final Map<String, String> mapping = dtoFactory.createVersionControlComponentMappingDto(versionedProcessGroup);
3822 final Revision groupRevision = revisionManager.getRevision(groupId);
3823 final RevisionDTO groupRevisionDto = dtoFactory.createRevisionDTO(groupRevision);
3825 final VersionControlComponentMappingEntity entity = new VersionControlComponentMappingEntity();
3826 entity.setVersionControlInformation(vci);
3827 entity.setProcessGroupRevision(groupRevisionDto);
3828 entity.setVersionControlComponentMapping(mapping);
3833 public VersionedFlow deleteVersionedFlow(final String registryId, final String bucketId, final String flowId) {
3834 final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
3835 if (registry == null) {
3836 throw new IllegalArgumentException("No Flow Registry exists with ID " + registryId);
3840 return registry.deleteVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser());
3841 } catch (final IOException | NiFiRegistryException e) {
3842 throw new NiFiCoreException("Failed to remove flow from Flow Registry due to " + e.getMessage(), e);
3847 public VersionControlInformationEntity getVersionControlInformation(final String groupId) {
3848 final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
3849 final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
3850 if (versionControlInfo == null) {
3854 final VersionControlInformationDTO versionControlDto = dtoFactory.createVersionControlInformationDto(processGroup);
3855 final RevisionDTO groupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(groupId));
3856 return entityFactory.createVersionControlInformationEntity(versionControlDto, groupRevision);
3859 private InstantiatedVersionedProcessGroup createFlowSnapshot(final String processGroupId) {
3860 final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
3861 final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
3862 final InstantiatedVersionedProcessGroup versionedGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, false);
3863 return versionedGroup;
3867 public FlowComparisonEntity getLocalModifications(final String processGroupId) {
3868 final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
3869 final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
3870 if (versionControlInfo == null) {
3871 throw new IllegalStateException("Process Group with ID " + processGroupId + " is not under Version Control");
3874 final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryIdentifier());
3875 if (flowRegistry == null) {
3876 throw new IllegalStateException("Process Group with ID " + processGroupId + " is tracking to a flow in Flow Registry with ID " + versionControlInfo.getRegistryIdentifier()
3877 + " but cannot find a Flow Registry with that identifier");
3880 final VersionedFlowSnapshot versionedFlowSnapshot;
3882 versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(),
3883 versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), true, NiFiUserUtils.getNiFiUser());
3884 } catch (final IOException | NiFiRegistryException e) {
3885 throw new NiFiCoreException("Failed to retrieve flow with Flow Registry in order to calculate local differences due to " + e.getMessage(), e);
3888 final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
3889 final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, true);
3890 final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents();
3892 final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup);
3893 final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", registryGroup);
3895 final Set<String> ancestorServiceIds = getAncestorGroupServiceIds(processGroup);
3896 final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, new ConciseEvolvingDifferenceDescriptor());
3897 final FlowComparison flowComparison = flowComparator.compare();
3899 final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison);
3901 final FlowComparisonEntity entity = new FlowComparisonEntity();
3902 entity.setComponentDifferences(differenceDtos);
3906 private Set<String> getAncestorGroupServiceIds(final ProcessGroup group) {
3907 final Set<String> ancestorServiceIds;
3908 ProcessGroup parentGroup = group.getParent();
3910 if (parentGroup == null) {
3911 ancestorServiceIds = Collections.emptySet();
3913 ancestorServiceIds = parentGroup.getControllerServices(true).stream()
3915 // We want to map the Controller Service to its Versioned Component ID, if it has one.
3916 // If it does not have one, we want to generate it in the same way that our Flow Mapper does
3917 // because this allows us to find the Controller Service when doing a Flow Diff.
3918 final Optional<String> versionedId = cs.getVersionedComponentId();
3919 if (versionedId.isPresent()) {
3920 return versionedId.get();
3923 return UUID.nameUUIDFromBytes(cs.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString();
3925 .collect(Collectors.toSet());
3928 return ancestorServiceIds;
3932 public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) {
3933 final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
3934 if (registry == null) {
3935 throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
3939 return registry.registerVersionedFlow(flow, NiFiUserUtils.getNiFiUser());
3940 } catch (final IOException | NiFiRegistryException e) {
3941 throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e);
3945 private VersionedFlow getVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
3946 final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
3947 if (registry == null) {
3948 throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
3951 return registry.getVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser());
3955 public VersionedFlowSnapshot registerVersionedFlowSnapshot(final String registryId, final VersionedFlow flow,
3956 final VersionedProcessGroup snapshot, final String comments, final int expectedVersion) {
3957 final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
3958 if (registry == null) {
3959 throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
3963 return registry.registerVersionedFlowSnapshot(flow, snapshot, comments, expectedVersion, NiFiUserUtils.getNiFiUser());
3964 } catch (final IOException | NiFiRegistryException e) {
3965 throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e);
3970 public VersionControlInformationEntity setVersionControlInformation(final Revision revision, final String processGroupId,
3971 final VersionControlInformationDTO versionControlInfo, final Map<String, String> versionedComponentMapping) {
3973 final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
3975 final RevisionUpdate<VersionControlInformationDTO> snapshot = updateComponent(revision,
3977 () -> processGroupDAO.updateVersionControlInformation(versionControlInfo, versionedComponentMapping),
3978 processGroup -> dtoFactory.createVersionControlInformationDto(processGroup));
3980 return entityFactory.createVersionControlInformationEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()));
3984 public VersionControlInformationEntity deleteVersionControl(final Revision revision, final String processGroupId) {
3985 final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
3987 final RevisionUpdate<VersionControlInformationDTO> snapshot = updateComponent(revision,
3989 () -> processGroupDAO.disconnectVersionControl(processGroupId),
3990 processGroup -> dtoFactory.createVersionControlInformationDto(group));
3992 return entityFactory.createVersionControlInformationEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()));
3996 public void verifyCanUpdate(final String groupId, final VersionedFlowSnapshot proposedFlow, final boolean verifyConnectionRemoval, final boolean verifyNotDirty) {
3997 final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
3998 group.verifyCanUpdate(proposedFlow, verifyConnectionRemoval, verifyNotDirty);
4002 public void verifyCanSaveToFlowRegistry(final String groupId, final String registryId, final String bucketId, final String flowId) {
4003 final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
4004 group.verifyCanSaveToFlowRegistry(registryId, bucketId, flowId);
4008 public void verifyCanRevertLocalModifications(final String groupId, final VersionedFlowSnapshot versionedFlowSnapshot) {
4009 final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
4010 group.verifyCanRevertLocalModifications();
4012 // verify that the process group can be updated to the given snapshot. We do not verify that connections can
4013 // be removed, because the flow may still be running, and it only matters that the connections can be removed once the components
4014 // have been stopped.
4015 group.verifyCanUpdate(versionedFlowSnapshot, false, false);
4019 public Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(final String processGroupId, final VersionedFlowSnapshot updatedSnapshot) {
4020 final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
4022 final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
4023 final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, controllerFacade.getControllerServiceProvider(), flowRegistryClient, true);
4025 final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localContents);
4026 final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Versioned Flow", updatedSnapshot.getFlowContents());
4028 final Set<String> ancestorGroupServiceIds = getAncestorGroupServiceIds(group);
4029 final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, ancestorGroupServiceIds, new StaticDifferenceDescriptor());
4030 final FlowComparison comparison = flowComparator.compare();
4032 final Set<AffectedComponentEntity> affectedComponents = comparison.getDifferences().stream()
4033 .filter(difference -> difference.getDifferenceType() != DifferenceType.COMPONENT_ADDED) // components that are added are not components that will be affected in the local flow.
4034 .filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED)
4035 .filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
4036 .filter(FlowDifferenceFilters.FILTER_IGNORABLE_VERSIONED_FLOW_COORDINATE_CHANGES)
4037 .map(difference -> {
4038 final VersionedComponent localComponent = difference.getComponentA();
4041 switch (localComponent.getComponentType()) {
4042 case CONTROLLER_SERVICE:
4043 final String serviceId = ((InstantiatedVersionedControllerService) localComponent).getInstanceId();
4044 state = controllerServiceDAO.getControllerService(serviceId).getState().name();
4047 final String processorId = ((InstantiatedVersionedProcessor) localComponent).getInstanceId();
4048 state = processorDAO.getProcessor(processorId).getPhysicalScheduledState().name();
4050 case REMOTE_INPUT_PORT:
4051 final InstantiatedVersionedRemoteGroupPort inputPort = (InstantiatedVersionedRemoteGroupPort) localComponent;
4052 state = remoteProcessGroupDAO.getRemoteProcessGroup(inputPort.getInstanceGroupId()).getInputPort(inputPort.getInstanceId()).getScheduledState().name();
4054 case REMOTE_OUTPUT_PORT:
4055 final InstantiatedVersionedRemoteGroupPort outputPort = (InstantiatedVersionedRemoteGroupPort) localComponent;
4056 state = remoteProcessGroupDAO.getRemoteProcessGroup(outputPort.getInstanceGroupId()).getOutputPort(outputPort.getInstanceId()).getScheduledState().name();
4063 return createAffectedComponentEntity((InstantiatedVersionedComponent) localComponent, localComponent.getComponentType().name(), state);
4065 .collect(Collectors.toCollection(HashSet::new));
4067 for (final FlowDifference difference : comparison.getDifferences()) {
4068 // Ignore these as local differences for now because we can't do anything with it
4069 if (difference.getDifferenceType() == DifferenceType.BUNDLE_CHANGED) {
4073 // Ignore differences for adding remote ports
4074 if (FlowDifferenceFilters.isAddedOrRemovedRemotePort(difference)) {
4078 if (FlowDifferenceFilters.isIgnorableVersionedFlowCoordinateChange(difference)) {
4082 final VersionedComponent localComponent = difference.getComponentA();
4083 if (localComponent == null) {
4087 // If any Process Group is removed, consider all components below that Process Group as an affected component
4088 if (difference.getDifferenceType() == DifferenceType.COMPONENT_REMOVED && localComponent.getComponentType() == org.apache.nifi.registry.flow.ComponentType.PROCESS_GROUP) {
4089 final String localGroupId = ((InstantiatedVersionedProcessGroup) localComponent).getInstanceId();
4090 final ProcessGroup localGroup = processGroupDAO.getProcessGroup(localGroupId);
4092 localGroup.findAllProcessors().stream()
4093 .map(comp -> createAffectedComponentEntity(comp))
4094 .forEach(affectedComponents::add);
4095 localGroup.findAllFunnels().stream()
4096 .map(comp -> createAffectedComponentEntity(comp))
4097 .forEach(affectedComponents::add);
4098 localGroup.findAllInputPorts().stream()
4099 .map(comp -> createAffectedComponentEntity(comp))
4100 .forEach(affectedComponents::add);
4101 localGroup.findAllOutputPorts().stream()
4102 .map(comp -> createAffectedComponentEntity(comp))
4103 .forEach(affectedComponents::add);
4104 localGroup.findAllRemoteProcessGroups().stream()
4105 .flatMap(rpg -> Stream.concat(rpg.getInputPorts().stream(), rpg.getOutputPorts().stream()))
4106 .map(comp -> createAffectedComponentEntity(comp))
4107 .forEach(affectedComponents::add);
4108 localGroup.findAllControllerServices().stream()
4109 .map(comp -> createAffectedComponentEntity(comp))
4110 .forEach(affectedComponents::add);
4113 if (localComponent.getComponentType() == org.apache.nifi.registry.flow.ComponentType.CONTROLLER_SERVICE) {
4114 final String serviceId = ((InstantiatedVersionedControllerService) localComponent).getInstanceId();
4115 final ControllerServiceNode serviceNode = controllerServiceDAO.getControllerService(serviceId);
4117 final List<ControllerServiceNode> referencingServices = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
4118 for (final ControllerServiceNode referencingService : referencingServices) {
4119 affectedComponents.add(createAffectedComponentEntity(referencingService));
4122 final List<ProcessorNode> referencingProcessors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class);
4123 for (final ProcessorNode referencingProcessor : referencingProcessors) {
4124 affectedComponents.add(createAffectedComponentEntity(referencingProcessor));
4129 // Create a map of all connectable components by versioned component ID to the connectable component itself
4130 final Map<String, List<Connectable>> connectablesByVersionId = new HashMap<>();
4131 mapToConnectableId(group.findAllFunnels(), connectablesByVersionId);
4132 mapToConnectableId(group.findAllInputPorts(), connectablesByVersionId);
4133 mapToConnectableId(group.findAllOutputPorts(), connectablesByVersionId);
4134 mapToConnectableId(group.findAllProcessors(), connectablesByVersionId);
4136 final List<RemoteGroupPort> remotePorts = new ArrayList<>();
4137 for (final RemoteProcessGroup rpg : group.findAllRemoteProcessGroups()) {
4138 remotePorts.addAll(rpg.getInputPorts());
4139 remotePorts.addAll(rpg.getOutputPorts());
4141 mapToConnectableId(remotePorts, connectablesByVersionId);
4143 // If any connection is added or modified, we need to stop both the source (if it exists in the flow currently)
4144 // and the destination (if it exists in the flow currently).
4145 for (final FlowDifference difference : comparison.getDifferences()) {
4146 VersionedComponent component = difference.getComponentA();
4147 if (component == null) {
4148 component = difference.getComponentB();
4151 if (component.getComponentType() != org.apache.nifi.registry.flow.ComponentType.CONNECTION) {
4155 final VersionedConnection connection = (VersionedConnection) component;
4157 final String sourceVersionedId = connection.getSource().getId();
4158 final List<Connectable> sources = connectablesByVersionId.get(sourceVersionedId);
4159 if (sources != null) {
4160 for (final Connectable source : sources) {
4161 affectedComponents.add(createAffectedComponentEntity(source));
4165 final String destinationVersionId = connection.getDestination().getId();
4166 final List<Connectable> destinations = connectablesByVersionId.get(destinationVersionId);
4167 if (destinations != null) {
4168 for (final Connectable destination : destinations) {
4169 affectedComponents.add(createAffectedComponentEntity(destination));
4174 return affectedComponents;
4177 private void mapToConnectableId(final Collection<? extends Connectable> connectables, final Map<String, List<Connectable>> destination) {
4178 for (final Connectable connectable : connectables) {
4179 final Optional<String> versionedIdOption = connectable.getVersionedComponentId();
4181 // Determine the Versioned ID by using the ID that is assigned, if one is. Otherwise,
4182 // we will calculate the Versioned ID. This allows us to map connectables that currently are not under
4183 // version control. We have to do this so that if we are changing flow versions and have a component that is running and it does not exist
4184 // in the Versioned Flow, we still need to be able to create an AffectedComponentDTO for it.
4185 final String versionedId;
4186 if (versionedIdOption.isPresent()) {
4187 versionedId = versionedIdOption.get();
4189 versionedId = UUID.nameUUIDFromBytes(connectable.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString();
4192 final List<Connectable> byVersionedId = destination.computeIfAbsent(versionedId, key -> new ArrayList<>());
4193 byVersionedId.add(connectable);
4198 private AffectedComponentEntity createAffectedComponentEntity(final Connectable connectable) {
4199 final AffectedComponentEntity entity = new AffectedComponentEntity();
4200 entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(connectable.getIdentifier())));
4201 entity.setId(connectable.getIdentifier());
4203 final Authorizable authorizable = getAuthorizable(connectable);
4204 final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable);
4205 entity.setPermissions(permissionsDto);
4207 final AffectedComponentDTO dto = new AffectedComponentDTO();
4208 dto.setId(connectable.getIdentifier());
4209 dto.setReferenceType(connectable.getConnectableType().name());
4210 dto.setState(connectable.getScheduledState().name());
4212 final String groupId = connectable instanceof RemoteGroupPort ? ((RemoteGroupPort) connectable).getRemoteProcessGroup().getIdentifier() : connectable.getProcessGroupIdentifier();
4213 dto.setProcessGroupId(groupId);
4215 entity.setComponent(dto);
4219 private AffectedComponentEntity createAffectedComponentEntity(final ControllerServiceNode serviceNode) {
4220 final AffectedComponentEntity entity = new AffectedComponentEntity();
4221 entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(serviceNode.getIdentifier())));
4222 entity.setId(serviceNode.getIdentifier());
4224 final Authorizable authorizable = authorizableLookup.getControllerService(serviceNode.getIdentifier()).getAuthorizable();
4225 final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable);
4226 entity.setPermissions(permissionsDto);
4228 final AffectedComponentDTO dto = new AffectedComponentDTO();
4229 dto.setId(serviceNode.getIdentifier());
4230 dto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
4231 dto.setProcessGroupId(serviceNode.getProcessGroupIdentifier());
4232 dto.setState(serviceNode.getState().name());
4234 entity.setComponent(dto);
4238 private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedVersionedComponent instance, final String componentTypeName, final String componentState) {
4239 final AffectedComponentEntity entity = new AffectedComponentEntity();
4240 entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(instance.getInstanceId())));
4241 entity.setId(instance.getInstanceId());
4243 final Authorizable authorizable = getAuthorizable(componentTypeName, instance);
4244 final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable);
4245 entity.setPermissions(permissionsDto);
4247 final AffectedComponentDTO dto = new AffectedComponentDTO();
4248 dto.setId(instance.getInstanceId());
4249 dto.setReferenceType(componentTypeName);
4250 dto.setProcessGroupId(instance.getInstanceGroupId());
4251 dto.setState(componentState);
4253 entity.setComponent(dto);
4258 private Authorizable getAuthorizable(final Connectable connectable) {
4259 switch (connectable.getConnectableType()) {
4260 case REMOTE_INPUT_PORT:
4261 case REMOTE_OUTPUT_PORT:
4262 final String rpgId = ((RemoteGroupPort) connectable).getRemoteProcessGroup().getIdentifier();
4263 return authorizableLookup.getRemoteProcessGroup(rpgId);
4265 return authorizableLookup.getLocalConnectable(connectable.getIdentifier());
4269 private Authorizable getAuthorizable(final String componentTypeName, final InstantiatedVersionedComponent versionedComponent) {
4270 final String componentId = versionedComponent.getInstanceId();
4272 if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.CONTROLLER_SERVICE.name())) {
4273 return authorizableLookup.getControllerService(componentId).getAuthorizable();
4276 if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.CONNECTION.name())) {
4277 return authorizableLookup.getConnection(componentId).getAuthorizable();
4280 if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.FUNNEL.name())) {
4281 return authorizableLookup.getFunnel(componentId);
4284 if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.INPUT_PORT.name())) {
4285 return authorizableLookup.getInputPort(componentId);
4288 if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.OUTPUT_PORT.name())) {
4289 return authorizableLookup.getOutputPort(componentId);
4292 if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.LABEL.name())) {
4293 return authorizableLookup.getLabel(componentId);
4296 if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.PROCESS_GROUP.name())) {
4297 return authorizableLookup.getProcessGroup(componentId).getAuthorizable();
4300 if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.PROCESSOR.name())) {
4301 return authorizableLookup.getProcessor(componentId).getAuthorizable();
4304 if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_INPUT_PORT.name())) {
4305 return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId());
4308 if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_OUTPUT_PORT.name())) {
4309 return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId());
4312 if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_PROCESS_GROUP.name())) {
4313 return authorizableLookup.getRemoteProcessGroup(componentId);
4320 public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo, final boolean fetchRemoteFlows) {
4321 final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryId());
4322 if (flowRegistry == null) {
4323 throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + versionControlInfo.getRegistryId());
4326 final VersionedFlowSnapshot snapshot;
4328 snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion(), fetchRemoteFlows, NiFiUserUtils.getNiFiUser());
4329 } catch (final NiFiRegistryException | IOException e) {
4330 logger.error(e.getMessage(), e);
4331 throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket "
4332 + versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion());
4339 public String getFlowRegistryName(final String flowRegistryId) {
4340 final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(flowRegistryId);
4341 return flowRegistry == null ? flowRegistryId : flowRegistry.getName();
4344 private List<Revision> getComponentRevisions(final ProcessGroup processGroup, final boolean includeGroupRevision) {
4345 final List<Revision> revisions = new ArrayList<>();
4346 if (includeGroupRevision) {
4347 revisions.add(revisionManager.getRevision(processGroup.getIdentifier()));
4350 processGroup.findAllConnections().stream()
4351 .map(component -> revisionManager.getRevision(component.getIdentifier()))
4352 .forEach(revisions::add);
4353 processGroup.findAllControllerServices().stream()
4354 .map(component -> revisionManager.getRevision(component.getIdentifier()))
4355 .forEach(revisions::add);
4356 processGroup.findAllFunnels().stream()
4357 .map(component -> revisionManager.getRevision(component.getIdentifier()))
4358 .forEach(revisions::add);
4359 processGroup.findAllInputPorts().stream()
4360 .map(component -> revisionManager.getRevision(component.getIdentifier()))
4361 .forEach(revisions::add);
4362 processGroup.findAllOutputPorts().stream()
4363 .map(component -> revisionManager.getRevision(component.getIdentifier()))
4364 .forEach(revisions::add);
4365 processGroup.findAllLabels().stream()
4366 .map(component -> revisionManager.getRevision(component.getIdentifier()))
4367 .forEach(revisions::add);
4368 processGroup.findAllProcessGroups().stream()
4369 .map(component -> revisionManager.getRevision(component.getIdentifier()))
4370 .forEach(revisions::add);
4371 processGroup.findAllProcessors().stream()
4372 .map(component -> revisionManager.getRevision(component.getIdentifier()))
4373 .forEach(revisions::add);
4374 processGroup.findAllRemoteProcessGroups().stream()
4375 .map(component -> revisionManager.getRevision(component.getIdentifier()))
4376 .forEach(revisions::add);
4377 processGroup.findAllRemoteProcessGroups().stream()
4378 .flatMap(rpg -> Stream.concat(rpg.getInputPorts().stream(), rpg.getOutputPorts().stream()))
4379 .map(component -> revisionManager.getRevision(component.getIdentifier()))
4380 .forEach(revisions::add);
4386 public ProcessGroupEntity updateProcessGroupContents(final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo,
4387 final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings, final boolean updateDescendantVersionedFlows) {
4389 final NiFiUser user = NiFiUserUtils.getNiFiUser();
4391 final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
4392 final List<Revision> revisions = getComponentRevisions(processGroup, false);
4393 revisions.add(revision);
4395 final RevisionClaim revisionClaim = new StandardRevisionClaim(revisions);
4397 final RevisionUpdate<ProcessGroupDTO> revisionUpdate = revisionManager.updateRevision(revisionClaim, user, new UpdateRevisionTask<ProcessGroupDTO>() {
4399 public RevisionUpdate<ProcessGroupDTO> update() {
4400 // update the Process Group
4401 processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows);
4403 // update the revisions
4404 final Set<Revision> updatedRevisions = revisions.stream()
4405 .map(rev -> revisionManager.getRevision(rev.getComponentId()).incrementRevision(revision.getClientId()))
4406 .collect(Collectors.toSet());
4409 controllerFacade.save();
4411 // gather details for response
4412 final ProcessGroupDTO dto = dtoFactory.createProcessGroupDto(processGroup);
4414 final Revision updatedRevision = revisionManager.getRevision(groupId).incrementRevision(revision.getClientId());
4415 final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity());
4416 return new StandardRevisionUpdate<>(dto, lastModification, updatedRevisions);
4420 final FlowModification lastModification = revisionUpdate.getLastModification();
4422 final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
4423 final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(lastModification);
4424 final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroup.getIdentifier()));
4425 final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroup.getIdentifier()));
4426 final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
4427 return entityFactory.createProcessGroupEntity(revisionUpdate.getComponent(), updatedRevision, permissions, status, bulletinEntities);
4430 private AuthorizationResult authorizeAction(final Action action) {
4431 final String sourceId = action.getSourceId();
4432 final Component type = action.getSourceType();
4434 Authorizable authorizable;
4438 authorizable = authorizableLookup.getProcessor(sourceId).getAuthorizable();
4441 authorizable = authorizableLookup.getReportingTask(sourceId).getAuthorizable();
4443 case ControllerService:
4444 authorizable = authorizableLookup.getControllerService(sourceId).getAuthorizable();
4447 authorizable = controllerFacade;
4450 authorizable = authorizableLookup.getInputPort(sourceId);
4453 authorizable = authorizableLookup.getOutputPort(sourceId);
4456 authorizable = authorizableLookup.getProcessGroup(sourceId).getAuthorizable();
4458 case RemoteProcessGroup:
4459 authorizable = authorizableLookup.getRemoteProcessGroup(sourceId);
4462 authorizable = authorizableLookup.getFunnel(sourceId);
4465 authorizable = authorizableLookup.getConnection(sourceId).getAuthorizable();
4468 authorizable = authorizableLookup.getAccessPolicyById(sourceId);
4472 authorizable = authorizableLookup.getTenant();
4475 throw new WebApplicationException(Response.serverError().entity("An unexpected type of component is the source of this action.").build());
4477 } catch (final ResourceNotFoundException e) {
4478 // if the underlying component is gone, use the controller to see if permissions should be allowed
4479 authorizable = controllerFacade;
4482 // perform the authorization
4483 return authorizable.checkAuthorization(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
4487 public HistoryDTO getActions(final HistoryQueryDTO historyQueryDto) {
4488 // extract the query criteria
4489 final HistoryQuery historyQuery = new HistoryQuery();
4490 historyQuery.setStartDate(historyQueryDto.getStartDate());
4491 historyQuery.setEndDate(historyQueryDto.getEndDate());
4492 historyQuery.setSourceId(historyQueryDto.getSourceId());
4493 historyQuery.setUserIdentity(historyQueryDto.getUserIdentity());
4494 historyQuery.setOffset(historyQueryDto.getOffset());
4495 historyQuery.setCount(historyQueryDto.getCount());
4496 historyQuery.setSortColumn(historyQueryDto.getSortColumn());
4497 historyQuery.setSortOrder(historyQueryDto.getSortOrder());
4499 // perform the query
4500 final History history = auditService.getActions(historyQuery);
4502 // only retain authorized actions
4503 final HistoryDTO historyDto = dtoFactory.createHistoryDto(history);
4504 if (history.getActions() != null) {
4505 final List<ActionEntity> actionEntities = new ArrayList<>();
4506 for (final Action action : history.getActions()) {
4507 final AuthorizationResult result = authorizeAction(action);
4508 actionEntities.add(entityFactory.createActionEntity(dtoFactory.createActionDto(action), Result.Approved.equals(result.getResult())));
4510 historyDto.setActions(actionEntities);
4513 // create the response
4518 public ActionEntity getAction(final Integer actionId) {
4520 final Action action = auditService.getAction(actionId);
4522 // ensure the action was found
4523 if (action == null) {
4524 throw new ResourceNotFoundException(String.format("Unable to find action with id '%s'.", actionId));
4527 final AuthorizationResult result = authorizeAction(action);
4528 final boolean authorized = Result.Approved.equals(result.getResult());
4530 throw new AccessDeniedException(result.getExplanation());
4533 // return the action
4534 return entityFactory.createActionEntity(dtoFactory.createActionDto(action), authorized);
4538 public ComponentHistoryDTO getComponentHistory(final String componentId) {
4539 final Map<String, PropertyHistoryDTO> propertyHistoryDtos = new LinkedHashMap<>();
4540 final Map<String, List<PreviousValue>> propertyHistory = auditService.getPreviousValues(componentId);
4542 for (final Map.Entry<String, List<PreviousValue>> entry : propertyHistory.entrySet()) {
4543 final List<PreviousValueDTO> previousValueDtos = new ArrayList<>();
4545 for (final PreviousValue previousValue : entry.getValue()) {
4546 final PreviousValueDTO dto = new PreviousValueDTO();
4547 dto.setPreviousValue(previousValue.getPreviousValue());
4548 dto.setTimestamp(previousValue.getTimestamp());
4549 dto.setUserIdentity(previousValue.getUserIdentity());
4550 previousValueDtos.add(dto);
4553 if (!previousValueDtos.isEmpty()) {
4554 final PropertyHistoryDTO propertyHistoryDto = new PropertyHistoryDTO();
4555 propertyHistoryDto.setPreviousValues(previousValueDtos);
4556 propertyHistoryDtos.put(entry.getKey(), propertyHistoryDto);
4560 final ComponentHistoryDTO history = new ComponentHistoryDTO();
4561 history.setComponentId(componentId);
4562 history.setPropertyHistory(propertyHistoryDtos);
4568 public ProcessorDiagnosticsEntity getProcessorDiagnostics(final String id) {
4569 final ProcessorNode processor = processorDAO.getProcessor(id);
4570 final ProcessorStatus processorStatus = controllerFacade.getProcessorStatus(id);
4572 // Generate Processor Diagnostics
4573 final NiFiUser user = NiFiUserUtils.getNiFiUser();
4574 final ProcessorDiagnosticsDTO dto = controllerFacade.getProcessorDiagnostics(processor, processorStatus, bulletinRepository, serviceId -> {
4575 final ControllerServiceNode serviceNode = controllerServiceDAO.getControllerService(serviceId);
4576 return createControllerServiceEntity(serviceNode, Collections.emptySet());
4579 // Filter anything out of diagnostics that the user is not authorized to see.
4580 final List<JVMDiagnosticsSnapshotDTO> jvmDiagnosticsSnaphots = new ArrayList<>();
4581 final JVMDiagnosticsDTO jvmDiagnostics = dto.getJvmDiagnostics();
4582 jvmDiagnosticsSnaphots.add(jvmDiagnostics.getAggregateSnapshot());
4584 // filter controller-related information
4585 final boolean canReadController = authorizableLookup.getController().isAuthorized(authorizer, RequestAction.READ, user);
4586 if (!canReadController) {
4587 for (final JVMDiagnosticsSnapshotDTO snapshot : jvmDiagnosticsSnaphots) {
4588 snapshot.setControllerDiagnostics(null);
4592 // filter system diagnostics information
4593 final boolean canReadSystem = authorizableLookup.getSystem().isAuthorized(authorizer, RequestAction.READ, user);
4594 if (!canReadSystem) {
4595 for (final JVMDiagnosticsSnapshotDTO snapshot : jvmDiagnosticsSnaphots) {
4596 snapshot.setSystemDiagnosticsDto(null);
4600 final boolean canReadFlow = authorizableLookup.getFlow().isAuthorized(authorizer, RequestAction.READ, user);
4602 for (final JVMDiagnosticsSnapshotDTO snapshot : jvmDiagnosticsSnaphots) {
4603 snapshot.setFlowDiagnosticsDto(null);
4607 // filter connections
4608 final Predicate<ConnectionDiagnosticsDTO> connectionAuthorized = connectionDiagnostics -> {
4609 final String connectionId = connectionDiagnostics.getConnection().getId();
4610 return authorizableLookup.getConnection(connectionId).getAuthorizable().isAuthorized(authorizer, RequestAction.READ, user);
4613 // Filter incoming connections by what user is authorized to READ
4614 final Set<ConnectionDiagnosticsDTO> incoming = dto.getIncomingConnections();
4615 final Set<ConnectionDiagnosticsDTO> filteredIncoming = incoming.stream()
4616 .filter(connectionAuthorized)
4617 .collect(Collectors.toSet());
4619 dto.setIncomingConnections(filteredIncoming);
4621 // Filter outgoing connections by what user is authorized to READ
4622 final Set<ConnectionDiagnosticsDTO> outgoing = dto.getOutgoingConnections();
4623 final Set<ConnectionDiagnosticsDTO> filteredOutgoing = outgoing.stream()
4624 .filter(connectionAuthorized)
4625 .collect(Collectors.toSet());
4626 dto.setOutgoingConnections(filteredOutgoing);
4628 // Filter out any controller services that are referenced by the Processor
4629 final Set<ControllerServiceDiagnosticsDTO> referencedServices = dto.getReferencedControllerServices();
4630 final Set<ControllerServiceDiagnosticsDTO> filteredReferencedServices = referencedServices.stream()
4631 .filter(csDiagnostics -> {
4632 final String csId = csDiagnostics.getControllerService().getId();
4633 return authorizableLookup.getControllerService(csId).getAuthorizable().isAuthorized(authorizer, RequestAction.READ, user);
4635 .map(csDiagnostics -> {
4636 // Filter out any referencing components because those are generally not relevant from this context.
4637 final ControllerServiceDTO serviceDto = csDiagnostics.getControllerService().getComponent();
4638 if (serviceDto != null) {
4639 serviceDto.setReferencingComponents(null);
4641 return csDiagnostics;
4643 .collect(Collectors.toSet());
4644 dto.setReferencedControllerServices(filteredReferencedServices);
4646 final Revision revision = revisionManager.getRevision(id);
4647 final RevisionDTO revisionDto = dtoFactory.createRevisionDTO(revision);
4648 final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(processor);
4649 final List<BulletinEntity> bulletins = bulletinRepository.findBulletinsForSource(id).stream()
4650 .map(bulletin -> dtoFactory.createBulletinDto(bulletin))
4651 .map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissionsDto.getCanRead()))
4652 .collect(Collectors.toList());
4654 final ProcessorStatusDTO processorStatusDto = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier()));
4655 return entityFactory.createProcessorDiagnosticsEntity(dto, revisionDto, permissionsDto, processorStatusDto, bulletins);
4659 public boolean isClustered() {
4660 return controllerFacade.isClustered();
4664 public String getNodeId() {
4665 final NodeIdentifier nodeId = controllerFacade.getNodeId();
4666 if (nodeId != null) {
4667 return nodeId.getId();
4674 public ClusterDTO getCluster() {
4675 // create cluster summary dto
4676 final ClusterDTO clusterDto = new ClusterDTO();
4679 clusterDto.setGenerated(new Date());
4682 final List<NodeDTO> nodeDtos = clusterCoordinator.getNodeIdentifiers().stream()
4683 .map(nodeId -> getNode(nodeId))
4684 .collect(Collectors.toList());
4685 clusterDto.setNodes(nodeDtos);
4691 public NodeDTO getNode(final String nodeId) {
4692 final NodeIdentifier nodeIdentifier = clusterCoordinator.getNodeIdentifier(nodeId);
4693 return getNode(nodeIdentifier);
4696 private NodeDTO getNode(final NodeIdentifier nodeId) {
4697 final NodeConnectionStatus nodeStatus = clusterCoordinator.getConnectionStatus(nodeId);
4698 final List<NodeEvent> events = clusterCoordinator.getNodeEvents(nodeId);
4699 final Set<String> roles = getRoles(nodeId);
4700 final NodeHeartbeat heartbeat = heartbeatMonitor.getLatestHeartbeat(nodeId);
4701 return dtoFactory.createNodeDTO(nodeId, nodeStatus, heartbeat, events, roles);
4704 private Set<String> getRoles(final NodeIdentifier nodeId) {
4705 final Set<String> roles = new HashSet<>();
4706 final String nodeAddress = nodeId.getSocketAddress() + ":" + nodeId.getSocketPort();
4708 for (final String roleName : ClusterRoles.getAllRoles()) {
4709 final String leader = leaderElectionManager.getLeader(roleName);
4710 if (leader == null) {
4714 if (leader.equals(nodeAddress)) {
4715 roles.add(roleName);
4723 public void deleteNode(final String nodeId) {
4724 final NiFiUser user = NiFiUserUtils.getNiFiUser();
4726 throw new WebApplicationException(new Throwable("Unable to access details for current user."));
4729 final String userDn = user.getIdentity();
4730 final NodeIdentifier nodeIdentifier = clusterCoordinator.getNodeIdentifier(nodeId);
4731 if (nodeIdentifier == null) {
4732 throw new UnknownNodeException("Cannot remove Node with ID " + nodeId + " because it is not part of the cluster");
4735 final NodeConnectionStatus nodeConnectionStatus = clusterCoordinator.getConnectionStatus(nodeIdentifier);
4736 if (!nodeConnectionStatus.getState().equals(NodeConnectionState.OFFLOADED) && !nodeConnectionStatus.getState().equals(NodeConnectionState.DISCONNECTED)) {
4737 throw new IllegalNodeDeletionException("Cannot remove Node with ID " + nodeId +
4738 " because it is not disconnected or offloaded, current state = " + nodeConnectionStatus.getState());
4741 clusterCoordinator.removeNode(nodeIdentifier, userDn);
4742 heartbeatMonitor.removeHeartbeat(nodeIdentifier);
4745 /* reusable function declarations for converting ids to tenant entities */
4746 private Function<String, TenantEntity> mapUserGroupIdToTenantEntity(final boolean enforceGroupExistence) {
4747 return userGroupId -> {
4748 final RevisionDTO userGroupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userGroupId));
4751 if (enforceGroupExistence || userGroupDAO.hasUserGroup(userGroupId)) {
4752 group = userGroupDAO.getUserGroup(userGroupId);
4754 group = new Group.Builder().identifier(userGroupId).name("Group ID - " + userGroupId + " (removed externally)").build();
4757 return entityFactory.createTenantEntity(dtoFactory.createTenantDTO(group), userGroupRevision,
4758 dtoFactory.createPermissionsDto(authorizableLookup.getTenant()));
4762 private Function<String, TenantEntity> mapUserIdToTenantEntity(final boolean enforceUserExistence) {
4764 final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userId));
4767 if (enforceUserExistence || userDAO.hasUser(userId)) {
4768 user = userDAO.getUser(userId);
4770 user = new User.Builder().identifier(userId).identity("User ID - " + userId + " (removed externally)").build();
4773 return entityFactory.createTenantEntity(dtoFactory.createTenantDTO(user), userRevision,
4774 dtoFactory.createPermissionsDto(authorizableLookup.getTenant()));
4780 public void setProperties(final NiFiProperties properties) {
4781 this.properties = properties;
4784 public void setControllerFacade(final ControllerFacade controllerFacade) {
4785 this.controllerFacade = controllerFacade;
4788 public void setRemoteProcessGroupDAO(final RemoteProcessGroupDAO remoteProcessGroupDAO) {
4789 this.remoteProcessGroupDAO = remoteProcessGroupDAO;
4792 public void setLabelDAO(final LabelDAO labelDAO) {
4793 this.labelDAO = labelDAO;
4796 public void setFunnelDAO(final FunnelDAO funnelDAO) {
4797 this.funnelDAO = funnelDAO;
4800 public void setSnippetDAO(final SnippetDAO snippetDAO) {
4801 this.snippetDAO = snippetDAO;
4804 public void setProcessorDAO(final ProcessorDAO processorDAO) {
4805 this.processorDAO = processorDAO;
4808 public void setConnectionDAO(final ConnectionDAO connectionDAO) {
4809 this.connectionDAO = connectionDAO;
4812 public void setAuditService(final AuditService auditService) {
4813 this.auditService = auditService;
4816 public void setRevisionManager(final RevisionManager revisionManager) {
4817 this.revisionManager = revisionManager;
4820 public void setDtoFactory(final DtoFactory dtoFactory) {
4821 this.dtoFactory = dtoFactory;
4824 public void setEntityFactory(final EntityFactory entityFactory) {
4825 this.entityFactory = entityFactory;
4828 public void setInputPortDAO(final PortDAO inputPortDAO) {
4829 this.inputPortDAO = inputPortDAO;
4832 public void setOutputPortDAO(final PortDAO outputPortDAO) {
4833 this.outputPortDAO = outputPortDAO;
4836 public void setProcessGroupDAO(final ProcessGroupDAO processGroupDAO) {
4837 this.processGroupDAO = processGroupDAO;
4840 public void setControllerServiceDAO(final ControllerServiceDAO controllerServiceDAO) {
4841 this.controllerServiceDAO = controllerServiceDAO;
4844 public void setReportingTaskDAO(final ReportingTaskDAO reportingTaskDAO) {
4845 this.reportingTaskDAO = reportingTaskDAO;
4848 public void setTemplateDAO(final TemplateDAO templateDAO) {
4849 this.templateDAO = templateDAO;
4852 public void setSnippetUtils(final SnippetUtils snippetUtils) {
4853 this.snippetUtils = snippetUtils;
4856 public void setAuthorizableLookup(final AuthorizableLookup authorizableLookup) {
4857 this.authorizableLookup = authorizableLookup;
4860 public void setAuthorizer(final Authorizer authorizer) {
4861 this.authorizer = authorizer;
4864 public void setUserDAO(final UserDAO userDAO) {
4865 this.userDAO = userDAO;
4868 public void setUserGroupDAO(final UserGroupDAO userGroupDAO) {
4869 this.userGroupDAO = userGroupDAO;
4872 public void setAccessPolicyDAO(final AccessPolicyDAO accessPolicyDAO) {
4873 this.accessPolicyDAO = accessPolicyDAO;
4876 public void setClusterCoordinator(final ClusterCoordinator coordinator) {
4877 this.clusterCoordinator = coordinator;
4880 public void setHeartbeatMonitor(final HeartbeatMonitor heartbeatMonitor) {
4881 this.heartbeatMonitor = heartbeatMonitor;
4884 public void setBulletinRepository(final BulletinRepository bulletinRepository) {
4885 this.bulletinRepository = bulletinRepository;
4888 public void setLeaderElectionManager(final LeaderElectionManager leaderElectionManager) {
4889 this.leaderElectionManager = leaderElectionManager;
4892 public void setRegistryDAO(RegistryDAO registryDao) {
4893 this.registryDAO = registryDao;
4896 public void setFlowRegistryClient(FlowRegistryClient flowRegistryClient) {
4897 this.flowRegistryClient = flowRegistryClient;