Use "outcome" to indicate pending record
[policy/drools-applications.git] / controlloop / common / eventmanager / src / main / java / org / onap / policy / controlloop / eventmanager / ControlLoopOperationManager2.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2019 Huawei Technologies Co., Ltd. All rights reserved.
7  * Modifications Copyright (C) 2019 Tech Mahindra
8  * Modifications Copyright (C) 2019 Bell Canada.
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.controlloop.eventmanager;
25
26 import java.io.Serializable;
27 import java.time.Instant;
28 import java.util.Deque;
29 import java.util.LinkedHashMap;
30 import java.util.List;
31 import java.util.concurrent.CancellationException;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.ConcurrentLinkedDeque;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicReference;
37 import java.util.stream.Collectors;
38 import lombok.AccessLevel;
39 import lombok.Getter;
40 import lombok.ToString;
41 import org.onap.policy.aai.AaiConstants;
42 import org.onap.policy.aai.AaiCqResponse;
43 import org.onap.policy.controlloop.ControlLoopOperation;
44 import org.onap.policy.controlloop.ControlLoopResponse;
45 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
46 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
47 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
48 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineUtil;
49 import org.onap.policy.controlloop.policy.Policy;
50 import org.onap.policy.controlloop.policy.PolicyResult;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 /**
55  * Manages a single Operation for a single event. Once this has been created,
56  * {@link #start()} should be invoked, and then {@link #nextStep()} should be invoked
57  * continually until it returns {@code false}, indicating that all steps have completed.
58  */
59 @ToString(onlyExplicitlyIncluded = true)
60 public class ControlLoopOperationManager2 implements Serializable {
61     private static final long serialVersionUID = -3773199283624595410L;
62     private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationManager2.class);
63     private static final String CL_TIMEOUT_ACTOR = "-CL-TIMEOUT-";
64     public static final String LOCK_ACTOR = "LOCK";
65     public static final String LOCK_OPERATION = "Lock";
66     private static final String GUARD_ACTOR = "GUARD";
67     public static final String VSERVER_VSERVER_NAME = "vserver.vserver-name";
68     public static final String GENERIC_VNF_VNF_NAME = "generic-vnf.vnf-name";
69     public static final String GENERIC_VNF_VNF_ID = "generic-vnf.vnf-id";
70     public static final String PNF_NAME = "pnf.pnf-name";
71
72     // @formatter:off
73     public enum State {
74         ACTIVE,
75         LOCK_DENIED,
76         LOCK_LOST,
77         GUARD_STARTED,
78         GUARD_PERMITTED,
79         GUARD_DENIED,
80         OPERATION_STARTED,
81         OPERATION_SUCCESS,
82         OPERATION_FAILURE,
83         CONTROL_LOOP_TIMEOUT
84     }
85     // @formatter:on
86
87     private final transient ManagerContext operContext;
88     private final transient ControlLoopEventContext eventContext;
89     private final Policy policy;
90
91     @Getter
92     @ToString.Include
93     private State state = State.ACTIVE;
94
95     @ToString.Include
96     private final String requestId;
97
98     @ToString.Include
99     private final String policyId;
100
101     /**
102      * Bumped each time the "complete" callback is invoked by the Actor, provided it's for
103      * this operation.
104      */
105     @ToString.Include
106     private int attempts = 0;
107
108     private final Deque<Operation> operationHistory = new ConcurrentLinkedDeque<>();
109
110     /**
111      * Set to {@code true} to prevent the last item in {@link #operationHistory} from
112      * being included in the outcome of {@link #getHistory()}. Used when the operation
113      * aborts prematurely due to lock-denied, guard-denied, etc.
114      */
115     private boolean holdLast = false;
116
117     /**
118      * Queue of outcomes yet to be processed. Outcomes are added to this each time the
119      * "start" or "complete" callback is invoked.
120      */
121     @Getter(AccessLevel.PROTECTED)
122     private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();
123
124     /**
125      * Used to cancel the running operation.
126      */
127     @Getter(AccessLevel.PROTECTED)
128     private transient CompletableFuture<OperationOutcome> future = null;
129
130     /**
131      * Target entity. Determined after the lock is granted, though it may require the
132      * custom query to be performed first.
133      */
134     @Getter
135     private String targetEntity;
136
137     @Getter(AccessLevel.PROTECTED)
138     private final transient ControlLoopOperationParams params;
139     private final transient PipelineUtil taskUtil;
140
141     @Getter
142     private ControlLoopResponse controlLoopResponse;
143
144     /**
145      * Time when the lock was first requested.
146      */
147     private transient AtomicReference<Instant> lockStart = new AtomicReference<>();
148
149     // values extracted from the policy
150     @Getter
151     private final String actor;
152     @Getter
153     private final String operation;
154
155
156     /**
157      * Construct an instance.
158      *
159      * @param operContext this operation's context
160      * @param context event context
161      * @param policy operation's policy
162      * @param executor executor for the Operation
163      */
164     public ControlLoopOperationManager2(ManagerContext operContext, ControlLoopEventContext context, Policy policy,
165                     Executor executor) {
166
167         this.operContext = operContext;
168         this.eventContext = context;
169         this.policy = policy;
170         this.requestId = context.getEvent().getRequestId().toString();
171         this.policyId = "" + policy.getId();
172         this.actor = policy.getActor();
173         this.operation = policy.getRecipe();
174
175         // @formatter:off
176         params = ControlLoopOperationParams.builder()
177                         .actorService(operContext.getActorService())
178                         .actor(actor)
179                         .operation(operation)
180                         .context(context)
181                         .executor(executor)
182                         .target(policy.getTarget())
183                         .startCallback(this::onStart)
184                         .completeCallback(this::onComplete)
185                         .build();
186         // @formatter:on
187
188         taskUtil = new PipelineUtil(params);
189     }
190
191     //
192     // Internal class used for tracking
193     //
194     @Getter
195     @ToString
196     private class Operation implements Serializable {
197         private static final long serialVersionUID = 1L;
198
199         private int attempt;
200         private PolicyResult policyResult;
201         private ControlLoopOperation clOperation;
202         private ControlLoopResponse clResponse;
203
204         /**
205          * Constructs the object.
206          *
207          * @param outcome outcome of the operation
208          */
209         public Operation(OperationOutcome outcome) {
210             attempt = ControlLoopOperationManager2.this.attempts;
211             policyResult = outcome.getResult();
212             clOperation = outcome.toControlLoopOperation();
213             clOperation.setTarget(policy.getTarget().toString());
214             clResponse = outcome.getControlLoopResponse();
215
216             if (outcome.getEnd() == null) {
217                 clOperation.setOutcome("Started");
218             } else if (clOperation.getOutcome() == null) {
219                 clOperation.setOutcome("");
220             }
221         }
222     }
223
224     /**
225      * Start the operation, first acquiring any locks that are needed. This should not
226      * throw any exceptions, but will, instead, invoke the callbacks with exceptions.
227      *
228      * @param remainingMs time remaining, in milliseconds, for the control loop
229      */
230     @SuppressWarnings("unchecked")
231     public synchronized void start(long remainingMs) {
232         // this is synchronized while we update "future"
233
234         try {
235             // provide a default, in case something fails before requestLock() is called
236             lockStart.set(Instant.now());
237
238             // @formatter:off
239             future = taskUtil.sequence(
240                 this::detmTarget,
241                 this::requestLock,
242                 this::startOperation);
243             // @formatter:on
244
245             // handle any exceptions that may be thrown, set timeout, and handle timeout
246
247             // @formatter:off
248             future.exceptionally(this::handleException)
249                     .orTimeout(remainingMs, TimeUnit.MILLISECONDS)
250                     .exceptionally(this::handleTimeout);
251             // @formatter:on
252
253         } catch (RuntimeException e) {
254             handleException(e);
255         }
256     }
257
258     /**
259      * Start the operation, after the lock has been acquired.
260      *
261      * @return
262      */
263     private CompletableFuture<OperationOutcome> startOperation() {
264         // @formatter:off
265         ControlLoopOperationParams params2 = params.toBuilder()
266                     .payload(new LinkedHashMap<>())
267                     .retry(policy.getRetry())
268                     .timeoutSec(policy.getTimeout())
269                     .targetEntity(targetEntity)
270                     .build();
271         // @formatter:on
272
273         if (policy.getPayload() != null) {
274             params2.getPayload().putAll(policy.getPayload());
275         }
276
277         return params2.start();
278     }
279
280     /**
281      * Handles exceptions that may be generated.
282      *
283      * @param thrown exception that was generated
284      * @return {@code null}
285      */
286     private OperationOutcome handleException(Throwable thrown) {
287         if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
288             return null;
289         }
290
291         logger.warn("{}.{}: exception starting operation for {}", actor, operation, requestId, thrown);
292         OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
293         outcome.setStart(lockStart.get());
294         outcome.setEnd(Instant.now());
295         outcome.setFinalOutcome(true);
296         onComplete(outcome);
297
298         // this outcome is not used so just return "null"
299         return null;
300     }
301
302     /**
303      * Handles control loop timeout exception.
304      *
305      * @param thrown exception that was generated
306      * @return {@code null}
307      */
308     private OperationOutcome handleTimeout(Throwable thrown) {
309         logger.warn("{}.{}: control loop timeout for {}", actor, operation, requestId, thrown);
310
311         OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
312         outcome.setActor(CL_TIMEOUT_ACTOR);
313         outcome.setOperation(null);
314         outcome.setStart(lockStart.get());
315         outcome.setEnd(Instant.now());
316         outcome.setFinalOutcome(true);
317         onComplete(outcome);
318
319         // cancel the operation, if it's still running
320         future.cancel(false);
321
322         // this outcome is not used so just return "null"
323         return null;
324     }
325
326     /**
327      * Cancels the operation.
328      */
329     public void cancel() {
330         synchronized (this) {
331             if (future == null) {
332                 return;
333             }
334         }
335
336         future.cancel(false);
337     }
338
339     /**
340      * Requests a lock on the {@link #targetEntity}.
341      *
342      * @return a future to await the lock
343      */
344     private CompletableFuture<OperationOutcome> requestLock() {
345         /*
346          * Failures are handled via the callback, and successes are discarded by
347          * sequence(), without passing them to onComplete().
348          *
349          * Return a COPY of the future so that if we try to cancel it, we'll only cancel
350          * the copy, not the original. This is done by tacking thenApply() onto the end.
351          */
352         lockStart.set(Instant.now());
353         return operContext.requestLock(targetEntity, this::lockUnavailable).thenApply(outcome -> outcome);
354     }
355
356     /**
357      * Indicates that the lock on the target entity is unavailable.
358      *
359      * @param outcome lock outcome
360      */
361     private void lockUnavailable(OperationOutcome outcome) {
362
363         // Note: NEVER invoke onStart() for locks; only invoke onComplete()
364         onComplete(outcome);
365
366         /*
367          * Now that we've added the lock outcome to the queue, ensure the future is
368          * canceled, which may, itself, generate an operation outcome.
369          */
370         cancel();
371     }
372
373     /**
374      * Handles responses provided via the "start" callback. Note: this is never be invoked
375      * for locks; only {@link #onComplete(OperationOutcome)} is invoked for locks.
376      *
377      * @param outcome outcome provided to the callback
378      */
379     private void onStart(OperationOutcome outcome) {
380         if (outcome.isFor(actor, operation) || GUARD_ACTOR.equals(outcome.getActor())) {
381             addOutcome(outcome);
382         }
383     }
384
385     /**
386      * Handles responses provided via the "complete" callback. Note: this is never invoked
387      * for "successful" locks.
388      *
389      * @param outcome outcome provided to the callback
390      */
391     private void onComplete(OperationOutcome outcome) {
392
393         switch (outcome.getActor()) {
394             case LOCK_ACTOR:
395             case GUARD_ACTOR:
396             case CL_TIMEOUT_ACTOR:
397                 addOutcome(outcome);
398                 break;
399
400             default:
401                 if (outcome.isFor(actor, operation)) {
402                     addOutcome(outcome);
403                 }
404                 break;
405         }
406     }
407
408     /**
409      * Adds an outcome to {@link #outcomes}.
410      *
411      * @param outcome outcome to be added
412      */
413     private synchronized void addOutcome(OperationOutcome outcome) {
414         /*
415          * This is synchronized to prevent nextStep() from invoking processOutcome() at
416          * the same time.
417          */
418
419         logger.debug("added outcome={} for {}", outcome, requestId);
420         outcomes.add(outcome);
421
422         if (outcomes.peekFirst() == outcomes.peekLast()) {
423             // this is the first outcome in the queue - process it
424             processOutcome();
425         }
426     }
427
428     /**
429      * Looks for the next step in the queue.
430      *
431      * @return {@code true} if more responses are expected, {@code false} otherwise
432      */
433     public synchronized boolean nextStep() {
434         switch (state) {
435             case LOCK_DENIED:
436             case LOCK_LOST:
437             case GUARD_DENIED:
438             case CONTROL_LOOP_TIMEOUT:
439                 holdLast = false;
440                 return false;
441             default:
442                 break;
443         }
444
445         OperationOutcome outcome = outcomes.peek();
446         if (outcome == null) {
447             // empty queue
448             return true;
449         }
450
451         if (outcome.isFinalOutcome() && outcome.isFor(actor, operation)) {
452             controlLoopResponse = null;
453             return false;
454         }
455
456         // first item has been processed, remove it
457         outcomes.remove();
458         if (!outcomes.isEmpty()) {
459             // have a new "first" item - process it
460             processOutcome();
461         }
462
463         return true;
464     }
465
466     /**
467      * Processes the first item in {@link #outcomes}. Sets the state, increments
468      * {@link #attempts}, if appropriate, and stores the operation history in the DB.
469      */
470     private synchronized void processOutcome() {
471         OperationOutcome outcome = outcomes.peek();
472         logger.debug("process outcome={} for {}", outcome, requestId);
473
474         controlLoopResponse = null;
475
476         switch (outcome.getActor()) {
477
478             case CL_TIMEOUT_ACTOR:
479                 state = State.CONTROL_LOOP_TIMEOUT;
480                 break;
481
482             case LOCK_ACTOR:
483                 // lock is no longer available
484                 if (state == State.ACTIVE) {
485                     state = State.LOCK_DENIED;
486                     storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Lock");
487                 } else {
488                     state = State.LOCK_LOST;
489                     storeFailureInDataBase(outcome, PolicyResult.FAILURE, "Operation aborted by Lock");
490                 }
491                 break;
492
493             case GUARD_ACTOR:
494                 if (outcome.getEnd() == null) {
495                     state = State.GUARD_STARTED;
496                 } else if (outcome.getResult() == PolicyResult.SUCCESS) {
497                     state = State.GUARD_PERMITTED;
498                 } else {
499                     state = State.GUARD_DENIED;
500                     storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Guard");
501                 }
502                 break;
503
504             default:
505                 if (outcome.getEnd() == null) {
506                     // operation started
507                     ++attempts;
508                     state = State.OPERATION_STARTED;
509
510                 } else {
511                     /*
512                      * Operation completed. If the last entry was a "start" (i.e., "end" field
513                      * is null), then replace it. Otherwise, just add the completion.
514                      */
515                     state = (outcome.getResult() == PolicyResult.SUCCESS ? State.OPERATION_SUCCESS
516                                     : State.OPERATION_FAILURE);
517                     controlLoopResponse = outcome.getControlLoopResponse();
518                     if (!operationHistory.isEmpty() && operationHistory.peekLast().getClOperation().getEnd() == null) {
519                         operationHistory.removeLast();
520                     }
521                 }
522
523                 operationHistory.add(new Operation(outcome));
524                 storeOperationInDataBase();
525                 break;
526         }
527
528         // indicate that this has changed
529         operContext.updated(this);
530     }
531
532     /**
533      * Get the operation, as a message.
534      *
535      * @return the operation, as a message
536      */
537     public String getOperationMessage() {
538         Operation last = operationHistory.peekLast();
539         return (last == null ? null : last.getClOperation().toMessage());
540     }
541
542     /**
543      * Gets the operation result.
544      *
545      * @return the operation result
546      */
547     public PolicyResult getOperationResult() {
548         Operation last = operationHistory.peekLast();
549         return (last == null ? PolicyResult.FAILURE_EXCEPTION : last.getPolicyResult());
550     }
551
552     /**
553      * Get the latest operation history.
554      *
555      * @return the latest operation history
556      */
557     public String getOperationHistory() {
558         Operation last = operationHistory.peekLast();
559         return (last == null ? null : last.clOperation.toHistory());
560     }
561
562     /**
563      * Get the history.
564      *
565      * @return the list of control loop operations
566      */
567     public List<ControlLoopOperation> getHistory() {
568         Operation last = (holdLast ? operationHistory.removeLast() : null);
569
570         List<ControlLoopOperation> result = operationHistory.stream().map(Operation::getClOperation)
571                         .map(ControlLoopOperation::new).collect(Collectors.toList());
572
573         if (last != null) {
574             operationHistory.add(last);
575         }
576
577         return result;
578     }
579
580     /**
581      * Stores a failure in the DB.
582      *
583      * @param outcome operation outcome
584      * @param result result to put into the DB
585      * @param message message to put into the DB
586      */
587     private void storeFailureInDataBase(OperationOutcome outcome, PolicyResult result, String message) {
588         // don't include this in history yet
589         holdLast = true;
590
591         outcome.setActor(actor);
592         outcome.setOperation(operation);
593         outcome.setMessage(message);
594         outcome.setResult(result);
595
596         operationHistory.add(new Operation(outcome));
597         storeOperationInDataBase();
598     }
599
600     /**
601      * Stores the latest operation in the DB.
602      */
603     private void storeOperationInDataBase() {
604         operContext.getDataManager().store(requestId, eventContext.getEvent(), targetEntity,
605                         operationHistory.peekLast().getClOperation());
606     }
607
608     /**
609      * Determines the target entity.
610      *
611      * @return a future to determine the target entity, or {@code null} if the entity has
612      *         already been determined
613      */
614     protected CompletableFuture<OperationOutcome> detmTarget() {
615         if (policy.getTarget() == null) {
616             throw new IllegalArgumentException("The target is null");
617         }
618
619         if (policy.getTarget().getType() == null) {
620             throw new IllegalArgumentException("The target type is null");
621         }
622
623         switch (policy.getTarget().getType()) {
624             case PNF:
625                 return detmPnfTarget();
626             case VM:
627             case VNF:
628             case VFMODULE:
629                 return detmVfModuleTarget();
630             default:
631                 throw new IllegalArgumentException("The target type is not supported");
632         }
633     }
634
635     /**
636      * Determines the PNF target entity.
637      *
638      * @return a future to determine the target entity, or {@code null} if the entity has
639      *         already been determined
640      */
641     private CompletableFuture<OperationOutcome> detmPnfTarget() {
642         if (!PNF_NAME.equalsIgnoreCase(eventContext.getEvent().getTarget())) {
643             throw new IllegalArgumentException("Target does not match target type");
644         }
645
646         targetEntity = eventContext.getEnrichment().get(PNF_NAME);
647         if (targetEntity == null) {
648             throw new IllegalArgumentException("AAI section is missing " + PNF_NAME);
649         }
650
651         return null;
652     }
653
654     /**
655      * Determines the VF Module target entity.
656      *
657      * @return a future to determine the target entity, or {@code null} if the entity has
658      *         already been determined
659      */
660     private CompletableFuture<OperationOutcome> detmVfModuleTarget() {
661         String targetFieldName = eventContext.getEvent().getTarget();
662         if (targetFieldName == null) {
663             throw new IllegalArgumentException("Target is null");
664         }
665
666         switch (targetFieldName.toLowerCase()) {
667             case VSERVER_VSERVER_NAME:
668                 targetEntity = eventContext.getEnrichment().get(VSERVER_VSERVER_NAME);
669                 break;
670             case GENERIC_VNF_VNF_ID:
671                 targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
672                 break;
673             case GENERIC_VNF_VNF_NAME:
674                 return detmVnfName();
675             default:
676                 throw new IllegalArgumentException("Target does not match target type");
677         }
678
679         if (targetEntity == null) {
680             throw new IllegalArgumentException("Enrichment data is missing " + targetFieldName);
681         }
682
683         return null;
684     }
685
686     /**
687      * Determines the VNF Name target entity.
688      *
689      * @return a future to determine the target entity, or {@code null} if the entity has
690      *         already been determined
691      */
692     @SuppressWarnings("unchecked")
693     private CompletableFuture<OperationOutcome> detmVnfName() {
694         // if the onset is enriched with the vnf-id, we don't need an A&AI response
695         targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
696         if (targetEntity != null) {
697             return null;
698         }
699
700         // vnf-id was not in the onset - obtain it via the custom query
701
702         // @formatter:off
703         ControlLoopOperationParams cqparams = params.toBuilder()
704                         .actor(AaiConstants.ACTOR_NAME)
705                         .operation(AaiCqResponse.OPERATION)
706                         .targetEntity("")
707                         .build();
708         // @formatter:on
709
710         // perform custom query and then extract the VNF ID from it
711         return taskUtil.sequence(() -> eventContext.obtain(AaiCqResponse.CONTEXT_KEY, cqparams),
712                         this::extractVnfFromCq);
713     }
714
715     /**
716      * Extracts the VNF Name target entity from the custom query data.
717      *
718      * @return {@code null}
719      */
720     private CompletableFuture<OperationOutcome> extractVnfFromCq() {
721         // already have the CQ data
722         AaiCqResponse cq = eventContext.getProperty(AaiCqResponse.CONTEXT_KEY);
723         if (cq.getDefaultGenericVnf() == null) {
724             throw new IllegalArgumentException("No vnf-id found");
725         }
726
727         targetEntity = cq.getDefaultGenericVnf().getVnfId();
728         if (targetEntity == null) {
729             throw new IllegalArgumentException("No vnf-id found");
730         }
731
732         return null;
733     }
734 }