5e8cbbc2e296856e5b7fb16177b34ae61d94dccd
[policy/drools-applications.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2019 Huawei Technologies Co., Ltd. All rights reserved.
7  * Modifications Copyright (C) 2019 Tech Mahindra
8  * Modifications Copyright (C) 2019 Bell Canada.
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.controlloop.eventmanager;
25
26 import java.io.Serializable;
27 import java.time.Instant;
28 import java.util.Deque;
29 import java.util.LinkedHashMap;
30 import java.util.List;
31 import java.util.concurrent.CancellationException;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.ConcurrentLinkedDeque;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicReference;
37 import java.util.stream.Collectors;
38 import lombok.AccessLevel;
39 import lombok.Getter;
40 import lombok.ToString;
41 import org.onap.policy.aai.AaiConstants;
42 import org.onap.policy.aai.AaiCqResponse;
43 import org.onap.policy.controlloop.ControlLoopOperation;
44 import org.onap.policy.controlloop.ControlLoopResponse;
45 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
46 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
47 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
48 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineUtil;
49 import org.onap.policy.controlloop.policy.Policy;
50 import org.onap.policy.controlloop.policy.PolicyResult;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 /**
55  * Manages a single Operation for a single event. Once this has been created,
56  * {@link #start()} should be invoked, and then {@link #nextStep()} should be invoked
57  * continually until it returns {@code false}, indicating that all steps have completed.
58  */
59 @ToString(onlyExplicitlyIncluded = true)
60 public class ControlLoopOperationManager2 implements Serializable {
61     private static final long serialVersionUID = -3773199283624595410L;
62     private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationManager2.class);
63     private static final String CL_TIMEOUT_ACTOR = "-CL-TIMEOUT-";
64     public static final String LOCK_ACTOR = "LOCK";
65     public static final String LOCK_OPERATION = "Lock";
66     private static final String GUARD_ACTOR = "GUARD";
67     public static final String VSERVER_VSERVER_NAME = "vserver.vserver-name";
68     public static final String GENERIC_VNF_VNF_NAME = "generic-vnf.vnf-name";
69     public static final String GENERIC_VNF_VNF_ID = "generic-vnf.vnf-id";
70     public static final String PNF_NAME = "pnf.pnf-name";
71
72     // @formatter:off
73     public enum State {
74         ACTIVE,
75         LOCK_DENIED,
76         LOCK_LOST,
77         GUARD_STARTED,
78         GUARD_PERMITTED,
79         GUARD_DENIED,
80         OPERATION_STARTED,
81         OPERATION_SUCCESS,
82         OPERATION_FAILURE,
83         CONTROL_LOOP_TIMEOUT
84     }
85     // @formatter:on
86
87     private final transient ManagerContext operContext;
88     private final transient ControlLoopEventContext eventContext;
89     private final Policy policy;
90
91     @Getter
92     @ToString.Include
93     private State state = State.ACTIVE;
94
95     @ToString.Include
96     private final String requestId;
97
98     @ToString.Include
99     private final String policyId;
100
101     /**
102      * Bumped each time the "complete" callback is invoked by the Actor, provided it's for
103      * this operation.
104      */
105     @ToString.Include
106     private int attempts = 0;
107
108     private final Deque<Operation> operationHistory = new ConcurrentLinkedDeque<>();
109
110     /**
111      * Set to {@code true} to prevent the last item in {@link #operationHistory} from
112      * being included in the outcome of {@link #getHistory()}. Used when the operation
113      * aborts prematurely due to lock-denied, guard-denied, etc.
114      */
115     private boolean holdLast = false;
116
117     /**
118      * Queue of outcomes yet to be processed. Outcomes are added to this each time the
119      * "start" or "complete" callback is invoked.
120      */
121     @Getter(AccessLevel.PROTECTED)
122     private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();
123
124     /**
125      * Used to cancel the running operation.
126      */
127     @Getter(AccessLevel.PROTECTED)
128     private transient CompletableFuture<OperationOutcome> future = null;
129
130     /**
131      * Target entity. Determined after the lock is granted, though it may require the
132      * custom query to be performed first.
133      */
134     @Getter
135     private String targetEntity;
136
137     @Getter(AccessLevel.PROTECTED)
138     private final transient ControlLoopOperationParams params;
139     private final transient PipelineUtil taskUtil;
140
141     @Getter
142     private ControlLoopResponse controlLoopResponse;
143
144     /**
145      * Time when the lock was first requested.
146      */
147     private transient AtomicReference<Instant> lockStart = new AtomicReference<>();
148
149     // values extracted from the policy
150     @Getter
151     private final String actor;
152     @Getter
153     private final String operation;
154
155
156     /**
157      * Construct an instance.
158      *
159      * @param operContext this operation's context
160      * @param context event context
161      * @param policy operation's policy
162      * @param executor executor for the Operation
163      */
164     public ControlLoopOperationManager2(ManagerContext operContext, ControlLoopEventContext context, Policy policy,
165                     Executor executor) {
166
167         this.operContext = operContext;
168         this.eventContext = context;
169         this.policy = policy;
170         this.requestId = context.getEvent().getRequestId().toString();
171         this.policyId = "" + policy.getId();
172         this.actor = policy.getActor();
173         this.operation = policy.getRecipe();
174
175         // @formatter:off
176         params = ControlLoopOperationParams.builder()
177                         .actorService(operContext.getActorService())
178                         .actor(actor)
179                         .operation(operation)
180                         .context(context)
181                         .executor(executor)
182                         .target(policy.getTarget())
183                         .startCallback(this::onStart)
184                         .completeCallback(this::onComplete)
185                         .build();
186         // @formatter:on
187
188         taskUtil = new PipelineUtil(params);
189     }
190
191     //
192     // Internal class used for tracking
193     //
194     @Getter
195     @ToString
196     private class Operation implements Serializable {
197         private static final long serialVersionUID = 1L;
198
199         private int attempt;
200         private PolicyResult policyResult;
201         private ControlLoopOperation clOperation;
202         private ControlLoopResponse clResponse;
203
204         /**
205          * Constructs the object.
206          *
207          * @param outcome outcome of the operation
208          */
209         public Operation(OperationOutcome outcome) {
210             attempt = ControlLoopOperationManager2.this.attempts;
211             policyResult = outcome.getResult();
212             clOperation = outcome.toControlLoopOperation();
213             clOperation.setTarget(policy.getTarget().toString());
214             clResponse = outcome.getControlLoopResponse();
215         }
216     }
217
218     /**
219      * Start the operation, first acquiring any locks that are needed. This should not
220      * throw any exceptions, but will, instead, invoke the callbacks with exceptions.
221      *
222      * @param remainingMs time remaining, in milliseconds, for the control loop
223      */
224     @SuppressWarnings("unchecked")
225     public synchronized void start(long remainingMs) {
226         // this is synchronized while we update "future"
227
228         try {
229             // provide a default, in case something fails before requestLock() is called
230             lockStart.set(Instant.now());
231
232             // @formatter:off
233             future = taskUtil.sequence(
234                 this::detmTarget,
235                 this::requestLock,
236                 this::startOperation);
237             // @formatter:on
238
239             // handle any exceptions that may be thrown, set timeout, and handle timeout
240
241             // @formatter:off
242             future.exceptionally(this::handleException)
243                     .orTimeout(remainingMs, TimeUnit.MILLISECONDS)
244                     .exceptionally(this::handleTimeout);
245             // @formatter:on
246
247         } catch (RuntimeException e) {
248             handleException(e);
249         }
250     }
251
252     /**
253      * Start the operation, after the lock has been acquired.
254      *
255      * @return
256      */
257     private CompletableFuture<OperationOutcome> startOperation() {
258         // @formatter:off
259         ControlLoopOperationParams params2 = params.toBuilder()
260                     .payload(new LinkedHashMap<>())
261                     .retry(policy.getRetry())
262                     .timeoutSec(policy.getTimeout())
263                     .targetEntity(targetEntity)
264                     .build();
265         // @formatter:on
266
267         if (policy.getPayload() != null) {
268             params2.getPayload().putAll(policy.getPayload());
269         }
270
271         return params2.start();
272     }
273
274     /**
275      * Handles exceptions that may be generated.
276      *
277      * @param thrown exception that was generated
278      * @return {@code null}
279      */
280     private OperationOutcome handleException(Throwable thrown) {
281         if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
282             return null;
283         }
284
285         logger.warn("{}.{}: exception starting operation for {}", actor, operation, requestId, thrown);
286         OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
287         outcome.setStart(lockStart.get());
288         outcome.setEnd(Instant.now());
289         outcome.setFinalOutcome(true);
290         onComplete(outcome);
291
292         // this outcome is not used so just return "null"
293         return null;
294     }
295
296     /**
297      * Handles control loop timeout exception.
298      *
299      * @param thrown exception that was generated
300      * @return {@code null}
301      */
302     private OperationOutcome handleTimeout(Throwable thrown) {
303         logger.warn("{}.{}: control loop timeout for {}", actor, operation, requestId, thrown);
304
305         OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
306         outcome.setActor(CL_TIMEOUT_ACTOR);
307         outcome.setOperation(null);
308         outcome.setStart(lockStart.get());
309         outcome.setEnd(Instant.now());
310         outcome.setFinalOutcome(true);
311         onComplete(outcome);
312
313         // cancel the operation, if it's still running
314         future.cancel(false);
315
316         // this outcome is not used so just return "null"
317         return null;
318     }
319
320     /**
321      * Cancels the operation.
322      */
323     public void cancel() {
324         synchronized (this) {
325             if (future == null) {
326                 return;
327             }
328         }
329
330         future.cancel(false);
331     }
332
333     /**
334      * Requests a lock on the {@link #targetEntity}.
335      *
336      * @return a future to await the lock
337      */
338     private CompletableFuture<OperationOutcome> requestLock() {
339         /*
340          * Failures are handled via the callback, and successes are discarded by
341          * sequence(), without passing them to onComplete().
342          *
343          * Return a COPY of the future so that if we try to cancel it, we'll only cancel
344          * the copy, not the original. This is done by tacking thenApply() onto the end.
345          */
346         lockStart.set(Instant.now());
347         return operContext.requestLock(targetEntity, this::lockUnavailable).thenApply(outcome -> outcome);
348     }
349
350     /**
351      * Indicates that the lock on the target entity is unavailable.
352      *
353      * @param outcome lock outcome
354      */
355     private void lockUnavailable(OperationOutcome outcome) {
356
357         // Note: NEVER invoke onStart() for locks; only invoke onComplete()
358         onComplete(outcome);
359
360         /*
361          * Now that we've added the lock outcome to the queue, ensure the future is
362          * canceled, which may, itself, generate an operation outcome.
363          */
364         cancel();
365     }
366
367     /**
368      * Handles responses provided via the "start" callback. Note: this is never be invoked
369      * for locks; only {@link #onComplete(OperationOutcome)} is invoked for locks.
370      *
371      * @param outcome outcome provided to the callback
372      */
373     private void onStart(OperationOutcome outcome) {
374         if (outcome.isFor(actor, operation) || GUARD_ACTOR.equals(outcome.getActor())) {
375             addOutcome(outcome);
376         }
377     }
378
379     /**
380      * Handles responses provided via the "complete" callback. Note: this is never invoked
381      * for "successful" locks.
382      *
383      * @param outcome outcome provided to the callback
384      */
385     private void onComplete(OperationOutcome outcome) {
386
387         switch (outcome.getActor()) {
388             case LOCK_ACTOR:
389             case GUARD_ACTOR:
390             case CL_TIMEOUT_ACTOR:
391                 addOutcome(outcome);
392                 break;
393
394             default:
395                 if (outcome.isFor(actor, operation)) {
396                     addOutcome(outcome);
397                 }
398                 break;
399         }
400     }
401
402     /**
403      * Adds an outcome to {@link #outcomes}.
404      *
405      * @param outcome outcome to be added
406      */
407     private synchronized void addOutcome(OperationOutcome outcome) {
408         /*
409          * This is synchronized to prevent nextStep() from invoking processOutcome() at
410          * the same time.
411          */
412
413         logger.debug("added outcome={} for {}", outcome, requestId);
414         outcomes.add(outcome);
415
416         if (outcomes.peekFirst() == outcomes.peekLast()) {
417             // this is the first outcome in the queue - process it
418             processOutcome();
419         }
420     }
421
422     /**
423      * Looks for the next step in the queue.
424      *
425      * @return {@code true} if more responses are expected, {@code false} otherwise
426      */
427     public synchronized boolean nextStep() {
428         switch (state) {
429             case LOCK_DENIED:
430             case LOCK_LOST:
431             case GUARD_DENIED:
432             case CONTROL_LOOP_TIMEOUT:
433                 holdLast = false;
434                 return false;
435             default:
436                 break;
437         }
438
439         OperationOutcome outcome = outcomes.peek();
440         if (outcome == null) {
441             // empty queue
442             return true;
443         }
444
445         if (outcome.isFinalOutcome() && outcome.isFor(actor, operation)) {
446             controlLoopResponse = null;
447             return false;
448         }
449
450         // first item has been processed, remove it
451         outcomes.remove();
452         if (!outcomes.isEmpty()) {
453             // have a new "first" item - process it
454             processOutcome();
455         }
456
457         return true;
458     }
459
460     /**
461      * Processes the first item in {@link #outcomes}. Sets the state, increments
462      * {@link #attempts}, if appropriate, and stores the operation history in the DB.
463      */
464     private synchronized void processOutcome() {
465         OperationOutcome outcome = outcomes.peek();
466         logger.debug("process outcome={} for {}", outcome, requestId);
467
468         controlLoopResponse = null;
469
470         switch (outcome.getActor()) {
471
472             case CL_TIMEOUT_ACTOR:
473                 state = State.CONTROL_LOOP_TIMEOUT;
474                 break;
475
476             case LOCK_ACTOR:
477                 // lock is no longer available
478                 if (state == State.ACTIVE) {
479                     state = State.LOCK_DENIED;
480                     storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Lock");
481                 } else {
482                     state = State.LOCK_LOST;
483                     storeFailureInDataBase(outcome, PolicyResult.FAILURE, "Operation aborted by Lock");
484                 }
485                 break;
486
487             case GUARD_ACTOR:
488                 if (outcome.getEnd() == null) {
489                     state = State.GUARD_STARTED;
490                 } else if (outcome.getResult() == PolicyResult.SUCCESS) {
491                     state = State.GUARD_PERMITTED;
492                 } else {
493                     state = State.GUARD_DENIED;
494                     storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Guard");
495                 }
496                 break;
497
498             default:
499                 if (outcome.getEnd() == null) {
500                     // operation started
501                     ++attempts;
502                     state = State.OPERATION_STARTED;
503
504                 } else {
505                     /*
506                      * Operation completed. If the last entry was a "start" (i.e., "end" field
507                      * is null), then replace it. Otherwise, just add the completion.
508                      */
509                     state = (outcome.getResult() == PolicyResult.SUCCESS ? State.OPERATION_SUCCESS
510                                     : State.OPERATION_FAILURE);
511                     controlLoopResponse = outcome.getControlLoopResponse();
512                     if (!operationHistory.isEmpty() && operationHistory.peekLast().getClOperation().getEnd() == null) {
513                         operationHistory.removeLast();
514                     }
515                 }
516
517                 operationHistory.add(new Operation(outcome));
518                 storeOperationInDataBase();
519                 break;
520         }
521
522         // indicate that this has changed
523         operContext.updated(this);
524     }
525
526     /**
527      * Get the operation, as a message.
528      *
529      * @return the operation, as a message
530      */
531     public String getOperationMessage() {
532         Operation last = operationHistory.peekLast();
533         return (last == null ? null : last.getClOperation().toMessage());
534     }
535
536     /**
537      * Gets the operation result.
538      *
539      * @return the operation result
540      */
541     public PolicyResult getOperationResult() {
542         Operation last = operationHistory.peekLast();
543         return (last == null ? PolicyResult.FAILURE_EXCEPTION : last.getPolicyResult());
544     }
545
546     /**
547      * Get the latest operation history.
548      *
549      * @return the latest operation history
550      */
551     public String getOperationHistory() {
552         Operation last = operationHistory.peekLast();
553         return (last == null ? null : last.clOperation.toHistory());
554     }
555
556     /**
557      * Get the history.
558      *
559      * @return the list of control loop operations
560      */
561     public List<ControlLoopOperation> getHistory() {
562         Operation last = (holdLast ? operationHistory.removeLast() : null);
563
564         List<ControlLoopOperation> result = operationHistory.stream().map(Operation::getClOperation)
565                         .map(ControlLoopOperation::new).collect(Collectors.toList());
566
567         if (last != null) {
568             operationHistory.add(last);
569         }
570
571         return result;
572     }
573
574     /**
575      * Stores a failure in the DB.
576      *
577      * @param outcome operation outcome
578      * @param result result to put into the DB
579      * @param message message to put into the DB
580      */
581     private void storeFailureInDataBase(OperationOutcome outcome, PolicyResult result, String message) {
582         // don't include this in history yet
583         holdLast = true;
584
585         outcome.setActor(actor);
586         outcome.setOperation(operation);
587         outcome.setMessage(message);
588         outcome.setResult(result);
589
590         operationHistory.add(new Operation(outcome));
591         storeOperationInDataBase();
592     }
593
594     /**
595      * Stores the latest operation in the DB.
596      */
597     private void storeOperationInDataBase() {
598         operContext.getDataManager().store(requestId, eventContext.getEvent(), targetEntity,
599                         operationHistory.peekLast().getClOperation());
600     }
601
602     /**
603      * Determines the target entity.
604      *
605      * @return a future to determine the target entity, or {@code null} if the entity has
606      *         already been determined
607      */
608     protected CompletableFuture<OperationOutcome> detmTarget() {
609         if (policy.getTarget() == null) {
610             throw new IllegalArgumentException("The target is null");
611         }
612
613         if (policy.getTarget().getType() == null) {
614             throw new IllegalArgumentException("The target type is null");
615         }
616
617         switch (policy.getTarget().getType()) {
618             case PNF:
619                 return detmPnfTarget();
620             case VM:
621             case VNF:
622             case VFMODULE:
623                 return detmVfModuleTarget();
624             default:
625                 throw new IllegalArgumentException("The target type is not supported");
626         }
627     }
628
629     /**
630      * Determines the PNF target entity.
631      *
632      * @return a future to determine the target entity, or {@code null} if the entity has
633      *         already been determined
634      */
635     private CompletableFuture<OperationOutcome> detmPnfTarget() {
636         if (!PNF_NAME.equalsIgnoreCase(eventContext.getEvent().getTarget())) {
637             throw new IllegalArgumentException("Target does not match target type");
638         }
639
640         targetEntity = eventContext.getEnrichment().get(PNF_NAME);
641         if (targetEntity == null) {
642             throw new IllegalArgumentException("AAI section is missing " + PNF_NAME);
643         }
644
645         return null;
646     }
647
648     /**
649      * Determines the VF Module target entity.
650      *
651      * @return a future to determine the target entity, or {@code null} if the entity has
652      *         already been determined
653      */
654     private CompletableFuture<OperationOutcome> detmVfModuleTarget() {
655         String targetFieldName = eventContext.getEvent().getTarget();
656         if (targetFieldName == null) {
657             throw new IllegalArgumentException("Target is null");
658         }
659
660         switch (targetFieldName.toLowerCase()) {
661             case VSERVER_VSERVER_NAME:
662                 targetEntity = eventContext.getEnrichment().get(VSERVER_VSERVER_NAME);
663                 break;
664             case GENERIC_VNF_VNF_ID:
665                 targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
666                 break;
667             case GENERIC_VNF_VNF_NAME:
668                 return detmVnfName();
669             default:
670                 throw new IllegalArgumentException("Target does not match target type");
671         }
672
673         if (targetEntity == null) {
674             throw new IllegalArgumentException("Enrichment data is missing " + targetFieldName);
675         }
676
677         return null;
678     }
679
680     /**
681      * Determines the VNF Name target entity.
682      *
683      * @return a future to determine the target entity, or {@code null} if the entity has
684      *         already been determined
685      */
686     @SuppressWarnings("unchecked")
687     private CompletableFuture<OperationOutcome> detmVnfName() {
688         // if the onset is enriched with the vnf-id, we don't need an A&AI response
689         targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
690         if (targetEntity != null) {
691             return null;
692         }
693
694         // vnf-id was not in the onset - obtain it via the custom query
695
696         // @formatter:off
697         ControlLoopOperationParams cqparams = params.toBuilder()
698                         .actor(AaiConstants.ACTOR_NAME)
699                         .operation(AaiCqResponse.OPERATION)
700                         .targetEntity("")
701                         .build();
702         // @formatter:on
703
704         // perform custom query and then extract the VNF ID from it
705         return taskUtil.sequence(() -> eventContext.obtain(AaiCqResponse.CONTEXT_KEY, cqparams),
706                         this::extractVnfFromCq);
707     }
708
709     /**
710      * Extracts the VNF Name target entity from the custom query data.
711      *
712      * @return {@code null}
713      */
714     private CompletableFuture<OperationOutcome> extractVnfFromCq() {
715         // already have the CQ data
716         AaiCqResponse cq = eventContext.getProperty(AaiCqResponse.CONTEXT_KEY);
717         if (cq.getDefaultGenericVnf() == null) {
718             throw new IllegalArgumentException("No vnf-id found");
719         }
720
721         targetEntity = cq.getDefaultGenericVnf().getVnfId();
722         if (targetEntity == null) {
723             throw new IllegalArgumentException("No vnf-id found");
724         }
725
726         return null;
727     }
728 }