a096522bccfc66d1cc7f898df10b2337378787c5
[policy/drools-applications.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2019 Huawei Technologies Co., Ltd. All rights reserved.
7  * Modifications Copyright (C) 2019 Tech Mahindra
8  * Modifications Copyright (C) 2019 Bell Canada.
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.controlloop.eventmanager;
25
26 import java.io.Serializable;
27 import java.time.Instant;
28 import java.util.Deque;
29 import java.util.LinkedHashMap;
30 import java.util.List;
31 import java.util.concurrent.CancellationException;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.ConcurrentLinkedDeque;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicReference;
37 import java.util.stream.Collectors;
38 import lombok.AccessLevel;
39 import lombok.Getter;
40 import lombok.ToString;
41 import org.onap.policy.aai.AaiConstants;
42 import org.onap.policy.aai.AaiCqResponse;
43 import org.onap.policy.controlloop.ControlLoopOperation;
44 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
45 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
46 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
47 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineUtil;
48 import org.onap.policy.controlloop.policy.Policy;
49 import org.onap.policy.controlloop.policy.PolicyResult;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 /**
54  * Manages a single Operation for a single event. Once this has been created,
55  * {@link #start()} should be invoked, and then {@link #nextStep()} should be invoked
56  * continually until it returns {@code false}, indicating that all steps have completed.
57  */
58 @ToString(onlyExplicitlyIncluded = true)
59 public class ControlLoopOperationManager2 implements Serializable {
60     private static final long serialVersionUID = -3773199283624595410L;
61     private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationManager2.class);
62     private static final String CL_TIMEOUT_ACTOR = "-CL-TIMEOUT-";
63     public static final String LOCK_ACTOR = "LOCK";
64     public static final String LOCK_OPERATION = "Lock";
65     private static final String GUARD_ACTOR = "GUARD";
66     public static final String VSERVER_VSERVER_NAME = "vserver.vserver-name";
67     public static final String GENERIC_VNF_VNF_NAME = "generic-vnf.vnf-name";
68     public static final String GENERIC_VNF_VNF_ID = "generic-vnf.vnf-id";
69     public static final String PNF_NAME = "pnf.pnf-name";
70
71     // @formatter:off
72     public enum State {
73         ACTIVE,
74         LOCK_DENIED,
75         LOCK_LOST,
76         GUARD_STARTED,
77         GUARD_PERMITTED,
78         GUARD_DENIED,
79         OPERATION_STARTED,
80         OPERATION_SUCCESS,
81         OPERATION_FAILURE,
82         CONTROL_LOOP_TIMEOUT
83     }
84     // @formatter:on
85
86     private final transient ManagerContext operContext;
87     private final transient ControlLoopEventContext eventContext;
88     private final Policy policy;
89
90     @Getter
91     @ToString.Include
92     private State state = State.ACTIVE;
93
94     @ToString.Include
95     private final String requestId;
96
97     @ToString.Include
98     private final String policyId;
99
100     /**
101      * Bumped each time the "complete" callback is invoked by the Actor, provided it's for
102      * this operation.
103      */
104     @ToString.Include
105     private int attempts = 0;
106
107     private final Deque<Operation> operationHistory = new ConcurrentLinkedDeque<>();
108
109     /**
110      * Set to {@code true} to prevent the last item in {@link #operationHistory} from
111      * being included in the outcome of {@link #getHistory()}. Used when the operation
112      * aborts prematurely due to lock-denied, guard-denied, etc.
113      */
114     private boolean holdLast = false;
115
116     /**
117      * Queue of outcomes yet to be processed. Outcomes are added to this each time the
118      * "start" or "complete" callback is invoked.
119      */
120     @Getter(AccessLevel.PROTECTED)
121     private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();
122
123     /**
124      * Used to cancel the running operation.
125      */
126     @Getter(AccessLevel.PROTECTED)
127     private transient CompletableFuture<OperationOutcome> future = null;
128
129     /**
130      * Target entity. Determined after the lock is granted, though it may require the
131      * custom query to be performed first.
132      */
133     @Getter
134     private String targetEntity;
135
136     @Getter(AccessLevel.PROTECTED)
137     private final transient ControlLoopOperationParams params;
138     private final transient PipelineUtil taskUtil;
139
140     /**
141      * Time when the lock was first requested.
142      */
143     private transient AtomicReference<Instant> lockStart = new AtomicReference<>();
144
145     // values extracted from the policy
146     @Getter
147     private final String actor;
148     @Getter
149     private final String operation;
150
151
152     /**
153      * Construct an instance.
154      *
155      * @param operContext this operation's context
156      * @param context event context
157      * @param policy operation's policy
158      * @param executor executor for the Operation
159      */
160     public ControlLoopOperationManager2(ManagerContext operContext, ControlLoopEventContext context, Policy policy,
161                     Executor executor) {
162
163         this.operContext = operContext;
164         this.eventContext = context;
165         this.policy = policy;
166         this.requestId = context.getEvent().getRequestId().toString();
167         this.policyId = "" + policy.getId();
168         this.actor = policy.getActor();
169         this.operation = policy.getRecipe();
170
171         // @formatter:off
172         params = ControlLoopOperationParams.builder()
173                         .actorService(operContext.getActorService())
174                         .actor(actor)
175                         .operation(operation)
176                         .context(context)
177                         .executor(executor)
178                         .target(policy.getTarget())
179                         .startCallback(this::onStart)
180                         .completeCallback(this::onComplete)
181                         .build();
182         // @formatter:on
183
184         taskUtil = new PipelineUtil(params);
185     }
186
187     //
188     // Internal class used for tracking
189     //
190     @Getter
191     @ToString
192     private class Operation implements Serializable {
193         private static final long serialVersionUID = 1L;
194
195         private int attempt;
196         private PolicyResult policyResult;
197         private ControlLoopOperation clOperation;
198
199         /**
200          * Constructs the object.
201          *
202          * @param outcome outcome of the operation
203          */
204         public Operation(OperationOutcome outcome) {
205             attempt = ControlLoopOperationManager2.this.attempts;
206             policyResult = outcome.getResult();
207             clOperation = outcome.toControlLoopOperation();
208             clOperation.setTarget(policy.getTarget().toString());
209         }
210     }
211
212     /**
213      * Start the operation, first acquiring any locks that are needed. This should not
214      * throw any exceptions, but will, instead, invoke the callbacks with exceptions.
215      *
216      * @param remainingMs time remaining, in milliseconds, for the control loop
217      */
218     @SuppressWarnings("unchecked")
219     public synchronized void start(long remainingMs) {
220         // this is synchronized while we update "future"
221
222         try {
223             // provide a default, in case something fails before requestLock() is called
224             lockStart.set(Instant.now());
225
226             // @formatter:off
227             future = taskUtil.sequence(
228                 this::detmTarget,
229                 this::requestLock,
230                 this::startOperation);
231             // @formatter:on
232
233             // handle any exceptions that may be thrown, set timeout, and handle timeout
234
235             // @formatter:off
236             future.exceptionally(this::handleException)
237                     .orTimeout(remainingMs, TimeUnit.MILLISECONDS)
238                     .exceptionally(this::handleTimeout);
239             // @formatter:on
240
241         } catch (RuntimeException e) {
242             handleException(e);
243         }
244     }
245
246     /**
247      * Start the operation, after the lock has been acquired.
248      *
249      * @return
250      */
251     private CompletableFuture<OperationOutcome> startOperation() {
252         // @formatter:off
253         ControlLoopOperationParams params2 = params.toBuilder()
254                     .payload(new LinkedHashMap<>())
255                     .retry(policy.getRetry())
256                     .timeoutSec(policy.getTimeout())
257                     .targetEntity(targetEntity)
258                     .build();
259         // @formatter:on
260
261         if (policy.getPayload() != null) {
262             params2.getPayload().putAll(policy.getPayload());
263         }
264
265         return params2.start();
266     }
267
268     /**
269      * Handles exceptions that may be generated.
270      *
271      * @param thrown exception that was generated
272      * @return {@code null}
273      */
274     private OperationOutcome handleException(Throwable thrown) {
275         if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
276             return null;
277         }
278
279         logger.warn("{}.{}: exception starting operation for {}", actor, operation, requestId, thrown);
280         OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
281         outcome.setStart(lockStart.get());
282         outcome.setEnd(Instant.now());
283         outcome.setFinalOutcome(true);
284         onComplete(outcome);
285
286         // this outcome is not used so just return "null"
287         return null;
288     }
289
290     /**
291      * Handles control loop timeout exception.
292      *
293      * @param thrown exception that was generated
294      * @return {@code null}
295      */
296     private OperationOutcome handleTimeout(Throwable thrown) {
297         logger.warn("{}.{}: control loop timeout for {}", actor, operation, requestId, thrown);
298
299         OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
300         outcome.setActor(CL_TIMEOUT_ACTOR);
301         outcome.setOperation(null);
302         outcome.setStart(lockStart.get());
303         outcome.setEnd(Instant.now());
304         outcome.setFinalOutcome(true);
305         onComplete(outcome);
306
307         // cancel the operation, if it's still running
308         future.cancel(false);
309
310         // this outcome is not used so just return "null"
311         return null;
312     }
313
314     /**
315      * Cancels the operation.
316      */
317     public void cancel() {
318         synchronized (this) {
319             if (future == null) {
320                 return;
321             }
322         }
323
324         future.cancel(false);
325     }
326
327     /**
328      * Requests a lock on the {@link #targetEntity}.
329      *
330      * @return a future to await the lock
331      */
332     private CompletableFuture<OperationOutcome> requestLock() {
333         /*
334          * Failures are handled via the callback, and successes are discarded by
335          * sequence(), without passing them to onComplete().
336          *
337          * Return a COPY of the future so that if we try to cancel it, we'll only cancel
338          * the copy, not the original. This is done by tacking thenApply() onto the end.
339          */
340         lockStart.set(Instant.now());
341         return operContext.requestLock(targetEntity, this::lockUnavailable).thenApply(outcome -> outcome);
342     }
343
344     /**
345      * Indicates that the lock on the target entity is unavailable.
346      *
347      * @param outcome lock outcome
348      */
349     private void lockUnavailable(OperationOutcome outcome) {
350
351         // Note: NEVER invoke onStart() for locks; only invoke onComplete()
352         onComplete(outcome);
353
354         /*
355          * Now that we've added the lock outcome to the queue, ensure the future is
356          * canceled, which may, itself, generate an operation outcome.
357          */
358         cancel();
359     }
360
361     /**
362      * Handles responses provided via the "start" callback. Note: this is never be invoked
363      * for locks; only {@link #onComplete(OperationOutcome)} is invoked for locks.
364      *
365      * @param outcome outcome provided to the callback
366      */
367     private void onStart(OperationOutcome outcome) {
368         if (outcome.isFor(actor, operation) || GUARD_ACTOR.equals(outcome.getActor())) {
369             addOutcome(outcome);
370         }
371     }
372
373     /**
374      * Handles responses provided via the "complete" callback. Note: this is never invoked
375      * for "successful" locks.
376      *
377      * @param outcome outcome provided to the callback
378      */
379     private void onComplete(OperationOutcome outcome) {
380
381         switch (outcome.getActor()) {
382             case LOCK_ACTOR:
383             case GUARD_ACTOR:
384             case CL_TIMEOUT_ACTOR:
385                 addOutcome(outcome);
386                 break;
387
388             default:
389                 if (outcome.isFor(actor, operation)) {
390                     addOutcome(outcome);
391                 }
392                 break;
393         }
394     }
395
396     /**
397      * Adds an outcome to {@link #outcomes}.
398      *
399      * @param outcome outcome to be added
400      */
401     private synchronized void addOutcome(OperationOutcome outcome) {
402         /*
403          * This is synchronized to prevent nextStep() from invoking processOutcome() at
404          * the same time.
405          */
406
407         logger.debug("added outcome={} for {}", outcome, requestId);
408         outcomes.add(outcome);
409
410         if (outcomes.peekFirst() == outcomes.peekLast()) {
411             // this is the first outcome in the queue - process it
412             processOutcome();
413         }
414     }
415
416     /**
417      * Looks for the next step in the queue.
418      *
419      * @return {@code true} if more responses are expected, {@code false} otherwise
420      */
421     public synchronized boolean nextStep() {
422         switch (state) {
423             case LOCK_DENIED:
424             case LOCK_LOST:
425             case GUARD_DENIED:
426             case CONTROL_LOOP_TIMEOUT:
427                 holdLast = false;
428                 return false;
429             default:
430                 break;
431         }
432
433         OperationOutcome outcome = outcomes.peek();
434         if (outcome == null) {
435             // empty queue
436             return true;
437         }
438
439         if (outcome.isFinalOutcome() && outcome.isFor(actor, operation)) {
440             return false;
441         }
442
443         // first item has been processed, remove it
444         outcomes.remove();
445         if (!outcomes.isEmpty()) {
446             // have a new "first" item - process it
447             processOutcome();
448         }
449
450         return true;
451     }
452
453     /**
454      * Processes the first item in {@link #outcomes}. Sets the state, increments
455      * {@link #attempts}, if appropriate, and stores the operation history in the DB.
456      */
457     private synchronized void processOutcome() {
458         OperationOutcome outcome = outcomes.peek();
459         logger.debug("process outcome={} for {}", outcome, requestId);
460
461         switch (outcome.getActor()) {
462
463             case CL_TIMEOUT_ACTOR:
464                 state = State.CONTROL_LOOP_TIMEOUT;
465                 break;
466
467             case LOCK_ACTOR:
468                 // lock is no longer available
469                 if (state == State.ACTIVE) {
470                     state = State.LOCK_DENIED;
471                     storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Lock");
472                 } else {
473                     state = State.LOCK_LOST;
474                     storeFailureInDataBase(outcome, PolicyResult.FAILURE, "Operation aborted by Lock");
475                 }
476                 break;
477
478             case GUARD_ACTOR:
479                 if (outcome.getEnd() == null) {
480                     state = State.GUARD_STARTED;
481                 } else if (outcome.getResult() == PolicyResult.SUCCESS) {
482                     state = State.GUARD_PERMITTED;
483                 } else {
484                     state = State.GUARD_DENIED;
485                     storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Guard");
486                 }
487                 break;
488
489             default:
490                 if (outcome.getEnd() == null) {
491                     // operation started
492                     ++attempts;
493                     state = State.OPERATION_STARTED;
494                     operationHistory.add(new Operation(outcome));
495                     break;
496                 }
497
498                 /*
499                  * Operation completed. If the last entry was a "start" (i.e., "end" field
500                  * is null), then replace it. Otherwise, just add the completion.
501                  */
502                 state = (outcome.getResult() == PolicyResult.SUCCESS ? State.OPERATION_SUCCESS
503                                 : State.OPERATION_FAILURE);
504                 if (!operationHistory.isEmpty() && operationHistory.peekLast().getClOperation().getEnd() == null) {
505                     operationHistory.removeLast();
506                 }
507                 operationHistory.add(new Operation(outcome));
508                 storeOperationInDataBase();
509                 break;
510         }
511
512         // indicate that this has changed
513         operContext.updated(this);
514     }
515
516     /**
517      * Get the operation, as a message.
518      *
519      * @return the operation, as a message
520      */
521     public String getOperationMessage() {
522         Operation last = operationHistory.peekLast();
523         return (last == null ? null : last.getClOperation().toMessage());
524     }
525
526     /**
527      * Gets the operation result.
528      *
529      * @return the operation result
530      */
531     public PolicyResult getOperationResult() {
532         Operation last = operationHistory.peekLast();
533         return (last == null ? PolicyResult.FAILURE_EXCEPTION : last.getPolicyResult());
534     }
535
536     /**
537      * Get the latest operation history.
538      *
539      * @return the latest operation history
540      */
541     public String getOperationHistory() {
542         Operation last = operationHistory.peekLast();
543         return (last == null ? null : last.clOperation.toHistory());
544     }
545
546     /**
547      * Get the history.
548      *
549      * @return the list of control loop operations
550      */
551     public List<ControlLoopOperation> getHistory() {
552         Operation last = (holdLast ? operationHistory.removeLast() : null);
553
554         List<ControlLoopOperation> result = operationHistory.stream().map(Operation::getClOperation)
555                         .map(ControlLoopOperation::new).collect(Collectors.toList());
556
557         if (last != null) {
558             operationHistory.add(last);
559         }
560
561         return result;
562     }
563
564     /**
565      * Stores a failure in the DB.
566      *
567      * @param outcome operation outcome
568      * @param result result to put into the DB
569      * @param message message to put into the DB
570      */
571     private void storeFailureInDataBase(OperationOutcome outcome, PolicyResult result, String message) {
572         // don't include this in history yet
573         holdLast = true;
574
575         outcome.setActor(actor);
576         outcome.setOperation(operation);
577         outcome.setMessage(message);
578         outcome.setResult(result);
579
580         operationHistory.add(new Operation(outcome));
581         storeOperationInDataBase();
582     }
583
584     /**
585      * Stores the latest operation in the DB.
586      */
587     private void storeOperationInDataBase() {
588         operContext.getDataManager().store(requestId, eventContext.getEvent(), targetEntity,
589                         operationHistory.peekLast().getClOperation());
590     }
591
592     /**
593      * Determines the target entity.
594      *
595      * @return a future to determine the target entity, or {@code null} if the entity has
596      *         already been determined
597      */
598     protected CompletableFuture<OperationOutcome> detmTarget() {
599         if (policy.getTarget() == null) {
600             throw new IllegalArgumentException("The target is null");
601         }
602
603         if (policy.getTarget().getType() == null) {
604             throw new IllegalArgumentException("The target type is null");
605         }
606
607         switch (policy.getTarget().getType()) {
608             case PNF:
609                 return detmPnfTarget();
610             case VM:
611             case VNF:
612             case VFMODULE:
613                 return detmVfModuleTarget();
614             default:
615                 throw new IllegalArgumentException("The target type is not supported");
616         }
617     }
618
619     /**
620      * Determines the PNF target entity.
621      *
622      * @return a future to determine the target entity, or {@code null} if the entity has
623      *         already been determined
624      */
625     private CompletableFuture<OperationOutcome> detmPnfTarget() {
626         if (!PNF_NAME.equalsIgnoreCase(eventContext.getEvent().getTarget())) {
627             throw new IllegalArgumentException("Target does not match target type");
628         }
629
630         targetEntity = eventContext.getEnrichment().get(PNF_NAME);
631         if (targetEntity == null) {
632             throw new IllegalArgumentException("AAI section is missing " + PNF_NAME);
633         }
634
635         return null;
636     }
637
638     /**
639      * Determines the VF Module target entity.
640      *
641      * @return a future to determine the target entity, or {@code null} if the entity has
642      *         already been determined
643      */
644     private CompletableFuture<OperationOutcome> detmVfModuleTarget() {
645         String targetFieldName = eventContext.getEvent().getTarget();
646         if (targetFieldName == null) {
647             throw new IllegalArgumentException("Target is null");
648         }
649
650         switch (targetFieldName.toLowerCase()) {
651             case VSERVER_VSERVER_NAME:
652                 targetEntity = eventContext.getEnrichment().get(VSERVER_VSERVER_NAME);
653                 break;
654             case GENERIC_VNF_VNF_ID:
655                 targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
656                 break;
657             case GENERIC_VNF_VNF_NAME:
658                 return detmVnfName();
659             default:
660                 throw new IllegalArgumentException("Target does not match target type");
661         }
662
663         if (targetEntity == null) {
664             throw new IllegalArgumentException("Enrichment data is missing " + targetFieldName);
665         }
666
667         return null;
668     }
669
670     /**
671      * Determines the VNF Name target entity.
672      *
673      * @return a future to determine the target entity, or {@code null} if the entity has
674      *         already been determined
675      */
676     @SuppressWarnings("unchecked")
677     private CompletableFuture<OperationOutcome> detmVnfName() {
678         // if the onset is enriched with the vnf-id, we don't need an A&AI response
679         targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
680         if (targetEntity != null) {
681             return null;
682         }
683
684         // vnf-id was not in the onset - obtain it via the custom query
685
686         // @formatter:off
687         ControlLoopOperationParams cqparams = params.toBuilder()
688                         .actor(AaiConstants.ACTOR_NAME)
689                         .operation(AaiCqResponse.OPERATION)
690                         .targetEntity("")
691                         .build();
692         // @formatter:on
693
694         // perform custom query and then extract the VNF ID from it
695         return taskUtil.sequence(() -> eventContext.obtain(AaiCqResponse.CONTEXT_KEY, cqparams),
696                         this::extractVnfFromCq);
697     }
698
699     /**
700      * Extracts the VNF Name target entity from the custom query data.
701      *
702      * @return {@code null}
703      */
704     private CompletableFuture<OperationOutcome> extractVnfFromCq() {
705         // already have the CQ data
706         AaiCqResponse cq = eventContext.getProperty(AaiCqResponse.CONTEXT_KEY);
707         if (cq.getDefaultGenericVnf() == null) {
708             throw new IllegalArgumentException("No vnf-id found");
709         }
710
711         targetEntity = cq.getDefaultGenericVnf().getVnfId();
712         if (targetEntity == null) {
713             throw new IllegalArgumentException("No vnf-id found");
714         }
715
716         return null;
717     }
718 }