b880fd190b56802306e7b2f7e2b4d245e4aee7b2
[policy/drools-applications.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2019 Huawei Technologies Co., Ltd. All rights reserved.
7  * Modifications Copyright (C) 2019 Tech Mahindra
8  * Modifications Copyright (C) 2019 Bell Canada.
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.controlloop.eventmanager;
25
26 import java.io.Serializable;
27 import java.time.Instant;
28 import java.util.Deque;
29 import java.util.LinkedHashMap;
30 import java.util.List;
31 import java.util.concurrent.CancellationException;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.ConcurrentLinkedDeque;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicReference;
37 import java.util.stream.Collectors;
38 import lombok.AccessLevel;
39 import lombok.Getter;
40 import lombok.ToString;
41 import org.onap.policy.aai.AaiConstants;
42 import org.onap.policy.aai.AaiCqResponse;
43 import org.onap.policy.controlloop.ControlLoopOperation;
44 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
45 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
46 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
47 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineUtil;
48 import org.onap.policy.controlloop.policy.Policy;
49 import org.onap.policy.controlloop.policy.PolicyResult;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 /**
54  * Manages a single Operation for a single event. Once this has been created,
55  * {@link #start()} should be invoked, and then {@link #nextStep()} should be invoked
56  * continually until it returns {@code false}, indicating that all steps have completed.
57  */
58 @ToString(onlyExplicitlyIncluded = true)
59 public class ControlLoopOperationManager2 implements Serializable {
60     private static final long serialVersionUID = -3773199283624595410L;
61     private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationManager2.class);
62     private static final String CL_TIMEOUT_ACTOR = "-CL-TIMEOUT-";
63     public static final String LOCK_ACTOR = "LOCK";
64     public static final String LOCK_OPERATION = "Lock";
65     private static final String GUARD_ACTOR = "GUARD";
66     public static final String VSERVER_VSERVER_NAME = "vserver.vserver-name";
67     public static final String GENERIC_VNF_VNF_NAME = "generic-vnf.vnf-name";
68     public static final String GENERIC_VNF_VNF_ID = "generic-vnf.vnf-id";
69     public static final String PNF_NAME = "pnf.pnf-name";
70
71     // @formatter:off
72     public enum State {
73         ACTIVE,
74         LOCK_DENIED,
75         LOCK_LOST,
76         GUARD_STARTED,
77         GUARD_PERMITTED,
78         GUARD_DENIED,
79         OPERATION_STARTED,
80         OPERATION_SUCCESS,
81         OPERATION_FAILURE,
82         CONTROL_LOOP_TIMEOUT
83     }
84     // @formatter:on
85
86     private final transient ManagerContext operContext;
87     private final transient ControlLoopEventContext eventContext;
88     private final Policy policy;
89
90     @Getter
91     @ToString.Include
92     private State state = State.ACTIVE;
93
94     @ToString.Include
95     private final String requestId;
96
97     @ToString.Include
98     private final String policyId;
99
100     /**
101      * Bumped each time the "complete" callback is invoked by the Actor, provided it's for
102      * this operation.
103      */
104     @ToString.Include
105     private int attempts = 0;
106
107     private final Deque<Operation> operationHistory = new ConcurrentLinkedDeque<>();
108
109     /**
110      * Queue of outcomes yet to be processed. Outcomes are added to this each time the
111      * "start" or "complete" callback is invoked.
112      */
113     @Getter(AccessLevel.PROTECTED)
114     private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();
115
116     /**
117      * Used to cancel the running operation.
118      */
119     @Getter(AccessLevel.PROTECTED)
120     private transient CompletableFuture<OperationOutcome> future = null;
121
122     /**
123      * Target entity. Determined after the lock is granted, though it may require the
124      * custom query to be performed first.
125      */
126     @Getter
127     private String targetEntity;
128
129     @Getter(AccessLevel.PROTECTED)
130     private final transient ControlLoopOperationParams params;
131     private final transient PipelineUtil taskUtil;
132
133     /**
134      * Time when the lock was first requested.
135      */
136     private transient AtomicReference<Instant> lockStart = new AtomicReference<>();
137
138     // values extracted from the policy
139     @Getter
140     private final String actor;
141     @Getter
142     private final String operation;
143
144
145     /**
146      * Construct an instance.
147      *
148      * @param operContext this operation's context
149      * @param context event context
150      * @param policy operation's policy
151      * @param executor executor for the Operation
152      */
153     public ControlLoopOperationManager2(ManagerContext operContext, ControlLoopEventContext context, Policy policy,
154                     Executor executor) {
155
156         this.operContext = operContext;
157         this.eventContext = context;
158         this.policy = policy;
159         this.requestId = context.getEvent().getRequestId().toString();
160         this.policyId = "" + policy.getId();
161         this.actor = policy.getActor();
162         this.operation = policy.getRecipe();
163
164         // @formatter:off
165         params = ControlLoopOperationParams.builder()
166                         .actorService(operContext.getActorService())
167                         .actor(actor)
168                         .operation(operation)
169                         .context(context)
170                         .executor(executor)
171                         .target(policy.getTarget())
172                         .startCallback(this::onStart)
173                         .completeCallback(this::onComplete)
174                         .build();
175         // @formatter:on
176
177         taskUtil = new PipelineUtil(params);
178     }
179
180     //
181     // Internal class used for tracking
182     //
183     @Getter
184     @ToString
185     private class Operation implements Serializable {
186         private static final long serialVersionUID = 1L;
187
188         private int attempt;
189         private PolicyResult policyResult;
190         private ControlLoopOperation clOperation;
191
192         /**
193          * Constructs the object.
194          *
195          * @param outcome outcome of the operation
196          */
197         public Operation(OperationOutcome outcome) {
198             attempt = ControlLoopOperationManager2.this.attempts;
199             policyResult = outcome.getResult();
200             clOperation = outcome.toControlLoopOperation();
201             clOperation.setTarget(policy.getTarget().toString());
202         }
203     }
204
205     /**
206      * Start the operation, first acquiring any locks that are needed. This should not
207      * throw any exceptions, but will, instead, invoke the callbacks with exceptions.
208      *
209      * @param remainingMs time remaining, in milliseconds, for the control loop
210      */
211     @SuppressWarnings("unchecked")
212     public synchronized void start(long remainingMs) {
213         // this is synchronized while we update "future"
214
215         try {
216             // provide a default, in case something fails before requestLock() is called
217             lockStart.set(Instant.now());
218
219             // @formatter:off
220             future = taskUtil.sequence(
221                 this::detmTarget,
222                 this::requestLock,
223                 this::startOperation);
224             // @formatter:on
225
226             // handle any exceptions that may be thrown, set timeout, and handle timeout
227
228             // @formatter:off
229             future.exceptionally(this::handleException)
230                     .orTimeout(remainingMs, TimeUnit.MILLISECONDS)
231                     .exceptionally(this::handleTimeout);
232             // @formatter:on
233
234         } catch (RuntimeException e) {
235             handleException(e);
236         }
237     }
238
239     /**
240      * Start the operation, after the lock has been acquired.
241      *
242      * @return
243      */
244     private CompletableFuture<OperationOutcome> startOperation() {
245         // @formatter:off
246         ControlLoopOperationParams params2 = params.toBuilder()
247                     .payload(new LinkedHashMap<>())
248                     .retry(policy.getRetry())
249                     .timeoutSec(policy.getTimeout())
250                     .targetEntity(targetEntity)
251                     .build();
252         // @formatter:on
253
254         if (policy.getPayload() != null) {
255             params2.getPayload().putAll(policy.getPayload());
256         }
257
258         return params2.start();
259     }
260
261     /**
262      * Handles exceptions that may be generated.
263      *
264      * @param thrown exception that was generated
265      * @return {@code null}
266      */
267     private OperationOutcome handleException(Throwable thrown) {
268         if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
269             return null;
270         }
271
272         logger.warn("{}.{}: exception starting operation for {}", actor, operation, requestId, thrown);
273         OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
274         outcome.setStart(lockStart.get());
275         outcome.setEnd(Instant.now());
276         outcome.setFinalOutcome(true);
277         onComplete(outcome);
278
279         // this outcome is not used so just return "null"
280         return null;
281     }
282
283     /**
284      * Handles control loop timeout exception.
285      *
286      * @param thrown exception that was generated
287      * @return {@code null}
288      */
289     private OperationOutcome handleTimeout(Throwable thrown) {
290         logger.warn("{}.{}: control loop timeout for {}", actor, operation, requestId, thrown);
291
292         OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
293         outcome.setActor(CL_TIMEOUT_ACTOR);
294         outcome.setOperation(null);
295         outcome.setStart(lockStart.get());
296         outcome.setEnd(Instant.now());
297         outcome.setFinalOutcome(true);
298         onComplete(outcome);
299
300         // cancel the operation, if it's still running
301         future.cancel(false);
302
303         // this outcome is not used so just return "null"
304         return null;
305     }
306
307     /**
308      * Cancels the operation.
309      */
310     public void cancel() {
311         synchronized (this) {
312             if (future == null) {
313                 return;
314             }
315         }
316
317         future.cancel(false);
318     }
319
320     /**
321      * Requests a lock on the {@link #targetEntity}.
322      *
323      * @return a future to await the lock
324      */
325     private CompletableFuture<OperationOutcome> requestLock() {
326         /*
327          * Failures are handled via the callback, and successes are discarded by
328          * sequence(), without passing them to onComplete().
329          *
330          * Return a COPY of the future so that if we try to cancel it, we'll only cancel
331          * the copy, not the original. This is done by tacking thenApply() onto the end.
332          */
333         lockStart.set(Instant.now());
334         return operContext.requestLock(targetEntity, this::lockUnavailable).thenApply(outcome -> outcome);
335     }
336
337     /**
338      * Indicates that the lock on the target entity is unavailable.
339      *
340      * @param outcome lock outcome
341      */
342     private void lockUnavailable(OperationOutcome outcome) {
343
344         // Note: NEVER invoke onStart() for locks; only invoke onComplete()
345         onComplete(outcome);
346
347         /*
348          * Now that we've added the lock outcome to the queue, ensure the future is
349          * canceled, which may, itself, generate an operation outcome.
350          */
351         cancel();
352     }
353
354     /**
355      * Handles responses provided via the "start" callback. Note: this is never be invoked
356      * for locks; only {@link #onComplete(OperationOutcome)} is invoked for locks.
357      *
358      * @param outcome outcome provided to the callback
359      */
360     private void onStart(OperationOutcome outcome) {
361         if (outcome.isFor(actor, operation) || GUARD_ACTOR.equals(outcome.getActor())) {
362             addOutcome(outcome);
363         }
364     }
365
366     /**
367      * Handles responses provided via the "complete" callback. Note: this is never invoked
368      * for "successful" locks.
369      *
370      * @param outcome outcome provided to the callback
371      */
372     private void onComplete(OperationOutcome outcome) {
373
374         switch (outcome.getActor()) {
375             case LOCK_ACTOR:
376             case GUARD_ACTOR:
377             case CL_TIMEOUT_ACTOR:
378                 addOutcome(outcome);
379                 break;
380
381             default:
382                 if (outcome.isFor(actor, operation)) {
383                     addOutcome(outcome);
384                 }
385                 break;
386         }
387     }
388
389     /**
390      * Adds an outcome to {@link #outcomes}.
391      *
392      * @param outcome outcome to be added
393      */
394     private synchronized void addOutcome(OperationOutcome outcome) {
395         /*
396          * This is synchronized to prevent nextStep() from invoking processOutcome() at
397          * the same time.
398          */
399
400         logger.debug("added outcome={} for {}", outcome, requestId);
401         outcomes.add(outcome);
402
403         if (outcomes.peekFirst() == outcomes.peekLast()) {
404             // this is the first outcome in the queue - process it
405             processOutcome();
406         }
407     }
408
409     /**
410      * Looks for the next step in the queue.
411      *
412      * @return {@code true} if more responses are expected, {@code false} otherwise
413      */
414     public synchronized boolean nextStep() {
415         switch (state) {
416             case LOCK_DENIED:
417             case LOCK_LOST:
418             case GUARD_DENIED:
419             case CONTROL_LOOP_TIMEOUT:
420                 return false;
421             default:
422                 break;
423         }
424
425         OperationOutcome outcome = outcomes.peek();
426         if (outcome == null) {
427             // empty queue
428             return true;
429         }
430
431         if (outcome.isFinalOutcome() && outcome.isFor(actor, operation)) {
432             return false;
433         }
434
435         // first item has been processed, remove it
436         outcomes.remove();
437         if (!outcomes.isEmpty()) {
438             // have a new "first" item - process it
439             processOutcome();
440         }
441
442         return true;
443     }
444
445     /**
446      * Processes the first item in {@link #outcomes}. Sets the state, increments
447      * {@link #attempts}, if appropriate, and stores the operation history in the DB.
448      */
449     private synchronized void processOutcome() {
450         OperationOutcome outcome = outcomes.peek();
451         logger.debug("process outcome={} for {}", outcome, requestId);
452
453         switch (outcome.getActor()) {
454
455             case CL_TIMEOUT_ACTOR:
456                 state = State.CONTROL_LOOP_TIMEOUT;
457                 break;
458
459             case LOCK_ACTOR:
460                 // lock is no longer available
461                 if (state == State.ACTIVE) {
462                     state = State.LOCK_DENIED;
463                     storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Lock");
464                 } else {
465                     state = State.LOCK_LOST;
466                     storeFailureInDataBase(outcome, PolicyResult.FAILURE, "Operation aborted by Lock");
467                 }
468                 break;
469
470             case GUARD_ACTOR:
471                 if (outcome.getEnd() == null) {
472                     state = State.GUARD_STARTED;
473                 } else if (outcome.getResult() == PolicyResult.SUCCESS) {
474                     state = State.GUARD_PERMITTED;
475                 } else {
476                     state = State.GUARD_DENIED;
477                     storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Guard");
478                 }
479                 break;
480
481             default:
482                 if (outcome.getEnd() == null) {
483                     // operation started
484                     ++attempts;
485                     state = State.OPERATION_STARTED;
486                     operationHistory.add(new Operation(outcome));
487                     break;
488                 }
489
490                 /*
491                  * Operation completed. If the last entry was a "start" (i.e., "end" field
492                  * is null), then replace it. Otherwise, just add the completion.
493                  */
494                 state = (outcome.getResult() == PolicyResult.SUCCESS ? State.OPERATION_SUCCESS
495                                 : State.OPERATION_FAILURE);
496                 if (!operationHistory.isEmpty() && operationHistory.peekLast().getClOperation().getEnd() == null) {
497                     operationHistory.removeLast();
498                 }
499                 operationHistory.add(new Operation(outcome));
500                 storeOperationInDataBase();
501                 break;
502         }
503
504         // indicate that this has changed
505         operContext.updated(this);
506     }
507
508     /**
509      * Get the operation, as a message.
510      *
511      * @return the operation, as a message
512      */
513     public String getOperationMessage() {
514         Operation last = operationHistory.peekLast();
515         return (last == null ? null : last.getClOperation().toMessage());
516     }
517
518     /**
519      * Gets the operation result.
520      *
521      * @return the operation result
522      */
523     public PolicyResult getOperationResult() {
524         Operation last = operationHistory.peekLast();
525         return (last == null ? PolicyResult.FAILURE_EXCEPTION : last.getPolicyResult());
526     }
527
528     /**
529      * Get the latest operation history.
530      *
531      * @return the latest operation history
532      */
533     public String getOperationHistory() {
534         Operation last = operationHistory.peekLast();
535         return (last == null ? null : last.clOperation.toHistory());
536     }
537
538     /**
539      * Get the history.
540      *
541      * @return the list of control loop operations
542      */
543     public List<ControlLoopOperation> getHistory() {
544         return operationHistory.stream().map(Operation::getClOperation).map(ControlLoopOperation::new)
545                         .collect(Collectors.toList());
546     }
547
548     /**
549      * Stores a failure in the DB.
550      *
551      * @param outcome operation outcome
552      * @param result result to put into the DB
553      * @param message message to put into the DB
554      */
555     private void storeFailureInDataBase(OperationOutcome outcome, PolicyResult result, String message) {
556         outcome.setActor(actor);
557         outcome.setOperation(operation);
558         outcome.setMessage(message);
559         outcome.setResult(result);
560
561         operationHistory.add(new Operation(outcome));
562         storeOperationInDataBase();
563     }
564
565     /**
566      * Stores the latest operation in the DB.
567      */
568     private void storeOperationInDataBase() {
569         operContext.getDataManager().store(requestId, eventContext.getEvent(),
570                         operationHistory.peekLast().getClOperation());
571     }
572
573     /**
574      * Determines the target entity.
575      *
576      * @return a future to determine the target entity, or {@code null} if the entity has
577      *         already been determined
578      */
579     protected CompletableFuture<OperationOutcome> detmTarget() {
580         if (policy.getTarget() == null) {
581             throw new IllegalArgumentException("The target is null");
582         }
583
584         if (policy.getTarget().getType() == null) {
585             throw new IllegalArgumentException("The target type is null");
586         }
587
588         switch (policy.getTarget().getType()) {
589             case PNF:
590                 return detmPnfTarget();
591             case VM:
592             case VNF:
593             case VFMODULE:
594                 return detmVfModuleTarget();
595             default:
596                 throw new IllegalArgumentException("The target type is not supported");
597         }
598     }
599
600     /**
601      * Determines the PNF target entity.
602      *
603      * @return a future to determine the target entity, or {@code null} if the entity has
604      *         already been determined
605      */
606     private CompletableFuture<OperationOutcome> detmPnfTarget() {
607         if (!PNF_NAME.equalsIgnoreCase(eventContext.getEvent().getTarget())) {
608             throw new IllegalArgumentException("Target does not match target type");
609         }
610
611         targetEntity = eventContext.getEnrichment().get(PNF_NAME);
612         if (targetEntity == null) {
613             throw new IllegalArgumentException("AAI section is missing " + PNF_NAME);
614         }
615
616         return null;
617     }
618
619     /**
620      * Determines the VF Module target entity.
621      *
622      * @return a future to determine the target entity, or {@code null} if the entity has
623      *         already been determined
624      */
625     private CompletableFuture<OperationOutcome> detmVfModuleTarget() {
626         String targetFieldName = eventContext.getEvent().getTarget();
627         if (targetFieldName == null) {
628             throw new IllegalArgumentException("Target is null");
629         }
630
631         switch (targetFieldName.toLowerCase()) {
632             case VSERVER_VSERVER_NAME:
633                 targetEntity = eventContext.getEnrichment().get(VSERVER_VSERVER_NAME);
634                 break;
635             case GENERIC_VNF_VNF_ID:
636                 targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
637                 break;
638             case GENERIC_VNF_VNF_NAME:
639                 return detmVnfName();
640             default:
641                 throw new IllegalArgumentException("Target does not match target type");
642         }
643
644         if (targetEntity == null) {
645             throw new IllegalArgumentException("Enrichment data is missing " + targetFieldName);
646         }
647
648         return null;
649     }
650
651     /**
652      * Determines the VNF Name target entity.
653      *
654      * @return a future to determine the target entity, or {@code null} if the entity has
655      *         already been determined
656      */
657     @SuppressWarnings("unchecked")
658     private CompletableFuture<OperationOutcome> detmVnfName() {
659         // if the onset is enriched with the vnf-id, we don't need an A&AI response
660         targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
661         if (targetEntity != null) {
662             return null;
663         }
664
665         // vnf-id was not in the onset - obtain it via the custom query
666
667         // @formatter:off
668         ControlLoopOperationParams cqparams = params.toBuilder()
669                         .actor(AaiConstants.ACTOR_NAME)
670                         .operation(AaiCqResponse.OPERATION)
671                         .targetEntity("")
672                         .build();
673         // @formatter:on
674
675         // perform custom query and then extract the VNF ID from it
676         return taskUtil.sequence(() -> eventContext.obtain(AaiCqResponse.CONTEXT_KEY, cqparams),
677                         this::extractVnfFromCq);
678     }
679
680     /**
681      * Extracts the VNF Name target entity from the custom query data.
682      *
683      * @return {@code null}
684      */
685     private CompletableFuture<OperationOutcome> extractVnfFromCq() {
686         // already have the CQ data
687         AaiCqResponse cq = eventContext.getProperty(AaiCqResponse.CONTEXT_KEY);
688         if (cq.getDefaultGenericVnf() == null) {
689             throw new IllegalArgumentException("No vnf-id found");
690         }
691
692         targetEntity = cq.getDefaultGenericVnf().getVnfId();
693         if (targetEntity == null) {
694             throw new IllegalArgumentException("No vnf-id found");
695         }
696
697         return null;
698     }
699 }