2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2019 Huawei Technologies Co., Ltd. All rights reserved.
7 * Modifications Copyright (C) 2019 Tech Mahindra
8 * Modifications Copyright (C) 2019 Bell Canada.
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
21 * ============LICENSE_END=========================================================
24 package org.onap.policy.controlloop.eventmanager;
26 import java.io.Serializable;
27 import java.time.Instant;
28 import java.util.Deque;
29 import java.util.LinkedHashMap;
30 import java.util.List;
32 import java.util.concurrent.CancellationException;
33 import java.util.concurrent.CompletableFuture;
34 import java.util.concurrent.ConcurrentLinkedDeque;
35 import java.util.concurrent.Executor;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicReference;
38 import java.util.stream.Collectors;
39 import lombok.AccessLevel;
41 import lombok.ToString;
42 import org.onap.policy.aai.AaiConstants;
43 import org.onap.policy.aai.AaiCqResponse;
44 import org.onap.policy.controlloop.ControlLoopOperation;
45 import org.onap.policy.controlloop.ControlLoopResponse;
46 import org.onap.policy.controlloop.VirtualControlLoopEvent;
47 import org.onap.policy.controlloop.actor.guard.GuardActor;
48 import org.onap.policy.controlloop.actor.sdnr.SdnrActor;
49 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
50 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
51 import org.onap.policy.controlloop.actorserviceprovider.TargetType;
52 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
53 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
54 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineUtil;
55 import org.onap.policy.drools.domain.models.operational.OperationalTarget;
56 import org.onap.policy.sdnr.PciMessage;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
61 * Manages a single Operation for a single event. Once this has been created,
62 * {@link #start()} should be invoked, and then {@link #nextStep()} should be invoked
63 * continually until it returns {@code false}, indicating that all steps have completed.
65 @ToString(onlyExplicitlyIncluded = true)
66 public class ControlLoopOperationManager2 implements Serializable {
67 private static final long serialVersionUID = -3773199283624595410L;
68 private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationManager2.class);
69 private static final String CL_TIMEOUT_ACTOR = "-CL-TIMEOUT-";
70 public static final String LOCK_ACTOR = "LOCK";
71 public static final String LOCK_OPERATION = "Lock";
72 private static final String GUARD_ACTOR = GuardActor.NAME;
73 public static final String VSERVER_VSERVER_NAME = "vserver.vserver-name";
74 public static final String GENERIC_VNF_VNF_NAME = "generic-vnf.vnf-name";
75 public static final String GENERIC_VNF_VNF_ID = "generic-vnf.vnf-id";
76 public static final String PNF_NAME = "pnf.pnf-name";
93 private final transient ManagerContext operContext;
94 private final transient ControlLoopEventContext eventContext;
95 private final org.onap.policy.drools.domain.models.operational.Operation policy;
99 private State state = State.ACTIVE;
102 private final String requestId;
105 private final String policyId;
108 * Bumped each time the "complete" callback is invoked by the Actor, provided it's for
112 private int attempts = 0;
114 private final Deque<Operation> operationHistory = new ConcurrentLinkedDeque<>();
117 * Set to {@code true} to prevent the last item in {@link #operationHistory} from
118 * being included in the outcome of {@link #getHistory()}. Used when the operation
119 * aborts prematurely due to lock-denied, guard-denied, etc.
121 private boolean holdLast = false;
124 * Queue of outcomes yet to be processed. Outcomes are added to this each time the
125 * "start" or "complete" callback is invoked.
127 @Getter(AccessLevel.PROTECTED)
128 private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();
131 * Used to cancel the running operation.
133 @Getter(AccessLevel.PROTECTED)
134 private transient CompletableFuture<OperationOutcome> future = null;
137 * Target entity. Determined after the lock is granted, though it may require the
138 * custom query to be performed first.
141 private String targetEntity;
143 @Getter(AccessLevel.PROTECTED)
144 private final transient ControlLoopOperationParams params;
145 private final transient PipelineUtil taskUtil;
148 private ControlLoopResponse controlLoopResponse;
151 * Time when the lock was first requested.
153 private transient AtomicReference<Instant> lockStart = new AtomicReference<>();
155 // values extracted from the policy
157 private final String actor;
159 private final String operation;
161 private final String targetStr;
162 private final OperationalTarget target;
166 * Construct an instance.
168 * @param operContext this operation's context
169 * @param context event context
170 * @param operation2 operation's policy
171 * @param executor executor for the Operation
173 public ControlLoopOperationManager2(ManagerContext operContext, ControlLoopEventContext context,
174 org.onap.policy.drools.domain.models.operational.Operation operation2, Executor executor) {
176 this.operContext = operContext;
177 this.eventContext = context;
178 this.policy = operation2;
179 this.requestId = context.getEvent().getRequestId().toString();
180 this.policyId = "" + operation2.getId();
181 this.actor = operation2.getActorOperation().getActor();
182 this.operation = operation2.getActorOperation().getOperation();
183 this.target = operation2.getActorOperation().getTarget();
185 String targetType = (target != null ? target.getTargetType() : null);
186 Map<String, String> entityIds = (target != null ? target.getEntityIds() : null);
189 this.targetStr = (target != null ? target.toString() : null);
192 params = ControlLoopOperationParams.builder()
193 .actorService(operContext.getActorService())
195 .operation(operation)
198 .targetType(TargetType.toTargetType(targetType))
199 .targetEntityIds(entityIds)
200 .startCallback(this::onStart)
201 .completeCallback(this::onComplete)
205 taskUtil = new PipelineUtil(params);
209 // Internal class used for tracking
213 private class Operation implements Serializable {
214 private static final long serialVersionUID = 1L;
217 private OperationResult policyResult;
218 private ControlLoopOperation clOperation;
219 private ControlLoopResponse clResponse;
222 * Constructs the object.
224 * @param outcome outcome of the operation
226 public Operation(OperationOutcome outcome) {
227 attempt = ControlLoopOperationManager2.this.attempts;
228 policyResult = outcome.getResult();
229 clOperation = outcome.toControlLoopOperation();
230 clOperation.setTarget(targetStr);
231 clResponse = makeControlLoopResponse(outcome);
233 if (outcome.getEnd() == null) {
234 clOperation.setOutcome("Started");
235 } else if (clOperation.getOutcome() == null) {
236 clOperation.setOutcome("");
242 * Start the operation, first acquiring any locks that are needed. This should not
243 * throw any exceptions, but will, instead, invoke the callbacks with exceptions.
245 * @param remainingMs time remaining, in milliseconds, for the control loop
247 @SuppressWarnings("unchecked")
248 public synchronized void start(long remainingMs) {
249 // this is synchronized while we update "future"
252 // provide a default, in case something fails before requestLock() is called
253 lockStart.set(Instant.now());
256 future = taskUtil.sequence(
259 this::startOperation);
262 // handle any exceptions that may be thrown, set timeout, and handle timeout
265 future.exceptionally(this::handleException)
266 .orTimeout(remainingMs, TimeUnit.MILLISECONDS)
267 .exceptionally(this::handleTimeout);
270 } catch (RuntimeException e) {
276 * Start the operation, after the lock has been acquired.
278 * @return CompletableFuture for the operation being started
280 private CompletableFuture<OperationOutcome> startOperation() {
282 ControlLoopOperationParams params2 = params.toBuilder()
283 .payload(new LinkedHashMap<>())
284 .retry(policy.getRetries())
285 .timeoutSec(policy.getTimeout())
286 .targetEntity(targetEntity)
290 if (policy.getActorOperation().getPayload() != null) {
291 params2.getPayload().putAll(policy.getActorOperation().getPayload());
294 return params2.start();
298 * Handles exceptions that may be generated.
300 * @param thrown exception that was generated
301 * @return {@code null}
303 private OperationOutcome handleException(Throwable thrown) { // NOSONAR
305 * disabling sonar about returning the same value because we prefer the code to be
306 * structured this way
309 if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
313 logger.warn("{}.{}: exception starting operation for {}", actor, operation, requestId, thrown);
314 OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
315 outcome.setStart(lockStart.get());
316 outcome.setEnd(Instant.now());
317 outcome.setFinalOutcome(true);
320 // this outcome is not used so just return "null"
325 * Handles control loop timeout exception.
327 * @param thrown exception that was generated
328 * @return {@code null}
330 private OperationOutcome handleTimeout(Throwable thrown) {
331 logger.warn("{}.{}: control loop timeout for {}", actor, operation, requestId, thrown);
333 OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
334 outcome.setActor(CL_TIMEOUT_ACTOR);
335 outcome.setOperation(null);
336 outcome.setStart(lockStart.get());
337 outcome.setEnd(Instant.now());
338 outcome.setFinalOutcome(true);
341 // cancel the operation, if it's still running
342 future.cancel(false);
344 // this outcome is not used so just return "null"
349 * Cancels the operation.
351 public void cancel() {
352 synchronized (this) {
353 if (future == null) {
358 future.cancel(false);
362 * Requests a lock on the {@link #targetEntity}.
364 * @return a future to await the lock
366 private CompletableFuture<OperationOutcome> requestLock() {
368 * Failures are handled via the callback, and successes are discarded by
369 * sequence(), without passing them to onComplete().
371 * Return a COPY of the future so that if we try to cancel it, we'll only cancel
372 * the copy, not the original. This is done by tacking thenApply() onto the end.
374 lockStart.set(Instant.now());
375 return operContext.requestLock(targetEntity, this::lockUnavailable).thenApply(outcome -> outcome);
379 * Indicates that the lock on the target entity is unavailable.
381 * @param outcome lock outcome
383 private void lockUnavailable(OperationOutcome outcome) {
385 // Note: NEVER invoke onStart() for locks; only invoke onComplete()
389 * Now that we've added the lock outcome to the queue, ensure the future is
390 * canceled, which may, itself, generate an operation outcome.
396 * Handles responses provided via the "start" callback. Note: this is never be invoked
397 * for locks; only {@link #onComplete(OperationOutcome)} is invoked for locks.
399 * @param outcome outcome provided to the callback
401 private void onStart(OperationOutcome outcome) {
402 if (outcome.isFor(actor, operation) || GUARD_ACTOR.equals(outcome.getActor())) {
408 * Handles responses provided via the "complete" callback. Note: this is never invoked
409 * for "successful" locks.
411 * @param outcome outcome provided to the callback
413 private void onComplete(OperationOutcome outcome) {
415 switch (outcome.getActor()) {
418 case CL_TIMEOUT_ACTOR:
423 if (outcome.isFor(actor, operation)) {
431 * Adds an outcome to {@link #outcomes}.
433 * @param outcome outcome to be added
435 private synchronized void addOutcome(OperationOutcome outcome) {
437 * This is synchronized to prevent nextStep() from invoking processOutcome() at
441 logger.debug("added outcome={} for {}", outcome, requestId);
442 outcomes.add(outcome);
444 if (outcomes.peekFirst() == outcomes.peekLast()) {
445 // this is the first outcome in the queue - process it
451 * Looks for the next step in the queue.
453 * @return {@code true} if more responses are expected, {@code false} otherwise
455 public synchronized boolean nextStep() {
460 case CONTROL_LOOP_TIMEOUT:
467 OperationOutcome outcome = outcomes.peek();
468 if (outcome == null) {
473 if (outcome.isFinalOutcome() && outcome.isFor(actor, operation)) {
474 controlLoopResponse = null;
478 // first item has been processed, remove it
480 if (!outcomes.isEmpty()) {
481 // have a new "first" item - process it
489 * Processes the first item in {@link #outcomes}. Sets the state, increments
490 * {@link #attempts}, if appropriate, and stores the operation history in the DB.
492 private synchronized void processOutcome() {
493 OperationOutcome outcome = outcomes.peek();
494 logger.debug("process outcome={} for {}", outcome, requestId);
496 controlLoopResponse = null;
498 switch (outcome.getActor()) {
500 case CL_TIMEOUT_ACTOR:
501 state = State.CONTROL_LOOP_TIMEOUT;
502 processAbort(outcome, OperationResult.FAILURE, "Control loop timed out");
506 // lock is no longer available
507 if (state == State.ACTIVE) {
508 state = State.LOCK_DENIED;
509 storeFailureInDataBase(outcome, OperationResult.FAILURE_GUARD, "Operation denied by Lock");
511 state = State.LOCK_LOST;
512 processAbort(outcome, OperationResult.FAILURE, "Operation aborted by Lock");
517 if (outcome.getEnd() == null) {
518 state = State.GUARD_STARTED;
519 } else if (outcome.getResult() == OperationResult.SUCCESS) {
520 state = State.GUARD_PERMITTED;
522 state = State.GUARD_DENIED;
523 storeFailureInDataBase(outcome, OperationResult.FAILURE_GUARD, "Operation denied by Guard");
528 if (outcome.getEnd() == null) {
531 state = State.OPERATION_STARTED;
535 * Operation completed. If the last entry was a "start" (i.e., "end" field
536 * is null), then replace it. Otherwise, just add the completion.
538 state = (outcome.getResult() == OperationResult.SUCCESS ? State.OPERATION_SUCCESS
539 : State.OPERATION_FAILURE);
540 controlLoopResponse = makeControlLoopResponse(outcome);
541 if (!operationHistory.isEmpty() && operationHistory.peekLast().getClOperation().getEnd() == null) {
542 operationHistory.removeLast();
546 operationHistory.add(new Operation(outcome));
547 storeOperationInDataBase();
551 // indicate that this has changed
552 operContext.updated(this);
556 * Processes an operation abort, updating the DB record, if an operation has been
559 * @param outcome operation outcome
560 * @param result result to put into the DB
561 * @param message message to put into the DB
563 private void processAbort(OperationOutcome outcome, OperationResult result, String message) {
564 if (operationHistory.isEmpty() || operationHistory.peekLast().getClOperation().getEnd() != null) {
565 // last item was not a "start" operation
567 // NOTE: do NOT generate control loop response since operation was not started
569 storeFailureInDataBase(outcome, result, message);
573 // last item was a "start" operation - replace it with a failure
574 final Operation operOrig = operationHistory.removeLast();
576 // use start time from the operation, itself
577 if (operOrig != null && operOrig.getClOperation() != null) {
578 outcome.setStart(operOrig.getClOperation().getStart());
581 controlLoopResponse = makeControlLoopResponse(outcome);
583 storeFailureInDataBase(outcome, result, message);
587 * Makes a control loop response.
589 * @param outcome operation outcome
590 * @return a new control loop response, or {@code null} if none is required
592 protected ControlLoopResponse makeControlLoopResponse(OperationOutcome outcome) {
594 // only generate response for certain actors.
595 if (outcome == null || !actor.equals(SdnrActor.NAME)) {
599 VirtualControlLoopEvent event = eventContext.getEvent();
601 ControlLoopResponse clRsp = new ControlLoopResponse();
602 clRsp.setFrom(actor);
603 clRsp.setTarget("DCAE");
604 clRsp.setClosedLoopControlName(event.getClosedLoopControlName());
605 clRsp.setPolicyName(event.getPolicyName());
606 clRsp.setPolicyVersion(event.getPolicyVersion());
607 clRsp.setRequestId(event.getRequestId());
608 clRsp.setVersion(event.getVersion());
610 PciMessage msg = outcome.getResponse();
611 if (msg != null && msg.getBody() != null && msg.getBody().getOutput() != null) {
612 clRsp.setPayload(msg.getBody().getOutput().getPayload());
619 * Get the operation, as a message.
621 * @return the operation, as a message
623 public String getOperationMessage() {
624 Operation last = operationHistory.peekLast();
625 return (last == null ? null : last.getClOperation().toMessage());
629 * Gets the operation result.
631 * @return the operation result
633 public OperationResult getOperationResult() {
634 Operation last = operationHistory.peekLast();
635 return (last == null ? OperationResult.FAILURE_EXCEPTION : last.getPolicyResult());
639 * Get the latest operation history.
641 * @return the latest operation history
643 public String getOperationHistory() {
644 Operation last = operationHistory.peekLast();
645 return (last == null ? null : last.clOperation.toHistory());
651 * @return the list of control loop operations
653 public List<ControlLoopOperation> getHistory() {
654 Operation last = (holdLast ? operationHistory.removeLast() : null);
656 List<ControlLoopOperation> result = operationHistory.stream().map(Operation::getClOperation)
657 .map(ControlLoopOperation::new).collect(Collectors.toList());
660 operationHistory.add(last);
667 * Stores a failure in the DB.
669 * @param outcome operation outcome
670 * @param result result to put into the DB
671 * @param message message to put into the DB
673 private void storeFailureInDataBase(OperationOutcome outcome, OperationResult result, String message) {
674 // don't include this in history yet
677 outcome.setActor(actor);
678 outcome.setOperation(operation);
679 outcome.setMessage(message);
680 outcome.setResult(result);
682 operationHistory.add(new Operation(outcome));
683 storeOperationInDataBase();
687 * Stores the latest operation in the DB.
689 private void storeOperationInDataBase() {
690 operContext.getDataManager().store(requestId, eventContext.getEvent(), targetEntity,
691 operationHistory.peekLast().getClOperation());
695 * Determines the target entity.
697 * @return a future to determine the target entity, or {@code null} if the entity has
698 * already been determined
700 protected CompletableFuture<OperationOutcome> detmTarget() {
701 if (target == null) {
702 throw new IllegalArgumentException("The target is null");
705 if (target.getTargetType() == null) {
706 throw new IllegalArgumentException("The target type is null");
709 switch (TargetType.toTargetType(target.getTargetType())) {
711 return detmPnfTarget();
715 return detmVfModuleTarget();
717 throw new IllegalArgumentException("The target type is not supported");
722 * Determines the PNF target entity.
724 * @return a future to determine the target entity, or {@code null} if the entity has
725 * already been determined
727 private CompletableFuture<OperationOutcome> detmPnfTarget() {
728 if (!PNF_NAME.equalsIgnoreCase(eventContext.getEvent().getTarget())) {
729 throw new IllegalArgumentException("Target does not match target type");
732 targetEntity = eventContext.getEnrichment().get(PNF_NAME);
733 if (targetEntity == null) {
734 throw new IllegalArgumentException("AAI section is missing " + PNF_NAME);
741 * Determines the VF Module target entity.
743 * @return a future to determine the target entity, or {@code null} if the entity has
744 * already been determined
746 private CompletableFuture<OperationOutcome> detmVfModuleTarget() {
747 String targetFieldName = eventContext.getEvent().getTarget();
748 if (targetFieldName == null) {
749 throw new IllegalArgumentException("Target is null");
752 switch (targetFieldName.toLowerCase()) {
753 case VSERVER_VSERVER_NAME:
754 targetEntity = eventContext.getEnrichment().get(VSERVER_VSERVER_NAME);
756 case GENERIC_VNF_VNF_ID:
757 targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
759 case GENERIC_VNF_VNF_NAME:
760 return detmVnfName();
762 throw new IllegalArgumentException("Target does not match target type");
765 if (targetEntity == null) {
766 throw new IllegalArgumentException("Enrichment data is missing " + targetFieldName);
773 * Determines the VNF Name target entity.
775 * @return a future to determine the target entity, or {@code null} if the entity has
776 * already been determined
778 @SuppressWarnings("unchecked")
779 private CompletableFuture<OperationOutcome> detmVnfName() {
780 // if the onset is enriched with the vnf-id, we don't need an A&AI response
781 targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
782 if (targetEntity != null) {
786 // vnf-id was not in the onset - obtain it via the custom query
789 ControlLoopOperationParams cqparams = params.toBuilder()
790 .actor(AaiConstants.ACTOR_NAME)
791 .operation(AaiCqResponse.OPERATION)
796 // perform custom query and then extract the VNF ID from it
797 return taskUtil.sequence(() -> eventContext.obtain(AaiCqResponse.CONTEXT_KEY, cqparams),
798 this::extractVnfFromCq);
802 * Extracts the VNF Name target entity from the custom query data.
804 * @return {@code null}
806 private CompletableFuture<OperationOutcome> extractVnfFromCq() {
807 // already have the CQ data
808 AaiCqResponse cq = eventContext.getProperty(AaiCqResponse.CONTEXT_KEY);
809 if (cq.getDefaultGenericVnf() == null) {
810 throw new IllegalArgumentException("No vnf-id found");
813 targetEntity = cq.getDefaultGenericVnf().getVnfId();
814 if (targetEntity == null) {
815 throw new IllegalArgumentException("No vnf-id found");