Add DCAE MOD design tool project
[dcaegen2/platform.git] / mod / designtool / designtool-web / src / main / java / org / apache / nifi / controller / AbstractPort.java
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * Modifications to the original nifi code for the ONAP project are made
18  * available under the Apache License, Version 2.0
19  */
20 package org.apache.nifi.controller;
21
22 import org.apache.commons.lang3.builder.ToStringBuilder;
23 import org.apache.commons.lang3.builder.ToStringStyle;
24 import org.apache.nifi.authorization.Resource;
25 import org.apache.nifi.authorization.resource.Authorizable;
26 import org.apache.nifi.authorization.resource.ResourceFactory;
27 import org.apache.nifi.authorization.resource.ResourceType;
28 import org.apache.nifi.components.ValidationResult;
29 import org.apache.nifi.connectable.Connectable;
30 import org.apache.nifi.connectable.ConnectableType;
31 import org.apache.nifi.connectable.Connection;
32 import org.apache.nifi.connectable.Port;
33 import org.apache.nifi.connectable.Position;
34 import org.apache.nifi.groups.ProcessGroup;
35 import org.apache.nifi.processor.ProcessContext;
36 import org.apache.nifi.processor.ProcessSession;
37 import org.apache.nifi.processor.ProcessSessionFactory;
38 import org.apache.nifi.processor.Relationship;
39 import org.apache.nifi.processor.exception.ProcessException;
40 import org.apache.nifi.util.CharacterFilterUtils;
41 import org.apache.nifi.util.FormatUtils;
42
43 import java.util.ArrayList;
44 import java.util.Collection;
45 import java.util.Collections;
46 import java.util.HashSet;
47 import java.util.List;
48 import java.util.Optional;
49 import java.util.Set;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.atomic.AtomicBoolean;
52 import java.util.concurrent.atomic.AtomicInteger;
53 import java.util.concurrent.atomic.AtomicLong;
54 import java.util.concurrent.atomic.AtomicReference;
55 import java.util.concurrent.locks.Lock;
56 import java.util.concurrent.locks.ReentrantReadWriteLock;
57
58 import static java.util.Objects.requireNonNull;
59
60 public abstract class AbstractPort implements Port {
61
62     public static final Relationship PORT_RELATIONSHIP = new Relationship.Builder()
63             .description("The relationship through which all Flow Files are transferred")
64             .name("")
65             .build();
66
67     public static final long MINIMUM_PENALIZATION_MILLIS = 0L;
68     public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
69
70     public static final long MINIMUM_YIELD_MILLIS = 0L;
71     public static final long DEFAULT_YIELD_PERIOD = 10000L;
72     public static final TimeUnit DEFAULT_YIELD_TIME_UNIT = TimeUnit.MILLISECONDS;
73
74     private final List<Relationship> relationships;
75
76     private final String id;
77     private final ConnectableType type;
78     private final AtomicReference<String> name;
79     private final AtomicReference<Position> position;
80     private final AtomicReference<String> comments;
81     private final AtomicReference<ProcessGroup> processGroup;
82     private final AtomicBoolean lossTolerant;
83     private final AtomicReference<ScheduledState> scheduledState;
84     private final AtomicInteger concurrentTaskCount;
85     private final AtomicReference<String> penalizationPeriod;
86     private final AtomicReference<String> yieldPeriod;
87     private final AtomicReference<String> schedulingPeriod;
88     private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
89     private final AtomicLong schedulingNanos;
90     private final AtomicLong yieldExpiration;
91     private final ProcessScheduler processScheduler;
92
93     private final Set<Connection> outgoingConnections;
94     private final List<Connection> incomingConnections;
95
96     private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
97     private final Lock readLock = rwLock.readLock();
98     private final Lock writeLock = rwLock.writeLock();
99
100     public AbstractPort(final String id, final String name, final ProcessGroup processGroup, final ConnectableType type, final ProcessScheduler scheduler) {
101         this.id = requireNonNull(id);
102         this.name = new AtomicReference<>(requireNonNull(name));
103         position = new AtomicReference<>(new Position(0D, 0D));
104         outgoingConnections = new HashSet<>();
105         incomingConnections = new ArrayList<>();
106         comments = new AtomicReference<>();
107         lossTolerant = new AtomicBoolean(false);
108         concurrentTaskCount = new AtomicInteger(1);
109         processScheduler = scheduler;
110
111         final List<Relationship> relationshipList = new ArrayList<>();
112         relationshipList.add(PORT_RELATIONSHIP);
113         relationships = Collections.unmodifiableList(relationshipList);
114         this.processGroup = new AtomicReference<>(processGroup);
115         this.type = type;
116         penalizationPeriod = new AtomicReference<>("30 sec");
117         yieldPeriod = new AtomicReference<>("1 sec");
118         yieldExpiration = new AtomicLong(0L);
119         schedulingPeriod = new AtomicReference<>("0 millis");
120         schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS);
121         scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
122     }
123
124     @Override
125     public String getIdentifier() {
126         return id;
127     }
128
129     @Override
130     public String getProcessGroupIdentifier() {
131         final ProcessGroup procGroup = getProcessGroup();
132         return procGroup == null ? null : procGroup.getIdentifier();
133     }
134
135     @Override
136     public String getName() {
137         return name.get();
138     }
139
140     @Override
141     public void setName(final String name) {
142         if (this.name.get().equals(name)) {
143             return;
144         }
145
146         final ProcessGroup parentGroup = this.processGroup.get();
147         if (getConnectableType() == ConnectableType.INPUT_PORT) {
148             if (parentGroup.getInputPortByName(name) != null) {
149                 throw new IllegalStateException("The requested new port name is not available");
150             }
151         } else if (getConnectableType() == ConnectableType.OUTPUT_PORT) {
152             if (parentGroup.getOutputPortByName(name) != null) {
153                 throw new IllegalStateException("The requested new port name is not available");
154             }
155         }
156
157         this.name.set(name);
158     }
159
160     @Override
161     public Authorizable getParentAuthorizable() {
162         return getProcessGroup();
163     }
164
165     @Override
166     public Resource getResource() {
167         final ResourceType resourceType = ConnectableType.INPUT_PORT.equals(getConnectableType()) ? ResourceType.InputPort : ResourceType.OutputPort;
168         return ResourceFactory.getComponentResource(resourceType, getIdentifier(), getName());
169     }
170
171     @Override
172     public ProcessGroup getProcessGroup() {
173         return processGroup.get();
174     }
175
176     @Override
177     public void setProcessGroup(final ProcessGroup newGroup) {
178         this.processGroup.set(newGroup);
179     }
180
181     @Override
182     public String getComments() {
183         return comments.get();
184     }
185
186     @Override
187     public void setComments(final String comments) {
188         this.comments.set(CharacterFilterUtils.filterInvalidXmlCharacters(comments));
189     }
190
191     @Override
192     public Collection<Relationship> getRelationships() {
193         return relationships;
194     }
195
196     @Override
197     public Relationship getRelationship(final String relationshipName) {
198         if (PORT_RELATIONSHIP.getName().equals(relationshipName)) {
199             return PORT_RELATIONSHIP;
200         }
201         return null;
202     }
203
204     @Override
205     public void addConnection(final Connection connection) throws IllegalArgumentException {
206         writeLock.lock();
207         try {
208             if (!requireNonNull(connection).getSource().equals(this)) {
209                 if (connection.getDestination().equals(this)) {
210                     // don't add the connection twice. This may occur if we have a self-loop because we will be told
211                     // to add the connection once because we are the source and again because we are the destination.
212                     if (!incomingConnections.contains(connection)) {
213                         incomingConnections.add(connection);
214                     }
215
216                     return;
217                 } else {
218                     throw new IllegalArgumentException("Cannot add a connection to a LocalPort for which the LocalPort is neither the Source nor the Destination");
219                 }
220             }
221
222             /* TODO: Will commenting this out have repercussions?
223             Needed to comment this out to allow use of relationships for port to processor case which was previously not supported
224             for (final Relationship relationship : connection.getRelationships()) {
225                 if (!relationship.equals(PORT_RELATIONSHIP)) {
226                     throw new IllegalArgumentException("No relationship with name " + relationship + " exists for Local Ports");
227                 }
228             }
229             */
230
231             // don't add the connection twice. This may occur if we have a self-loop because we will be told
232             // to add the connection once because we are the source and again because we are the destination.
233             if (!outgoingConnections.contains(connection)) {
234                 outgoingConnections.add(connection);
235             }
236         } finally {
237             writeLock.unlock();
238         }
239     }
240
241     @Override
242     public boolean hasIncomingConnection() {
243         readLock.lock();
244         try {
245             return !incomingConnections.isEmpty();
246         } finally {
247             readLock.unlock();
248         }
249     }
250
251     @Override
252     public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
253         final ProcessSession session = sessionFactory.createSession();
254
255         try {
256             onTrigger(context, session);
257             session.commit();
258         } catch (final ProcessException e) {
259             session.rollback();
260             throw e;
261         } catch (final Throwable t) {
262             session.rollback();
263             throw new RuntimeException(t);
264         }
265     }
266
267     public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
268
269     @Override
270     public void updateConnection(final Connection connection) throws IllegalStateException {
271         if (requireNonNull(connection).getSource().equals(this)) {
272             writeLock.lock();
273             try {
274                 if (!outgoingConnections.remove(connection)) {
275                     throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port");
276                 }
277                 outgoingConnections.add(connection);
278             } finally {
279                 writeLock.unlock();
280             }
281         } else if (connection.getDestination().equals(this)) {
282             writeLock.lock();
283             try {
284                 if (!incomingConnections.remove(connection)) {
285                     throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port");
286                 }
287                 incomingConnections.add(connection);
288             } finally {
289                 writeLock.unlock();
290             }
291         } else {
292             throw new IllegalStateException("The given connection is not currently registered for this Port");
293         }
294     }
295
296     @Override
297     public void removeConnection(final Connection connection) throws IllegalArgumentException, IllegalStateException {
298         writeLock.lock();
299         try {
300             if (!requireNonNull(connection).getSource().equals(this)) {
301                 final boolean existed = incomingConnections.remove(connection);
302                 if (!existed) {
303                     throw new IllegalStateException("The given connection is not currently registered for this Port");
304                 }
305                 return;
306             }
307
308             if (!canConnectionBeRemoved(connection)) {
309                 // TODO: Determine which processors will be broken if connection is removed, rather than just returning a boolean
310                 throw new IllegalStateException("Connection " + connection.getIdentifier() + " cannot be removed");
311             }
312
313             final boolean removed = outgoingConnections.remove(connection);
314             if (!removed) {
315                 throw new IllegalStateException("Connection " + connection.getIdentifier() + " is not registered with " + this.getIdentifier());
316             }
317         } finally {
318             writeLock.unlock();
319         }
320     }
321
322     /**
323      * Verify that removing this connection will not prevent this Port from
324      * still being connected via each relationship
325      *
326      * @param connection to test for removal
327      * @return true if can be removed
328      */
329     private boolean canConnectionBeRemoved(final Connection connection) {
330         final Connectable source = connection.getSource();
331         if (!source.isRunning()) {
332             // we don't have to verify that this Connectable is still connected because it's okay to make
333             // the source invalid since it is not running.
334             return true;
335         }
336
337         for (final Relationship relationship : source.getRelationships()) {
338             if (source.isAutoTerminated(relationship)) {
339                 continue;
340             }
341
342             final Set<Connection> connectionsForRelationship = source.getConnections(relationship);
343             if (connectionsForRelationship == null || connectionsForRelationship.isEmpty()) {
344                 return false;
345             }
346         }
347
348         return true;
349     }
350
351     @Override
352     public Set<Connection> getConnections() {
353         readLock.lock();
354         try {
355             return Collections.unmodifiableSet(outgoingConnections);
356         } finally {
357             readLock.unlock();
358         }
359     }
360
361     @Override
362     public Set<Connection> getConnections(final Relationship relationship) {
363         readLock.lock();
364         try {
365             if (relationship.equals(PORT_RELATIONSHIP)) {
366                 return Collections.unmodifiableSet(outgoingConnections);
367             }
368
369             throw new IllegalArgumentException("No relationship with name " + relationship.getName() + " exists for Local Ports");
370         } finally {
371             readLock.unlock();
372         }
373     }
374
375     @Override
376     public Position getPosition() {
377         return position.get();
378     }
379
380     @Override
381     public void setPosition(final Position position) {
382         this.position.set(position);
383     }
384
385     @Override
386     public String toString() {
387         return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", getIdentifier()).toString();
388     }
389
390     @Override
391     public List<Connection> getIncomingConnections() {
392         readLock.lock();
393         try {
394             return Collections.unmodifiableList(incomingConnections);
395         } finally {
396             readLock.unlock();
397         }
398     }
399
400     @Override
401     public abstract boolean isValid();
402
403     @Override
404     public boolean isAutoTerminated(final Relationship relationship) {
405         return false;
406     }
407
408     @Override
409     public boolean isLossTolerant() {
410         return lossTolerant.get();
411     }
412
413     @Override
414     public void setLossTolerant(boolean lossTolerant) {
415         this.lossTolerant.set(lossTolerant);
416     }
417
418     @Override
419     public void setMaxConcurrentTasks(final int taskCount) {
420         if (taskCount < 1) {
421             throw new IllegalArgumentException();
422         }
423         concurrentTaskCount.set(taskCount);
424     }
425
426     @Override
427     public int getMaxConcurrentTasks() {
428         return concurrentTaskCount.get();
429     }
430
431     @Override
432     public void shutdown() {
433         scheduledState.set(ScheduledState.STOPPED);
434     }
435
436     @Override
437     public void onSchedulingStart() {
438         scheduledState.set(ScheduledState.RUNNING);
439     }
440
441     public void disable() {
442         final boolean updated = scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED);
443         if (!updated) {
444             throw new IllegalStateException("Port cannot be disabled because it is not stopped");
445         }
446     }
447
448     public void enable() {
449         final boolean updated = scheduledState.compareAndSet(ScheduledState.DISABLED, ScheduledState.STOPPED);
450         if (!updated) {
451             throw new IllegalStateException("Port cannot be enabled because it is not disabled");
452         }
453     }
454
455     @Override
456     public boolean isRunning() {
457         return getScheduledState().equals(ScheduledState.RUNNING) || processScheduler.getActiveThreadCount(this) > 0;
458     }
459
460     @Override
461     public ScheduledState getScheduledState() {
462         return scheduledState.get();
463     }
464
465     @Override
466     public ConnectableType getConnectableType() {
467         return type;
468     }
469
470     @Override
471     public void setYieldPeriod(final String yieldPeriod) {
472         final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS);
473         if (yieldMillis < 0) {
474             throw new IllegalArgumentException("Yield duration must be positive");
475         }
476         this.yieldPeriod.set(yieldPeriod);
477     }
478
479     @Override
480     public void setScheduldingPeriod(final String schedulingPeriod) {
481         final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), TimeUnit.NANOSECONDS);
482         if (schedulingNanos < 0) {
483             throw new IllegalArgumentException("Scheduling Period must be positive");
484         }
485
486         this.schedulingPeriod.set(schedulingPeriod);
487         this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos));
488     }
489
490     @Override
491     public long getPenalizationPeriod(final TimeUnit timeUnit) {
492         return FormatUtils.getTimeDuration(getPenalizationPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit);
493     }
494
495     @Override
496     public String getPenalizationPeriod() {
497         return penalizationPeriod.get();
498     }
499
500     @Override
501     public void yield() {
502         final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS);
503         yield(yieldMillis, TimeUnit.MILLISECONDS);
504     }
505
506     @Override
507     public void yield(final long yieldDuration, final TimeUnit timeUnit) {
508         final long yieldMillis = timeUnit.toMillis(yieldDuration);
509         yieldExpiration.set(Math.max(yieldExpiration.get(), System.currentTimeMillis() + yieldMillis));
510     }
511
512     @Override
513     public long getYieldExpiration() {
514         return yieldExpiration.get();
515     }
516
517     @Override
518     public long getSchedulingPeriod(final TimeUnit timeUnit) {
519         return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS);
520     }
521
522     @Override
523     public String getSchedulingPeriod() {
524         return schedulingPeriod.get();
525     }
526
527     @Override
528     public void setPenalizationPeriod(final String penalizationPeriod) {
529         this.penalizationPeriod.set(penalizationPeriod);
530     }
531
532     @Override
533     public String getYieldPeriod() {
534         return yieldPeriod.get();
535     }
536
537     @Override
538     public long getYieldPeriod(final TimeUnit timeUnit) {
539         return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit);
540     }
541
542     @Override
543     public void verifyCanDelete() throws IllegalStateException {
544         verifyCanDelete(false);
545     }
546
547     @Override
548     public void verifyCanDelete(final boolean ignoreConnections) {
549         readLock.lock();
550         try {
551             if (isRunning()) {
552                 throw new IllegalStateException(this.getIdentifier() + " is running");
553             }
554
555             if (!ignoreConnections) {
556                 for (final Connection connection : outgoingConnections) {
557                     connection.verifyCanDelete();
558                 }
559
560                 for (final Connection connection : incomingConnections) {
561                     if (connection.getSource().equals(this)) {
562                         connection.verifyCanDelete();
563                     } else {
564                         throw new IllegalStateException(this.getIdentifier() + " is the destination of another component");
565                     }
566                 }
567             }
568         } finally {
569             readLock.unlock();
570         }
571     }
572
573     @Override
574     public void verifyCanStart() {
575         readLock.lock();
576         try {
577             switch (scheduledState.get()) {
578                 case DISABLED:
579                     throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is disabled");
580                 case RUNNING:
581                     throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is already running");
582                 case STOPPED:
583                     break;
584             }
585             verifyNoActiveThreads();
586
587             final Collection<ValidationResult> validationResults = getValidationErrors();
588             if (!validationResults.isEmpty()) {
589                 throw new IllegalStateException(this.getIdentifier() + " is not in a valid state: " + validationResults.iterator().next().getExplanation());
590             }
591         } finally {
592             readLock.unlock();
593         }
594     }
595
596     @Override
597     public void verifyCanStop() {
598         if (getScheduledState() != ScheduledState.RUNNING) {
599             throw new IllegalStateException(this.getIdentifier() + " is not scheduled to run");
600         }
601     }
602
603     @Override
604     public void verifyCanUpdate() {
605         readLock.lock();
606         try {
607             if (isRunning()) {
608                 throw new IllegalStateException(this.getIdentifier() + " is not stopped");
609             }
610         } finally {
611             readLock.unlock();
612         }
613     }
614
615     @Override
616     public void verifyCanEnable() {
617         readLock.lock();
618         try {
619             if (getScheduledState() != ScheduledState.DISABLED) {
620                 throw new IllegalStateException(this.getIdentifier() + " is not disabled");
621             }
622
623             verifyNoActiveThreads();
624         } finally {
625             readLock.unlock();
626         }
627     }
628
629     @Override
630     public void verifyCanDisable() {
631         readLock.lock();
632         try {
633             if (getScheduledState() != ScheduledState.STOPPED) {
634                 throw new IllegalStateException(this.getIdentifier() + " is not stopped");
635             }
636             verifyNoActiveThreads();
637         } finally {
638             readLock.unlock();
639         }
640     }
641
642     private void verifyNoActiveThreads() throws IllegalStateException {
643         final int threadCount = processScheduler.getActiveThreadCount(this);
644         if (threadCount > 0) {
645             throw new IllegalStateException(this.getIdentifier() + " has " + threadCount + " threads still active");
646         }
647     }
648
649     @Override
650     public void verifyCanClearState() {
651     }
652
653     @Override
654     public Optional<String> getVersionedComponentId() {
655         return Optional.ofNullable(versionedComponentId.get());
656     }
657
658     @Override
659     public void setVersionedComponentId(final String versionedComponentId) {
660         boolean updated = false;
661         while (!updated) {
662             final String currentId = this.versionedComponentId.get();
663
664             if (currentId == null) {
665                 updated = this.versionedComponentId.compareAndSet(null, versionedComponentId);
666             } else if (currentId.equals(versionedComponentId)) {
667                 return;
668             } else if (versionedComponentId == null) {
669                 updated = this.versionedComponentId.compareAndSet(currentId, null);
670             } else {
671                 throw new IllegalStateException(this + " is already under version control");
672             }
673         }
674     }
675 }