2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2019 Huawei Technologies Co., Ltd. All rights reserved.
7 * Modifications Copyright (C) 2019 Tech Mahindra
8 * Modifications Copyright (C) 2019 Bell Canada.
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
21 * ============LICENSE_END=========================================================
24 package org.onap.policy.controlloop.eventmanager;
26 import java.io.Serializable;
27 import java.time.Instant;
28 import java.util.Deque;
29 import java.util.LinkedHashMap;
30 import java.util.List;
31 import java.util.concurrent.CancellationException;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.ConcurrentLinkedDeque;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicReference;
37 import java.util.stream.Collectors;
38 import lombok.AccessLevel;
40 import lombok.ToString;
41 import org.onap.policy.aai.AaiConstants;
42 import org.onap.policy.aai.AaiCqResponse;
43 import org.onap.policy.controlloop.ControlLoopOperation;
44 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
45 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
46 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
47 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineUtil;
48 import org.onap.policy.controlloop.policy.Policy;
49 import org.onap.policy.controlloop.policy.PolicyResult;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
54 * Manages a single Operation for a single event. Once this has been created,
55 * {@link #start()} should be invoked, and then {@link #nextStep()} should be invoked
56 * continually until it returns {@code false}, indicating that all steps have completed.
58 @ToString(onlyExplicitlyIncluded = true)
59 public class ControlLoopOperationManager2 implements Serializable {
60 private static final long serialVersionUID = -3773199283624595410L;
61 private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationManager2.class);
62 private static final String CL_TIMEOUT_ACTOR = "-CL-TIMEOUT-";
63 public static final String LOCK_ACTOR = "LOCK";
64 public static final String LOCK_OPERATION = "Lock";
65 private static final String GUARD_ACTOR = "GUARD";
66 public static final String VSERVER_VSERVER_NAME = "vserver.vserver-name";
67 public static final String GENERIC_VNF_VNF_NAME = "generic-vnf.vnf-name";
68 public static final String GENERIC_VNF_VNF_ID = "generic-vnf.vnf-id";
69 public static final String PNF_NAME = "pnf.pnf-name";
86 private final transient ManagerContext operContext;
87 private final transient ControlLoopEventContext eventContext;
88 private final Policy policy;
92 private State state = State.ACTIVE;
95 private final String requestId;
98 private final String policyId;
101 * Bumped each time the "complete" callback is invoked by the Actor, provided it's for
105 private int attempts = 0;
107 private final Deque<Operation> operationHistory = new ConcurrentLinkedDeque<>();
110 * Queue of outcomes yet to be processed. Outcomes are added to this each time the
111 * "start" or "complete" callback is invoked.
113 @Getter(AccessLevel.PROTECTED)
114 private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();
117 * Used to cancel the running operation.
119 @Getter(AccessLevel.PROTECTED)
120 private transient CompletableFuture<OperationOutcome> future = null;
123 * Target entity. Determined after the lock is granted, though it may require the
124 * custom query to be performed first.
127 private String targetEntity;
129 @Getter(AccessLevel.PROTECTED)
130 private final transient ControlLoopOperationParams params;
131 private final transient PipelineUtil taskUtil;
134 * Time when the lock was first requested.
136 private transient AtomicReference<Instant> lockStart = new AtomicReference<>();
138 // values extracted from the policy
140 private final String actor;
142 private final String operation;
146 * Construct an instance.
148 * @param operContext this operation's context
149 * @param context event context
150 * @param policy operation's policy
151 * @param executor executor for the Operation
153 public ControlLoopOperationManager2(ManagerContext operContext, ControlLoopEventContext context, Policy policy,
156 this.operContext = operContext;
157 this.eventContext = context;
158 this.policy = policy;
159 this.requestId = context.getEvent().getRequestId().toString();
160 this.policyId = "" + policy.getId();
161 this.actor = policy.getActor();
162 this.operation = policy.getRecipe();
165 params = ControlLoopOperationParams.builder()
166 .actorService(operContext.getActorService())
168 .operation(operation)
171 .target(policy.getTarget())
172 .startCallback(this::onStart)
173 .completeCallback(this::onComplete)
177 taskUtil = new PipelineUtil(params);
181 // Internal class used for tracking
185 private class Operation implements Serializable {
186 private static final long serialVersionUID = 1L;
189 private PolicyResult policyResult;
190 private ControlLoopOperation clOperation;
193 * Constructs the object.
195 * @param outcome outcome of the operation
197 public Operation(OperationOutcome outcome) {
198 attempt = ControlLoopOperationManager2.this.attempts;
199 policyResult = outcome.getResult();
200 clOperation = outcome.toControlLoopOperation();
201 clOperation.setTarget(policy.getTarget().toString());
206 * Start the operation, first acquiring any locks that are needed. This should not
207 * throw any exceptions, but will, instead, invoke the callbacks with exceptions.
209 * @param remainingMs time remaining, in milliseconds, for the control loop
211 @SuppressWarnings("unchecked")
212 public synchronized void start(long remainingMs) {
213 // this is synchronized while we update "future"
216 // provide a default, in case something fails before requestLock() is called
217 lockStart.set(Instant.now());
220 future = taskUtil.sequence(
223 this::startOperation);
226 // handle any exceptions that may be thrown, set timeout, and handle timeout
229 future.exceptionally(this::handleException)
230 .orTimeout(remainingMs, TimeUnit.MILLISECONDS)
231 .exceptionally(this::handleTimeout);
234 } catch (RuntimeException e) {
240 * Start the operation, after the lock has been acquired.
244 private CompletableFuture<OperationOutcome> startOperation() {
246 ControlLoopOperationParams params2 = params.toBuilder()
247 .payload(new LinkedHashMap<>())
248 .retry(policy.getRetry())
249 .timeoutSec(policy.getTimeout())
250 .targetEntity(targetEntity)
254 if (policy.getPayload() != null) {
255 params2.getPayload().putAll(policy.getPayload());
258 return params2.start();
262 * Handles exceptions that may be generated.
264 * @param thrown exception that was generated
265 * @return {@code null}
267 private OperationOutcome handleException(Throwable thrown) {
268 if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
272 logger.warn("{}.{}: exception starting operation for {}", actor, operation, requestId, thrown);
273 OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
274 outcome.setStart(lockStart.get());
275 outcome.setEnd(Instant.now());
276 outcome.setFinalOutcome(true);
279 // this outcome is not used so just return "null"
284 * Handles control loop timeout exception.
286 * @param thrown exception that was generated
287 * @return {@code null}
289 private OperationOutcome handleTimeout(Throwable thrown) {
290 logger.warn("{}.{}: control loop timeout for {}", actor, operation, requestId, thrown);
292 OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
293 outcome.setActor(CL_TIMEOUT_ACTOR);
294 outcome.setOperation(null);
295 outcome.setStart(lockStart.get());
296 outcome.setEnd(Instant.now());
297 outcome.setFinalOutcome(true);
300 // cancel the operation, if it's still running
301 future.cancel(false);
303 // this outcome is not used so just return "null"
308 * Cancels the operation.
310 public void cancel() {
311 synchronized (this) {
312 if (future == null) {
317 future.cancel(false);
321 * Requests a lock on the {@link #targetEntity}.
323 * @return a future to await the lock
325 private CompletableFuture<OperationOutcome> requestLock() {
327 * Failures are handled via the callback, and successes are discarded by
328 * sequence(), without passing them to onComplete().
330 * Return a COPY of the future so that if we try to cancel it, we'll only cancel
331 * the copy, not the original. This is done by tacking thenApply() onto the end.
333 lockStart.set(Instant.now());
334 return operContext.requestLock(targetEntity, this::lockUnavailable).thenApply(outcome -> outcome);
338 * Indicates that the lock on the target entity is unavailable.
340 * @param outcome lock outcome
342 private void lockUnavailable(OperationOutcome outcome) {
344 // Note: NEVER invoke onStart() for locks; only invoke onComplete()
348 * Now that we've added the lock outcome to the queue, ensure the future is
349 * canceled, which may, itself, generate an operation outcome.
355 * Handles responses provided via the "start" callback. Note: this is never be invoked
356 * for locks; only {@link #onComplete(OperationOutcome)} is invoked for locks.
358 * @param outcome outcome provided to the callback
360 private void onStart(OperationOutcome outcome) {
361 if (outcome.isFor(actor, operation) || GUARD_ACTOR.equals(outcome.getActor())) {
367 * Handles responses provided via the "complete" callback. Note: this is never invoked
368 * for "successful" locks.
370 * @param outcome outcome provided to the callback
372 private void onComplete(OperationOutcome outcome) {
374 switch (outcome.getActor()) {
377 case CL_TIMEOUT_ACTOR:
382 if (outcome.isFor(actor, operation)) {
390 * Adds an outcome to {@link #outcomes}.
392 * @param outcome outcome to be added
394 private synchronized void addOutcome(OperationOutcome outcome) {
396 * This is synchronized to prevent nextStep() from invoking processOutcome() at
400 logger.debug("added outcome={} for {}", outcome, requestId);
401 outcomes.add(outcome);
403 if (outcomes.peekFirst() == outcomes.peekLast()) {
404 // this is the first outcome in the queue - process it
410 * Looks for the next step in the queue.
412 * @return {@code true} if more responses are expected, {@code false} otherwise
414 public synchronized boolean nextStep() {
419 case CONTROL_LOOP_TIMEOUT:
425 OperationOutcome outcome = outcomes.peek();
426 if (outcome == null) {
431 if (outcome.isFinalOutcome() && outcome.isFor(actor, operation)) {
435 // first item has been processed, remove it
437 if (!outcomes.isEmpty()) {
438 // have a new "first" item - process it
446 * Processes the first item in {@link #outcomes}. Sets the state, increments
447 * {@link #attempts}, if appropriate, and stores the operation history in the DB.
449 private synchronized void processOutcome() {
450 OperationOutcome outcome = outcomes.peek();
451 logger.debug("process outcome={} for {}", outcome, requestId);
453 switch (outcome.getActor()) {
455 case CL_TIMEOUT_ACTOR:
456 state = State.CONTROL_LOOP_TIMEOUT;
460 // lock is no longer available
461 if (state == State.ACTIVE) {
462 state = State.LOCK_DENIED;
463 storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Lock");
465 state = State.LOCK_LOST;
466 storeFailureInDataBase(outcome, PolicyResult.FAILURE, "Operation aborted by Lock");
471 if (outcome.getEnd() == null) {
472 state = State.GUARD_STARTED;
473 } else if (outcome.getResult() == PolicyResult.SUCCESS) {
474 state = State.GUARD_PERMITTED;
476 state = State.GUARD_DENIED;
477 storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Guard");
482 if (outcome.getEnd() == null) {
485 state = State.OPERATION_STARTED;
486 operationHistory.add(new Operation(outcome));
491 * Operation completed. If the last entry was a "start" (i.e., "end" field
492 * is null), then replace it. Otherwise, just add the completion.
494 state = (outcome.getResult() == PolicyResult.SUCCESS ? State.OPERATION_SUCCESS
495 : State.OPERATION_FAILURE);
496 if (!operationHistory.isEmpty() && operationHistory.peekLast().getClOperation().getEnd() == null) {
497 operationHistory.removeLast();
499 operationHistory.add(new Operation(outcome));
500 storeOperationInDataBase();
504 // indicate that this has changed
505 operContext.updated(this);
509 * Get the operation, as a message.
511 * @return the operation, as a message
513 public String getOperationMessage() {
514 Operation last = operationHistory.peekLast();
515 return (last == null ? null : last.getClOperation().toMessage());
519 * Gets the operation result.
521 * @return the operation result
523 public PolicyResult getOperationResult() {
524 Operation last = operationHistory.peekLast();
525 return (last == null ? PolicyResult.FAILURE_EXCEPTION : last.getPolicyResult());
529 * Get the latest operation history.
531 * @return the latest operation history
533 public String getOperationHistory() {
534 Operation last = operationHistory.peekLast();
535 return (last == null ? null : last.clOperation.toHistory());
541 * @return the list of control loop operations
543 public List<ControlLoopOperation> getHistory() {
544 return operationHistory.stream().map(Operation::getClOperation).map(ControlLoopOperation::new)
545 .collect(Collectors.toList());
549 * Stores a failure in the DB.
551 * @param outcome operation outcome
552 * @param result result to put into the DB
553 * @param message message to put into the DB
555 private void storeFailureInDataBase(OperationOutcome outcome, PolicyResult result, String message) {
556 outcome.setActor(actor);
557 outcome.setOperation(operation);
558 outcome.setMessage(message);
559 outcome.setResult(result);
561 operationHistory.add(new Operation(outcome));
562 storeOperationInDataBase();
566 * Stores the latest operation in the DB.
568 private void storeOperationInDataBase() {
569 operContext.getDataManager().store(requestId, eventContext.getEvent(),
570 operationHistory.peekLast().getClOperation());
574 * Determines the target entity.
576 * @return a future to determine the target entity, or {@code null} if the entity has
577 * already been determined
579 protected CompletableFuture<OperationOutcome> detmTarget() {
580 if (policy.getTarget() == null) {
581 throw new IllegalArgumentException("The target is null");
584 if (policy.getTarget().getType() == null) {
585 throw new IllegalArgumentException("The target type is null");
588 switch (policy.getTarget().getType()) {
590 return detmPnfTarget();
594 return detmVfModuleTarget();
596 throw new IllegalArgumentException("The target type is not supported");
601 * Determines the PNF target entity.
603 * @return a future to determine the target entity, or {@code null} if the entity has
604 * already been determined
606 private CompletableFuture<OperationOutcome> detmPnfTarget() {
607 if (!PNF_NAME.equalsIgnoreCase(eventContext.getEvent().getTarget())) {
608 throw new IllegalArgumentException("Target does not match target type");
611 targetEntity = eventContext.getEnrichment().get(PNF_NAME);
612 if (targetEntity == null) {
613 throw new IllegalArgumentException("AAI section is missing " + PNF_NAME);
620 * Determines the VF Module target entity.
622 * @return a future to determine the target entity, or {@code null} if the entity has
623 * already been determined
625 private CompletableFuture<OperationOutcome> detmVfModuleTarget() {
626 String targetFieldName = eventContext.getEvent().getTarget();
627 if (targetFieldName == null) {
628 throw new IllegalArgumentException("Target is null");
631 switch (targetFieldName.toLowerCase()) {
632 case VSERVER_VSERVER_NAME:
633 targetEntity = eventContext.getEnrichment().get(VSERVER_VSERVER_NAME);
635 case GENERIC_VNF_VNF_ID:
636 targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
638 case GENERIC_VNF_VNF_NAME:
639 return detmVnfName();
641 throw new IllegalArgumentException("Target does not match target type");
644 if (targetEntity == null) {
645 throw new IllegalArgumentException("Enrichment data is missing " + targetFieldName);
652 * Determines the VNF Name target entity.
654 * @return a future to determine the target entity, or {@code null} if the entity has
655 * already been determined
657 @SuppressWarnings("unchecked")
658 private CompletableFuture<OperationOutcome> detmVnfName() {
659 // if the onset is enriched with the vnf-id, we don't need an A&AI response
660 targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
661 if (targetEntity != null) {
665 // vnf-id was not in the onset - obtain it via the custom query
668 ControlLoopOperationParams cqparams = params.toBuilder()
669 .actor(AaiConstants.ACTOR_NAME)
670 .operation(AaiCqResponse.OPERATION)
675 // perform custom query and then extract the VNF ID from it
676 return taskUtil.sequence(() -> eventContext.obtain(AaiCqResponse.CONTEXT_KEY, cqparams),
677 this::extractVnfFromCq);
681 * Extracts the VNF Name target entity from the custom query data.
683 * @return {@code null}
685 private CompletableFuture<OperationOutcome> extractVnfFromCq() {
686 // already have the CQ data
687 AaiCqResponse cq = eventContext.getProperty(AaiCqResponse.CONTEXT_KEY);
688 if (cq.getDefaultGenericVnf() == null) {
689 throw new IllegalArgumentException("No vnf-id found");
692 targetEntity = cq.getDefaultGenericVnf().getVnfId();
693 if (targetEntity == null) {
694 throw new IllegalArgumentException("No vnf-id found");