2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2019 Huawei Technologies Co., Ltd. All rights reserved.
7 * Modifications Copyright (C) 2019 Tech Mahindra
8 * Modifications Copyright (C) 2019 Bell Canada.
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
21 * ============LICENSE_END=========================================================
24 package org.onap.policy.controlloop.eventmanager;
26 import java.io.Serializable;
27 import java.time.Instant;
28 import java.util.Deque;
29 import java.util.LinkedHashMap;
30 import java.util.List;
31 import java.util.concurrent.CancellationException;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.ConcurrentLinkedDeque;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicReference;
37 import java.util.stream.Collectors;
38 import lombok.AccessLevel;
40 import lombok.ToString;
41 import org.onap.policy.aai.AaiConstants;
42 import org.onap.policy.aai.AaiCqResponse;
43 import org.onap.policy.controlloop.ControlLoopOperation;
44 import org.onap.policy.controlloop.ControlLoopResponse;
45 import org.onap.policy.controlloop.VirtualControlLoopEvent;
46 import org.onap.policy.controlloop.actor.guard.GuardActorServiceProvider;
47 import org.onap.policy.controlloop.actor.sdnr.SdnrActorServiceProvider;
48 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
49 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
50 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
51 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineUtil;
52 import org.onap.policy.controlloop.policy.Policy;
53 import org.onap.policy.controlloop.policy.PolicyResult;
54 import org.onap.policy.sdnr.PciMessage;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
59 * Manages a single Operation for a single event. Once this has been created,
60 * {@link #start()} should be invoked, and then {@link #nextStep()} should be invoked
61 * continually until it returns {@code false}, indicating that all steps have completed.
63 @ToString(onlyExplicitlyIncluded = true)
64 public class ControlLoopOperationManager2 implements Serializable {
65 private static final long serialVersionUID = -3773199283624595410L;
66 private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationManager2.class);
67 private static final String CL_TIMEOUT_ACTOR = "-CL-TIMEOUT-";
68 public static final String LOCK_ACTOR = "LOCK";
69 public static final String LOCK_OPERATION = "Lock";
70 private static final String GUARD_ACTOR = GuardActorServiceProvider.NAME;
71 public static final String VSERVER_VSERVER_NAME = "vserver.vserver-name";
72 public static final String GENERIC_VNF_VNF_NAME = "generic-vnf.vnf-name";
73 public static final String GENERIC_VNF_VNF_ID = "generic-vnf.vnf-id";
74 public static final String PNF_NAME = "pnf.pnf-name";
91 private final transient ManagerContext operContext;
92 private final transient ControlLoopEventContext eventContext;
93 private final Policy policy;
97 private State state = State.ACTIVE;
100 private final String requestId;
103 private final String policyId;
106 * Bumped each time the "complete" callback is invoked by the Actor, provided it's for
110 private int attempts = 0;
112 private final Deque<Operation> operationHistory = new ConcurrentLinkedDeque<>();
115 * Set to {@code true} to prevent the last item in {@link #operationHistory} from
116 * being included in the outcome of {@link #getHistory()}. Used when the operation
117 * aborts prematurely due to lock-denied, guard-denied, etc.
119 private boolean holdLast = false;
122 * Queue of outcomes yet to be processed. Outcomes are added to this each time the
123 * "start" or "complete" callback is invoked.
125 @Getter(AccessLevel.PROTECTED)
126 private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();
129 * Used to cancel the running operation.
131 @Getter(AccessLevel.PROTECTED)
132 private transient CompletableFuture<OperationOutcome> future = null;
135 * Target entity. Determined after the lock is granted, though it may require the
136 * custom query to be performed first.
139 private String targetEntity;
141 @Getter(AccessLevel.PROTECTED)
142 private final transient ControlLoopOperationParams params;
143 private final transient PipelineUtil taskUtil;
146 private ControlLoopResponse controlLoopResponse;
149 * Time when the lock was first requested.
151 private transient AtomicReference<Instant> lockStart = new AtomicReference<>();
153 // values extracted from the policy
155 private final String actor;
157 private final String operation;
161 * Construct an instance.
163 * @param operContext this operation's context
164 * @param context event context
165 * @param policy operation's policy
166 * @param executor executor for the Operation
168 public ControlLoopOperationManager2(ManagerContext operContext, ControlLoopEventContext context, Policy policy,
171 this.operContext = operContext;
172 this.eventContext = context;
173 this.policy = policy;
174 this.requestId = context.getEvent().getRequestId().toString();
175 this.policyId = "" + policy.getId();
176 this.actor = policy.getActor();
177 this.operation = policy.getRecipe();
180 params = ControlLoopOperationParams.builder()
181 .actorService(operContext.getActorService())
183 .operation(operation)
186 .target(policy.getTarget())
187 .startCallback(this::onStart)
188 .completeCallback(this::onComplete)
192 taskUtil = new PipelineUtil(params);
196 // Internal class used for tracking
200 private class Operation implements Serializable {
201 private static final long serialVersionUID = 1L;
204 private PolicyResult policyResult;
205 private ControlLoopOperation clOperation;
206 private ControlLoopResponse clResponse;
209 * Constructs the object.
211 * @param outcome outcome of the operation
213 public Operation(OperationOutcome outcome) {
214 attempt = ControlLoopOperationManager2.this.attempts;
215 policyResult = outcome.getResult();
216 clOperation = outcome.toControlLoopOperation();
217 clOperation.setTarget(policy.getTarget().toString());
218 clResponse = makeControlLoopResponse(outcome);
220 if (outcome.getEnd() == null) {
221 clOperation.setOutcome("Started");
222 } else if (clOperation.getOutcome() == null) {
223 clOperation.setOutcome("");
229 * Start the operation, first acquiring any locks that are needed. This should not
230 * throw any exceptions, but will, instead, invoke the callbacks with exceptions.
232 * @param remainingMs time remaining, in milliseconds, for the control loop
234 @SuppressWarnings("unchecked")
235 public synchronized void start(long remainingMs) {
236 // this is synchronized while we update "future"
239 // provide a default, in case something fails before requestLock() is called
240 lockStart.set(Instant.now());
243 future = taskUtil.sequence(
246 this::startOperation);
249 // handle any exceptions that may be thrown, set timeout, and handle timeout
252 future.exceptionally(this::handleException)
253 .orTimeout(remainingMs, TimeUnit.MILLISECONDS)
254 .exceptionally(this::handleTimeout);
257 } catch (RuntimeException e) {
263 * Start the operation, after the lock has been acquired.
267 private CompletableFuture<OperationOutcome> startOperation() {
269 ControlLoopOperationParams params2 = params.toBuilder()
270 .payload(new LinkedHashMap<>())
271 .retry(policy.getRetry())
272 .timeoutSec(policy.getTimeout())
273 .targetEntity(targetEntity)
277 if (policy.getPayload() != null) {
278 params2.getPayload().putAll(policy.getPayload());
281 return params2.start();
285 * Handles exceptions that may be generated.
287 * @param thrown exception that was generated
288 * @return {@code null}
290 private OperationOutcome handleException(Throwable thrown) {
291 if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
295 logger.warn("{}.{}: exception starting operation for {}", actor, operation, requestId, thrown);
296 OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
297 outcome.setStart(lockStart.get());
298 outcome.setEnd(Instant.now());
299 outcome.setFinalOutcome(true);
302 // this outcome is not used so just return "null"
307 * Handles control loop timeout exception.
309 * @param thrown exception that was generated
310 * @return {@code null}
312 private OperationOutcome handleTimeout(Throwable thrown) {
313 logger.warn("{}.{}: control loop timeout for {}", actor, operation, requestId, thrown);
315 OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
316 outcome.setActor(CL_TIMEOUT_ACTOR);
317 outcome.setOperation(null);
318 outcome.setStart(lockStart.get());
319 outcome.setEnd(Instant.now());
320 outcome.setFinalOutcome(true);
323 // cancel the operation, if it's still running
324 future.cancel(false);
326 // this outcome is not used so just return "null"
331 * Cancels the operation.
333 public void cancel() {
334 synchronized (this) {
335 if (future == null) {
340 future.cancel(false);
344 * Requests a lock on the {@link #targetEntity}.
346 * @return a future to await the lock
348 private CompletableFuture<OperationOutcome> requestLock() {
350 * Failures are handled via the callback, and successes are discarded by
351 * sequence(), without passing them to onComplete().
353 * Return a COPY of the future so that if we try to cancel it, we'll only cancel
354 * the copy, not the original. This is done by tacking thenApply() onto the end.
356 lockStart.set(Instant.now());
357 return operContext.requestLock(targetEntity, this::lockUnavailable).thenApply(outcome -> outcome);
361 * Indicates that the lock on the target entity is unavailable.
363 * @param outcome lock outcome
365 private void lockUnavailable(OperationOutcome outcome) {
367 // Note: NEVER invoke onStart() for locks; only invoke onComplete()
371 * Now that we've added the lock outcome to the queue, ensure the future is
372 * canceled, which may, itself, generate an operation outcome.
378 * Handles responses provided via the "start" callback. Note: this is never be invoked
379 * for locks; only {@link #onComplete(OperationOutcome)} is invoked for locks.
381 * @param outcome outcome provided to the callback
383 private void onStart(OperationOutcome outcome) {
384 if (outcome.isFor(actor, operation) || GUARD_ACTOR.equals(outcome.getActor())) {
390 * Handles responses provided via the "complete" callback. Note: this is never invoked
391 * for "successful" locks.
393 * @param outcome outcome provided to the callback
395 private void onComplete(OperationOutcome outcome) {
397 switch (outcome.getActor()) {
400 case CL_TIMEOUT_ACTOR:
405 if (outcome.isFor(actor, operation)) {
413 * Adds an outcome to {@link #outcomes}.
415 * @param outcome outcome to be added
417 private synchronized void addOutcome(OperationOutcome outcome) {
419 * This is synchronized to prevent nextStep() from invoking processOutcome() at
423 logger.debug("added outcome={} for {}", outcome, requestId);
424 outcomes.add(outcome);
426 if (outcomes.peekFirst() == outcomes.peekLast()) {
427 // this is the first outcome in the queue - process it
433 * Looks for the next step in the queue.
435 * @return {@code true} if more responses are expected, {@code false} otherwise
437 public synchronized boolean nextStep() {
442 case CONTROL_LOOP_TIMEOUT:
449 OperationOutcome outcome = outcomes.peek();
450 if (outcome == null) {
455 if (outcome.isFinalOutcome() && outcome.isFor(actor, operation)) {
456 controlLoopResponse = null;
460 // first item has been processed, remove it
462 if (!outcomes.isEmpty()) {
463 // have a new "first" item - process it
471 * Processes the first item in {@link #outcomes}. Sets the state, increments
472 * {@link #attempts}, if appropriate, and stores the operation history in the DB.
474 private synchronized void processOutcome() {
475 OperationOutcome outcome = outcomes.peek();
476 logger.debug("process outcome={} for {}", outcome, requestId);
478 controlLoopResponse = null;
480 switch (outcome.getActor()) {
482 case CL_TIMEOUT_ACTOR:
483 state = State.CONTROL_LOOP_TIMEOUT;
484 processAbort(outcome, PolicyResult.FAILURE, "Control loop timed out");
488 // lock is no longer available
489 if (state == State.ACTIVE) {
490 state = State.LOCK_DENIED;
491 storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Lock");
493 state = State.LOCK_LOST;
494 processAbort(outcome, PolicyResult.FAILURE, "Operation aborted by Lock");
499 if (outcome.getEnd() == null) {
500 state = State.GUARD_STARTED;
501 } else if (outcome.getResult() == PolicyResult.SUCCESS) {
502 state = State.GUARD_PERMITTED;
504 state = State.GUARD_DENIED;
505 storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Guard");
510 if (outcome.getEnd() == null) {
513 state = State.OPERATION_STARTED;
517 * Operation completed. If the last entry was a "start" (i.e., "end" field
518 * is null), then replace it. Otherwise, just add the completion.
520 state = (outcome.getResult() == PolicyResult.SUCCESS ? State.OPERATION_SUCCESS
521 : State.OPERATION_FAILURE);
522 controlLoopResponse = makeControlLoopResponse(outcome);
523 if (!operationHistory.isEmpty() && operationHistory.peekLast().getClOperation().getEnd() == null) {
524 operationHistory.removeLast();
528 operationHistory.add(new Operation(outcome));
529 storeOperationInDataBase();
533 // indicate that this has changed
534 operContext.updated(this);
538 * Processes an operation abort, updating the DB record, if an operation has been
541 * @param outcome operation outcome
542 * @param result result to put into the DB
543 * @param message message to put into the DB
545 private void processAbort(OperationOutcome outcome, PolicyResult result, String message) {
546 if (operationHistory.isEmpty() || operationHistory.peekLast().getClOperation().getEnd() != null) {
547 // last item was not a "start" operation
549 // NOTE: do NOT generate control loop response since operation was not started
551 storeFailureInDataBase(outcome, result, message);
555 // last item was a "start" operation - replace it with a failure
556 final Operation operOrig = operationHistory.removeLast();
558 // use start time from the operation, itself
559 if (operOrig != null && operOrig.getClOperation() != null) {
560 outcome.setStart(operOrig.getClOperation().getStart());
563 controlLoopResponse = makeControlLoopResponse(outcome);
565 storeFailureInDataBase(outcome, result, message);
569 * Makes a control loop response.
571 * @param outcome operation outcome
572 * @return a new control loop response, or {@code null} if none is required
574 protected ControlLoopResponse makeControlLoopResponse(OperationOutcome outcome) {
576 // only generate response for certain actors.
577 if (outcome == null || !actor.equals(SdnrActorServiceProvider.NAME)) {
581 VirtualControlLoopEvent event = eventContext.getEvent();
583 ControlLoopResponse clRsp = new ControlLoopResponse();
584 clRsp.setFrom(actor);
585 clRsp.setTarget("DCAE");
586 clRsp.setClosedLoopControlName(event.getClosedLoopControlName());
587 clRsp.setPolicyName(event.getPolicyName());
588 clRsp.setPolicyVersion(event.getPolicyVersion());
589 clRsp.setRequestId(event.getRequestId());
590 clRsp.setVersion(event.getVersion());
592 PciMessage msg = outcome.getResponse();
593 if (msg != null && msg.getBody() != null && msg.getBody().getOutput() != null) {
594 clRsp.setPayload(msg.getBody().getOutput().getPayload());
601 * Get the operation, as a message.
603 * @return the operation, as a message
605 public String getOperationMessage() {
606 Operation last = operationHistory.peekLast();
607 return (last == null ? null : last.getClOperation().toMessage());
611 * Gets the operation result.
613 * @return the operation result
615 public PolicyResult getOperationResult() {
616 Operation last = operationHistory.peekLast();
617 return (last == null ? PolicyResult.FAILURE_EXCEPTION : last.getPolicyResult());
621 * Get the latest operation history.
623 * @return the latest operation history
625 public String getOperationHistory() {
626 Operation last = operationHistory.peekLast();
627 return (last == null ? null : last.clOperation.toHistory());
633 * @return the list of control loop operations
635 public List<ControlLoopOperation> getHistory() {
636 Operation last = (holdLast ? operationHistory.removeLast() : null);
638 List<ControlLoopOperation> result = operationHistory.stream().map(Operation::getClOperation)
639 .map(ControlLoopOperation::new).collect(Collectors.toList());
642 operationHistory.add(last);
649 * Stores a failure in the DB.
651 * @param outcome operation outcome
652 * @param result result to put into the DB
653 * @param message message to put into the DB
655 private void storeFailureInDataBase(OperationOutcome outcome, PolicyResult result, String message) {
656 // don't include this in history yet
659 outcome.setActor(actor);
660 outcome.setOperation(operation);
661 outcome.setMessage(message);
662 outcome.setResult(result);
664 operationHistory.add(new Operation(outcome));
665 storeOperationInDataBase();
669 * Stores the latest operation in the DB.
671 private void storeOperationInDataBase() {
672 operContext.getDataManager().store(requestId, eventContext.getEvent(), targetEntity,
673 operationHistory.peekLast().getClOperation());
677 * Determines the target entity.
679 * @return a future to determine the target entity, or {@code null} if the entity has
680 * already been determined
682 protected CompletableFuture<OperationOutcome> detmTarget() {
683 if (policy.getTarget() == null) {
684 throw new IllegalArgumentException("The target is null");
687 if (policy.getTarget().getType() == null) {
688 throw new IllegalArgumentException("The target type is null");
691 switch (policy.getTarget().getType()) {
693 return detmPnfTarget();
697 return detmVfModuleTarget();
699 throw new IllegalArgumentException("The target type is not supported");
704 * Determines the PNF target entity.
706 * @return a future to determine the target entity, or {@code null} if the entity has
707 * already been determined
709 private CompletableFuture<OperationOutcome> detmPnfTarget() {
710 if (!PNF_NAME.equalsIgnoreCase(eventContext.getEvent().getTarget())) {
711 throw new IllegalArgumentException("Target does not match target type");
714 targetEntity = eventContext.getEnrichment().get(PNF_NAME);
715 if (targetEntity == null) {
716 throw new IllegalArgumentException("AAI section is missing " + PNF_NAME);
723 * Determines the VF Module target entity.
725 * @return a future to determine the target entity, or {@code null} if the entity has
726 * already been determined
728 private CompletableFuture<OperationOutcome> detmVfModuleTarget() {
729 String targetFieldName = eventContext.getEvent().getTarget();
730 if (targetFieldName == null) {
731 throw new IllegalArgumentException("Target is null");
734 switch (targetFieldName.toLowerCase()) {
735 case VSERVER_VSERVER_NAME:
736 targetEntity = eventContext.getEnrichment().get(VSERVER_VSERVER_NAME);
738 case GENERIC_VNF_VNF_ID:
739 targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
741 case GENERIC_VNF_VNF_NAME:
742 return detmVnfName();
744 throw new IllegalArgumentException("Target does not match target type");
747 if (targetEntity == null) {
748 throw new IllegalArgumentException("Enrichment data is missing " + targetFieldName);
755 * Determines the VNF Name target entity.
757 * @return a future to determine the target entity, or {@code null} if the entity has
758 * already been determined
760 @SuppressWarnings("unchecked")
761 private CompletableFuture<OperationOutcome> detmVnfName() {
762 // if the onset is enriched with the vnf-id, we don't need an A&AI response
763 targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
764 if (targetEntity != null) {
768 // vnf-id was not in the onset - obtain it via the custom query
771 ControlLoopOperationParams cqparams = params.toBuilder()
772 .actor(AaiConstants.ACTOR_NAME)
773 .operation(AaiCqResponse.OPERATION)
778 // perform custom query and then extract the VNF ID from it
779 return taskUtil.sequence(() -> eventContext.obtain(AaiCqResponse.CONTEXT_KEY, cqparams),
780 this::extractVnfFromCq);
784 * Extracts the VNF Name target entity from the custom query data.
786 * @return {@code null}
788 private CompletableFuture<OperationOutcome> extractVnfFromCq() {
789 // already have the CQ data
790 AaiCqResponse cq = eventContext.getProperty(AaiCqResponse.CONTEXT_KEY);
791 if (cq.getDefaultGenericVnf() == null) {
792 throw new IllegalArgumentException("No vnf-id found");
795 targetEntity = cq.getDefaultGenericVnf().getVnfId();
796 if (targetEntity == null) {
797 throw new IllegalArgumentException("No vnf-id found");