b2a8dd308f4ee06679ad5bf51ec541d1f3e24e85
[policy/drools-applications.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2019 Huawei Technologies Co., Ltd. All rights reserved.
7  * Modifications Copyright (C) 2019 Tech Mahindra
8  * Modifications Copyright (C) 2019 Bell Canada.
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.controlloop.eventmanager;
25
26 import java.io.Serializable;
27 import java.time.Instant;
28 import java.util.Deque;
29 import java.util.LinkedHashMap;
30 import java.util.List;
31 import java.util.concurrent.CancellationException;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.ConcurrentLinkedDeque;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicReference;
37 import java.util.stream.Collectors;
38 import lombok.AccessLevel;
39 import lombok.Getter;
40 import lombok.ToString;
41 import org.onap.policy.aai.AaiConstants;
42 import org.onap.policy.aai.AaiCqResponse;
43 import org.onap.policy.controlloop.ControlLoopOperation;
44 import org.onap.policy.controlloop.ControlLoopResponse;
45 import org.onap.policy.controlloop.VirtualControlLoopEvent;
46 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
47 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
48 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
49 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineUtil;
50 import org.onap.policy.controlloop.policy.Policy;
51 import org.onap.policy.controlloop.policy.PolicyResult;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 /**
56  * Manages a single Operation for a single event. Once this has been created,
57  * {@link #start()} should be invoked, and then {@link #nextStep()} should be invoked
58  * continually until it returns {@code false}, indicating that all steps have completed.
59  */
60 @ToString(onlyExplicitlyIncluded = true)
61 public class ControlLoopOperationManager2 implements Serializable {
62     private static final long serialVersionUID = -3773199283624595410L;
63     private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationManager2.class);
64     private static final String CL_TIMEOUT_ACTOR = "-CL-TIMEOUT-";
65     public static final String LOCK_ACTOR = "LOCK";
66     public static final String LOCK_OPERATION = "Lock";
67     private static final String GUARD_ACTOR = "GUARD";
68     public static final String VSERVER_VSERVER_NAME = "vserver.vserver-name";
69     public static final String GENERIC_VNF_VNF_NAME = "generic-vnf.vnf-name";
70     public static final String GENERIC_VNF_VNF_ID = "generic-vnf.vnf-id";
71     public static final String PNF_NAME = "pnf.pnf-name";
72
73     // @formatter:off
74     public enum State {
75         ACTIVE,
76         LOCK_DENIED,
77         LOCK_LOST,
78         GUARD_STARTED,
79         GUARD_PERMITTED,
80         GUARD_DENIED,
81         OPERATION_STARTED,
82         OPERATION_SUCCESS,
83         OPERATION_FAILURE,
84         CONTROL_LOOP_TIMEOUT
85     }
86     // @formatter:on
87
88     private final transient ManagerContext operContext;
89     private final transient ControlLoopEventContext eventContext;
90     private final Policy policy;
91
92     @Getter
93     @ToString.Include
94     private State state = State.ACTIVE;
95
96     @ToString.Include
97     private final String requestId;
98
99     @ToString.Include
100     private final String policyId;
101
102     /**
103      * Bumped each time the "complete" callback is invoked by the Actor, provided it's for
104      * this operation.
105      */
106     @ToString.Include
107     private int attempts = 0;
108
109     private final Deque<Operation> operationHistory = new ConcurrentLinkedDeque<>();
110
111     /**
112      * Set to {@code true} to prevent the last item in {@link #operationHistory} from
113      * being included in the outcome of {@link #getHistory()}. Used when the operation
114      * aborts prematurely due to lock-denied, guard-denied, etc.
115      */
116     private boolean holdLast = false;
117
118     /**
119      * Queue of outcomes yet to be processed. Outcomes are added to this each time the
120      * "start" or "complete" callback is invoked.
121      */
122     @Getter(AccessLevel.PROTECTED)
123     private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();
124
125     /**
126      * Used to cancel the running operation.
127      */
128     @Getter(AccessLevel.PROTECTED)
129     private transient CompletableFuture<OperationOutcome> future = null;
130
131     /**
132      * Target entity. Determined after the lock is granted, though it may require the
133      * custom query to be performed first.
134      */
135     @Getter
136     private String targetEntity;
137
138     @Getter(AccessLevel.PROTECTED)
139     private final transient ControlLoopOperationParams params;
140     private final transient PipelineUtil taskUtil;
141
142     @Getter
143     private ControlLoopResponse controlLoopResponse;
144
145     /**
146      * Time when the lock was first requested.
147      */
148     private transient AtomicReference<Instant> lockStart = new AtomicReference<>();
149
150     // values extracted from the policy
151     @Getter
152     private final String actor;
153     @Getter
154     private final String operation;
155
156
157     /**
158      * Construct an instance.
159      *
160      * @param operContext this operation's context
161      * @param context event context
162      * @param policy operation's policy
163      * @param executor executor for the Operation
164      */
165     public ControlLoopOperationManager2(ManagerContext operContext, ControlLoopEventContext context, Policy policy,
166                     Executor executor) {
167
168         this.operContext = operContext;
169         this.eventContext = context;
170         this.policy = policy;
171         this.requestId = context.getEvent().getRequestId().toString();
172         this.policyId = "" + policy.getId();
173         this.actor = policy.getActor();
174         this.operation = policy.getRecipe();
175
176         // @formatter:off
177         params = ControlLoopOperationParams.builder()
178                         .actorService(operContext.getActorService())
179                         .actor(actor)
180                         .operation(operation)
181                         .context(context)
182                         .executor(executor)
183                         .target(policy.getTarget())
184                         .startCallback(this::onStart)
185                         .completeCallback(this::onComplete)
186                         .build();
187         // @formatter:on
188
189         taskUtil = new PipelineUtil(params);
190     }
191
192     //
193     // Internal class used for tracking
194     //
195     @Getter
196     @ToString
197     private class Operation implements Serializable {
198         private static final long serialVersionUID = 1L;
199
200         private int attempt;
201         private PolicyResult policyResult;
202         private ControlLoopOperation clOperation;
203         private ControlLoopResponse clResponse;
204
205         /**
206          * Constructs the object.
207          *
208          * @param outcome outcome of the operation
209          */
210         public Operation(OperationOutcome outcome) {
211             attempt = ControlLoopOperationManager2.this.attempts;
212             policyResult = outcome.getResult();
213             clOperation = outcome.toControlLoopOperation();
214             clOperation.setTarget(policy.getTarget().toString());
215             clResponse = outcome.getControlLoopResponse();
216
217             if (outcome.getEnd() == null) {
218                 clOperation.setOutcome("Started");
219             } else if (clOperation.getOutcome() == null) {
220                 clOperation.setOutcome("");
221             }
222         }
223     }
224
225     /**
226      * Start the operation, first acquiring any locks that are needed. This should not
227      * throw any exceptions, but will, instead, invoke the callbacks with exceptions.
228      *
229      * @param remainingMs time remaining, in milliseconds, for the control loop
230      */
231     @SuppressWarnings("unchecked")
232     public synchronized void start(long remainingMs) {
233         // this is synchronized while we update "future"
234
235         try {
236             // provide a default, in case something fails before requestLock() is called
237             lockStart.set(Instant.now());
238
239             // @formatter:off
240             future = taskUtil.sequence(
241                 this::detmTarget,
242                 this::requestLock,
243                 this::startOperation);
244             // @formatter:on
245
246             // handle any exceptions that may be thrown, set timeout, and handle timeout
247
248             // @formatter:off
249             future.exceptionally(this::handleException)
250                     .orTimeout(remainingMs, TimeUnit.MILLISECONDS)
251                     .exceptionally(this::handleTimeout);
252             // @formatter:on
253
254         } catch (RuntimeException e) {
255             handleException(e);
256         }
257     }
258
259     /**
260      * Start the operation, after the lock has been acquired.
261      *
262      * @return
263      */
264     private CompletableFuture<OperationOutcome> startOperation() {
265         // @formatter:off
266         ControlLoopOperationParams params2 = params.toBuilder()
267                     .payload(new LinkedHashMap<>())
268                     .retry(policy.getRetry())
269                     .timeoutSec(policy.getTimeout())
270                     .targetEntity(targetEntity)
271                     .build();
272         // @formatter:on
273
274         if (policy.getPayload() != null) {
275             params2.getPayload().putAll(policy.getPayload());
276         }
277
278         return params2.start();
279     }
280
281     /**
282      * Handles exceptions that may be generated.
283      *
284      * @param thrown exception that was generated
285      * @return {@code null}
286      */
287     private OperationOutcome handleException(Throwable thrown) {
288         if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
289             return null;
290         }
291
292         logger.warn("{}.{}: exception starting operation for {}", actor, operation, requestId, thrown);
293         OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
294         outcome.setStart(lockStart.get());
295         outcome.setEnd(Instant.now());
296         outcome.setFinalOutcome(true);
297         onComplete(outcome);
298
299         // this outcome is not used so just return "null"
300         return null;
301     }
302
303     /**
304      * Handles control loop timeout exception.
305      *
306      * @param thrown exception that was generated
307      * @return {@code null}
308      */
309     private OperationOutcome handleTimeout(Throwable thrown) {
310         logger.warn("{}.{}: control loop timeout for {}", actor, operation, requestId, thrown);
311
312         OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
313         outcome.setActor(CL_TIMEOUT_ACTOR);
314         outcome.setOperation(null);
315         outcome.setStart(lockStart.get());
316         outcome.setEnd(Instant.now());
317         outcome.setFinalOutcome(true);
318         onComplete(outcome);
319
320         // cancel the operation, if it's still running
321         future.cancel(false);
322
323         // this outcome is not used so just return "null"
324         return null;
325     }
326
327     /**
328      * Cancels the operation.
329      */
330     public void cancel() {
331         synchronized (this) {
332             if (future == null) {
333                 return;
334             }
335         }
336
337         future.cancel(false);
338     }
339
340     /**
341      * Requests a lock on the {@link #targetEntity}.
342      *
343      * @return a future to await the lock
344      */
345     private CompletableFuture<OperationOutcome> requestLock() {
346         /*
347          * Failures are handled via the callback, and successes are discarded by
348          * sequence(), without passing them to onComplete().
349          *
350          * Return a COPY of the future so that if we try to cancel it, we'll only cancel
351          * the copy, not the original. This is done by tacking thenApply() onto the end.
352          */
353         lockStart.set(Instant.now());
354         return operContext.requestLock(targetEntity, this::lockUnavailable).thenApply(outcome -> outcome);
355     }
356
357     /**
358      * Indicates that the lock on the target entity is unavailable.
359      *
360      * @param outcome lock outcome
361      */
362     private void lockUnavailable(OperationOutcome outcome) {
363
364         // Note: NEVER invoke onStart() for locks; only invoke onComplete()
365         onComplete(outcome);
366
367         /*
368          * Now that we've added the lock outcome to the queue, ensure the future is
369          * canceled, which may, itself, generate an operation outcome.
370          */
371         cancel();
372     }
373
374     /**
375      * Handles responses provided via the "start" callback. Note: this is never be invoked
376      * for locks; only {@link #onComplete(OperationOutcome)} is invoked for locks.
377      *
378      * @param outcome outcome provided to the callback
379      */
380     private void onStart(OperationOutcome outcome) {
381         if (outcome.isFor(actor, operation) || GUARD_ACTOR.equals(outcome.getActor())) {
382             addOutcome(outcome);
383         }
384     }
385
386     /**
387      * Handles responses provided via the "complete" callback. Note: this is never invoked
388      * for "successful" locks.
389      *
390      * @param outcome outcome provided to the callback
391      */
392     private void onComplete(OperationOutcome outcome) {
393
394         switch (outcome.getActor()) {
395             case LOCK_ACTOR:
396             case GUARD_ACTOR:
397             case CL_TIMEOUT_ACTOR:
398                 addOutcome(outcome);
399                 break;
400
401             default:
402                 if (outcome.isFor(actor, operation)) {
403                     addOutcome(outcome);
404                 }
405                 break;
406         }
407     }
408
409     /**
410      * Adds an outcome to {@link #outcomes}.
411      *
412      * @param outcome outcome to be added
413      */
414     private synchronized void addOutcome(OperationOutcome outcome) {
415         /*
416          * This is synchronized to prevent nextStep() from invoking processOutcome() at
417          * the same time.
418          */
419
420         logger.debug("added outcome={} for {}", outcome, requestId);
421         outcomes.add(outcome);
422
423         if (outcomes.peekFirst() == outcomes.peekLast()) {
424             // this is the first outcome in the queue - process it
425             processOutcome();
426         }
427     }
428
429     /**
430      * Looks for the next step in the queue.
431      *
432      * @return {@code true} if more responses are expected, {@code false} otherwise
433      */
434     public synchronized boolean nextStep() {
435         switch (state) {
436             case LOCK_DENIED:
437             case LOCK_LOST:
438             case GUARD_DENIED:
439             case CONTROL_LOOP_TIMEOUT:
440                 holdLast = false;
441                 return false;
442             default:
443                 break;
444         }
445
446         OperationOutcome outcome = outcomes.peek();
447         if (outcome == null) {
448             // empty queue
449             return true;
450         }
451
452         if (outcome.isFinalOutcome() && outcome.isFor(actor, operation)) {
453             controlLoopResponse = null;
454             return false;
455         }
456
457         // first item has been processed, remove it
458         outcomes.remove();
459         if (!outcomes.isEmpty()) {
460             // have a new "first" item - process it
461             processOutcome();
462         }
463
464         return true;
465     }
466
467     /**
468      * Processes the first item in {@link #outcomes}. Sets the state, increments
469      * {@link #attempts}, if appropriate, and stores the operation history in the DB.
470      */
471     private synchronized void processOutcome() {
472         OperationOutcome outcome = outcomes.peek();
473         logger.debug("process outcome={} for {}", outcome, requestId);
474
475         controlLoopResponse = null;
476
477         switch (outcome.getActor()) {
478
479             case CL_TIMEOUT_ACTOR:
480                 state = State.CONTROL_LOOP_TIMEOUT;
481                 processAbort(outcome, PolicyResult.FAILURE, "Control loop timed out");
482                 break;
483
484             case LOCK_ACTOR:
485                 // lock is no longer available
486                 if (state == State.ACTIVE) {
487                     state = State.LOCK_DENIED;
488                     storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Lock");
489                 } else {
490                     state = State.LOCK_LOST;
491                     processAbort(outcome, PolicyResult.FAILURE, "Operation aborted by Lock");
492                 }
493                 break;
494
495             case GUARD_ACTOR:
496                 if (outcome.getEnd() == null) {
497                     state = State.GUARD_STARTED;
498                 } else if (outcome.getResult() == PolicyResult.SUCCESS) {
499                     state = State.GUARD_PERMITTED;
500                 } else {
501                     state = State.GUARD_DENIED;
502                     storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Guard");
503                 }
504                 break;
505
506             default:
507                 if (outcome.getEnd() == null) {
508                     // operation started
509                     ++attempts;
510                     state = State.OPERATION_STARTED;
511
512                 } else {
513                     /*
514                      * Operation completed. If the last entry was a "start" (i.e., "end" field
515                      * is null), then replace it. Otherwise, just add the completion.
516                      */
517                     state = (outcome.getResult() == PolicyResult.SUCCESS ? State.OPERATION_SUCCESS
518                                     : State.OPERATION_FAILURE);
519                     controlLoopResponse = makeControlLoopResponse(outcome.getControlLoopResponse());
520                     if (!operationHistory.isEmpty() && operationHistory.peekLast().getClOperation().getEnd() == null) {
521                         operationHistory.removeLast();
522                     }
523                 }
524
525                 operationHistory.add(new Operation(outcome));
526                 storeOperationInDataBase();
527                 break;
528         }
529
530         // indicate that this has changed
531         operContext.updated(this);
532     }
533
534     /**
535      * Processes an operation abort, updating the DB record, if an operation has been
536      * started.
537      *
538      * @param outcome operation outcome
539      * @param result result to put into the DB
540      * @param message message to put into the DB
541      */
542     private void processAbort(OperationOutcome outcome, PolicyResult result, String message) {
543         if (operationHistory.isEmpty() || operationHistory.peekLast().getClOperation().getEnd() != null) {
544             // last item was not a "start" operation
545
546             // NOTE: do NOT generate control loop response since operation was not started
547
548             storeFailureInDataBase(outcome, result, message);
549             return;
550         }
551
552         // last item was a "start" operation - replace it with a failure
553         final Operation operOrig = operationHistory.removeLast();
554
555         // use start time from the operation, itself
556         if (operOrig != null && operOrig.getClOperation() != null) {
557             outcome.setStart(operOrig.getClOperation().getStart());
558         }
559
560         controlLoopResponse = makeControlLoopResponse(outcome.getControlLoopResponse());
561
562         storeFailureInDataBase(outcome, result, message);
563     }
564
565     /**
566      * Makes a control loop response.
567      *
568      * @param source original control loop response or {@code null}
569      * @return a new control loop response, or {@code null} if none is required
570      */
571     protected ControlLoopResponse makeControlLoopResponse(ControlLoopResponse source) {
572         if (source != null) {
573             return source;
574         }
575
576         // only generate response for certain actors.
577         if (!actor.equals("SDNR")) {
578             return null;
579         }
580
581         VirtualControlLoopEvent event = eventContext.getEvent();
582
583         ControlLoopResponse clRsp = new ControlLoopResponse();
584         clRsp.setFrom(actor);
585         clRsp.setTarget("DCAE");
586         clRsp.setClosedLoopControlName(event.getClosedLoopControlName());
587         clRsp.setPolicyName(event.getPolicyName());
588         clRsp.setPolicyVersion(event.getPolicyVersion());
589         clRsp.setRequestId(event.getRequestId());
590         clRsp.setVersion(event.getVersion());
591
592         return clRsp;
593     }
594
595     /**
596      * Get the operation, as a message.
597      *
598      * @return the operation, as a message
599      */
600     public String getOperationMessage() {
601         Operation last = operationHistory.peekLast();
602         return (last == null ? null : last.getClOperation().toMessage());
603     }
604
605     /**
606      * Gets the operation result.
607      *
608      * @return the operation result
609      */
610     public PolicyResult getOperationResult() {
611         Operation last = operationHistory.peekLast();
612         return (last == null ? PolicyResult.FAILURE_EXCEPTION : last.getPolicyResult());
613     }
614
615     /**
616      * Get the latest operation history.
617      *
618      * @return the latest operation history
619      */
620     public String getOperationHistory() {
621         Operation last = operationHistory.peekLast();
622         return (last == null ? null : last.clOperation.toHistory());
623     }
624
625     /**
626      * Get the history.
627      *
628      * @return the list of control loop operations
629      */
630     public List<ControlLoopOperation> getHistory() {
631         Operation last = (holdLast ? operationHistory.removeLast() : null);
632
633         List<ControlLoopOperation> result = operationHistory.stream().map(Operation::getClOperation)
634                         .map(ControlLoopOperation::new).collect(Collectors.toList());
635
636         if (last != null) {
637             operationHistory.add(last);
638         }
639
640         return result;
641     }
642
643     /**
644      * Stores a failure in the DB.
645      *
646      * @param outcome operation outcome
647      * @param result result to put into the DB
648      * @param message message to put into the DB
649      */
650     private void storeFailureInDataBase(OperationOutcome outcome, PolicyResult result, String message) {
651         // don't include this in history yet
652         holdLast = true;
653
654         outcome.setActor(actor);
655         outcome.setOperation(operation);
656         outcome.setMessage(message);
657         outcome.setResult(result);
658
659         operationHistory.add(new Operation(outcome));
660         storeOperationInDataBase();
661     }
662
663     /**
664      * Stores the latest operation in the DB.
665      */
666     private void storeOperationInDataBase() {
667         operContext.getDataManager().store(requestId, eventContext.getEvent(), targetEntity,
668                         operationHistory.peekLast().getClOperation());
669     }
670
671     /**
672      * Determines the target entity.
673      *
674      * @return a future to determine the target entity, or {@code null} if the entity has
675      *         already been determined
676      */
677     protected CompletableFuture<OperationOutcome> detmTarget() {
678         if (policy.getTarget() == null) {
679             throw new IllegalArgumentException("The target is null");
680         }
681
682         if (policy.getTarget().getType() == null) {
683             throw new IllegalArgumentException("The target type is null");
684         }
685
686         switch (policy.getTarget().getType()) {
687             case PNF:
688                 return detmPnfTarget();
689             case VM:
690             case VNF:
691             case VFMODULE:
692                 return detmVfModuleTarget();
693             default:
694                 throw new IllegalArgumentException("The target type is not supported");
695         }
696     }
697
698     /**
699      * Determines the PNF target entity.
700      *
701      * @return a future to determine the target entity, or {@code null} if the entity has
702      *         already been determined
703      */
704     private CompletableFuture<OperationOutcome> detmPnfTarget() {
705         if (!PNF_NAME.equalsIgnoreCase(eventContext.getEvent().getTarget())) {
706             throw new IllegalArgumentException("Target does not match target type");
707         }
708
709         targetEntity = eventContext.getEnrichment().get(PNF_NAME);
710         if (targetEntity == null) {
711             throw new IllegalArgumentException("AAI section is missing " + PNF_NAME);
712         }
713
714         return null;
715     }
716
717     /**
718      * Determines the VF Module target entity.
719      *
720      * @return a future to determine the target entity, or {@code null} if the entity has
721      *         already been determined
722      */
723     private CompletableFuture<OperationOutcome> detmVfModuleTarget() {
724         String targetFieldName = eventContext.getEvent().getTarget();
725         if (targetFieldName == null) {
726             throw new IllegalArgumentException("Target is null");
727         }
728
729         switch (targetFieldName.toLowerCase()) {
730             case VSERVER_VSERVER_NAME:
731                 targetEntity = eventContext.getEnrichment().get(VSERVER_VSERVER_NAME);
732                 break;
733             case GENERIC_VNF_VNF_ID:
734                 targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
735                 break;
736             case GENERIC_VNF_VNF_NAME:
737                 return detmVnfName();
738             default:
739                 throw new IllegalArgumentException("Target does not match target type");
740         }
741
742         if (targetEntity == null) {
743             throw new IllegalArgumentException("Enrichment data is missing " + targetFieldName);
744         }
745
746         return null;
747     }
748
749     /**
750      * Determines the VNF Name target entity.
751      *
752      * @return a future to determine the target entity, or {@code null} if the entity has
753      *         already been determined
754      */
755     @SuppressWarnings("unchecked")
756     private CompletableFuture<OperationOutcome> detmVnfName() {
757         // if the onset is enriched with the vnf-id, we don't need an A&AI response
758         targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
759         if (targetEntity != null) {
760             return null;
761         }
762
763         // vnf-id was not in the onset - obtain it via the custom query
764
765         // @formatter:off
766         ControlLoopOperationParams cqparams = params.toBuilder()
767                         .actor(AaiConstants.ACTOR_NAME)
768                         .operation(AaiCqResponse.OPERATION)
769                         .targetEntity("")
770                         .build();
771         // @formatter:on
772
773         // perform custom query and then extract the VNF ID from it
774         return taskUtil.sequence(() -> eventContext.obtain(AaiCqResponse.CONTEXT_KEY, cqparams),
775                         this::extractVnfFromCq);
776     }
777
778     /**
779      * Extracts the VNF Name target entity from the custom query data.
780      *
781      * @return {@code null}
782      */
783     private CompletableFuture<OperationOutcome> extractVnfFromCq() {
784         // already have the CQ data
785         AaiCqResponse cq = eventContext.getProperty(AaiCqResponse.CONTEXT_KEY);
786         if (cq.getDefaultGenericVnf() == null) {
787             throw new IllegalArgumentException("No vnf-id found");
788         }
789
790         targetEntity = cq.getDefaultGenericVnf().getVnfId();
791         if (targetEntity == null) {
792             throw new IllegalArgumentException("No vnf-id found");
793         }
794
795         return null;
796     }
797 }