6bdaa1575fb8a29cfa09e2b504a238b8617cdd61
[policy/drools-applications.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2019 Huawei Technologies Co., Ltd. All rights reserved.
7  * Modifications Copyright (C) 2019 Tech Mahindra
8  * Modifications Copyright (C) 2019 Bell Canada.
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.controlloop.eventmanager;
25
26 import java.io.Serializable;
27 import java.time.Instant;
28 import java.util.Deque;
29 import java.util.LinkedHashMap;
30 import java.util.List;
31 import java.util.concurrent.CancellationException;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.ConcurrentLinkedDeque;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicReference;
37 import java.util.stream.Collectors;
38 import lombok.AccessLevel;
39 import lombok.Getter;
40 import lombok.ToString;
41 import org.onap.policy.aai.AaiConstants;
42 import org.onap.policy.aai.AaiCqResponse;
43 import org.onap.policy.controlloop.ControlLoopOperation;
44 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
45 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
46 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
47 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineUtil;
48 import org.onap.policy.controlloop.policy.Policy;
49 import org.onap.policy.controlloop.policy.PolicyResult;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 /**
54  * Manages a single Operation for a single event. Once this has been created,
55  * {@link #start()} should be invoked, and then {@link #nextStep()} should be invoked
56  * continually until it returns {@code false}, indicating that all steps have completed.
57  */
58 @ToString(onlyExplicitlyIncluded = true)
59 public class ControlLoopOperationManager2 implements Serializable {
60     private static final long serialVersionUID = -3773199283624595410L;
61     private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationManager2.class);
62     private static final String CL_TIMEOUT_ACTOR = "-CL-TIMEOUT-";
63     public static final String LOCK_ACTOR = "LOCK";
64     public static final String LOCK_OPERATION = "Lock";
65     private static final String GUARD_ACTOR = "GUARD";
66     public static final String VSERVER_VSERVER_NAME = "vserver.vserver-name";
67     public static final String GENERIC_VNF_VNF_NAME = "generic-vnf.vnf-name";
68     public static final String GENERIC_VNF_VNF_ID = "generic-vnf.vnf-id";
69     public static final String PNF_NAME = "pnf.pnf-name";
70
71     // @formatter:off
72     public enum State {
73         ACTIVE,
74         LOCK_DENIED,
75         LOCK_LOST,
76         GUARD_STARTED,
77         GUARD_PERMITTED,
78         GUARD_DENIED,
79         OPERATION_SUCCESS,
80         OPERATION_FAILURE,
81         CONTROL_LOOP_TIMEOUT
82     }
83     // @formatter:on
84
85     private final transient ManagerContext operContext;
86     private final transient ControlLoopEventContext eventContext;
87     private final Policy policy;
88
89     @Getter
90     @ToString.Include
91     private State state = State.ACTIVE;
92
93     @ToString.Include
94     private final String requestId;
95
96     @ToString.Include
97     private final String policyId;
98
99     /**
100      * Bumped each time the "complete" callback is invoked by the Actor, provided it's for
101      * this operation.
102      */
103     @ToString.Include
104     private int attempts = 0;
105
106     private final Deque<Operation> operationHistory = new ConcurrentLinkedDeque<>();
107
108     /**
109      * Queue of outcomes yet to be processed. Outcomes are added to this each time the
110      * "start" or "complete" callback is invoked.
111      */
112     @Getter(AccessLevel.PROTECTED)
113     private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();
114
115     /**
116      * Used to cancel the running operation.
117      */
118     @Getter(AccessLevel.PROTECTED)
119     private transient CompletableFuture<OperationOutcome> future = null;
120
121     /**
122      * Target entity. Determined after the lock is granted, though it may require the
123      * custom query to be performed first.
124      */
125     @Getter
126     private String targetEntity;
127
128     @Getter(AccessLevel.PROTECTED)
129     private final transient ControlLoopOperationParams params;
130     private final transient PipelineUtil taskUtil;
131
132     /**
133      * Time when the lock was first requested.
134      */
135     private transient AtomicReference<Instant> lockStart = new AtomicReference<>();
136
137     // values extracted from the policy
138     @Getter
139     private final String actor;
140     @Getter
141     private final String operation;
142
143
144     /**
145      * Construct an instance.
146      *
147      * @param operContext this operation's context
148      * @param context event context
149      * @param policy operation's policy
150      * @param executor executor for the Operation
151      */
152     public ControlLoopOperationManager2(ManagerContext operContext, ControlLoopEventContext context, Policy policy,
153                     Executor executor) {
154
155         this.operContext = operContext;
156         this.eventContext = context;
157         this.policy = policy;
158         this.requestId = context.getEvent().getRequestId().toString();
159         this.policyId = "" + policy.getId();
160         this.actor = policy.getActor();
161         this.operation = policy.getRecipe();
162
163         // @formatter:off
164         params = ControlLoopOperationParams.builder()
165                         .actorService(operContext.getActorService())
166                         .actor(actor)
167                         .operation(operation)
168                         .context(context)
169                         .executor(executor)
170                         .target(policy.getTarget())
171                         .startCallback(this::onStart)
172                         .completeCallback(this::onComplete)
173                         .build();
174         // @formatter:on
175
176         taskUtil = new PipelineUtil(params);
177     }
178
179     //
180     // Internal class used for tracking
181     //
182     @Getter
183     @ToString
184     private class Operation implements Serializable {
185         private static final long serialVersionUID = 1L;
186
187         private int attempt;
188         private PolicyResult policyResult;
189         private ControlLoopOperation clOperation;
190
191         /**
192          * Constructs the object.
193          *
194          * @param outcome outcome of the operation
195          */
196         public Operation(OperationOutcome outcome) {
197             attempt = ControlLoopOperationManager2.this.attempts;
198             policyResult = outcome.getResult();
199             clOperation = outcome.toControlLoopOperation();
200         }
201     }
202
203     /**
204      * Start the operation, first acquiring any locks that are needed. This should not
205      * throw any exceptions, but will, instead, invoke the callbacks with exceptions.
206      *
207      * @param remainingMs time remaining, in milliseconds, for the control loop
208      */
209     @SuppressWarnings("unchecked")
210     public synchronized void start(long remainingMs) {
211         // this is synchronized while we update "future"
212
213         try {
214             // provide a default, in case something fails before requestLock() is called
215             lockStart.set(Instant.now());
216
217             // @formatter:off
218             future = taskUtil.sequence(
219                 this::detmTarget,
220                 this::requestLock,
221                 this::startOperation);
222             // @formatter:on
223
224             // handle any exceptions that may be thrown, set timeout, and handle timeout
225
226             // @formatter:off
227             future.exceptionally(this::handleException)
228                     .orTimeout(remainingMs, TimeUnit.MILLISECONDS)
229                     .exceptionally(this::handleTimeout);
230             // @formatter:on
231
232         } catch (RuntimeException e) {
233             handleException(e);
234         }
235     }
236
237     /**
238      * Start the operation, after the lock has been acquired.
239      *
240      * @return
241      */
242     private CompletableFuture<OperationOutcome> startOperation() {
243         // @formatter:off
244         ControlLoopOperationParams params2 = params.toBuilder()
245                     .payload(new LinkedHashMap<>())
246                     .retry(policy.getRetry())
247                     .timeoutSec(policy.getTimeout())
248                     .targetEntity(targetEntity)
249                     .build();
250         // @formatter:on
251
252         if (policy.getPayload() != null) {
253             params2.getPayload().putAll(policy.getPayload());
254         }
255
256         return params2.start();
257     }
258
259     /**
260      * Handles exceptions that may be generated.
261      *
262      * @param thrown exception that was generated
263      * @return {@code null}
264      */
265     private OperationOutcome handleException(Throwable thrown) {
266         if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
267             return null;
268         }
269
270         logger.warn("{}.{}: exception starting operation for {}", actor, operation, requestId, thrown);
271         OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
272         outcome.setStart(lockStart.get());
273         outcome.setEnd(Instant.now());
274         outcome.setFinalOutcome(true);
275         onComplete(outcome);
276
277         // this outcome is not used so just return "null"
278         return null;
279     }
280
281     /**
282      * Handles control loop timeout exception.
283      *
284      * @param thrown exception that was generated
285      * @return {@code null}
286      */
287     private OperationOutcome handleTimeout(Throwable thrown) {
288         logger.warn("{}.{}: control loop timeout for {}", actor, operation, requestId, thrown);
289
290         OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
291         outcome.setActor(CL_TIMEOUT_ACTOR);
292         outcome.setOperation(null);
293         outcome.setStart(lockStart.get());
294         outcome.setEnd(Instant.now());
295         outcome.setFinalOutcome(true);
296         onComplete(outcome);
297
298         // cancel the operation, if it's still running
299         future.cancel(false);
300
301         // this outcome is not used so just return "null"
302         return null;
303     }
304
305     /**
306      * Cancels the operation.
307      */
308     public void cancel() {
309         synchronized (this) {
310             if (future == null) {
311                 return;
312             }
313         }
314
315         future.cancel(false);
316     }
317
318     /**
319      * Requests a lock on the {@link #targetEntity}.
320      *
321      * @return a future to await the lock
322      */
323     private CompletableFuture<OperationOutcome> requestLock() {
324         /*
325          * Failures are handled via the callback, and successes are discarded by
326          * sequence(), without passing them to onComplete().
327          *
328          * Return a COPY of the future so that if we try to cancel it, we'll only cancel
329          * the copy, not the original. This is done by tacking thenApply() onto the end.
330          */
331         lockStart.set(Instant.now());
332         return operContext.requestLock(targetEntity, this::lockUnavailable).thenApply(outcome -> outcome);
333     }
334
335     /**
336      * Indicates that the lock on the target entity is unavailable.
337      *
338      * @param outcome lock outcome
339      */
340     private void lockUnavailable(OperationOutcome outcome) {
341
342         // Note: NEVER invoke onStart() for locks; only invoke onComplete()
343         onComplete(outcome);
344
345         /*
346          * Now that we've added the lock outcome to the queue, ensure the future is
347          * canceled, which may, itself, generate an operation outcome.
348          */
349         cancel();
350     }
351
352     /**
353      * Handles responses provided via the "start" callback. Note: this is never be invoked
354      * for locks; only {@link #onComplete(OperationOutcome)} is invoked for locks.
355      *
356      * @param outcome outcome provided to the callback
357      */
358     private void onStart(OperationOutcome outcome) {
359         if (GUARD_ACTOR.equals(outcome.getActor())) {
360             addOutcome(outcome);
361         }
362     }
363
364     /**
365      * Handles responses provided via the "complete" callback. Note: this is never invoked
366      * for "successful" locks.
367      *
368      * @param outcome outcome provided to the callback
369      */
370     private void onComplete(OperationOutcome outcome) {
371
372         switch (outcome.getActor()) {
373             case LOCK_ACTOR:
374             case GUARD_ACTOR:
375             case CL_TIMEOUT_ACTOR:
376                 addOutcome(outcome);
377                 break;
378
379             default:
380                 if (outcome.isFor(actor, operation)) {
381                     addOutcome(outcome);
382                 }
383                 break;
384         }
385     }
386
387     /**
388      * Adds an outcome to {@link #outcomes}.
389      *
390      * @param outcome outcome to be added
391      */
392     private synchronized void addOutcome(OperationOutcome outcome) {
393         /*
394          * This is synchronized to prevent nextStep() from invoking processOutcome() at
395          * the same time.
396          */
397
398         logger.debug("added outcome={} for {}", outcome, requestId);
399         outcomes.add(outcome);
400
401         if (outcomes.peekFirst() == outcomes.peekLast()) {
402             // this is the first outcome in the queue - process it
403             processOutcome();
404         }
405     }
406
407     /**
408      * Looks for the next step in the queue.
409      *
410      * @return {@code true} if more responses are expected, {@code false} otherwise
411      */
412     public synchronized boolean nextStep() {
413         switch (state) {
414             case LOCK_DENIED:
415             case LOCK_LOST:
416             case GUARD_DENIED:
417             case CONTROL_LOOP_TIMEOUT:
418                 return false;
419             default:
420                 break;
421         }
422
423         OperationOutcome outcome = outcomes.peek();
424         if (outcome == null) {
425             // empty queue
426             return true;
427         }
428
429         if (outcome.isFinalOutcome() && outcome.isFor(actor, operation)) {
430             return false;
431         }
432
433         // first item has been processed, remove it
434         outcomes.remove();
435         if (!outcomes.isEmpty()) {
436             // have a new "first" item - process it
437             processOutcome();
438         }
439
440         return true;
441     }
442
443     /**
444      * Processes the first item in {@link #outcomes}. Sets the state, increments
445      * {@link #attempts}, if appropriate, and stores the operation history in the DB.
446      */
447     private synchronized void processOutcome() {
448         OperationOutcome outcome = outcomes.peek();
449         logger.debug("process outcome={} for {}", outcome, requestId);
450
451         switch (outcome.getActor()) {
452
453             case CL_TIMEOUT_ACTOR:
454                 state = State.CONTROL_LOOP_TIMEOUT;
455                 break;
456
457             case LOCK_ACTOR:
458                 // lock is no longer available
459                 if (state == State.ACTIVE) {
460                     state = State.LOCK_DENIED;
461                     storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Lock");
462                 } else {
463                     state = State.LOCK_LOST;
464                     storeFailureInDataBase(outcome, PolicyResult.FAILURE, "Operation aborted by Lock");
465                 }
466                 break;
467
468             case GUARD_ACTOR:
469                 if (outcome.getEnd() == null) {
470                     state = State.GUARD_STARTED;
471                 } else if (outcome.getResult() == PolicyResult.SUCCESS) {
472                     state = State.GUARD_PERMITTED;
473                 } else {
474                     state = State.GUARD_DENIED;
475                     storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Guard");
476                 }
477                 break;
478
479             default:
480                 // operation completed
481                 ++attempts;
482                 state = (outcome.getResult() == PolicyResult.SUCCESS ? State.OPERATION_SUCCESS
483                                 : State.OPERATION_FAILURE);
484                 operationHistory.add(new Operation(outcome));
485                 storeOperationInDataBase();
486                 break;
487         }
488
489         // indicate that this has changed
490         operContext.updated(this);
491     }
492
493     /**
494      * Get the operation, as a message.
495      *
496      * @return the operation, as a message
497      */
498     public String getOperationMessage() {
499         Operation last = operationHistory.peekLast();
500         return (last == null ? null : last.getClOperation().toMessage());
501     }
502
503     /**
504      * Gets the operation result.
505      *
506      * @return the operation result
507      */
508     public PolicyResult getOperationResult() {
509         Operation last = operationHistory.peekLast();
510         return (last == null ? PolicyResult.FAILURE_EXCEPTION : last.getPolicyResult());
511     }
512
513     /**
514      * Get the latest operation history.
515      *
516      * @return the latest operation history
517      */
518     public String getOperationHistory() {
519         Operation last = operationHistory.peekLast();
520         return (last == null ? null : last.clOperation.toHistory());
521     }
522
523     /**
524      * Get the history.
525      *
526      * @return the list of control loop operations
527      */
528     public List<ControlLoopOperation> getHistory() {
529         return operationHistory.stream().map(Operation::getClOperation).map(ControlLoopOperation::new)
530                         .collect(Collectors.toList());
531     }
532
533     /**
534      * Stores a failure in the DB.
535      *
536      * @param outcome operation outcome
537      * @param result result to put into the DB
538      * @param message message to put into the DB
539      */
540     private void storeFailureInDataBase(OperationOutcome outcome, PolicyResult result, String message) {
541         outcome.setActor(actor);
542         outcome.setOperation(operation);
543         outcome.setMessage(message);
544         outcome.setResult(result);
545
546         operationHistory.add(new Operation(outcome));
547         storeOperationInDataBase();
548     }
549
550     /**
551      * Stores the latest operation in the DB.
552      */
553     private void storeOperationInDataBase() {
554         operContext.getDataManager().store(requestId, eventContext.getEvent(),
555                         operationHistory.peekLast().getClOperation());
556     }
557
558     /**
559      * Determines the target entity.
560      *
561      * @return a future to determine the target entity, or {@code null} if the entity has
562      *         already been determined
563      */
564     protected CompletableFuture<OperationOutcome> detmTarget() {
565         if (policy.getTarget() == null) {
566             throw new IllegalArgumentException("The target is null");
567         }
568
569         if (policy.getTarget().getType() == null) {
570             throw new IllegalArgumentException("The target type is null");
571         }
572
573         switch (policy.getTarget().getType()) {
574             case PNF:
575                 return detmPnfTarget();
576             case VM:
577             case VNF:
578             case VFMODULE:
579                 return detmVfModuleTarget();
580             default:
581                 throw new IllegalArgumentException("The target type is not supported");
582         }
583     }
584
585     /**
586      * Determines the PNF target entity.
587      *
588      * @return a future to determine the target entity, or {@code null} if the entity has
589      *         already been determined
590      */
591     private CompletableFuture<OperationOutcome> detmPnfTarget() {
592         if (!PNF_NAME.equalsIgnoreCase(eventContext.getEvent().getTarget())) {
593             throw new IllegalArgumentException("Target does not match target type");
594         }
595
596         targetEntity = eventContext.getEnrichment().get(PNF_NAME);
597         if (targetEntity == null) {
598             throw new IllegalArgumentException("AAI section is missing " + PNF_NAME);
599         }
600
601         return null;
602     }
603
604     /**
605      * Determines the VF Module target entity.
606      *
607      * @return a future to determine the target entity, or {@code null} if the entity has
608      *         already been determined
609      */
610     private CompletableFuture<OperationOutcome> detmVfModuleTarget() {
611         String targetFieldName = eventContext.getEvent().getTarget();
612         if (targetFieldName == null) {
613             throw new IllegalArgumentException("Target is null");
614         }
615
616         switch (targetFieldName.toLowerCase()) {
617             case VSERVER_VSERVER_NAME:
618                 targetEntity = eventContext.getEnrichment().get(VSERVER_VSERVER_NAME);
619                 break;
620             case GENERIC_VNF_VNF_ID:
621                 targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
622                 break;
623             case GENERIC_VNF_VNF_NAME:
624                 return detmVnfName();
625             default:
626                 throw new IllegalArgumentException("Target does not match target type");
627         }
628
629         if (targetEntity == null) {
630             throw new IllegalArgumentException("Enrichment data is missing " + targetFieldName);
631         }
632
633         return null;
634     }
635
636     /**
637      * Determines the VNF Name target entity.
638      *
639      * @return a future to determine the target entity, or {@code null} if the entity has
640      *         already been determined
641      */
642     @SuppressWarnings("unchecked")
643     private CompletableFuture<OperationOutcome> detmVnfName() {
644         // if the onset is enriched with the vnf-id, we don't need an A&AI response
645         targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
646         if (targetEntity != null) {
647             return null;
648         }
649
650         // vnf-id was not in the onset - obtain it via the custom query
651
652         // @formatter:off
653         ControlLoopOperationParams cqparams = params.toBuilder()
654                         .actor(AaiConstants.ACTOR_NAME)
655                         .operation(AaiCqResponse.OPERATION)
656                         .targetEntity("")
657                         .build();
658         // @formatter:on
659
660         // perform custom query and then extract the VNF ID from it
661         return taskUtil.sequence(() -> eventContext.obtain(AaiCqResponse.CONTEXT_KEY, cqparams),
662                         this::extractVnfFromCq);
663     }
664
665     /**
666      * Extracts the VNF Name target entity from the custom query data.
667      *
668      * @return {@code null}
669      */
670     private CompletableFuture<OperationOutcome> extractVnfFromCq() {
671         // already have the CQ data
672         AaiCqResponse cq = eventContext.getProperty(AaiCqResponse.CONTEXT_KEY);
673         if (cq.getDefaultGenericVnf() == null) {
674             throw new IllegalArgumentException("No vnf-id found");
675         }
676
677         targetEntity = cq.getDefaultGenericVnf().getVnfId();
678         if (targetEntity == null) {
679             throw new IllegalArgumentException("No vnf-id found");
680         }
681
682         return null;
683     }
684 }