2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * Modifications to the original nifi code for the ONAP project are made
18 * available under the Apache License, Version 2.0
20 package org.apache.nifi.controller;
22 import org.apache.commons.lang3.builder.ToStringBuilder;
23 import org.apache.commons.lang3.builder.ToStringStyle;
24 import org.apache.nifi.authorization.Resource;
25 import org.apache.nifi.authorization.resource.Authorizable;
26 import org.apache.nifi.authorization.resource.ResourceFactory;
27 import org.apache.nifi.authorization.resource.ResourceType;
28 import org.apache.nifi.components.ValidationResult;
29 import org.apache.nifi.connectable.Connectable;
30 import org.apache.nifi.connectable.ConnectableType;
31 import org.apache.nifi.connectable.Connection;
32 import org.apache.nifi.connectable.Port;
33 import org.apache.nifi.connectable.Position;
34 import org.apache.nifi.groups.ProcessGroup;
35 import org.apache.nifi.processor.ProcessContext;
36 import org.apache.nifi.processor.ProcessSession;
37 import org.apache.nifi.processor.ProcessSessionFactory;
38 import org.apache.nifi.processor.Relationship;
39 import org.apache.nifi.processor.exception.ProcessException;
40 import org.apache.nifi.util.CharacterFilterUtils;
41 import org.apache.nifi.util.FormatUtils;
43 import java.util.ArrayList;
44 import java.util.Collection;
45 import java.util.Collections;
46 import java.util.HashSet;
47 import java.util.List;
48 import java.util.Optional;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.atomic.AtomicBoolean;
52 import java.util.concurrent.atomic.AtomicInteger;
53 import java.util.concurrent.atomic.AtomicLong;
54 import java.util.concurrent.atomic.AtomicReference;
55 import java.util.concurrent.locks.Lock;
56 import java.util.concurrent.locks.ReentrantReadWriteLock;
58 import static java.util.Objects.requireNonNull;
60 public abstract class AbstractPort implements Port {
62 public static final Relationship PORT_RELATIONSHIP = new Relationship.Builder()
63 .description("The relationship through which all Flow Files are transferred")
67 public static final long MINIMUM_PENALIZATION_MILLIS = 0L;
68 public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
70 public static final long MINIMUM_YIELD_MILLIS = 0L;
71 public static final long DEFAULT_YIELD_PERIOD = 10000L;
72 public static final TimeUnit DEFAULT_YIELD_TIME_UNIT = TimeUnit.MILLISECONDS;
74 private final List<Relationship> relationships;
76 private final String id;
77 private final ConnectableType type;
78 private final AtomicReference<String> name;
79 private final AtomicReference<Position> position;
80 private final AtomicReference<String> comments;
81 private final AtomicReference<ProcessGroup> processGroup;
82 private final AtomicBoolean lossTolerant;
83 private final AtomicReference<ScheduledState> scheduledState;
84 private final AtomicInteger concurrentTaskCount;
85 private final AtomicReference<String> penalizationPeriod;
86 private final AtomicReference<String> yieldPeriod;
87 private final AtomicReference<String> schedulingPeriod;
88 private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
89 private final AtomicLong schedulingNanos;
90 private final AtomicLong yieldExpiration;
91 private final ProcessScheduler processScheduler;
93 private final Set<Connection> outgoingConnections;
94 private final List<Connection> incomingConnections;
96 private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
97 private final Lock readLock = rwLock.readLock();
98 private final Lock writeLock = rwLock.writeLock();
100 public AbstractPort(final String id, final String name, final ProcessGroup processGroup, final ConnectableType type, final ProcessScheduler scheduler) {
101 this.id = requireNonNull(id);
102 this.name = new AtomicReference<>(requireNonNull(name));
103 position = new AtomicReference<>(new Position(0D, 0D));
104 outgoingConnections = new HashSet<>();
105 incomingConnections = new ArrayList<>();
106 comments = new AtomicReference<>();
107 lossTolerant = new AtomicBoolean(false);
108 concurrentTaskCount = new AtomicInteger(1);
109 processScheduler = scheduler;
111 final List<Relationship> relationshipList = new ArrayList<>();
112 relationshipList.add(PORT_RELATIONSHIP);
113 relationships = Collections.unmodifiableList(relationshipList);
114 this.processGroup = new AtomicReference<>(processGroup);
116 penalizationPeriod = new AtomicReference<>("30 sec");
117 yieldPeriod = new AtomicReference<>("1 sec");
118 yieldExpiration = new AtomicLong(0L);
119 schedulingPeriod = new AtomicReference<>("0 millis");
120 schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS);
121 scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
125 public String getIdentifier() {
130 public String getProcessGroupIdentifier() {
131 final ProcessGroup procGroup = getProcessGroup();
132 return procGroup == null ? null : procGroup.getIdentifier();
136 public String getName() {
141 public void setName(final String name) {
142 if (this.name.get().equals(name)) {
146 final ProcessGroup parentGroup = this.processGroup.get();
147 if (getConnectableType() == ConnectableType.INPUT_PORT) {
148 if (parentGroup.getInputPortByName(name) != null) {
149 throw new IllegalStateException("The requested new port name is not available");
151 } else if (getConnectableType() == ConnectableType.OUTPUT_PORT) {
152 if (parentGroup.getOutputPortByName(name) != null) {
153 throw new IllegalStateException("The requested new port name is not available");
161 public Authorizable getParentAuthorizable() {
162 return getProcessGroup();
166 public Resource getResource() {
167 final ResourceType resourceType = ConnectableType.INPUT_PORT.equals(getConnectableType()) ? ResourceType.InputPort : ResourceType.OutputPort;
168 return ResourceFactory.getComponentResource(resourceType, getIdentifier(), getName());
172 public ProcessGroup getProcessGroup() {
173 return processGroup.get();
177 public void setProcessGroup(final ProcessGroup newGroup) {
178 this.processGroup.set(newGroup);
182 public String getComments() {
183 return comments.get();
187 public void setComments(final String comments) {
188 this.comments.set(CharacterFilterUtils.filterInvalidXmlCharacters(comments));
192 public Collection<Relationship> getRelationships() {
193 return relationships;
197 public Relationship getRelationship(final String relationshipName) {
198 if (PORT_RELATIONSHIP.getName().equals(relationshipName)) {
199 return PORT_RELATIONSHIP;
205 public void addConnection(final Connection connection) throws IllegalArgumentException {
208 if (!requireNonNull(connection).getSource().equals(this)) {
209 if (connection.getDestination().equals(this)) {
210 // don't add the connection twice. This may occur if we have a self-loop because we will be told
211 // to add the connection once because we are the source and again because we are the destination.
212 if (!incomingConnections.contains(connection)) {
213 incomingConnections.add(connection);
218 throw new IllegalArgumentException("Cannot add a connection to a LocalPort for which the LocalPort is neither the Source nor the Destination");
222 /* TODO: Will commenting this out have repercussions?
223 Needed to comment this out to allow use of relationships for port to processor case which was previously not supported
224 for (final Relationship relationship : connection.getRelationships()) {
225 if (!relationship.equals(PORT_RELATIONSHIP)) {
226 throw new IllegalArgumentException("No relationship with name " + relationship + " exists for Local Ports");
231 // don't add the connection twice. This may occur if we have a self-loop because we will be told
232 // to add the connection once because we are the source and again because we are the destination.
233 if (!outgoingConnections.contains(connection)) {
234 outgoingConnections.add(connection);
242 public boolean hasIncomingConnection() {
245 return !incomingConnections.isEmpty();
252 public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
253 final ProcessSession session = sessionFactory.createSession();
256 onTrigger(context, session);
258 } catch (final ProcessException e) {
261 } catch (final Throwable t) {
263 throw new RuntimeException(t);
267 public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
270 public void updateConnection(final Connection connection) throws IllegalStateException {
271 if (requireNonNull(connection).getSource().equals(this)) {
274 if (!outgoingConnections.remove(connection)) {
275 throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port");
277 outgoingConnections.add(connection);
281 } else if (connection.getDestination().equals(this)) {
284 if (!incomingConnections.remove(connection)) {
285 throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port");
287 incomingConnections.add(connection);
292 throw new IllegalStateException("The given connection is not currently registered for this Port");
297 public void removeConnection(final Connection connection) throws IllegalArgumentException, IllegalStateException {
300 if (!requireNonNull(connection).getSource().equals(this)) {
301 final boolean existed = incomingConnections.remove(connection);
303 throw new IllegalStateException("The given connection is not currently registered for this Port");
308 if (!canConnectionBeRemoved(connection)) {
309 // TODO: Determine which processors will be broken if connection is removed, rather than just returning a boolean
310 throw new IllegalStateException("Connection " + connection.getIdentifier() + " cannot be removed");
313 final boolean removed = outgoingConnections.remove(connection);
315 throw new IllegalStateException("Connection " + connection.getIdentifier() + " is not registered with " + this.getIdentifier());
323 * Verify that removing this connection will not prevent this Port from
324 * still being connected via each relationship
326 * @param connection to test for removal
327 * @return true if can be removed
329 private boolean canConnectionBeRemoved(final Connection connection) {
330 final Connectable source = connection.getSource();
331 if (!source.isRunning()) {
332 // we don't have to verify that this Connectable is still connected because it's okay to make
333 // the source invalid since it is not running.
337 for (final Relationship relationship : source.getRelationships()) {
338 if (source.isAutoTerminated(relationship)) {
342 final Set<Connection> connectionsForRelationship = source.getConnections(relationship);
343 if (connectionsForRelationship == null || connectionsForRelationship.isEmpty()) {
352 public Set<Connection> getConnections() {
355 return Collections.unmodifiableSet(outgoingConnections);
362 public Set<Connection> getConnections(final Relationship relationship) {
365 if (relationship.equals(PORT_RELATIONSHIP)) {
366 return Collections.unmodifiableSet(outgoingConnections);
369 throw new IllegalArgumentException("No relationship with name " + relationship.getName() + " exists for Local Ports");
376 public Position getPosition() {
377 return position.get();
381 public void setPosition(final Position position) {
382 this.position.set(position);
386 public String toString() {
387 return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", getIdentifier()).toString();
391 public List<Connection> getIncomingConnections() {
394 return Collections.unmodifiableList(incomingConnections);
401 public abstract boolean isValid();
404 public boolean isAutoTerminated(final Relationship relationship) {
409 public boolean isLossTolerant() {
410 return lossTolerant.get();
414 public void setLossTolerant(boolean lossTolerant) {
415 this.lossTolerant.set(lossTolerant);
419 public void setMaxConcurrentTasks(final int taskCount) {
421 throw new IllegalArgumentException();
423 concurrentTaskCount.set(taskCount);
427 public int getMaxConcurrentTasks() {
428 return concurrentTaskCount.get();
432 public void shutdown() {
433 scheduledState.set(ScheduledState.STOPPED);
437 public void onSchedulingStart() {
438 scheduledState.set(ScheduledState.RUNNING);
441 public void disable() {
442 final boolean updated = scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED);
444 throw new IllegalStateException("Port cannot be disabled because it is not stopped");
448 public void enable() {
449 final boolean updated = scheduledState.compareAndSet(ScheduledState.DISABLED, ScheduledState.STOPPED);
451 throw new IllegalStateException("Port cannot be enabled because it is not disabled");
456 public boolean isRunning() {
457 return getScheduledState().equals(ScheduledState.RUNNING) || processScheduler.getActiveThreadCount(this) > 0;
461 public ScheduledState getScheduledState() {
462 return scheduledState.get();
466 public ConnectableType getConnectableType() {
471 public void setYieldPeriod(final String yieldPeriod) {
472 final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS);
473 if (yieldMillis < 0) {
474 throw new IllegalArgumentException("Yield duration must be positive");
476 this.yieldPeriod.set(yieldPeriod);
480 public void setScheduldingPeriod(final String schedulingPeriod) {
481 final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), TimeUnit.NANOSECONDS);
482 if (schedulingNanos < 0) {
483 throw new IllegalArgumentException("Scheduling Period must be positive");
486 this.schedulingPeriod.set(schedulingPeriod);
487 this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos));
491 public long getPenalizationPeriod(final TimeUnit timeUnit) {
492 return FormatUtils.getTimeDuration(getPenalizationPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit);
496 public String getPenalizationPeriod() {
497 return penalizationPeriod.get();
501 public void yield() {
502 final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS);
503 yield(yieldMillis, TimeUnit.MILLISECONDS);
507 public void yield(final long yieldDuration, final TimeUnit timeUnit) {
508 final long yieldMillis = timeUnit.toMillis(yieldDuration);
509 yieldExpiration.set(Math.max(yieldExpiration.get(), System.currentTimeMillis() + yieldMillis));
513 public long getYieldExpiration() {
514 return yieldExpiration.get();
518 public long getSchedulingPeriod(final TimeUnit timeUnit) {
519 return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS);
523 public String getSchedulingPeriod() {
524 return schedulingPeriod.get();
528 public void setPenalizationPeriod(final String penalizationPeriod) {
529 this.penalizationPeriod.set(penalizationPeriod);
533 public String getYieldPeriod() {
534 return yieldPeriod.get();
538 public long getYieldPeriod(final TimeUnit timeUnit) {
539 return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit);
543 public void verifyCanDelete() throws IllegalStateException {
544 verifyCanDelete(false);
548 public void verifyCanDelete(final boolean ignoreConnections) {
552 throw new IllegalStateException(this.getIdentifier() + " is running");
555 if (!ignoreConnections) {
556 for (final Connection connection : outgoingConnections) {
557 connection.verifyCanDelete();
560 for (final Connection connection : incomingConnections) {
561 if (connection.getSource().equals(this)) {
562 connection.verifyCanDelete();
564 throw new IllegalStateException(this.getIdentifier() + " is the destination of another component");
574 public void verifyCanStart() {
577 switch (scheduledState.get()) {
579 throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is disabled");
581 throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is already running");
585 verifyNoActiveThreads();
587 final Collection<ValidationResult> validationResults = getValidationErrors();
588 if (!validationResults.isEmpty()) {
589 throw new IllegalStateException(this.getIdentifier() + " is not in a valid state: " + validationResults.iterator().next().getExplanation());
597 public void verifyCanStop() {
598 if (getScheduledState() != ScheduledState.RUNNING) {
599 throw new IllegalStateException(this.getIdentifier() + " is not scheduled to run");
604 public void verifyCanUpdate() {
608 throw new IllegalStateException(this.getIdentifier() + " is not stopped");
616 public void verifyCanEnable() {
619 if (getScheduledState() != ScheduledState.DISABLED) {
620 throw new IllegalStateException(this.getIdentifier() + " is not disabled");
623 verifyNoActiveThreads();
630 public void verifyCanDisable() {
633 if (getScheduledState() != ScheduledState.STOPPED) {
634 throw new IllegalStateException(this.getIdentifier() + " is not stopped");
636 verifyNoActiveThreads();
642 private void verifyNoActiveThreads() throws IllegalStateException {
643 final int threadCount = processScheduler.getActiveThreadCount(this);
644 if (threadCount > 0) {
645 throw new IllegalStateException(this.getIdentifier() + " has " + threadCount + " threads still active");
650 public void verifyCanClearState() {
654 public Optional<String> getVersionedComponentId() {
655 return Optional.ofNullable(versionedComponentId.get());
659 public void setVersionedComponentId(final String versionedComponentId) {
660 boolean updated = false;
662 final String currentId = this.versionedComponentId.get();
664 if (currentId == null) {
665 updated = this.versionedComponentId.compareAndSet(null, versionedComponentId);
666 } else if (currentId.equals(versionedComponentId)) {
668 } else if (versionedComponentId == null) {
669 updated = this.versionedComponentId.compareAndSet(currentId, null);
671 throw new IllegalStateException(this + " is already under version control");