2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
 
   6  * Modifications Copyright (C) 2019 Huawei Technologies Co., Ltd. All rights reserved.
 
   7  * Modifications Copyright (C) 2019 Tech Mahindra
 
   8  * Modifications Copyright (C) 2019 Bell Canada.
 
   9  * ================================================================================
 
  10  * Licensed under the Apache License, Version 2.0 (the "License");
 
  11  * you may not use this file except in compliance with the License.
 
  12  * You may obtain a copy of the License at
 
  14  *      http://www.apache.org/licenses/LICENSE-2.0
 
  16  * Unless required by applicable law or agreed to in writing, software
 
  17  * distributed under the License is distributed on an "AS IS" BASIS,
 
  18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  19  * See the License for the specific language governing permissions and
 
  20  * limitations under the License.
 
  21  * ============LICENSE_END=========================================================
 
  24 package org.onap.policy.controlloop.eventmanager;
 
  26 import java.io.Serializable;
 
  27 import java.time.Instant;
 
  28 import java.util.Deque;
 
  29 import java.util.LinkedHashMap;
 
  30 import java.util.List;
 
  31 import java.util.concurrent.CancellationException;
 
  32 import java.util.concurrent.CompletableFuture;
 
  33 import java.util.concurrent.ConcurrentLinkedDeque;
 
  34 import java.util.concurrent.Executor;
 
  35 import java.util.concurrent.TimeUnit;
 
  36 import java.util.concurrent.atomic.AtomicReference;
 
  37 import java.util.stream.Collectors;
 
  38 import lombok.AccessLevel;
 
  40 import lombok.ToString;
 
  41 import org.onap.policy.aai.AaiConstants;
 
  42 import org.onap.policy.aai.AaiCqResponse;
 
  43 import org.onap.policy.controlloop.ControlLoopOperation;
 
  44 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 
  45 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
 
  46 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 
  47 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineUtil;
 
  48 import org.onap.policy.controlloop.policy.Policy;
 
  49 import org.onap.policy.controlloop.policy.PolicyResult;
 
  50 import org.slf4j.Logger;
 
  51 import org.slf4j.LoggerFactory;
 
  54  * Manages a single Operation for a single event. Once this has been created,
 
  55  * {@link #start()} should be invoked, and then {@link #nextStep()} should be invoked
 
  56  * continually until it returns {@code false}, indicating that all steps have completed.
 
  58 @ToString(onlyExplicitlyIncluded = true)
 
  59 public class ControlLoopOperationManager2 implements Serializable {
 
  60     private static final long serialVersionUID = -3773199283624595410L;
 
  61     private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationManager2.class);
 
  62     private static final String CL_TIMEOUT_ACTOR = "-CL-TIMEOUT-";
 
  63     public static final String LOCK_ACTOR = "LOCK";
 
  64     public static final String LOCK_OPERATION = "Lock";
 
  65     private static final String GUARD_ACTOR = "GUARD";
 
  66     public static final String VSERVER_VSERVER_NAME = "vserver.vserver-name";
 
  67     public static final String GENERIC_VNF_VNF_NAME = "generic-vnf.vnf-name";
 
  68     public static final String GENERIC_VNF_VNF_ID = "generic-vnf.vnf-id";
 
  69     public static final String PNF_NAME = "pnf.pnf-name";
 
  86     private final transient ManagerContext operContext;
 
  87     private final transient ControlLoopEventContext eventContext;
 
  88     private final Policy policy;
 
  92     private State state = State.ACTIVE;
 
  95     private final String requestId;
 
  98     private final String policyId;
 
 101      * Bumped each time the "complete" callback is invoked by the Actor, provided it's for
 
 105     private int attempts = 0;
 
 107     private final Deque<Operation> operationHistory = new ConcurrentLinkedDeque<>();
 
 110      * Set to {@code true} to prevent the last item in {@link #operationHistory} from
 
 111      * being included in the outcome of {@link #getHistory()}. Used when the operation
 
 112      * aborts prematurely due to lock-denied, guard-denied, etc.
 
 114     private boolean holdLast = false;
 
 117      * Queue of outcomes yet to be processed. Outcomes are added to this each time the
 
 118      * "start" or "complete" callback is invoked.
 
 120     @Getter(AccessLevel.PROTECTED)
 
 121     private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();
 
 124      * Used to cancel the running operation.
 
 126     @Getter(AccessLevel.PROTECTED)
 
 127     private transient CompletableFuture<OperationOutcome> future = null;
 
 130      * Target entity. Determined after the lock is granted, though it may require the
 
 131      * custom query to be performed first.
 
 134     private String targetEntity;
 
 136     @Getter(AccessLevel.PROTECTED)
 
 137     private final transient ControlLoopOperationParams params;
 
 138     private final transient PipelineUtil taskUtil;
 
 141      * Time when the lock was first requested.
 
 143     private transient AtomicReference<Instant> lockStart = new AtomicReference<>();
 
 145     // values extracted from the policy
 
 147     private final String actor;
 
 149     private final String operation;
 
 153      * Construct an instance.
 
 155      * @param operContext this operation's context
 
 156      * @param context event context
 
 157      * @param policy operation's policy
 
 158      * @param executor executor for the Operation
 
 160     public ControlLoopOperationManager2(ManagerContext operContext, ControlLoopEventContext context, Policy policy,
 
 163         this.operContext = operContext;
 
 164         this.eventContext = context;
 
 165         this.policy = policy;
 
 166         this.requestId = context.getEvent().getRequestId().toString();
 
 167         this.policyId = "" + policy.getId();
 
 168         this.actor = policy.getActor();
 
 169         this.operation = policy.getRecipe();
 
 172         params = ControlLoopOperationParams.builder()
 
 173                         .actorService(operContext.getActorService())
 
 175                         .operation(operation)
 
 178                         .target(policy.getTarget())
 
 179                         .startCallback(this::onStart)
 
 180                         .completeCallback(this::onComplete)
 
 184         taskUtil = new PipelineUtil(params);
 
 188     // Internal class used for tracking
 
 192     private class Operation implements Serializable {
 
 193         private static final long serialVersionUID = 1L;
 
 196         private PolicyResult policyResult;
 
 197         private ControlLoopOperation clOperation;
 
 200          * Constructs the object.
 
 202          * @param outcome outcome of the operation
 
 204         public Operation(OperationOutcome outcome) {
 
 205             attempt = ControlLoopOperationManager2.this.attempts;
 
 206             policyResult = outcome.getResult();
 
 207             clOperation = outcome.toControlLoopOperation();
 
 208             clOperation.setTarget(policy.getTarget().toString());
 
 213      * Start the operation, first acquiring any locks that are needed. This should not
 
 214      * throw any exceptions, but will, instead, invoke the callbacks with exceptions.
 
 216      * @param remainingMs time remaining, in milliseconds, for the control loop
 
 218     @SuppressWarnings("unchecked")
 
 219     public synchronized void start(long remainingMs) {
 
 220         // this is synchronized while we update "future"
 
 223             // provide a default, in case something fails before requestLock() is called
 
 224             lockStart.set(Instant.now());
 
 227             future = taskUtil.sequence(
 
 230                 this::startOperation);
 
 233             // handle any exceptions that may be thrown, set timeout, and handle timeout
 
 236             future.exceptionally(this::handleException)
 
 237                     .orTimeout(remainingMs, TimeUnit.MILLISECONDS)
 
 238                     .exceptionally(this::handleTimeout);
 
 241         } catch (RuntimeException e) {
 
 247      * Start the operation, after the lock has been acquired.
 
 251     private CompletableFuture<OperationOutcome> startOperation() {
 
 253         ControlLoopOperationParams params2 = params.toBuilder()
 
 254                     .payload(new LinkedHashMap<>())
 
 255                     .retry(policy.getRetry())
 
 256                     .timeoutSec(policy.getTimeout())
 
 257                     .targetEntity(targetEntity)
 
 261         if (policy.getPayload() != null) {
 
 262             params2.getPayload().putAll(policy.getPayload());
 
 265         return params2.start();
 
 269      * Handles exceptions that may be generated.
 
 271      * @param thrown exception that was generated
 
 272      * @return {@code null}
 
 274     private OperationOutcome handleException(Throwable thrown) {
 
 275         if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
 
 279         logger.warn("{}.{}: exception starting operation for {}", actor, operation, requestId, thrown);
 
 280         OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
 
 281         outcome.setStart(lockStart.get());
 
 282         outcome.setEnd(Instant.now());
 
 283         outcome.setFinalOutcome(true);
 
 286         // this outcome is not used so just return "null"
 
 291      * Handles control loop timeout exception.
 
 293      * @param thrown exception that was generated
 
 294      * @return {@code null}
 
 296     private OperationOutcome handleTimeout(Throwable thrown) {
 
 297         logger.warn("{}.{}: control loop timeout for {}", actor, operation, requestId, thrown);
 
 299         OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
 
 300         outcome.setActor(CL_TIMEOUT_ACTOR);
 
 301         outcome.setOperation(null);
 
 302         outcome.setStart(lockStart.get());
 
 303         outcome.setEnd(Instant.now());
 
 304         outcome.setFinalOutcome(true);
 
 307         // cancel the operation, if it's still running
 
 308         future.cancel(false);
 
 310         // this outcome is not used so just return "null"
 
 315      * Cancels the operation.
 
 317     public void cancel() {
 
 318         synchronized (this) {
 
 319             if (future == null) {
 
 324         future.cancel(false);
 
 328      * Requests a lock on the {@link #targetEntity}.
 
 330      * @return a future to await the lock
 
 332     private CompletableFuture<OperationOutcome> requestLock() {
 
 334          * Failures are handled via the callback, and successes are discarded by
 
 335          * sequence(), without passing them to onComplete().
 
 337          * Return a COPY of the future so that if we try to cancel it, we'll only cancel
 
 338          * the copy, not the original. This is done by tacking thenApply() onto the end.
 
 340         lockStart.set(Instant.now());
 
 341         return operContext.requestLock(targetEntity, this::lockUnavailable).thenApply(outcome -> outcome);
 
 345      * Indicates that the lock on the target entity is unavailable.
 
 347      * @param outcome lock outcome
 
 349     private void lockUnavailable(OperationOutcome outcome) {
 
 351         // Note: NEVER invoke onStart() for locks; only invoke onComplete()
 
 355          * Now that we've added the lock outcome to the queue, ensure the future is
 
 356          * canceled, which may, itself, generate an operation outcome.
 
 362      * Handles responses provided via the "start" callback. Note: this is never be invoked
 
 363      * for locks; only {@link #onComplete(OperationOutcome)} is invoked for locks.
 
 365      * @param outcome outcome provided to the callback
 
 367     private void onStart(OperationOutcome outcome) {
 
 368         if (outcome.isFor(actor, operation) || GUARD_ACTOR.equals(outcome.getActor())) {
 
 374      * Handles responses provided via the "complete" callback. Note: this is never invoked
 
 375      * for "successful" locks.
 
 377      * @param outcome outcome provided to the callback
 
 379     private void onComplete(OperationOutcome outcome) {
 
 381         switch (outcome.getActor()) {
 
 384             case CL_TIMEOUT_ACTOR:
 
 389                 if (outcome.isFor(actor, operation)) {
 
 397      * Adds an outcome to {@link #outcomes}.
 
 399      * @param outcome outcome to be added
 
 401     private synchronized void addOutcome(OperationOutcome outcome) {
 
 403          * This is synchronized to prevent nextStep() from invoking processOutcome() at
 
 407         logger.debug("added outcome={} for {}", outcome, requestId);
 
 408         outcomes.add(outcome);
 
 410         if (outcomes.peekFirst() == outcomes.peekLast()) {
 
 411             // this is the first outcome in the queue - process it
 
 417      * Looks for the next step in the queue.
 
 419      * @return {@code true} if more responses are expected, {@code false} otherwise
 
 421     public synchronized boolean nextStep() {
 
 426             case CONTROL_LOOP_TIMEOUT:
 
 433         OperationOutcome outcome = outcomes.peek();
 
 434         if (outcome == null) {
 
 439         if (outcome.isFinalOutcome() && outcome.isFor(actor, operation)) {
 
 443         // first item has been processed, remove it
 
 445         if (!outcomes.isEmpty()) {
 
 446             // have a new "first" item - process it
 
 454      * Processes the first item in {@link #outcomes}. Sets the state, increments
 
 455      * {@link #attempts}, if appropriate, and stores the operation history in the DB.
 
 457     private synchronized void processOutcome() {
 
 458         OperationOutcome outcome = outcomes.peek();
 
 459         logger.debug("process outcome={} for {}", outcome, requestId);
 
 461         switch (outcome.getActor()) {
 
 463             case CL_TIMEOUT_ACTOR:
 
 464                 state = State.CONTROL_LOOP_TIMEOUT;
 
 468                 // lock is no longer available
 
 469                 if (state == State.ACTIVE) {
 
 470                     state = State.LOCK_DENIED;
 
 471                     storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Lock");
 
 473                     state = State.LOCK_LOST;
 
 474                     storeFailureInDataBase(outcome, PolicyResult.FAILURE, "Operation aborted by Lock");
 
 479                 if (outcome.getEnd() == null) {
 
 480                     state = State.GUARD_STARTED;
 
 481                 } else if (outcome.getResult() == PolicyResult.SUCCESS) {
 
 482                     state = State.GUARD_PERMITTED;
 
 484                     state = State.GUARD_DENIED;
 
 485                     storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Guard");
 
 490                 if (outcome.getEnd() == null) {
 
 493                     state = State.OPERATION_STARTED;
 
 494                     operationHistory.add(new Operation(outcome));
 
 499                  * Operation completed. If the last entry was a "start" (i.e., "end" field
 
 500                  * is null), then replace it. Otherwise, just add the completion.
 
 502                 state = (outcome.getResult() == PolicyResult.SUCCESS ? State.OPERATION_SUCCESS
 
 503                                 : State.OPERATION_FAILURE);
 
 504                 if (!operationHistory.isEmpty() && operationHistory.peekLast().getClOperation().getEnd() == null) {
 
 505                     operationHistory.removeLast();
 
 507                 operationHistory.add(new Operation(outcome));
 
 508                 storeOperationInDataBase();
 
 512         // indicate that this has changed
 
 513         operContext.updated(this);
 
 517      * Get the operation, as a message.
 
 519      * @return the operation, as a message
 
 521     public String getOperationMessage() {
 
 522         Operation last = operationHistory.peekLast();
 
 523         return (last == null ? null : last.getClOperation().toMessage());
 
 527      * Gets the operation result.
 
 529      * @return the operation result
 
 531     public PolicyResult getOperationResult() {
 
 532         Operation last = operationHistory.peekLast();
 
 533         return (last == null ? PolicyResult.FAILURE_EXCEPTION : last.getPolicyResult());
 
 537      * Get the latest operation history.
 
 539      * @return the latest operation history
 
 541     public String getOperationHistory() {
 
 542         Operation last = operationHistory.peekLast();
 
 543         return (last == null ? null : last.clOperation.toHistory());
 
 549      * @return the list of control loop operations
 
 551     public List<ControlLoopOperation> getHistory() {
 
 552         Operation last = (holdLast ? operationHistory.removeLast() : null);
 
 554         List<ControlLoopOperation> result = operationHistory.stream().map(Operation::getClOperation)
 
 555                         .map(ControlLoopOperation::new).collect(Collectors.toList());
 
 558             operationHistory.add(last);
 
 565      * Stores a failure in the DB.
 
 567      * @param outcome operation outcome
 
 568      * @param result result to put into the DB
 
 569      * @param message message to put into the DB
 
 571     private void storeFailureInDataBase(OperationOutcome outcome, PolicyResult result, String message) {
 
 572         // don't include this in history yet
 
 575         outcome.setActor(actor);
 
 576         outcome.setOperation(operation);
 
 577         outcome.setMessage(message);
 
 578         outcome.setResult(result);
 
 580         operationHistory.add(new Operation(outcome));
 
 581         storeOperationInDataBase();
 
 585      * Stores the latest operation in the DB.
 
 587     private void storeOperationInDataBase() {
 
 588         operContext.getDataManager().store(requestId, eventContext.getEvent(), targetEntity,
 
 589                         operationHistory.peekLast().getClOperation());
 
 593      * Determines the target entity.
 
 595      * @return a future to determine the target entity, or {@code null} if the entity has
 
 596      *         already been determined
 
 598     protected CompletableFuture<OperationOutcome> detmTarget() {
 
 599         if (policy.getTarget() == null) {
 
 600             throw new IllegalArgumentException("The target is null");
 
 603         if (policy.getTarget().getType() == null) {
 
 604             throw new IllegalArgumentException("The target type is null");
 
 607         switch (policy.getTarget().getType()) {
 
 609                 return detmPnfTarget();
 
 613                 return detmVfModuleTarget();
 
 615                 throw new IllegalArgumentException("The target type is not supported");
 
 620      * Determines the PNF target entity.
 
 622      * @return a future to determine the target entity, or {@code null} if the entity has
 
 623      *         already been determined
 
 625     private CompletableFuture<OperationOutcome> detmPnfTarget() {
 
 626         if (!PNF_NAME.equalsIgnoreCase(eventContext.getEvent().getTarget())) {
 
 627             throw new IllegalArgumentException("Target does not match target type");
 
 630         targetEntity = eventContext.getEnrichment().get(PNF_NAME);
 
 631         if (targetEntity == null) {
 
 632             throw new IllegalArgumentException("AAI section is missing " + PNF_NAME);
 
 639      * Determines the VF Module target entity.
 
 641      * @return a future to determine the target entity, or {@code null} if the entity has
 
 642      *         already been determined
 
 644     private CompletableFuture<OperationOutcome> detmVfModuleTarget() {
 
 645         String targetFieldName = eventContext.getEvent().getTarget();
 
 646         if (targetFieldName == null) {
 
 647             throw new IllegalArgumentException("Target is null");
 
 650         switch (targetFieldName.toLowerCase()) {
 
 651             case VSERVER_VSERVER_NAME:
 
 652                 targetEntity = eventContext.getEnrichment().get(VSERVER_VSERVER_NAME);
 
 654             case GENERIC_VNF_VNF_ID:
 
 655                 targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
 
 657             case GENERIC_VNF_VNF_NAME:
 
 658                 return detmVnfName();
 
 660                 throw new IllegalArgumentException("Target does not match target type");
 
 663         if (targetEntity == null) {
 
 664             throw new IllegalArgumentException("Enrichment data is missing " + targetFieldName);
 
 671      * Determines the VNF Name target entity.
 
 673      * @return a future to determine the target entity, or {@code null} if the entity has
 
 674      *         already been determined
 
 676     @SuppressWarnings("unchecked")
 
 677     private CompletableFuture<OperationOutcome> detmVnfName() {
 
 678         // if the onset is enriched with the vnf-id, we don't need an A&AI response
 
 679         targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
 
 680         if (targetEntity != null) {
 
 684         // vnf-id was not in the onset - obtain it via the custom query
 
 687         ControlLoopOperationParams cqparams = params.toBuilder()
 
 688                         .actor(AaiConstants.ACTOR_NAME)
 
 689                         .operation(AaiCqResponse.OPERATION)
 
 694         // perform custom query and then extract the VNF ID from it
 
 695         return taskUtil.sequence(() -> eventContext.obtain(AaiCqResponse.CONTEXT_KEY, cqparams),
 
 696                         this::extractVnfFromCq);
 
 700      * Extracts the VNF Name target entity from the custom query data.
 
 702      * @return {@code null}
 
 704     private CompletableFuture<OperationOutcome> extractVnfFromCq() {
 
 705         // already have the CQ data
 
 706         AaiCqResponse cq = eventContext.getProperty(AaiCqResponse.CONTEXT_KEY);
 
 707         if (cq.getDefaultGenericVnf() == null) {
 
 708             throw new IllegalArgumentException("No vnf-id found");
 
 711         targetEntity = cq.getDefaultGenericVnf().getVnfId();
 
 712         if (targetEntity == null) {
 
 713             throw new IllegalArgumentException("No vnf-id found");