Add generic eventmanager classes 85/111485/2
authorJim Hahn <jrh3@att.com>
Thu, 20 Aug 2020 14:11:54 +0000 (10:11 -0400)
committerJim Hahn <jrh3@att.com>
Thu, 20 Aug 2020 15:00:06 +0000 (11:00 -0400)
Added classes that are event-agnostic and support moving control from
java into rules.
Updates per review comments:
- removed policy scope

Issue-ID: POLICY-2748-event-mgr
Change-Id: Icf811cc25a3975543fc5c725766b7b9df2bb87b0
Signed-off-by: Jim Hahn <jrh3@att.com>
controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ActorConstants.java [new file with mode: 0644]
controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java [new file with mode: 0644]
controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/Step.java [new file with mode: 0644]
controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/StepContext.java [new file with mode: 0644]
controlloop/common/eventmanager/src/test/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManagerTest.java [new file with mode: 0644]
controlloop/common/eventmanager/src/test/java/org/onap/policy/controlloop/eventmanager/StepTest.java [new file with mode: 0644]

diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ActorConstants.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ActorConstants.java
new file mode 100644 (file)
index 0000000..2591a3f
--- /dev/null
@@ -0,0 +1,34 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.eventmanager;
+
+public class ActorConstants {
+    public static final String CL_TIMEOUT_ACTOR = "-CL-TIMEOUT-";
+    public static final String LOCK_ACTOR = "LOCK";
+    public static final String LOCK_OPERATION = "Lock";
+
+    public static final String PAYLOAD_KEY_VF_COUNT = "vfCount";
+
+
+    private ActorConstants() {
+        // do nothing
+    }
+}
diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java
new file mode 100644 (file)
index 0000000..5cf087a
--- /dev/null
@@ -0,0 +1,297 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.eventmanager;
+
+import java.io.Serializable;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.ToString;
+import org.onap.policy.controlloop.ControlLoopException;
+import org.onap.policy.controlloop.actorserviceprovider.ActorService;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.drl.legacy.ControlLoopParams;
+import org.onap.policy.controlloop.ophistory.OperationHistoryDataManager;
+import org.onap.policy.controlloop.processor.ControlLoopProcessor;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.system.PolicyEngineConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manager for a single event. Once this has been created, the event can be retracted from
+ * working memory. Invoke {@link #isActive()} to determine if the manager is active (i.e.,
+ * hasn't been replicated from another server). When the manager is no longer needed,
+ * {@link #destroy()} should be invoked.
+ */
+@ToString(onlyExplicitlyIncluded = true)
+public class ControlLoopEventManager implements StepContext, Serializable {
+
+    private static final Logger logger = LoggerFactory.getLogger(ControlLoopEventManager.class);
+    private static final long serialVersionUID = -1216568161322872641L;
+
+    private static final String EVENT_MANAGER_SERVICE_CONFIG = "event-manager";
+
+    /**
+     * Counts the number of these objects that have been created. This is used by junit
+     * tests.
+     */
+    private static final AtomicLong createCount = new AtomicLong(0);
+
+    /**
+     * {@code True} if this object was created by this JVM instance, {@code false}
+     * otherwise. This will be {@code false} if this object is reconstituted from a
+     * persistent store or by transfer from another server.
+     */
+    private transient boolean createdByThisJvmInstance;
+
+    @Getter
+    @ToString.Include
+    public final String closedLoopControlName;
+    @Getter
+    @ToString.Include
+    private final UUID requestId;
+
+    /**
+     * Time, in milliseconds, when the control loop will time out.
+     */
+    @Getter
+    private final long endTimeMs;
+
+    // fields extracted from the ControlLoopParams
+    @Getter
+    private final String policyName;
+    @Getter
+    private final String policyVersion;
+
+    /**
+     * Maps a target entity to its lock.
+     */
+    private final transient Map<String, LockData> target2lock = new HashMap<>();
+
+    @Getter(AccessLevel.PROTECTED)
+    private final ControlLoopProcessor processor;
+
+    /**
+     * Set of properties used while processing the event.
+     */
+    private Map<String, Serializable> properties = new ConcurrentHashMap<>();
+
+    /**
+     * Unprocessed outcomes from the operations. Outcomes are added to this each time the
+     * "start" or "complete" callback is invoked, typically by an operation running in a
+     * background thread, thus it must be thread safe.
+     */
+    @Getter
+    private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();
+
+
+    /**
+     * Constructs the object.
+     *
+     * @param params control loop parameters
+     * @param requestId event request ID
+     * @throws ControlLoopException if the event is invalid or if a YAML processor cannot
+     *         be created
+     */
+    public ControlLoopEventManager(ControlLoopParams params, UUID requestId) throws ControlLoopException {
+
+        createCount.incrementAndGet();
+
+        this.createdByThisJvmInstance = true;
+        this.closedLoopControlName = params.getClosedLoopControlName();
+        this.requestId = requestId;
+        this.policyName = params.getPolicyName();
+        this.policyVersion = params.getPolicyVersion();
+        this.processor = new ControlLoopProcessor(params.getToscaPolicy());
+        this.endTimeMs = System.currentTimeMillis() + detmControlLoopTimeoutMs();
+    }
+
+    /**
+     * Gets the number of manager objects that have been created.
+     *
+     * @return the number of manager objects that have been created
+     */
+    public static long getCreateCount() {
+        return createCount.get();
+    }
+
+    /**
+     * Determines if the manager is still active.
+     *
+     * @return {@code true} if the manager is still active, {@code false} otherwise
+     */
+    public boolean isActive() {
+        return createdByThisJvmInstance;
+    }
+
+    /**
+     * Cancels the current operation and frees all locks.
+     */
+    public void destroy() {
+        if (isActive()) {
+            getBlockingExecutor().execute(this::freeAllLocks);
+        }
+    }
+
+    /**
+     * Frees all locks.
+     */
+    private void freeAllLocks() {
+        target2lock.values().forEach(LockData::free);
+    }
+
+    /**
+     * Determines the overall control loop timeout.
+     *
+     * @return the policy timeout, in milliseconds, if specified, a default timeout
+     *         otherwise
+     */
+    private long detmControlLoopTimeoutMs() {
+        // validation checks preclude null or 0 timeout values in the policy
+        Integer timeout = processor.getControlLoop().getTimeout();
+        return TimeUnit.MILLISECONDS.convert(timeout, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Requests a lock. This requests the lock for the time that remains before the
+     * timeout expires. This avoids having to extend the lock.
+     *
+     * @param targetEntity entity to be locked
+     * @return a future that can be used to await the lock
+     */
+    @Override
+    public synchronized CompletableFuture<OperationOutcome> requestLock(String targetEntity) {
+
+        long remainingMs = endTimeMs - System.currentTimeMillis();
+        int remainingSec = 15 + Math.max(0, (int) TimeUnit.SECONDS.convert(remainingMs, TimeUnit.MILLISECONDS));
+
+        LockData data = target2lock.computeIfAbsent(targetEntity, key -> {
+            LockData data2 = new LockData(key, requestId);
+            makeLock(targetEntity, requestId.toString(), remainingSec, data2);
+
+            data2.addUnavailableCallback(this::onComplete);
+
+            return data2;
+        });
+
+        return data.getFuture();
+    }
+
+    public void onStart(OperationOutcome outcome) {
+        outcomes.add(outcome);
+    }
+
+    public void onComplete(OperationOutcome outcome) {
+        outcomes.add(outcome);
+    }
+
+    /**
+     * Determines if the context contains a property.
+     *
+     * @param name name of the property of interest
+     * @return {@code true} if the context contains the property, {@code false} otherwise
+     */
+    public boolean contains(String name) {
+        return properties.containsKey(name);
+    }
+
+    /**
+     * Gets a property, casting it to the desired type.
+     *
+     * @param <T> desired type
+     * @param name name of the property whose value is to be retrieved
+     * @return the property's value, or {@code null} if it does not yet have a value
+     */
+    @SuppressWarnings("unchecked")
+    public <T> T getProperty(String name) {
+        return (T) properties.get(name);
+    }
+
+    /**
+     * Sets a property's value.
+     *
+     * @param name property name
+     * @param value new property value
+     */
+    public void setProperty(String name, Serializable value) {
+        logger.error("set property {}={} manager={}", name, value, this);
+        properties.put(name, value);
+    }
+
+    /**
+     * Removes a property.
+     *
+     * @param name property name
+     */
+    public void removeProperty(String name) {
+        properties.remove(name);
+    }
+
+    /**
+     * Initializes various components, on demand.
+     */
+    private static class LazyInitData {
+        private static final OperationHistoryDataManager DATA_MANAGER;
+        private static final ActorService ACTOR_SERVICE;
+
+        static {
+            // TODO how to dynamically change data manager, depending whether or not
+            // guards are enabled?
+            EventManagerServices services = new EventManagerServices(EVENT_MANAGER_SERVICE_CONFIG);
+            ACTOR_SERVICE = services.getActorService();
+            DATA_MANAGER = services.getDataManager();
+        }
+    }
+
+    // the following methods may be overridden by junit tests
+
+    public Executor getExecutor() {
+        return ForkJoinPool.commonPool();
+    }
+
+    protected ExecutorService getBlockingExecutor() {
+        return PolicyEngineConstants.getManager().getExecutorService();
+    }
+
+    protected void makeLock(String targetEntity, String requestId, int holdSec, LockCallback callback) {
+        PolicyEngineConstants.getManager().createLock(targetEntity, requestId, holdSec, callback, false);
+    }
+
+    public ActorService getActorService() {
+        return LazyInitData.ACTOR_SERVICE;
+    }
+
+    public OperationHistoryDataManager getDataManager() {
+        return LazyInitData.DATA_MANAGER;
+    }
+}
diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/Step.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/Step.java
new file mode 100644 (file)
index 0000000..01c64e5
--- /dev/null
@@ -0,0 +1,253 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.eventmanager;
+
+import java.time.Instant;
+import java.util.LinkedHashMap;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Getter;
+import lombok.NonNull;
+import org.onap.policy.controlloop.actorserviceprovider.Operation;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A single step within a single policy. The rules typically break a policy operation down
+ * into separate steps. For instance, a policy for VF-Module-Create would be broken down
+ * into lock target, A&AI tenant query, A&AI custom query, guard, and finally
+ * VF-Module-Create.
+ */
+public class Step {
+    private static final Logger logger = LoggerFactory.getLogger(Step.class);
+
+    @Getter
+    protected ControlLoopOperationParams params;
+
+    /**
+     * Time when the first step started for the current policy. This is shared by all
+     * steps for the policy. When a step is started and finds this to be {@code null}, it
+     * sets the value. Subsequent steps leave it unchanged.
+     */
+    private final AtomicReference<Instant> startTime;
+
+    /**
+     * {@code True} if this step is for the policy's actual operation, {@code false} if it's a preprocessor step.
+     */
+    @Getter
+    private final boolean policyStep;
+
+    /**
+     * The operation for this step.
+     */
+    @Getter
+    private Operation operation = null;
+
+    /**
+     * Used to cancel the running operation.
+     */
+    protected CompletableFuture<OperationOutcome> future;
+
+
+    /**
+     * Constructs the object.  This is used when constructing the step for the policy's actual operation.
+     *
+     * @param params operation parameters
+     * @param startTime start time of the first step for the current policy, initially
+     *        containing {@code null}
+     */
+    public Step(ControlLoopOperationParams params, @NonNull AtomicReference<Instant> startTime) {
+        this.params = params;
+        this.startTime = startTime;
+        this.policyStep = true;
+    }
+
+    /**
+     * Constructs the object using information from another step.  This is used when constructing a preprocessing
+     * step.
+     *
+     * @param otherStep step whose information should be used
+     * @param actor actor name
+     * @param operation operation name
+     */
+    public Step(Step otherStep, String actor, String operation) {
+        this.params = otherStep.params.toBuilder().actor(actor).operation(operation).retry(null).timeoutSec(null)
+                        .payload(new LinkedHashMap<>()).build();
+        this.startTime = otherStep.startTime;
+        this.policyStep = false;
+    }
+
+    public String getActorName() {
+        return params.getActor();
+    }
+
+    public String getOperationName() {
+        return params.getOperation();
+    }
+
+    /**
+     * Determines if the operation has been initialized (i.e., created).
+     *
+     * @return {@code true} if the operation has been initialized, {@code false} otherwise
+     */
+    public boolean isInitialized() {
+        return (operation != null);
+    }
+
+    /**
+     * Initializes the step, creating the operation if it has not yet been created.
+     */
+    public void init() {
+        if (operation == null) {
+            operation = buildOperation();
+        }
+    }
+
+    /**
+     * Starts the operation.
+     *
+     * @param remainingMs time remaining, in milliseconds, for the control loop
+     * @return {@code true} if started, {@code false} if the step is no longer necessary
+     *         (i.e., because it was previously completed)
+     */
+    public boolean start(long remainingMs) {
+        if (!isInitialized()) {
+            throw new IllegalStateException("step has not been initialized");
+        }
+
+        if (future != null) {
+            throw new IllegalStateException("step is already running");
+        }
+
+        try {
+            initStartTime();
+            future = operation.start();
+
+            // handle any exceptions that may be thrown, set timeout, and handle timeout
+
+            // @formatter:off
+            future.exceptionally(this::handleException)
+                    .orTimeout(remainingMs, TimeUnit.MILLISECONDS)
+                    .exceptionally(this::handleTimeout);
+            // @formatter:on
+
+        } catch (RuntimeException e) {
+            handleException(e);
+        }
+
+        return true;
+    }
+
+    /**
+     * Handles exceptions that may be generated.
+     *
+     * @param thrown exception that was generated
+     * @return {@code null}
+     */
+    private OperationOutcome handleException(Throwable thrown) {
+        if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
+            return null;
+        }
+
+        logger.warn("{}.{}: exception starting operation for {}", params.getActor(), params.getOperation(),
+                        params.getRequestId(), thrown);
+        OperationOutcome outcome = new PipelineUtil(params).setOutcome(params.makeOutcome(), thrown);
+        outcome.setStart(startTime.get());
+        outcome.setEnd(Instant.now());
+        outcome.setFinalOutcome(true);
+        params.getCompleteCallback().accept(outcome);
+
+        // this outcome is not used so just return "null"
+        return null;
+    }
+
+    /**
+     * Handles control loop timeout exception.
+     *
+     * @param thrown exception that was generated
+     * @return {@code null}
+     */
+    private OperationOutcome handleTimeout(Throwable thrown) {
+        logger.warn("{}.{}: control loop timeout for {}", params.getActor(), params.getOperation(),
+                        params.getRequestId(), thrown);
+
+        OperationOutcome outcome = new PipelineUtil(params).setOutcome(params.makeOutcome(), thrown);
+        outcome.setActor(ActorConstants.CL_TIMEOUT_ACTOR);
+        outcome.setOperation(null);
+        outcome.setStart(startTime.get());
+        outcome.setEnd(Instant.now());
+        outcome.setFinalOutcome(true);
+        params.getCompleteCallback().accept(outcome);
+
+        // cancel the operation, if it's still running
+        future.cancel(false);
+
+        // this outcome is not used so just return "null"
+        return null;
+    }
+
+    /**
+     * Cancels the operation, if it's running.
+     */
+    public void cancel() {
+        if (future != null) {
+            future.cancel(false);
+        }
+    }
+
+    /**
+     * Initializes the start time, if it's still unset.
+     */
+    private void initStartTime() {
+        if (startTime.get() == null) {
+            startTime.set(Instant.now());
+        }
+    }
+
+    /**
+     * Gets the start time.
+     *
+     * @return the start time, or {@code null} if it hasn't been initialized yet
+     */
+    public Instant getStartTime() {
+        return startTime.get();
+    }
+
+    /**
+     * Builds the operation. The default method simply invokes
+     * {@link ControlLoopOperationParams#build()}.
+     *
+     * @return a new operation
+     */
+    protected Operation buildOperation() {
+        return params.build();
+    }
+
+    @Override
+    public String toString() {
+        return "Step(actor=" + getActorName() + ", operation=" + getOperationName() + ")";
+    }
+}
diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/StepContext.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/StepContext.java
new file mode 100644 (file)
index 0000000..5251b7a
--- /dev/null
@@ -0,0 +1,72 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.eventmanager;
+
+import java.io.Serializable;
+import java.util.concurrent.CompletableFuture;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+
+/**
+ * Context used by steps to perform their work.
+ */
+public interface StepContext {
+
+    /**
+     * Determines if the context contains a property.
+     *
+     * @param name name of the property of interest
+     * @return {@code true} if the context contains the property, {@code false} otherwise
+     */
+    public boolean contains(String name);
+
+    /**
+     * Gets a property, casting it to the desired type.
+     *
+     * @param <T> desired type
+     * @param name name of the property whose value is to be retrieved
+     * @return the property's value, or {@code null} if it does not yet have a value
+     */
+    public <T> T getProperty(String name);
+
+    /**
+     * Sets a property's value.
+     *
+     * @param name property name
+     * @param value new property value
+     */
+    public void setProperty(String name, Serializable value);
+
+    /**
+     * Removes a property.
+     *
+     * @param name property name
+     */
+    public void removeProperty(String name);
+
+    /**
+     * Requests a lock. This requests the lock for the time that remains before the
+     * timeout expires. This avoids having to extend the lock.
+     *
+     * @param targetEntity entity to be locked
+     * @return a future that can be used to await the lock
+     */
+    public CompletableFuture<OperationOutcome> requestLock(String targetEntity);
+}
diff --git a/controlloop/common/eventmanager/src/test/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManagerTest.java b/controlloop/common/eventmanager/src/test/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManagerTest.java
new file mode 100644 (file)
index 0000000..91434d7
--- /dev/null
@@ -0,0 +1,295 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.eventmanager;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardYamlCoder;
+import org.onap.policy.common.utils.io.Serializer;
+import org.onap.policy.common.utils.resources.ResourceUtils;
+import org.onap.policy.controlloop.ControlLoopException;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.drl.legacy.ControlLoopParams;
+import org.onap.policy.controlloop.policy.PolicyResult;
+import org.onap.policy.controlloop.policy.Target;
+import org.onap.policy.controlloop.policy.TargetType;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.core.lock.LockImpl;
+import org.onap.policy.drools.core.lock.LockState;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
+
+public class ControlLoopEventManagerTest {
+    private static final UUID REQ_ID = UUID.randomUUID();
+    private static final String CL_NAME = "my-closed-loop-name";
+    private static final String POLICY_NAME = "my-policy-name";
+    private static final String POLICY_SCOPE = "my-scope";
+    private static final String POLICY_VERSION = "1.2.3";
+    private static final String LOCK1 = "my-lock-A";
+    private static final String LOCK2 = "my-lock-B";
+    private static final Coder yamlCoder = new StandardYamlCoder();
+    private static final String MY_KEY = "def";
+
+    @Mock
+    private ExecutorService executor;
+
+    private long preCreateTimeMs;
+    private List<LockImpl> locks;
+    private Target target;
+    private ToscaPolicy tosca;
+    private ControlLoopParams params;
+    private ControlLoopEventManager mgr;
+
+    /**
+     * Sets up.
+     */
+    @Before
+    public void setUp() throws ControlLoopException, CoderException {
+        MockitoAnnotations.initMocks(this);
+
+        target = new Target();
+        target.setType(TargetType.VNF);
+
+        params = new ControlLoopParams();
+        params.setClosedLoopControlName(CL_NAME);
+        params.setPolicyName(POLICY_NAME);
+        params.setPolicyScope(POLICY_SCOPE);
+        params.setPolicyVersion(POLICY_VERSION);
+
+        loadPolicy("eventManager/event-mgr-simple.yaml");
+
+        locks = new ArrayList<>();
+
+        preCreateTimeMs = System.currentTimeMillis();
+
+        MyManager.executor = executor;
+        MyManager.locks = locks;
+
+        mgr = new MyManager(params, REQ_ID);
+    }
+
+    @Test
+    public void testConstructor() {
+        assertEquals(POLICY_NAME, mgr.getPolicyName());
+
+        assertTrue(mgr.isActive());
+        assertEquals(CL_NAME, mgr.getClosedLoopControlName());
+        assertSame(REQ_ID, mgr.getRequestId());
+        assertEquals(POLICY_NAME, mgr.getPolicyName());
+        assertEquals(POLICY_VERSION, mgr.getPolicyVersion());
+        assertNotNull(mgr.getProcessor());
+        assertThat(mgr.getEndTimeMs()).isGreaterThanOrEqualTo(preCreateTimeMs);
+    }
+
+    @Test
+    public void testGetCreateCount() throws ControlLoopException {
+        long original = ControlLoopEventManager.getCreateCount();
+
+        new MyManager(params, REQ_ID);
+        assertEquals(original + 1, ControlLoopEventManager.getCreateCount());
+
+        new MyManager(params, REQ_ID);
+        assertEquals(original + 2, ControlLoopEventManager.getCreateCount());
+    }
+
+    @Test
+    public void testIsActive() throws Exception {
+        mgr = new ControlLoopEventManager(params, REQ_ID);
+        assertTrue(mgr.isActive());
+
+        ControlLoopEventManager mgr2 = Serializer.roundTrip(mgr);
+        assertFalse(mgr2.isActive());
+    }
+
+    @Test
+    public void testDestroy() throws IOException {
+        mgr.requestLock(LOCK1);
+        mgr.requestLock(LOCK2);
+        mgr.requestLock(LOCK1);
+
+        // ensure destroy() doesn't throw an exception if the object is deserialized
+        ControlLoopEventManager mgr2 = Serializer.roundTrip(mgr);
+        assertThatCode(() -> mgr2.destroy()).doesNotThrowAnyException();
+
+        // locks should not have been freed
+        for (LockImpl lock : locks) {
+            assertFalse(lock.isUnavailable());
+        }
+
+        mgr.destroy();
+
+        runExecutor();
+
+        for (LockImpl lock : locks) {
+            assertTrue(lock.isUnavailable());
+        }
+    }
+
+    @Test
+    public void testDetmControlLoopTimeoutMs() throws Exception {
+        long timeMs = 1200 * 1000L;
+        long end = mgr.getEndTimeMs();
+        assertThat(end).isGreaterThanOrEqualTo(preCreateTimeMs + timeMs);
+        assertThat(end).isLessThan(preCreateTimeMs + timeMs + 5000);
+    }
+
+    @Test
+    public void testRequestLock() {
+        final CompletableFuture<OperationOutcome> future1 = mgr.requestLock(LOCK1);
+        assertTrue(mgr.getOutcomes().isEmpty());
+
+        final CompletableFuture<OperationOutcome> future2 = mgr.requestLock(LOCK2);
+        assertTrue(mgr.getOutcomes().isEmpty());
+
+        assertSame(future1, mgr.requestLock(LOCK1));
+        assertTrue(mgr.getOutcomes().isEmpty());
+
+        assertEquals(2, locks.size());
+
+        assertTrue(future1.isDone());
+        assertTrue(future2.isDone());
+
+        // indicate that the first lock failed
+        locks.get(0).notifyUnavailable();
+
+        verifyLock(PolicyResult.FAILURE);
+        assertTrue(mgr.getOutcomes().isEmpty());
+    }
+
+    private void verifyLock(PolicyResult result) {
+        OperationOutcome outcome = mgr.getOutcomes().poll();
+        assertNotNull(outcome);
+        assertEquals(ActorConstants.LOCK_ACTOR, outcome.getActor());
+        assertEquals(ActorConstants.LOCK_OPERATION, outcome.getOperation());
+        assertNotNull(outcome.getEnd());
+        assertTrue(outcome.isFinalOutcome());
+        assertEquals(result, outcome.getResult());
+    }
+
+    @Test
+    public void testOnStart() {
+        OperationOutcome outcome1 = new OperationOutcome();
+        OperationOutcome outcome2 = new OperationOutcome();
+
+        mgr.onStart(outcome1);
+        mgr.onStart(outcome2);
+
+        assertSame(outcome1, mgr.getOutcomes().poll());
+        assertSame(outcome2, mgr.getOutcomes().poll());
+        assertTrue(mgr.getOutcomes().isEmpty());
+    }
+
+    @Test
+    public void testOnComplete() {
+        OperationOutcome outcome1 = new OperationOutcome();
+        OperationOutcome outcome2 = new OperationOutcome();
+
+        mgr.onComplete(outcome1);
+        mgr.onComplete(outcome2);
+
+        assertSame(outcome1, mgr.getOutcomes().poll());
+        assertSame(outcome2, mgr.getOutcomes().poll());
+        assertTrue(mgr.getOutcomes().isEmpty());
+    }
+
+    @Test
+    public void testContains_testGetProperty_testSetProperty_testRemoveProperty() {
+        mgr.setProperty("abc", "a string");
+        mgr.setProperty(MY_KEY, 100);
+
+        assertTrue(mgr.contains(MY_KEY));
+        assertFalse(mgr.contains("ghi"));
+
+        String strValue = mgr.getProperty("abc");
+        assertEquals("a string", strValue);
+
+        int intValue = mgr.getProperty(MY_KEY);
+        assertEquals(100, intValue);
+
+        mgr.removeProperty(MY_KEY);
+        assertFalse(mgr.contains(MY_KEY));
+    }
+
+    @Test
+    public void testToString() {
+        assertNotNull(mgr.toString());
+    }
+
+    private void loadPolicy(String fileName) throws CoderException {
+        ToscaServiceTemplate template =
+                        yamlCoder.decode(ResourceUtils.getResourceAsString(fileName), ToscaServiceTemplate.class);
+        tosca = template.getToscaTopologyTemplate().getPolicies().get(0).values().iterator().next();
+
+        params.setToscaPolicy(tosca);
+    }
+
+    private void runExecutor() {
+        ArgumentCaptor<Runnable> runCaptor = ArgumentCaptor.forClass(Runnable.class);
+        verify(executor).execute(runCaptor.capture());
+
+        runCaptor.getValue().run();
+    }
+
+
+    private static class MyManager extends ControlLoopEventManager {
+        private static final long serialVersionUID = 1L;
+
+        private static ExecutorService executor;
+        private static List<LockImpl> locks;
+
+        public MyManager(ControlLoopParams params, UUID requestId)
+                        throws ControlLoopException {
+            super(params, requestId);
+        }
+
+        @Override
+        protected ExecutorService getBlockingExecutor() {
+            return executor;
+        }
+
+        @Override
+        protected void makeLock(String targetEntity, String requestId, int holdSec, LockCallback callback) {
+            LockImpl lock = new LockImpl(LockState.ACTIVE, targetEntity, requestId, holdSec, callback);
+            locks.add(lock);
+            callback.lockAvailable(lock);
+        }
+    }
+}
diff --git a/controlloop/common/eventmanager/src/test/java/org/onap/policy/controlloop/eventmanager/StepTest.java b/controlloop/common/eventmanager/src/test/java/org/onap/policy/controlloop/eventmanager/StepTest.java
new file mode 100644 (file)
index 0000000..1472adc
--- /dev/null
@@ -0,0 +1,380 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.eventmanager;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.controlloop.VirtualControlLoopEvent;
+import org.onap.policy.controlloop.actorserviceprovider.ActorService;
+import org.onap.policy.controlloop.actorserviceprovider.Operation;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.Operator;
+import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
+import org.onap.policy.controlloop.policy.Policy;
+import org.onap.policy.controlloop.policy.PolicyResult;
+import org.onap.policy.controlloop.policy.Target;
+import org.onap.policy.controlloop.policy.TargetType;
+
+public class StepTest {
+    private static final UUID REQ_ID = UUID.randomUUID();
+    private static final String POLICY_ID = "my-policy";
+    private static final String POLICY_ACTOR = "my-actor";
+    private static final String POLICY_OPERATION = "my-operation";
+    private static final String MY_TARGET = "my-target";
+    private static final String PAYLOAD_KEY = "payload-key";
+    private static final String PAYLOAD_VALUE = "payload-value";
+    private static final long REMAINING_MS = 5000;
+    private static final Integer POLICY_RETRY = 3;
+    private static final Integer POLICY_TIMEOUT = 20;
+    private static final String EXPECTED_EXCEPTION = "expected exception";
+
+    @Mock
+    private Operator policyOperator;
+    @Mock
+    private Operation policyOperation;
+    @Mock
+    private Actor policyActor;
+    @Mock
+    private ActorService actors;
+
+    private CompletableFuture<OperationOutcome> future;
+    private Target target;
+    private Map<String, String> payload;
+    private Policy policy;
+    private VirtualControlLoopEvent event;
+    private ControlLoopEventContext context;
+    private BlockingQueue<OperationOutcome> starts;
+    private BlockingQueue<OperationOutcome> completions;
+    private ControlLoopOperationParams params;
+    private AtomicReference<Instant> startTime;
+    private Step step;
+
+    /**
+     * Sets up.
+     */
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
+        future = new CompletableFuture<>();
+
+        // configure policy operation
+        when(actors.getActor(POLICY_ACTOR)).thenReturn(policyActor);
+        when(policyActor.getOperator(POLICY_OPERATION)).thenReturn(policyOperator);
+        when(policyOperator.buildOperation(any())).thenReturn(policyOperation);
+        when(policyOperation.start()).thenReturn(future);
+
+        target = new Target();
+        target.setType(TargetType.VM);
+
+        payload = Map.of(PAYLOAD_KEY, PAYLOAD_VALUE);
+
+        policy = new Policy();
+        policy.setId(POLICY_ID);
+        policy.setActor(POLICY_ACTOR);
+        policy.setRecipe(POLICY_OPERATION);
+        policy.setTarget(target);
+        policy.setPayload(payload);
+        policy.setRetry(POLICY_RETRY);
+        policy.setTimeout(POLICY_TIMEOUT);
+
+        event = new VirtualControlLoopEvent();
+        event.setRequestId(REQ_ID);
+        event.setTarget(ControlLoopOperationManager2.VSERVER_VSERVER_NAME);
+        event.setAai(new TreeMap<>(Map.of(ControlLoopOperationManager2.VSERVER_VSERVER_NAME, MY_TARGET)));
+
+        context = new ControlLoopEventContext(event);
+
+        starts = new LinkedBlockingQueue<>();
+        completions = new LinkedBlockingQueue<>();
+
+        params = ControlLoopOperationParams.builder().actor(POLICY_ACTOR).actorService(actors)
+                        .completeCallback(completions::add).context(context).executor(ForkJoinPool.commonPool())
+                        .operation(POLICY_OPERATION).payload(new TreeMap<>(payload)).startCallback(starts::add)
+                        .target(target).targetEntity(MY_TARGET).build();
+
+        startTime = new AtomicReference<>();
+
+        step = new Step(params, startTime);
+    }
+
+    @Test
+    public void testConstructor() {
+        assertTrue(step.isPolicyStep());
+        assertSame(params, step.getParams());
+
+        // check that it recorded the startTime by starting and checking it
+        step.init();
+        step.start(REMAINING_MS);
+
+        assertNotNull(startTime.get());
+
+        // try with null start time
+        assertThatThrownBy(() -> new Step(params, null)).isInstanceOf(NullPointerException.class)
+                        .hasMessageContaining("startTime");
+    }
+
+    @Test
+    public void testConstructorWithOtherStep_testInitStartTime_testGetStartTimeRef() {
+        Step step2 = new Step(step, "actorB", "operB");
+        assertFalse(step2.isPolicyStep());
+
+        ControlLoopOperationParams params2 = step2.getParams();
+        assertEquals("actorB", params2.getActor());
+        assertEquals("operB", params2.getOperation());
+        assertNull(params2.getRetry());
+        assertNull(params2.getTimeoutSec());
+        assertSame(target, params2.getTarget());
+        assertEquals(MY_TARGET, params2.getTargetEntity());
+        assertTrue(params2.getPayload().isEmpty());
+
+        when(actors.getActor(params2.getActor())).thenReturn(policyActor);
+        when(policyActor.getOperator(params2.getOperation())).thenReturn(policyOperator);
+
+        assertNull(step2.getStartTime());
+
+        // check that it recorded the startTime by starting and checking it
+        step2.init();
+        step2.start(REMAINING_MS);
+
+        Instant instant = startTime.get();
+        assertNotNull(instant);
+        assertSame(instant, step2.getStartTime());
+
+        // launch the original step, too, so we can test the other branch of
+        // initStartTime()
+        step.init();
+        step.start(REMAINING_MS);
+
+        assertSame(instant, startTime.get());
+        assertSame(instant, step.getStartTime());
+    }
+
+    @Test
+    public void testGetActorName_testGetOperationName() {
+        assertEquals(POLICY_ACTOR, step.getActorName());
+        assertEquals(POLICY_OPERATION, step.getOperationName());
+    }
+
+    @Test
+    public void testIsInitialized_testInit_testGetOperation() {
+        assertFalse(step.isInitialized());
+
+        // verify it's unchanged
+        assertFalse(step.isInitialized());
+
+        assertNull(step.getOperation());
+
+        step.init();
+
+        assertSame(policyOperation, step.getOperation());
+        assertTrue(step.isInitialized());
+
+        // repeat - should be unchanged
+        step.init();
+        assertSame(policyOperation, step.getOperation());
+        assertTrue(step.isInitialized());
+
+        // repeat without init - should be unchanged
+        assertSame(policyOperation, step.getOperation());
+        assertTrue(step.isInitialized());
+    }
+
+    @Test
+    public void testStart() {
+        assertThatIllegalStateException().isThrownBy(() -> step.start(REMAINING_MS))
+                        .withMessage("step has not been initialized");
+
+        // initialize it, by calling getOperation(), and then try again
+        step.init();
+        assertTrue(step.start(REMAINING_MS));
+
+        assertNotNull(startTime.get());
+
+        // should fail if we try again
+        assertThatIllegalStateException().isThrownBy(() -> step.start(REMAINING_MS))
+                        .withMessage("step is already running");
+    }
+
+    /**
+     * Tests start() when the operation.start() throws an exception.
+     */
+    @Test
+    public void testStartException() {
+        when(policyOperation.start()).thenThrow(new RuntimeException());
+        step.init();
+
+        assertTrue(step.start(REMAINING_MS));
+
+        // exception should be immediate
+        OperationOutcome outcome = completions.poll();
+        assertNotNull(outcome);
+
+        assertNotEquals(PolicyResult.SUCCESS, outcome.getResult());
+        assertEquals(POLICY_ACTOR, outcome.getActor());
+        assertTrue(outcome.isFinalOutcome());
+    }
+
+    /**
+     * Tests start() when the operation throws an asynchronous exception.
+     */
+    @Test
+    public void testStartAsyncException() {
+        step.init();
+        step.start(REMAINING_MS);
+
+        future.completeExceptionally(new RuntimeException(EXPECTED_EXCEPTION));
+
+        // exception should be immediate
+        OperationOutcome outcome = completions.poll();
+        assertNotNull(outcome);
+
+        assertNotEquals(PolicyResult.SUCCESS, outcome.getResult());
+        assertEquals(POLICY_ACTOR, outcome.getActor());
+        assertTrue(outcome.isFinalOutcome());
+    }
+
+    /**
+     * Tests handleException() when the exception is a CancellationException.
+     */
+    @Test
+    public void testHandleExceptionCancellationException() {
+        step.init();
+        step.start(REMAINING_MS);
+
+        future.completeExceptionally(new CancellationException(EXPECTED_EXCEPTION));
+
+        // should not have generated an outcome
+        assertNull(completions.peek());
+    }
+
+    @Test
+    public void testHandleExceptionCauseCancellationException() {
+        step.init();
+        step.start(REMAINING_MS);
+
+        future.completeExceptionally(new RuntimeException(EXPECTED_EXCEPTION, new CancellationException()));
+
+        // should not have generated an outcome
+        assertNull(completions.peek());
+    }
+
+    @Test
+    public void testHandleException() {
+        when(policyOperation.start()).thenThrow(new RuntimeException());
+
+        step.init();
+
+        assertTrue(step.start(REMAINING_MS));
+
+        // exception should be immediate
+        OperationOutcome outcome = completions.poll();
+        assertNotNull(outcome);
+
+        assertNotEquals(PolicyResult.SUCCESS, outcome.getResult());
+        assertEquals(POLICY_ACTOR, outcome.getActor());
+        assertTrue(outcome.isFinalOutcome());
+        assertEquals(POLICY_OPERATION, outcome.getOperation());
+        assertSame(startTime.get(), outcome.getStart());
+        assertNotNull(outcome.getEnd());
+        assertTrue(outcome.getEnd().getEpochSecond() >= startTime.get().getEpochSecond());
+    }
+
+    @Test
+    public void testHandleTimeout() throws InterruptedException {
+        step.init();
+
+        long tstart = System.currentTimeMillis();
+
+        // give it a short timeout
+        step.start(100);
+
+        OperationOutcome outcome = completions.poll(5, TimeUnit.SECONDS);
+        assertNotNull(outcome);
+
+        // should not have timed out before 100ms
+        assertTrue(tstart + 100 <= System.currentTimeMillis());
+
+        // must wait for the future to complete before checking that it was cancelled
+        assertThatThrownBy(() -> future.get(5, TimeUnit.SECONDS)).isInstanceOf(Exception.class);
+
+        // verify that the future was cancelled
+        assertTrue(future.isCancelled());
+
+        assertNotEquals(PolicyResult.SUCCESS, outcome.getResult());
+        assertEquals(ActorConstants.CL_TIMEOUT_ACTOR, outcome.getActor());
+        assertTrue(outcome.isFinalOutcome());
+        assertNull(outcome.getOperation());
+        assertSame(startTime.get(), outcome.getStart());
+        assertNotNull(outcome.getEnd());
+        assertTrue(outcome.getEnd().getEpochSecond() >= startTime.get().getEpochSecond());
+    }
+
+    @Test
+    public void testCancel() {
+        // should have no effect
+        step.cancel();
+
+        step.init();
+
+        step.start(REMAINING_MS);
+        step.cancel();
+
+        assertTrue(future.isCancelled());
+    }
+
+    @Test
+    public void testBuildOperation() {
+        assertSame(policyOperation, step.buildOperation());
+    }
+
+    @Test
+    public void testToString() {
+        assertNotNull(step.toString());
+    }
+}