Remove dmaap from models
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartial.java
index c81575f..c19ad6c 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * ONAP
  * ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2020-2022 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -24,7 +24,6 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -65,9 +64,7 @@ import org.slf4j.LoggerFactory;
  * Partial implementation of an operator. In general, it's preferable that subclasses
  * would override {@link #startOperationAsync(int, OperationOutcome)
  * startOperationAsync()}. However, if that proves to be too difficult, then they can
- * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
- * if the operation requires any preprocessor steps, the subclass may choose to override
- * {@link #startPreprocessorAsync()}.
+ * simply override {@link #doOperation(int, OperationOutcome) doOperation()}.
  * <p/>
  * The futures returned by the methods within this class can be canceled, and will
  * propagate the cancellation to any subtasks. Thus it is also expected that any futures
@@ -121,7 +118,7 @@ public abstract class OperationPartial implements Operation {
      * @param config configuration for this operation
      * @param propertyNames names of properties required by this operation
      */
-    public OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List<String> propertyNames) {
+    protected OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List<String> propertyNames) {
         this.params = params;
         this.config = config;
         this.fullName = params.getActor() + "." + params.getOperation();
@@ -159,153 +156,36 @@ public abstract class OperationPartial implements Operation {
         return (T) properties.get(name);
     }
 
-    @Override
-    public CompletableFuture<OperationOutcome> start() {
-        // allocate a controller for the entire operation
-        final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
-        if (preproc == null) {
-            // no preprocessor required - just start the operation
-            return startOperationAttempt(controller, 1);
-        }
-
-        /*
-         * Do preprocessor first and then, if successful, start the operation. Note:
-         * operations create their own outcome, ignoring the outcome from any previous
-         * steps.
-         *
-         * Wrap the preprocessor to ensure "stop" is propagated to it.
-         */
-        // @formatter:off
-        controller.wrap(preproc)
-                        .exceptionally(fromException("preprocessor of operation"))
-                        .thenCompose(handlePreprocessorFailure(controller))
-                        .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
-                        .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
-        // @formatter:on
-
-        return controller;
-    }
-
     /**
-     * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
-     * invokes the call-backs, marks the controller complete, and returns an incomplete
-     * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
-     * received.
-     * <p/>
-     * Assumes that no callbacks have been invoked yet.
+     * Gets a property value, throwing an exception if it's missing.
      *
-     * @param controller pipeline controller
-     * @return a function that checks the outcome status and continues, if successful, or
-     *         indicates a failure otherwise
+     * @param name property name
+     * @param propertyType property type, used in an error message if the property value
+     *        is {@code null}
+     * @return the property value
      */
-    private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
-                    PipelineControllerFuture<OperationOutcome> controller) {
-
-        return outcome -> {
-
-            if (isSuccess(outcome)) {
-                logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
-                return CompletableFuture.completedFuture(outcome);
-            }
-
-            logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
-
-            final Executor executor = params.getExecutor();
-            final CallbackManager callbacks = new CallbackManager();
-
-            // propagate "stop" to the callbacks
-            controller.add(callbacks);
-
-            final OperationOutcome outcome2 = params.makeOutcome(getTargetEntity());
-
-            // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
-
-            outcome2.setFinalOutcome(true);
-            outcome2.setResult(OperationResult.FAILURE_GUARD);
-            outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
-
-            // @formatter:off
-            CompletableFuture.completedFuture(outcome2)
-                            .whenCompleteAsync(callbackStarted(callbacks), executor)
-                            .whenCompleteAsync(callbackCompleted(callbacks), executor)
-                            .whenCompleteAsync(controller.delayedComplete(), executor);
-            // @formatter:on
-
-            return new CompletableFuture<>();
-        };
-    }
-
-    /**
-     * Invokes the operation's preprocessor step(s) as a "future". This method simply
-     * returns {@code null}.
-     * <p/>
-     * This method assumes the following:
-     * <ul>
-     * <li>the operator is alive</li>
-     * <li>exceptions generated within the pipeline will be handled by the invoker</li>
-     * </ul>
-     *
-     * @return a function that will start the preprocessor and returns its outcome, or
-     *         {@code null} if this operation needs no preprocessor
-     */
-    protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
-        return null;
-    }
-
-    /**
-     * Invokes the operation's guard step(s) as a "future".
-     * <p/>
-     * This method assumes the following:
-     * <ul>
-     * <li>the operator is alive</li>
-     * <li>exceptions generated within the pipeline will be handled by the invoker</li>
-     * </ul>
-     *
-     * @return a function that will start the guard checks and returns its outcome, or
-     *         {@code null} if this operation has no guard
-     */
-    protected CompletableFuture<OperationOutcome> startGuardAsync() {
-        if (params.isPreprocessed()) {
-            return null;
+    @SuppressWarnings("unchecked")
+    public <T> T getRequiredProperty(String name, String propertyType) {
+        T value = (T) properties.get(name);
+        if (value == null) {
+            throw new IllegalStateException("missing " + propertyType);
         }
 
-        // get the guard payload
-        Map<String, Object> payload = makeGuardPayload();
-
-        /*
-         * Note: can't use constants from actor.guard, because that would create a
-         * circular dependency.
-         */
-        return params.toBuilder().actor(GUARD_ACTOR_NAME).operation(GUARD_OPERATION_NAME).retry(null).timeoutSec(null)
-                        .payload(payload).build().start();
+        return value;
     }
 
-    /**
-     * Creates a payload to execute a guard operation.
-     *
-     * @return a new guard payload
-     */
-    protected Map<String, Object> makeGuardPayload() {
-        // TODO delete this once preprocessing is done by the application
-        Map<String, Object> guard = new LinkedHashMap<>();
-        guard.put("actor", params.getActor());
-        guard.put("operation", params.getOperation());
-        guard.put("target", getTargetEntity());
-        guard.put("requestId", params.getRequestId());
-
-        String clname = params.getContext().getEvent().getClosedLoopControlName();
-        if (clname != null) {
-            guard.put("clname", clname);
-        }
+    @Override
+    public CompletableFuture<OperationOutcome> start() {
+        // allocate a controller for the entire operation
+        final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
-        return guard;
+        // start attempt #1
+        return startOperationAttempt(controller, 1);
     }
 
     /**
-     * Starts the operation attempt, with no preprocessor. When all retries complete, it
-     * will complete the controller.
+     * Starts the operation attempt. When all retries complete, it will complete the
+     * controller.
      *
      * @param controller controller for all operation attempts
      * @param attempt attempt number, typically starting with 1
@@ -337,7 +217,6 @@ public abstract class OperationPartial implements Operation {
     /**
      * Starts the operation attempt, without doing any retries.
      *
-     * @param params operation parameters
      * @param attempt attempt number, typically starting with 1
      * @return a future that will return the result of a single operation attempt
      */
@@ -345,9 +224,9 @@ public abstract class OperationPartial implements Operation {
 
         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
 
-        final Executor executor = params.getExecutor();
-        final OperationOutcome outcome = params.makeOutcome(getTargetEntity());
-        final CallbackManager callbacks = new CallbackManager();
+        final var executor = params.getExecutor();
+        final var outcome = makeOutcome();
+        final var callbacks = new CallbackManager();
 
         // this operation attempt gets its own controller
         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
@@ -477,7 +356,7 @@ public abstract class OperationPartial implements Operation {
                 outcome = origOutcome;
             } else {
                 logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
-                outcome = this.setOutcome(params.makeOutcome(getTargetEntity()), OperationResult.FAILURE);
+                outcome = this.setOutcome(makeOutcome(), OperationResult.FAILURE);
             }
 
             // ensure correct actor/operation
@@ -502,7 +381,7 @@ public abstract class OperationPartial implements Operation {
 
             } else {
                 /*
-                 * retries were specified and we've already tried them all - change to
+                 * retries were specified, and we've already tried them all - change to
                  * FAILURE_RETRIES
                  */
                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
@@ -576,7 +455,7 @@ public abstract class OperationPartial implements Operation {
     private Function<Throwable, OperationOutcome> fromException(String type) {
 
         return thrown -> {
-            OperationOutcome outcome = params.makeOutcome(getTargetEntity());
+            OperationOutcome outcome = makeOutcome();
 
             if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
                 // do not include exception in the message, as it just clutters the log
@@ -638,7 +517,7 @@ public abstract class OperationPartial implements Operation {
             return futures[0];
         }
 
-        CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
+        CompletableFuture.anyOf(futures).thenApply(OperationOutcome.class::cast)
                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
 
         return controller;
@@ -714,14 +593,13 @@ public abstract class OperationPartial implements Operation {
      * @return an array of futures, possibly zero-length. If the array is of size one,
      *         then that one item should be returned instead of the controller
      */
+    @SuppressWarnings("unchecked")
     private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
                     UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
 
         if (futureMakers.isEmpty()) {
-            @SuppressWarnings("unchecked")
-            CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
-            return result;
+            return new CompletableFuture[0];
         }
 
         // the last, unadorned future that is created
@@ -751,8 +629,7 @@ public abstract class OperationPartial implements Operation {
             }
         }
 
-        @SuppressWarnings("unchecked")
-        CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
+        var result = new CompletableFuture[futures.size()];
 
         if (result.length == 1) {
             // special case - return the unadorned future
@@ -867,7 +744,7 @@ public abstract class OperationPartial implements Operation {
          * executing
          */
         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-        final Executor executor = params.getExecutor();
+        final var executor = params.getExecutor();
 
         // @formatter:off
         controller.wrap(nextTask)
@@ -950,7 +827,7 @@ public abstract class OperationPartial implements Operation {
                 outcome.setEnd(null);
 
                 // pass a copy to the callback
-                OperationOutcome outcome2 = new OperationOutcome(outcome);
+                var outcome2 = new OperationOutcome(outcome);
                 outcome2.setFinalOutcome(false);
                 params.callbackStarted(outcome2);
             }
@@ -1012,6 +889,18 @@ public abstract class OperationPartial implements Operation {
         return operation;
     }
 
+    /**
+     * Makes an outcome, populating the "target" field with the contents of the target
+     * entity property.
+     *
+     * @return a new operation outcome
+     */
+    protected OperationOutcome makeOutcome() {
+        OperationOutcome outcome = params.makeOutcome();
+        outcome.setTarget(getProperty(OperationProperties.AAI_TARGET_ENTITY));
+        return outcome;
+    }
+
     /**
      * Determines if a throwable is due to a timeout.
      *
@@ -1095,16 +984,6 @@ public abstract class OperationPartial implements Operation {
         return DEFAULT_RETRY_WAIT_MS;
     }
 
-    /**
-     * Gets the target entity, first trying the properties and then the parameters.
-     *
-     * @return the target entity
-     */
-    protected String getTargetEntity() {
-        String targetEntity = getProperty(OperationProperties.AAI_TARGET_ENTITY);
-        return (targetEntity != null ? targetEntity : params.getTargetEntity());
-    }
-
     /**
      * Gets the operation timeout.
      *