Add Topic Actor superclasses 14/101714/3
authorJim Hahn <jrh3@att.com>
Thu, 13 Feb 2020 17:34:46 +0000 (12:34 -0500)
committerJim Hahn <jrh3@att.com>
Fri, 14 Feb 2020 01:31:33 +0000 (20:31 -0500)
Issue-ID: POLICY-2363
Change-Id: I5d29d85f6c5f40fb6c8f1bf678d9c718760a7558
Signed-off-by: Jim Hahn <jrh3@att.com>
23 files changed:
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParams.java [moved from models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java with 85% similarity]
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/SelectorKey.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperationTest.java
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/MyExec.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartialTest.java
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperationTest.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperatorTest.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParamsTest.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParamsTest.java [moved from models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParamsTest.java with 83% similarity]
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/ForwarderTest.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/SelectorKeyTest.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImplTest.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairTest.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml

index c4bf5f4..ba75f0b 100644 (file)
@@ -181,9 +181,9 @@ public abstract class HttpOperation<T> extends OperationPartial {
             try {
                 response = makeCoder().decode(strResponse, responseClass);
             } catch (CoderException e) {
-                logger.warn("{}.{} cannot decode response with http error code {} for {}", params.getActor(),
-                                params.getOperation(), rawResponse.getStatus(), params.getRequestId(), e);
-                return setOutcome(outcome, PolicyResult.FAILURE_EXCEPTION);
+                logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(),
+                                params.getRequestId(), e);
+                throw new IllegalArgumentException("cannot decode response");
             }
         }
 
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java
new file mode 100644 (file)
index 0000000..c3e1e5c
--- /dev/null
@@ -0,0 +1,112 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairActorParams;
+import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
+import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager;
+
+/**
+ * Actor that uses a topic pair. The actor's parameters must be a
+ * {@link TopicPairActorParams}.
+ */
+public class TopicPairActor extends ActorImpl implements TopicPairManager {
+
+    /**
+     * Maps a topic source and target name to its topic pair.
+     */
+    private final Map<Pair<String, String>, TopicPair> params2topic = new ConcurrentHashMap<>();
+
+
+    /**
+     * Constructs the object.
+     *
+     * @param name actor's name
+     */
+    public TopicPairActor(String name) {
+        super(name);
+    }
+
+    @Override
+    protected void doStart() {
+        params2topic.values().forEach(TopicPair::start);
+        super.doStart();
+    }
+
+    @Override
+    protected void doStop() {
+        params2topic.values().forEach(TopicPair::stop);
+        super.doStop();
+    }
+
+    @Override
+    protected void doShutdown() {
+        params2topic.values().forEach(TopicPair::shutdown);
+        params2topic.clear();
+        super.doShutdown();
+    }
+
+    @Override
+    public TopicPair getTopicPair(String source, String target) {
+        Pair<String, String> key = Pair.of(source, target);
+        return params2topic.computeIfAbsent(key, pair -> new TopicPair(source, target));
+    }
+
+    /**
+     * Translates the parameters to a {@link TopicPairActorParams} and then creates a
+     * function that will extract operator-specific parameters.
+     */
+    @Override
+    protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) {
+        String actorName = getName();
+
+        TopicPairActorParams params = Util.translate(actorName, actorParameters, TopicPairActorParams.class);
+        ValidationResult result = params.validate(getName());
+        if (!result.isValid()) {
+            throw new ParameterValidationRuntimeException("invalid parameters", result);
+        }
+
+        // create a map of the default parameters
+        Map<String, Object> defaultParams = Util.translateToMap(getName(), params.getDefaults());
+        Map<String, Map<String, Object>> operations = params.getOperation();
+
+        return operationName -> {
+            Map<String, Object> specificParams = operations.get(operationName);
+            if (specificParams == null) {
+                return null;
+            }
+
+            // start with a copy of defaults and overlay with specific
+            Map<String, Object> subparams = new TreeMap<>(defaultParams);
+            subparams.putAll(specificParams);
+
+            return Util.translateToMap(getName() + "." + operationName, subparams);
+        };
+    }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java
new file mode 100644 (file)
index 0000000..6b584d7
--- /dev/null
@@ -0,0 +1,316 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import org.apache.commons.lang3.tuple.Triple;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
+import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams;
+import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
+import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
+import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
+import org.onap.policy.controlloop.policy.PolicyResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Operation that uses a Topic pair.
+ *
+ * @param <S> response type
+ */
+@Getter
+public abstract class TopicPairOperation<Q, S> extends OperationPartial {
+    private static final Logger logger = LoggerFactory.getLogger(TopicPairOperation.class);
+    private static final Coder coder = new StandardCoder();
+
+    // fields extracted from the operator
+
+    private final TopicPair topicPair;
+    private final Forwarder forwarder;
+    private final TopicPairParams pairParams;
+    private final long timeoutMs;
+
+    /**
+     * Response class.
+     */
+    private final Class<S> responseClass;
+
+
+    /**
+     * Constructs the object.
+     *
+     * @param params operation parameters
+     * @param operator operator that created this operation
+     * @param clazz response class
+     */
+    public TopicPairOperation(ControlLoopOperationParams params, TopicPairOperator operator, Class<S> clazz) {
+        super(params, operator);
+        this.topicPair = operator.getTopicPair();
+        this.forwarder = operator.getForwarder();
+        this.pairParams = operator.getParams();
+        this.responseClass = clazz;
+        this.timeoutMs = TimeUnit.MILLISECONDS.convert(pairParams.getTimeoutSec(), TimeUnit.SECONDS);
+    }
+
+    /**
+     * If no timeout is specified, then it returns the default timeout.
+     */
+    @Override
+    protected long getTimeoutMs(Integer timeoutSec) {
+        // TODO move this method to the superclass
+        return (timeoutSec == null || timeoutSec == 0 ? this.timeoutMs : super.getTimeoutMs(timeoutSec));
+    }
+
+    /**
+     * Publishes the request and arranges to receive the response.
+     */
+    @Override
+    protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
+
+        final Q request = makeRequest(attempt);
+        final List<String> expectedKeyValues = getExpectedKeyValues(attempt, request);
+
+        final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+        final CompletableFuture<Triple<CommInfrastructure, String, StandardCoderObject>> future =
+                        new CompletableFuture<>();
+        final Executor executor = params.getExecutor();
+
+        // register a listener BEFORE publishing
+
+        // @formatter:off
+        TriConsumer<CommInfrastructure, String, StandardCoderObject> listener =
+            (infra, rawResponse, scoResponse) -> future.complete(Triple.of(infra, rawResponse, scoResponse));
+        // @formatter:on
+
+        // TODO this currently only allows a single matching response
+
+        forwarder.register(expectedKeyValues, listener);
+
+        // ensure listener is unregistered if the controller is canceled
+        controller.add(() -> forwarder.unregister(expectedKeyValues, listener));
+
+        // publish the request
+        try {
+            publishRequest(request);
+        } catch (RuntimeException e) {
+            logger.warn("{}: failed to publish request for {}", getFullName(), params.getRequestId());
+            forwarder.unregister(expectedKeyValues, listener);
+            throw e;
+        }
+
+
+        // once "future" completes, process the response, and then complete the controller
+
+        // @formatter:off
+        future.thenApplyAsync(
+            triple -> processResponse(triple.getLeft(), outcome, triple.getMiddle(), triple.getRight()),
+                            executor)
+                        .whenCompleteAsync(controller.delayedComplete(), executor);
+        // @formatter:on
+
+        return controller;
+    }
+
+    /**
+     * Makes the request.
+     *
+     * @param attempt operation attempt
+     * @return a new request
+     */
+    protected abstract Q makeRequest(int attempt);
+
+    /**
+     * Gets values, expected in the response, that should match the selector keys.
+     *
+     * @param attempt operation attempt
+     * @param request request to be published
+     * @return a list of the values to be matched by the selector keys
+     */
+    protected abstract List<String> getExpectedKeyValues(int attempt, Q request);
+
+    /**
+     * Publishes the request. Encodes the request, if it is not already a String.
+     *
+     * @param request request to be published
+     */
+    protected void publishRequest(Q request) {
+        String json;
+        try {
+            if (request instanceof String) {
+                json = request.toString();
+            } else {
+                json = makeCoder().encode(request);
+            }
+        } catch (CoderException e) {
+            throw new IllegalArgumentException("cannot encode request", e);
+        }
+
+        List<CommInfrastructure> list = topicPair.publish(json);
+        if (list.isEmpty()) {
+            throw new IllegalStateException("nothing published");
+        }
+
+        logTopicRequest(list, request);
+    }
+
+    /**
+     * Processes a response.
+     *
+     * @param infra communication infrastructure on which the response was received
+     * @param outcome outcome to be populated
+     * @param response raw response to process
+     * @param scoResponse response, as a {@link StandardCoderObject}
+     * @return the outcome
+     */
+    protected OperationOutcome processResponse(CommInfrastructure infra, OperationOutcome outcome, String rawResponse,
+                    StandardCoderObject scoResponse) {
+
+        logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId());
+
+        logTopicResponse(infra, rawResponse);
+
+        S response;
+        if (responseClass == String.class) {
+            response = responseClass.cast(rawResponse);
+
+        } else if (responseClass == StandardCoderObject.class) {
+            response = responseClass.cast(scoResponse);
+
+        } else {
+            try {
+                response = makeCoder().decode(rawResponse, responseClass);
+            } catch (CoderException e) {
+                logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(),
+                                params.getRequestId());
+                throw new IllegalArgumentException("cannot decode response", e);
+            }
+        }
+
+        if (!isSuccess(rawResponse, response)) {
+            logger.info("{}.{} request failed  for {}", params.getActor(), params.getOperation(),
+                            params.getRequestId());
+            return setOutcome(outcome, PolicyResult.FAILURE);
+        }
+
+        logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId());
+        setOutcome(outcome, PolicyResult.SUCCESS);
+        postProcessResponse(outcome, rawResponse, response);
+
+        return outcome;
+    }
+
+    /**
+     * Processes a successful response.
+     *
+     * @param outcome outcome to be populated
+     * @param rawResponse raw response
+     * @param response decoded response
+     */
+    protected void postProcessResponse(OperationOutcome outcome, String rawResponse, S response) {
+        // do nothing
+    }
+
+    /**
+     * Determines if the response indicates success.
+     *
+     * @param rawResponse raw response
+     * @param response decoded response
+     * @return {@code true} if the response indicates success, {@code false} otherwise
+     */
+    protected abstract boolean isSuccess(String rawResponse, S response);
+
+    /**
+     * Logs a TOPIC request. If the request is not of type, String, then it attempts to
+     * pretty-print it into JSON before logging.
+     *
+     * @param infrastructures list of communication infrastructures on which it was
+     *        published
+     * @param request request to be logged
+     */
+    protected void logTopicRequest(List<CommInfrastructure> infrastructures, Q request) {
+        if (infrastructures.isEmpty()) {
+            return;
+        }
+
+        String json;
+        try {
+            if (request == null) {
+                json = null;
+            } else if (request instanceof String) {
+                json = request.toString();
+            } else {
+                json = makeCoder().encode(request, true);
+            }
+
+        } catch (CoderException e) {
+            logger.warn("cannot pretty-print request", e);
+            json = request.toString();
+        }
+
+        for (CommInfrastructure infra : infrastructures) {
+            logger.info("[OUT|{}|{}|]{}{}", infra, pairParams.getTarget(), NetLoggerUtil.SYSTEM_LS, json);
+        }
+    }
+
+    /**
+     * Logs a TOPIC response. If the response is not of type, String, then it attempts to
+     * pretty-print it into JSON before logging.
+     *
+     * @param infra communication infrastructure on which the response was received
+     * @param response response to be logged
+     */
+    protected <T> void logTopicResponse(CommInfrastructure infra, T response) {
+        String json;
+        try {
+            if (response == null) {
+                json = null;
+            } else if (response instanceof String) {
+                json = response.toString();
+            } else {
+                json = makeCoder().encode(response, true);
+            }
+
+        } catch (CoderException e) {
+            logger.warn("cannot pretty-print response", e);
+            json = response.toString();
+        }
+
+        logger.info("[IN|{}|{}|]{}{}", infra, pairParams.getSource(), NetLoggerUtil.SYSTEM_LS, json);
+    }
+
+    // these may be overridden by junit tests
+
+    protected Coder makeCoder() {
+        return coder;
+    }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java
new file mode 100644 (file)
index 0000000..8ce0133
--- /dev/null
@@ -0,0 +1,156 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+import lombok.Getter;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.controlloop.actorserviceprovider.Operation;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams;
+import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
+import org.onap.policy.controlloop.actorserviceprovider.topic.SelectorKey;
+import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
+import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager;
+
+/**
+ * Operator that uses a pair of topics, one for publishing the request, and another for
+ * receiving the response. Topic operators may share a {@link TopicPair}.
+ */
+public abstract class TopicPairOperator extends OperatorPartial {
+
+    /**
+     * Manager from which to get the topic pair.
+     */
+    private final TopicPairManager pairManager;
+
+    /**
+     * Keys used to extract the fields used to select responses for this operator.
+     */
+    private final List<SelectorKey> selectorKeys;
+
+    /*
+     * The remaining fields are initialized when configure() is invoked, thus they may
+     * change.
+     */
+
+    /**
+     * Current parameters. While {@link params} may change, the values contained within it
+     * will not, thus operations may copy it.
+     */
+    @Getter
+    private TopicPairParams params;
+
+    /**
+     * Topic pair associated with the parameters.
+     */
+    @Getter
+    private TopicPair topicPair;
+
+    /**
+     * Forwarder associated with the parameters.
+     */
+    @Getter
+    private Forwarder forwarder;
+
+
+    /**
+     * Constructs the object.
+     *
+     * @param actorName name of the actor with which this operator is associated
+     * @param name operation name
+     * @param pairManager manager from which to get the topic pair
+     * @param selectorKeys keys used to extract the fields used to select responses for
+     *        this operator
+     */
+    public TopicPairOperator(String actorName, String name, TopicPairManager pairManager,
+                    List<SelectorKey> selectorKeys) {
+        super(actorName, name);
+        this.pairManager = pairManager;
+        this.selectorKeys = selectorKeys;
+    }
+
+    @Override
+    protected void doConfigure(Map<String, Object> parameters) {
+        params = Util.translate(getFullName(), parameters, TopicPairParams.class);
+        ValidationResult result = params.validate(getFullName());
+        if (!result.isValid()) {
+            throw new ParameterValidationRuntimeException("invalid parameters", result);
+        }
+
+        topicPair = pairManager.getTopicPair(params.getSource(), params.getTarget());
+        forwarder = topicPair.addForwarder(selectorKeys);
+    }
+
+    /**
+     * Makes an operator that will construct operations.
+     *
+     * @param <Q> request type
+     * @param <S> response type
+     * @param actorName actor name
+     * @param operation operation name
+     * @param pairManager manager from which to get the topic pair
+     * @param operationMaker function to make an operation
+     * @param keys keys used to extract the fields used to select responses for this
+     *        operator
+     * @return a new operator
+     */
+    // @formatter:off
+    public static <Q,S> TopicPairOperator makeOperator(String actorName, String operation, TopicPairManager pairManager,
+                    BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<Q,S>> operationMaker,
+                    SelectorKey... keys) {
+        // @formatter:off
+
+        return makeOperator(actorName, operation, pairManager, Arrays.asList(keys), operationMaker);
+    }
+
+    /**
+     * Makes an operator that will construct operations.
+     *
+     * @param <Q> request type
+     * @param <S> response type
+     * @param actorName actor name
+     * @param operation operation name
+     * @param pairManager manager from which to get the topic pair
+     * @param keys keys used to extract the fields used to select responses for
+     *        this operator
+     * @param operationMaker function to make an operation
+     * @return a new operator
+     */
+    // @formatter:off
+    public static <Q,S> TopicPairOperator makeOperator(String actorName, String operation, TopicPairManager pairManager,
+                    List<SelectorKey> keys,
+                    BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<Q,S>> operationMaker) {
+        // @formatter:on
+
+        return new TopicPairOperator(actorName, operation, pairManager, keys) {
+            @Override
+            public synchronized Operation buildOperation(ControlLoopOperationParams params) {
+                return operationMaker.apply(params, this);
+            }
+        };
+    }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java
new file mode 100644 (file)
index 0000000..42a44ee
--- /dev/null
@@ -0,0 +1,93 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import java.util.Map;
+import lombok.Builder;
+import lombok.Data;
+import org.onap.policy.common.parameters.BeanValidationResult;
+import org.onap.policy.common.parameters.BeanValidator;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.common.parameters.annotations.NotBlank;
+import org.onap.policy.common.parameters.annotations.NotNull;
+
+/**
+ * Parameters used by Actors whose Operators use a pair of Topics, one to publish requests
+ * and the other to receive responses.
+ */
+@NotNull
+@NotBlank
+@Data
+@Builder
+public class TopicPairActorParams {
+
+    /**
+     * This contains the default parameters that are used when an operation doesn't
+     * specify them. Note: each operation to be used must still have an entry in
+     * {@link #operation}, even if it's empty. Otherwise, the given operation will not be
+     * started.
+     */
+    private TopicPairParams defaults;
+
+    /**
+     * Maps an operation name to its individual parameters.
+     */
+    private Map<String, Map<String, Object>> operation;
+
+
+    /**
+     * Validates the parameters.
+     *
+     * @param name name of the object containing these parameters
+     * @return "this"
+     * @throws IllegalArgumentException if the parameters are invalid
+     */
+    public TopicPairActorParams doValidation(String name) {
+        ValidationResult result = validate(name);
+        if (!result.isValid()) {
+            throw new ParameterValidationRuntimeException("invalid parameters", result);
+        }
+
+        return this;
+    }
+
+    /**
+     * Validates the parameters.
+     *
+     * @param resultName name of the result
+     *
+     * @return the validation result
+     */
+    public ValidationResult validate(String resultName) {
+        BeanValidationResult result = new BeanValidator().validateTop(resultName, this);
+
+        if (defaults != null) {
+            result.addResult(defaults.validate("defaults"));
+        }
+
+        // @formatter:off
+        result.validateMap("operation", operation,
+            (result2, entry) -> result2.validateNotNull(entry.getKey(), entry.getValue()));
+        // @formatter:on
+
+        return result;
+    }
+}
@@ -29,34 +29,34 @@ import org.onap.policy.common.parameters.annotations.NotBlank;
 import org.onap.policy.common.parameters.annotations.NotNull;
 
 /**
- * Parameters used by Operators that connect to a server via DMaaP.
+ * Parameters used by Operators that use a pair of Topics, one to publish requests and the
+ * other to receive responses.
  */
 @NotNull
 @NotBlank
 @Data
 @Builder(toBuilder = true)
-public class TopicParams {
+public class TopicPairParams {
 
     /**
-     * Name of the target topic end point to which requests should be published.
+     * Source topic end point, from which to read responses.
      */
-    private String target;
+    private String source;
 
     /**
-     * Source topic end point, from which to read responses.
+     * Name of the target topic end point to which requests should be published.
      */
-    private String source;
+    private String target;
 
     /**
-     * Amount of time, in seconds to wait for the response, where zero indicates that it
-     * should wait forever. The default is zero.
+     * Amount of time, in seconds to wait for the response. The default is five minutes.
      */
-    @Min(0)
+    @Min(1)
     @Builder.Default
-    private int timeoutSec = 0;
+    private int timeoutSec = 300;
 
     /**
-     * Validates both the publisher and the subscriber parameters.
+     * Validates the parameters.
      *
      * @param resultName name of the result
      *
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java
new file mode 100644 (file)
index 0000000..8e9109c
--- /dev/null
@@ -0,0 +1,141 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.topic;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Forwarder that selectively forwards message to listeners based on the content of the
+ * message. Each forwarder is associated with a single set of selector keys. Listeners are
+ * then registered with that forwarder for a particular set of values for the given keys.
+ */
+public class Forwarder {
+    private static final Logger logger = LoggerFactory.getLogger(Forwarder.class);
+
+    /**
+     * Maps a set of field values to one or more listeners.
+     */
+    // @formatter:off
+    private final Map<List<String>, Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String>>
+                values2listeners = new ConcurrentHashMap<>();
+    // @formatter:on
+
+    /**
+     * Keys used to extract the field values from the {@link StandardCoderObject}.
+     */
+    private final List<SelectorKey> keys;
+
+    /**
+     * Constructs the object.
+     *
+     * @param keys keys used to extract the field's value from the
+     *        {@link StandardCoderObject}
+     */
+    public Forwarder(List<SelectorKey> keys) {
+        this.keys = keys;
+    }
+
+    /**
+     * Registers a listener for messages containing the given field values.
+     *
+     * @param values field values of interest, in one-to-one correspondence with the keys
+     * @param listener listener to register
+     */
+    public void register(List<String> values, TriConsumer<CommInfrastructure, String, StandardCoderObject> listener) {
+        if (keys.size() != values.size()) {
+            throw new IllegalArgumentException("key/value mismatch");
+        }
+
+        values2listeners.compute(values, (key, listeners) -> {
+            Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String> map = listeners;
+            if (map == null) {
+                map = new ConcurrentHashMap<>();
+            }
+
+            map.put(listener, "");
+            return map;
+        });
+    }
+
+    /**
+     * Unregisters a listener for messages containing the given field values.
+     *
+     * @param values field values of interest, in one-to-one correspondence with the keys
+     * @param listener listener to unregister
+     */
+    public void unregister(List<String> values, TriConsumer<CommInfrastructure, String, StandardCoderObject> listener) {
+        values2listeners.computeIfPresent(values, (key, listeners) -> {
+            listeners.remove(listener);
+            return (listeners.isEmpty() ? null : listeners);
+        });
+    }
+
+    /**
+     * Processes a message, forwarding it to the appropriate listeners, if any.
+     *
+     * @param infra communication infrastructure on which the response was received
+     * @param textMessage original text message that was received
+     * @param scoMessage decoded text message
+     */
+    public void onMessage(CommInfrastructure infra, String textMessage, StandardCoderObject scoMessage) {
+        // extract the key values from the message
+        List<String> values = new ArrayList<>(keys.size());
+        for (SelectorKey key : keys) {
+            String value = key.extractField(scoMessage);
+            if (value == null) {
+                /*
+                 * No value for this field, so this message is not relevant to this
+                 * forwarder.
+                 */
+                return;
+            }
+
+            values.add(value);
+        }
+
+        // get the listeners for this set of values
+        Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String> listeners =
+                        values2listeners.get(values);
+        if (listeners == null) {
+            // no listeners for this particular list of values
+            return;
+        }
+
+
+        // forward the message to each listener
+        for (TriConsumer<CommInfrastructure, String, StandardCoderObject> listener : listeners.keySet()) {
+            try {
+                listener.accept(infra, textMessage, scoMessage);
+            } catch (RuntimeException e) {
+                logger.warn("exception thrown by listener {}", Util.ident(listener), e);
+            }
+        }
+    }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/SelectorKey.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/SelectorKey.java
new file mode 100644 (file)
index 0000000..fc57273
--- /dev/null
@@ -0,0 +1,57 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.topic;
+
+import lombok.EqualsAndHashCode;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+
+/**
+ * Selector key, which contains a hierarchical list of Strings and Integers that are used
+ * to extract the content of a field, typically from a {@link StandardCoderObject}.
+ */
+@EqualsAndHashCode
+public class SelectorKey {
+
+    /**
+     * Names and indices used to extract the field's value.
+     */
+    private final Object[] fieldIdentifiers;
+
+    /**
+     * Constructs the object.
+     *
+     * @param fieldIdentifiers names and indices used to extract the field's value
+     */
+    public SelectorKey(Object... fieldIdentifiers) {
+        this.fieldIdentifiers = fieldIdentifiers;
+    }
+
+    /**
+     * Extracts the given field from an object.
+     *
+     * @param object object from which to extract the field
+     * @return the extracted value, or {@code null} if the object does not contain the
+     *         field
+     */
+    public String extractField(StandardCoderObject object) {
+        return object.getString(fieldIdentifiers);
+    }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java
new file mode 100644 (file)
index 0000000..eb805ca
--- /dev/null
@@ -0,0 +1,104 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.topic;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A topic listener. When a message arrives on a topic, it is forwarded to listeners based
+ * on the content of fields found within the message. However, depending on the message
+ * type, the relevant fields might be found in different places within the message's
+ * object hierarchy. For each different list of keys, this class maintains a
+ * {@link Forwarder}, which is used to forward the message to all relevant listeners.
+ * <p/>
+ * Once a selector has been added, it is not removed until {@link #shutdown()} is invoked.
+ * As selectors are typically only added by Operators, and not by individual Operations,
+ * this should not pose a problem.
+ */
+public class TopicListenerImpl implements TopicListener {
+    private static final Logger logger = LoggerFactory.getLogger(TopicListenerImpl.class);
+    private static StandardCoder coder = new StandardCoder();
+
+    /**
+     * Maps selector to a forwarder.
+     */
+    private final Map<List<SelectorKey>, Forwarder> selector2forwarder = new ConcurrentHashMap<>();
+
+
+    /**
+     * Removes all forwarders.
+     */
+    public void shutdown() {
+        selector2forwarder.clear();
+    }
+
+    /**
+     * Adds a forwarder, if it doesn't already exist.
+     *
+     * @param keys the selector keys
+     * @return the forwarder associated with the given selector keys
+     */
+    public Forwarder addForwarder(SelectorKey... keys) {
+        return addForwarder(Arrays.asList(keys));
+    }
+
+    /**
+     * Adds a forwarder, if it doesn't already exist.
+     *
+     * @param keys the selector keys
+     * @return the forwarder associated with the given selector keys
+     */
+    public Forwarder addForwarder(List<SelectorKey> keys) {
+        return selector2forwarder.computeIfAbsent(keys, key -> new Forwarder(keys));
+    }
+
+    /**
+     * Decodes the message and then forwards it to each forwarder for processing.
+     */
+    @Override
+    public void onTopicEvent(CommInfrastructure infra, String topic, String message) {
+        StandardCoderObject object;
+        try {
+            object = coder.decode(message, StandardCoderObject.class);
+        } catch (CoderException e) {
+            logger.warn("cannot decode message", e);
+            return;
+        }
+
+        /*
+         * We don't know which selector is appropriate for the message, so we just let
+         * them all take a crack at it.
+         */
+        for (Forwarder forwarder : selector2forwarder.values()) {
+            forwarder.onMessage(infra, message, object);
+        }
+    }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java
new file mode 100644 (file)
index 0000000..c0cfe25
--- /dev/null
@@ -0,0 +1,122 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.topic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import lombok.Getter;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A pair of topics, one of which is used to publish requests and the other to receive
+ * responses.
+ */
+public class TopicPair extends TopicListenerImpl {
+    private static final Logger logger = LoggerFactory.getLogger(TopicPair.class);
+
+    @Getter
+    private final String source;
+
+    @Getter
+    private final String target;
+
+    private final List<TopicSink> publishers;
+    private final List<TopicSource> subscribers;
+
+    /**
+     * Constructs the object.
+     *
+     * @param source source topic name
+     * @param target target topic name
+     */
+    public TopicPair(String source, String target) {
+        this.source = source;
+        this.target = target;
+
+        publishers = getTopicEndpointManager().getTopicSinks(target);
+        if (publishers.isEmpty()) {
+            throw new IllegalArgumentException("no sinks for topic: " + target);
+        }
+
+        subscribers = getTopicEndpointManager().getTopicSources(Arrays.asList(source));
+        if (subscribers.isEmpty()) {
+            throw new IllegalArgumentException("no sources for topic: " + source);
+        }
+    }
+
+    /**
+     * Starts listening on the source topic(s).
+     */
+    public void start() {
+        subscribers.forEach(topic -> topic.register(this));
+    }
+
+    /**
+     * Stops listening on the source topic(s).
+     */
+    public void stop() {
+        subscribers.forEach(topic -> topic.unregister(this));
+    }
+
+    /**
+     * Stops listening on the source topic(s) and clears all of the forwarders.
+     */
+    @Override
+    public void shutdown() {
+        stop();
+        super.shutdown();
+    }
+
+    /**
+     * Publishes a message to the target topic.
+     *
+     * @param message message to be published
+     * @return a list of the infrastructures on which it was published
+     */
+    public List<CommInfrastructure> publish(String message) {
+        List<CommInfrastructure> infrastructures = new ArrayList<>(publishers.size());
+
+        for (TopicSink topic : publishers) {
+            try {
+                topic.send(message);
+                infrastructures.add(topic.getTopicCommInfrastructure());
+
+            } catch (RuntimeException e) {
+                logger.warn("cannot publish to {}:{}", topic.getTopicCommInfrastructure(), target, e);
+            }
+        }
+
+        return infrastructures;
+    }
+
+    // these may be overridden by junit tests
+
+    protected TopicEndpoint getTopicEndpointManager() {
+        return TopicEndpointManager.getManager();
+    }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java
new file mode 100644 (file)
index 0000000..c351f95
--- /dev/null
@@ -0,0 +1,37 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.topic;
+
+/**
+ * Manages topic pairs.
+ */
+@FunctionalInterface
+public interface TopicPairManager {
+
+    /**
+     * Gets the topic pair for the given parameters, creating one if it does not exist.
+     *
+     * @param source source topic name
+     * @param target target topic name
+     * @return the topic pair associated with the given source and target topics
+     */
+    TopicPair getTopicPair(String source, String target);
+}
index 19f781d..39d6fd4 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.policy.controlloop.actorserviceprovider.impl;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -359,8 +360,7 @@ public class HttpOperationTest {
     public void testProcessResponseDecodeExcept() throws CoderException {
         MyGetOperation<Integer> oper2 = new MyGetOperation<>(Integer.class);
 
-        assertSame(outcome, oper2.processResponse(outcome, PATH, response));
-        assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
+        assertThatIllegalArgumentException().isThrownBy(() -> oper2.processResponse(outcome, PATH, response));
     }
 
     @Test
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/MyExec.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/MyExec.java
new file mode 100644 (file)
index 0000000..6515eb3
--- /dev/null
@@ -0,0 +1,65 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+
+/**
+ * Executor that will run tasks until the queue is empty or a maximum number of tasks have
+ * been executed. Doesn't actually run anything until {@link #runAll()} is invoked.
+ */
+public class MyExec implements Executor {
+
+    // TODO move this to policy-common/utils-test
+
+    private final int maxTasks;
+    private final Queue<Runnable> commands = new LinkedList<>();
+
+    public MyExec(int maxTasks) {
+        this.maxTasks = maxTasks;
+    }
+
+    public int getQueueLength() {
+        return commands.size();
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        commands.add(command);
+    }
+
+    /**
+     * Runs all tasks until the queue is empty or the maximum number of tasks have been
+     * reached.
+     *
+     * @return {@code true} if the queue is empty, {@code false} if the maximum number of
+     *         tasks have been reached before the queue was completed
+     */
+    public boolean runAll() {
+        for (int count = 0; count < maxTasks && !commands.isEmpty(); ++count) {
+            commands.remove().run();
+        }
+
+        return commands.isEmpty();
+    }
+}
index 0d5cb24..f28c1f6 100644 (file)
@@ -36,7 +36,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -105,7 +104,7 @@ public class OperationPartialTest {
         event.setRequestId(REQ_ID);
 
         context = new ControlLoopEventContext(event);
-        executor = new MyExec();
+        executor = new MyExec(100 * MAX_PARALLEL_REQUESTS);
 
         params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
                         .executor(executor).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
@@ -1267,36 +1266,4 @@ public class OperationPartialTest {
             return 0L;
         }
     }
-
-    /**
-     * Executor that will run tasks until the queue is empty or a maximum number of tasks
-     * have been executed. Doesn't actually run anything until {@link #runAll()} is
-     * invoked.
-     */
-    private static class MyExec implements Executor {
-        private static final int MAX_TASKS = MAX_PARALLEL_REQUESTS * 100;
-
-        private Queue<Runnable> commands = new LinkedList<>();
-
-        public MyExec() {
-            // do nothing
-        }
-
-        public int getQueueLength() {
-            return commands.size();
-        }
-
-        @Override
-        public void execute(Runnable command) {
-            commands.add(command);
-        }
-
-        public boolean runAll() {
-            for (int count = 0; count < MAX_TASKS && !commands.isEmpty(); ++count) {
-                commands.remove().run();
-            }
-
-            return commands.isEmpty();
-        }
-    }
 }
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperationTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperationTest.java
new file mode 100644 (file)
index 0000000..4e45b1a
--- /dev/null
@@ -0,0 +1,503 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import ch.qos.logback.classic.Logger;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import lombok.Setter;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams;
+import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
+import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
+import org.onap.policy.controlloop.policy.PolicyResult;
+import org.slf4j.LoggerFactory;
+
+public class TopicPairOperationTest {
+    private static final List<CommInfrastructure> INFRA_LIST =
+                    Arrays.asList(CommInfrastructure.NOOP, CommInfrastructure.UEB);
+    private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
+    private static final String ACTOR = "my-actor";
+    private static final String OPERATION = "my-operation";
+    private static final String REQ_ID = "my-request-id";
+    private static final String MY_SOURCE = "my-source";
+    private static final String MY_TARGET = "my-target";
+    private static final String TEXT = "some text";
+    private static final int TIMEOUT_SEC = 10;
+    private static final long TIMEOUT_MS = 1000 * TIMEOUT_SEC;
+
+    private static final StandardCoder coder = new StandardCoder();
+
+    /**
+     * Used to attach an appender to the class' logger.
+     */
+    private static final Logger logger = (Logger) LoggerFactory.getLogger(TopicPairOperation.class);
+    private static final ExtractAppender appender = new ExtractAppender();
+
+    @Mock
+    private TopicPairOperator operator;
+    @Mock
+    private TopicPair pair;
+    @Mock
+    private Forwarder forwarder;
+
+    @Captor
+    private ArgumentCaptor<TriConsumer<CommInfrastructure, String, StandardCoderObject>> listenerCaptor;
+
+    private ControlLoopOperationParams params;
+    private TopicPairParams topicParams;
+    private OperationOutcome outcome;
+    private StandardCoderObject stdResponse;
+    private String responseText;
+    private MyExec executor;
+    private TopicPairOperation<MyRequest, MyResponse> oper;
+
+    /**
+     * Attaches the appender to the logger.
+     */
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        /**
+         * Attach appender to the logger.
+         */
+        appender.setContext(logger.getLoggerContext());
+        appender.start();
+
+        logger.addAppender(appender);
+    }
+
+    /**
+     * Stops the appender.
+     */
+    @AfterClass
+    public static void tearDownAfterClass() {
+        appender.stop();
+    }
+
+    /**
+     * Sets up.
+     */
+    @Before
+    public void setUp() throws CoderException {
+        MockitoAnnotations.initMocks(this);
+
+        appender.clearExtractions();
+
+        topicParams = TopicPairParams.builder().source(MY_SOURCE).target(MY_TARGET).timeoutSec(TIMEOUT_SEC).build();
+
+        when(operator.getActorName()).thenReturn(ACTOR);
+        when(operator.getName()).thenReturn(OPERATION);
+        when(operator.getTopicPair()).thenReturn(pair);
+        when(operator.getForwarder()).thenReturn(forwarder);
+        when(operator.getParams()).thenReturn(topicParams);
+        when(operator.isAlive()).thenReturn(true);
+
+        when(pair.publish(any())).thenReturn(INFRA_LIST);
+
+        executor = new MyExec(100);
+
+        params = ControlLoopOperationParams.builder().actor(ACTOR).operation(OPERATION).executor(executor).build();
+        outcome = params.makeOutcome();
+
+        responseText = coder.encode(new MyResponse());
+        stdResponse = coder.decode(responseText, StandardCoderObject.class);
+
+        oper = new MyOperation();
+    }
+
+    @Test
+    public void testTopicPairOperation_testGetTopicPair_testGetForwarder_testGetPairParams() {
+        assertEquals(ACTOR, oper.getActorName());
+        assertEquals(OPERATION, oper.getName());
+        assertSame(pair, oper.getTopicPair());
+        assertSame(forwarder, oper.getForwarder());
+        assertSame(topicParams, oper.getPairParams());
+        assertEquals(TIMEOUT_MS, oper.getTimeoutMs());
+        assertSame(MyResponse.class, oper.getResponseClass());
+    }
+
+    @Test
+    public void testStartOperationAsync() throws Exception {
+        CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome);
+        assertFalse(future.isDone());
+
+        verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
+
+        verify(forwarder, never()).unregister(any(), any());
+
+        verify(pair).publish(any());
+
+        // provide the response
+        listenerCaptor.getValue().accept(CommInfrastructure.NOOP, responseText, stdResponse);
+
+        // run the tasks
+        assertTrue(executor.runAll());
+
+        assertTrue(future.isDone());
+
+        assertSame(outcome, future.get(5, TimeUnit.SECONDS));
+        assertEquals(PolicyResult.SUCCESS, outcome.getResult());
+
+        verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue()));
+    }
+
+    /**
+     * Tests startOperationAsync() when the publisher throws an exception.
+     */
+    @Test
+    public void testStartOperationAsyncException() throws Exception {
+        // indicate that nothing was published
+        when(pair.publish(any())).thenReturn(Arrays.asList());
+
+        assertThatIllegalStateException().isThrownBy(() -> oper.startOperationAsync(1, outcome));
+
+        verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
+
+        // must still unregister
+        verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue()));
+    }
+
+    @Test
+    public void testGetTimeoutMsInteger() {
+        // use default
+        assertEquals(TIMEOUT_MS, oper.getTimeoutMs(null));
+        assertEquals(TIMEOUT_MS, oper.getTimeoutMs(0));
+
+        // use provided value
+        assertEquals(5000, oper.getTimeoutMs(5));
+    }
+
+    @Test
+    public void testPublishRequest() {
+        oper.publishRequest(new MyRequest());
+        assertEquals(INFRA_LIST.size(), appender.getExtracted().size());
+    }
+
+    /**
+     * Tests publishRequest() when nothing is published.
+     */
+    @Test
+    public void testPublishRequestUnpublished() {
+        when(pair.publish(any())).thenReturn(Arrays.asList());
+        assertThatIllegalStateException().isThrownBy(() -> oper.publishRequest(new MyRequest()));
+    }
+
+    /**
+     * Tests publishRequest() when the request type is a String.
+     */
+    @Test
+    public void testPublishRequestString() {
+        MyStringOperation oper2 = new MyStringOperation();
+        oper2.publishRequest(TEXT);
+        assertEquals(INFRA_LIST.size(), appender.getExtracted().size());
+    }
+
+    /**
+     * Tests publishRequest() when the coder throws an exception.
+     */
+    @Test
+    public void testPublishRequestException() {
+        setOperCoderException();
+        assertThatIllegalArgumentException().isThrownBy(() -> oper.publishRequest(new MyRequest()));
+    }
+
+    /**
+     * Tests processResponse() when it's a success and the response type is a String.
+     */
+    @Test
+    public void testProcessResponseSuccessString() {
+        MyStringOperation oper2 = new MyStringOperation();
+
+        assertSame(outcome, oper2.processResponse(CommInfrastructure.NOOP, outcome, TEXT, null));
+        assertEquals(PolicyResult.SUCCESS, outcome.getResult());
+    }
+
+    /**
+     * Tests processResponse() when it's a success and the response type is a
+     * StandardCoderObject.
+     */
+    @Test
+    public void testProcessResponseSuccessSco() {
+        MyScoOperation oper2 = new MyScoOperation();
+
+        assertSame(outcome, oper2.processResponse(CommInfrastructure.NOOP, outcome, responseText, stdResponse));
+        assertEquals(PolicyResult.SUCCESS, outcome.getResult());
+    }
+
+    /**
+     * Tests processResponse() when it's a failure.
+     */
+    @Test
+    public void testProcessResponseFailure() throws CoderException {
+        // indicate error in the response
+        MyResponse resp = new MyResponse();
+        resp.setOutput("error");
+
+        responseText = coder.encode(resp);
+        stdResponse = coder.decode(responseText, StandardCoderObject.class);
+
+        assertSame(outcome, oper.processResponse(CommInfrastructure.NOOP, outcome, responseText, stdResponse));
+        assertEquals(PolicyResult.FAILURE, outcome.getResult());
+    }
+
+    /**
+     * Tests processResponse() when the decoder succeeds.
+     */
+    @Test
+    public void testProcessResponseDecodeOk() throws CoderException {
+        assertSame(outcome, oper.processResponse(CommInfrastructure.NOOP, outcome, responseText, stdResponse));
+        assertEquals(PolicyResult.SUCCESS, outcome.getResult());
+    }
+
+    /**
+     * Tests processResponse() when the decoder throws an exception.
+     */
+    @Test
+    public void testProcessResponseDecodeExcept() throws CoderException {
+        // @formatter:off
+        assertThatIllegalArgumentException().isThrownBy(
+            () -> oper.processResponse(CommInfrastructure.NOOP, outcome, "{invalid json", stdResponse));
+        // @formatter:on
+    }
+
+    @Test
+    public void testPostProcessResponse() {
+        assertThatCode(() -> oper.postProcessResponse(outcome, null, null)).doesNotThrowAnyException();
+    }
+
+    @Test
+    public void testLogTopicRequest() {
+        // nothing to log
+        appender.clearExtractions();
+        oper.logTopicRequest(Arrays.asList(), new MyRequest());
+        assertEquals(0, appender.getExtracted().size());
+
+        // log structured data
+        appender.clearExtractions();
+        oper.logTopicRequest(INFRA_LIST, new MyRequest());
+        List<String> output = appender.getExtracted();
+        assertEquals(2, output.size());
+
+        assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString())
+                        .contains("{\n  \"theRequestId\": \"my-request-id\"\n}");
+
+        assertThat(output.get(1)).contains(CommInfrastructure.UEB.toString())
+                        .contains("{\n  \"theRequestId\": \"my-request-id\"\n}");
+
+        // log a plain string
+        appender.clearExtractions();
+        new MyStringOperation().logTopicRequest(Arrays.asList(CommInfrastructure.NOOP), TEXT);
+        output = appender.getExtracted();
+        assertEquals(1, output.size());
+        assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains(TEXT);
+
+        // log a null request
+        appender.clearExtractions();
+        oper.logTopicRequest(Arrays.asList(CommInfrastructure.NOOP), null);
+        output = appender.getExtracted();
+        assertEquals(1, output.size());
+
+        assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains("null");
+
+        // exception from coder
+        setOperCoderException();
+
+        appender.clearExtractions();
+        oper.logTopicRequest(Arrays.asList(CommInfrastructure.NOOP), new MyRequest());
+        output = appender.getExtracted();
+        assertEquals(2, output.size());
+        assertThat(output.get(0)).contains("cannot pretty-print request");
+        assertThat(output.get(1)).contains(CommInfrastructure.NOOP.toString());
+    }
+
+    @Test
+    public void testLogTopicResponse() {
+        // log structured data
+        appender.clearExtractions();
+        oper.logTopicResponse(CommInfrastructure.NOOP, new MyResponse());
+        List<String> output = appender.getExtracted();
+        assertEquals(1, output.size());
+
+        assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString())
+                        .contains("{\n  \"requestId\": \"my-request-id\"\n}");
+
+        // log a plain string
+        appender.clearExtractions();
+        new MyStringOperation().logTopicResponse(CommInfrastructure.NOOP, TEXT);
+        output = appender.getExtracted();
+        assertEquals(1, output.size());
+        assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains(TEXT);
+
+        // log a null response
+        appender.clearExtractions();
+        oper.logTopicResponse(CommInfrastructure.NOOP, null);
+        output = appender.getExtracted();
+        assertEquals(1, output.size());
+
+        assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains("null");
+
+        // exception from coder
+        setOperCoderException();
+
+        appender.clearExtractions();
+        oper.logTopicResponse(CommInfrastructure.NOOP, new MyResponse());
+        output = appender.getExtracted();
+        assertEquals(2, output.size());
+        assertThat(output.get(0)).contains("cannot pretty-print response");
+        assertThat(output.get(1)).contains(CommInfrastructure.NOOP.toString());
+    }
+
+    @Test
+    public void testMakeCoder() {
+        assertNotNull(oper.makeCoder());
+    }
+
+    /**
+     * Creates a new {@link #oper} whose coder will throw an exception.
+     */
+    private void setOperCoderException() {
+        oper = new MyOperation() {
+            @Override
+            protected Coder makeCoder() {
+                return new StandardCoder() {
+                    @Override
+                    public String encode(Object object, boolean pretty) throws CoderException {
+                        throw new CoderException(EXPECTED_EXCEPTION);
+                    }
+                };
+            }
+        };
+    }
+
+    @Getter
+    @Setter
+    public static class MyRequest {
+        private String theRequestId = REQ_ID;
+        private String input;
+    }
+
+    @Getter
+    @Setter
+    public static class MyResponse {
+        private String requestId = REQ_ID;
+        private String output;
+    }
+
+
+    private class MyStringOperation extends TopicPairOperation<String, String> {
+        public MyStringOperation() {
+            super(TopicPairOperationTest.this.params, operator, String.class);
+        }
+
+        @Override
+        protected String makeRequest(int attempt) {
+            return TEXT;
+        }
+
+        @Override
+        protected List<String> getExpectedKeyValues(int attempt, String request) {
+            return Arrays.asList(REQ_ID);
+        }
+
+        @Override
+        protected boolean isSuccess(String rawResponse, String response) {
+            return (response != null);
+        }
+    }
+
+
+    private class MyScoOperation extends TopicPairOperation<MyRequest, StandardCoderObject> {
+        public MyScoOperation() {
+            super(TopicPairOperationTest.this.params, operator, StandardCoderObject.class);
+        }
+
+        @Override
+        protected MyRequest makeRequest(int attempt) {
+            return new MyRequest();
+        }
+
+        @Override
+        protected List<String> getExpectedKeyValues(int attempt, MyRequest request) {
+            return Arrays.asList(REQ_ID);
+        }
+
+        @Override
+        protected boolean isSuccess(String rawResponse, StandardCoderObject response) {
+            return (response.getString("output") == null);
+        }
+    }
+
+
+    private class MyOperation extends TopicPairOperation<MyRequest, MyResponse> {
+        public MyOperation() {
+            super(TopicPairOperationTest.this.params, operator, MyResponse.class);
+        }
+
+        @Override
+        protected MyRequest makeRequest(int attempt) {
+            return new MyRequest();
+        }
+
+        @Override
+        protected List<String> getExpectedKeyValues(int attempt, MyRequest request) {
+            return Arrays.asList(REQ_ID);
+        }
+
+        @Override
+        protected boolean isSuccess(String rawResponse, MyResponse response) {
+            return (response.getOutput() == null);
+        }
+    }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperatorTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperatorTest.java
new file mode 100644 (file)
index 0000000..dd25902
--- /dev/null
@@ -0,0 +1,140 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.controlloop.actorserviceprovider.Operation;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams;
+import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
+import org.onap.policy.controlloop.actorserviceprovider.topic.SelectorKey;
+import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
+import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager;
+
+public class TopicPairOperatorTest {
+    private static final String ACTOR = "my-actor";
+    private static final String OPERATION = "my-operation";
+    private static final String MY_SOURCE = "my-source";
+    private static final String MY_TARGET = "my-target";
+    private static final int TIMEOUT_SEC = 10;
+
+    @Mock
+    private TopicPairManager mgr;
+    @Mock
+    private TopicPair pair;
+    @Mock
+    private Forwarder forwarder;
+    @Mock
+    private TopicPairOperation<String, Integer> operation;
+
+    private List<SelectorKey> keys;
+    private TopicPairParams params;
+    private MyOperator oper;
+
+    /**
+     * Sets up.
+     */
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
+        keys = List.of(new SelectorKey(""));
+
+        when(mgr.getTopicPair(MY_SOURCE, MY_TARGET)).thenReturn(pair);
+        when(pair.addForwarder(keys)).thenReturn(forwarder);
+
+        oper = new MyOperator(keys);
+
+        params = TopicPairParams.builder().source(MY_SOURCE).target(MY_TARGET).timeoutSec(TIMEOUT_SEC).build();
+        oper.configure(Util.translateToMap(OPERATION, params));
+        oper.start();
+    }
+
+    @Test
+    public void testTopicPairOperator_testGetParams_testGetTopicPair_testGetForwarder() {
+        assertEquals(ACTOR, oper.getActorName());
+        assertEquals(OPERATION, oper.getName());
+        assertEquals(params, oper.getParams());
+        assertSame(pair, oper.getTopicPair());
+        assertSame(forwarder, oper.getForwarder());
+    }
+
+    @Test
+    public void testDoConfigure() {
+        oper.stop();
+
+        // invalid parameters
+        params.setSource(null);
+        assertThatThrownBy(() -> oper.configure(Util.translateToMap(OPERATION, params)))
+                        .isInstanceOf(ParameterValidationRuntimeException.class);
+    }
+
+    @Test
+    public void testMakeOperator() {
+        AtomicReference<ControlLoopOperationParams> paramsRef = new AtomicReference<>();
+        AtomicReference<TopicPairOperator> operRef = new AtomicReference<>();
+
+        // @formatter:off
+        BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<String, Integer>> maker =
+            (params, operator) -> {
+                paramsRef.set(params);
+                operRef.set(operator);
+                return operation;
+            };
+        // @formatter:on
+
+        TopicPairOperator oper2 = TopicPairOperator.makeOperator(ACTOR, OPERATION, mgr, maker, new SelectorKey(""));
+
+        assertEquals(ACTOR, oper2.getActorName());
+        assertEquals(OPERATION, oper2.getName());
+
+        ControlLoopOperationParams params2 = ControlLoopOperationParams.builder().build();
+
+        assertSame(operation, oper2.buildOperation(params2));
+        assertSame(params2, paramsRef.get());
+        assertSame(oper2, operRef.get());
+    }
+
+
+    private class MyOperator extends TopicPairOperator {
+        public MyOperator(List<SelectorKey> selectorKeys) {
+            super(ACTOR, OPERATION, mgr, selectorKeys);
+        }
+
+        @Override
+        public Operation buildOperation(ControlLoopOperationParams params) {
+            return null;
+        }
+    }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParamsTest.java
new file mode 100644 (file)
index 0000000..4322c5f
--- /dev/null
@@ -0,0 +1,132 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.TreeMap;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+
+public class TopicPairActorParamsTest {
+    private static final String MY_NAME = "my-name";
+
+    private static final String DFLT_SOURCE = "default-source";
+    private static final String DFLT_TARGET = "default-target";
+    private static final int DFLT_TIMEOUT = 10;
+
+    private static final String OPER1_NAME = "oper A";
+    private static final String OPER1_SOURCE = "source A";
+    private static final String OPER1_TARGET = "target A";
+    private static final int OPER1_TIMEOUT = 20;
+
+    // oper2 uses some default values
+    private static final String OPER2_NAME = "oper B";
+    private static final String OPER2_SOURCE = "source B";
+
+    // oper3 uses default values for everything
+    private static final String OPER3_NAME = "oper C";
+
+    private TopicPairParams defaults;
+    private Map<String, Map<String, Object>> operMap;
+    private TopicPairActorParams params;
+
+
+    /**
+     * Sets up.
+     */
+    @Before
+    public void setUp() {
+        defaults = TopicPairParams.builder().source(DFLT_SOURCE).target(DFLT_TARGET).timeoutSec(DFLT_TIMEOUT).build();
+
+        TopicPairParams oper1 = TopicPairParams.builder().source(OPER1_SOURCE).target(OPER1_TARGET)
+                        .timeoutSec(OPER1_TIMEOUT).build();
+
+        Map<String, Object> oper1Map = Util.translateToMap(OPER1_NAME, oper1);
+        Map<String, Object> oper2Map = Map.of("source", OPER2_SOURCE);
+        Map<String, Object> oper3Map = Collections.emptyMap();
+        operMap = Map.of(OPER1_NAME, oper1Map, OPER2_NAME, oper2Map, OPER3_NAME, oper3Map);
+
+        params = TopicPairActorParams.builder().defaults(defaults).operation(operMap).build();
+
+    }
+
+    @Test
+    public void testTopicPairActorParams() {
+        assertSame(defaults, params.getDefaults());
+        assertSame(operMap, params.getOperation());
+    }
+
+    @Test
+    public void testDoValidation() {
+        assertSame(params, params.doValidation(MY_NAME));
+
+        // test with invalid parameters
+        defaults.setTimeoutSec(-1);
+        assertThatThrownBy(() -> params.doValidation(MY_NAME)).isInstanceOf(ParameterValidationRuntimeException.class);
+    }
+
+    @Test
+    public void testValidate() {
+        ValidationResult result;
+
+        // null defaults
+        params.setDefaults(null);
+        result = params.validate(MY_NAME);
+        assertFalse(result.isValid());
+        assertThat(result.getResult()).contains("defaults").contains("null");
+        params.setDefaults(defaults);
+
+        // invalid value in defaults
+        defaults.setTimeoutSec(-1);
+        result = params.validate(MY_NAME);
+        assertFalse(result.isValid());
+        assertThat(result.getResult()).contains("defaults").contains("timeoutSec");
+        defaults.setTimeoutSec(DFLT_TIMEOUT);
+
+        // null map
+        params.setOperation(null);
+        result = params.validate(MY_NAME);
+        assertFalse(result.isValid());
+        assertThat(result.getResult()).contains("operation");
+        params.setOperation(operMap);
+
+        // null entry in the map
+        Map<String, Map<String, Object>> map2 = new TreeMap<>(operMap);
+        map2.put(OPER2_NAME, null);
+        params.setOperation(map2);
+        result = params.validate(MY_NAME);
+        assertFalse(result.isValid());
+        assertThat(result.getResult()).contains("operation").contains("null");
+        params.setOperation(operMap);
+
+        // test success case
+        assertTrue(params.validate(MY_NAME).isValid());
+    }
+}
@@ -29,20 +29,20 @@ import java.util.function.Function;
 import org.junit.Before;
 import org.junit.Test;
 import org.onap.policy.common.parameters.ValidationResult;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicParams.TopicParamsBuilder;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams.TopicPairParamsBuilder;
 
-public class TopicParamsTest {
+public class TopicPairParamsTest {
 
     private static final String CONTAINER = "my-container";
     private static final String TARGET = "my-target";
     private static final String SOURCE = "my-source";
     private static final int TIMEOUT = 10;
 
-    private TopicParams params;
+    private TopicPairParams params;
 
     @Before
     public void setUp() {
-        params = TopicParams.builder().target(TARGET).source(SOURCE).timeoutSec(TIMEOUT).build();
+        params = TopicPairParams.builder().target(TARGET).source(SOURCE).timeoutSec(TIMEOUT).build();
     }
 
     @Test
@@ -52,8 +52,11 @@ public class TopicParamsTest {
         testValidateField("timeoutSec", "minimum", bldr -> bldr.timeoutSec(-1));
 
         // check edge cases
-        assertTrue(params.toBuilder().timeoutSec(0).build().validate(CONTAINER).isValid());
+        assertFalse(params.toBuilder().timeoutSec(0).build().validate(CONTAINER).isValid());
         assertTrue(params.toBuilder().timeoutSec(1).build().validate(CONTAINER).isValid());
+
+        // some default values should be valid
+        assertTrue(TopicPairParams.builder().target(TARGET).source(SOURCE).build().validate(CONTAINER).isValid());
     }
 
     @Test
@@ -66,7 +69,7 @@ public class TopicParamsTest {
     }
 
     private void testValidateField(String fieldName, String expected,
-                    Function<TopicParamsBuilder, TopicParamsBuilder> makeInvalid) {
+                    Function<TopicPairParamsBuilder, TopicPairParamsBuilder> makeInvalid) {
 
         // original params should be valid
         ValidationResult result = params.validate(CONTAINER);
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/ForwarderTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/ForwarderTest.java
new file mode 100644 (file)
index 0000000..24f8b70
--- /dev/null
@@ -0,0 +1,201 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.topic;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.Arrays;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+
+public class ForwarderTest {
+    private static final CommInfrastructure INFRA = CommInfrastructure.NOOP;
+    private static final String TEXT = "some text";
+
+    private static final String KEY1 = "requestId";
+    private static final String KEY2 = "container";
+    private static final String SUBKEY = "subRequestId";
+
+    private static final String VALUEA_REQID = "hello";
+    private static final String VALUEA_SUBREQID = "world";
+
+    // request id is shared with value A
+    private static final String VALUEB_REQID = "hello";
+    private static final String VALUEB_SUBREQID = "another world";
+
+    // unique values
+    private static final String VALUEC_REQID = "bye";
+    private static final String VALUEC_SUBREQID = "bye-bye";
+
+    @Mock
+    private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener1;
+
+    @Mock
+    private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener1b;
+
+    @Mock
+    private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener2;
+
+    @Mock
+    private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener3;
+
+    private Forwarder forwarder;
+
+
+    /**
+     * Sets up.
+     */
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+        forwarder = new Forwarder(Arrays.asList(new SelectorKey(KEY1), new SelectorKey(KEY2, SUBKEY)));
+
+        forwarder.register(Arrays.asList(VALUEA_REQID, VALUEA_SUBREQID), listener1);
+        forwarder.register(Arrays.asList(VALUEA_REQID, VALUEA_SUBREQID), listener1b);
+        forwarder.register(Arrays.asList(VALUEB_REQID, VALUEB_SUBREQID), listener2);
+        forwarder.register(Arrays.asList(VALUEC_REQID, VALUEC_SUBREQID), listener3);
+    }
+
+    @Test
+    public void testRegister() {
+        // key size mismatches
+        assertThatIllegalArgumentException().isThrownBy(() -> forwarder.register(Arrays.asList(), listener1))
+                        .withMessage("key/value mismatch");
+        assertThatIllegalArgumentException()
+                        .isThrownBy(() -> forwarder.register(Arrays.asList(VALUEA_REQID), listener1))
+                        .withMessage("key/value mismatch");
+    }
+
+    @Test
+    public void testUnregister() {
+        // remove listener1b
+        forwarder.unregister(Arrays.asList(VALUEA_REQID, VALUEA_SUBREQID), listener1b);
+
+        StandardCoderObject sco = makeMessage(Map.of(KEY1, VALUEA_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID)));
+        forwarder.onMessage(INFRA, TEXT, sco);
+
+        verify(listener1).accept(INFRA, TEXT, sco);
+        verify(listener1b, never()).accept(any(), any(), any());
+
+        // remove listener1
+        forwarder.unregister(Arrays.asList(VALUEA_REQID, VALUEA_SUBREQID), listener1);
+        forwarder.onMessage(INFRA, TEXT, sco);
+
+        // route a message to listener2
+        sco = makeMessage(Map.of(KEY1, VALUEB_REQID, KEY2, Map.of(SUBKEY, VALUEB_SUBREQID)));
+        forwarder.onMessage(INFRA, TEXT, sco);
+        verify(listener2).accept(INFRA, TEXT, sco);
+
+        // no more messages to listener1 or 1b
+        verify(listener1).accept(any(), any(), any());
+        verify(listener1b, never()).accept(any(), any(), any());
+    }
+
+    @Test
+    public void testOnMessage() {
+        StandardCoderObject sco = makeMessage(Map.of(KEY1, VALUEA_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID)));
+        forwarder.onMessage(INFRA, TEXT, sco);
+
+        verify(listener1).accept(INFRA, TEXT, sco);
+        verify(listener1b).accept(INFRA, TEXT, sco);
+
+        // repeat - counts should increment
+        forwarder.onMessage(INFRA, TEXT, sco);
+
+        verify(listener1, times(2)).accept(INFRA, TEXT, sco);
+        verify(listener1b, times(2)).accept(INFRA, TEXT, sco);
+
+        // should not have been invoked
+        verify(listener2, never()).accept(any(), any(), any());
+        verify(listener3, never()).accept(any(), any(), any());
+
+        // try other listeners now
+        sco = makeMessage(Map.of(KEY1, VALUEB_REQID, KEY2, Map.of(SUBKEY, VALUEB_SUBREQID)));
+        forwarder.onMessage(INFRA, TEXT, sco);
+        verify(listener2).accept(INFRA, TEXT, sco);
+
+        sco = makeMessage(Map.of(KEY1, VALUEC_REQID, KEY2, Map.of(SUBKEY, VALUEC_SUBREQID)));
+        forwarder.onMessage(INFRA, TEXT, sco);
+        verify(listener3).accept(INFRA, TEXT, sco);
+
+        // message has no listeners
+        sco = makeMessage(Map.of(KEY1, "xyzzy", KEY2, Map.of(SUBKEY, VALUEB_SUBREQID)));
+        forwarder.onMessage(INFRA, TEXT, sco);
+
+        // message doesn't have both keys
+        sco = makeMessage(Map.of(KEY1, VALUEA_REQID));
+        forwarder.onMessage(INFRA, TEXT, sco);
+
+        // counts should not have incremented
+        verify(listener1, times(2)).accept(any(), any(), any());
+        verify(listener1b, times(2)).accept(any(), any(), any());
+        verify(listener2).accept(any(), any(), any());
+        verify(listener3).accept(any(), any(), any());
+
+        // listener throws an exception
+        doThrow(new IllegalStateException("expected exception")).when(listener1).accept(any(), any(), any());
+    }
+
+    /*
+     * Tests onMessage() when listener1 throws an exception.
+     */
+    @Test
+    public void testOnMessageListenerException1() {
+        doThrow(new IllegalStateException("expected exception")).when(listener1).accept(any(), any(), any());
+
+        StandardCoderObject sco = makeMessage(Map.of(KEY1, VALUEA_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID)));
+        forwarder.onMessage(INFRA, TEXT, sco);
+
+        verify(listener1b).accept(INFRA, TEXT, sco);
+    }
+
+    /*
+     * Tests onMessage() when listener1b throws an exception.
+     */
+    @Test
+    public void testOnMessageListenerException1b() {
+        doThrow(new IllegalStateException("expected exception")).when(listener1b).accept(any(), any(), any());
+
+        StandardCoderObject sco = makeMessage(Map.of(KEY1, VALUEA_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID)));
+        forwarder.onMessage(INFRA, TEXT, sco);
+
+        verify(listener1).accept(INFRA, TEXT, sco);
+    }
+
+    /**
+     * Makes a message from a map.
+     */
+    private StandardCoderObject makeMessage(Map<String, Object> map) {
+        return Util.translate("", map, StandardCoderObject.class);
+    }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/SelectorKeyTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/SelectorKeyTest.java
new file mode 100644 (file)
index 0000000..19df9c2
--- /dev/null
@@ -0,0 +1,93 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.topic;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.util.Map;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+
+public class SelectorKeyTest {
+    private static final String FIELD1 = "map";
+    private static final String FIELD2 = "abc";
+    private static final String FIELDX = "abd";
+
+    private SelectorKey key;
+
+    @Before
+    public void setUp() {
+        key = new SelectorKey(FIELD1, FIELD2);
+    }
+
+    @Test
+    public void testHashCode_testEquals() {
+        SelectorKey key2 = new SelectorKey(FIELD1, FIELD2);
+        assertEquals(key, key2);
+        assertEquals(key.hashCode(), key2.hashCode());
+
+        key2 = new SelectorKey(FIELD1, FIELDX);
+        assertNotEquals(key, key2);
+        assertNotEquals(key.hashCode(), key2.hashCode());
+
+        // test empty key
+        key = new SelectorKey();
+        key2 = new SelectorKey();
+        assertEquals(key, key2);
+        assertEquals(key.hashCode(), key2.hashCode());
+    }
+
+    @Test
+    public void testExtractField() {
+        Map<String, Object> map = Map.of("hello", "world", FIELD1, Map.of("another", "", FIELD2, "value B"));
+        StandardCoderObject sco = Util.translate("", map, StandardCoderObject.class);
+
+        String result = key.extractField(sco);
+        assertNotNull(result);
+        assertEquals("value B", result);
+
+        // shorter key
+        assertEquals("world", new SelectorKey("hello").extractField(sco));
+        assertNull(new SelectorKey("bye").extractField(sco));
+
+        // not found
+        assertNull(new SelectorKey(FIELD1, "not field 2").extractField(sco));
+
+        // test with empty key
+        assertNull(new SelectorKey().extractField(sco));
+    }
+
+    @Getter
+    @Setter
+    @Builder
+    protected static class Data {
+        private String text;
+        private Map<String, String> map;
+    }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImplTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImplTest.java
new file mode 100644 (file)
index 0000000..a370857
--- /dev/null
@@ -0,0 +1,154 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.topic;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import java.util.Arrays;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+
+public class TopicListenerImplTest {
+    private static final StandardCoder coder = new StandardCoder();
+    private static final CommInfrastructure INFRA = CommInfrastructure.NOOP;
+    private static final String MY_TOPIC = "my-topic";
+    private static final String KEY1 = "requestId";
+    private static final String KEY2 = "container";
+    private static final String SUBKEY = "subRequestId";
+
+    private static final String VALUEA_REQID = "hello";
+    private static final String VALUEA_SUBREQID = "world";
+
+    private static final String VALUEB_REQID = "bye";
+
+    private Forwarder forwarder1;
+    private Forwarder forwarder2;
+    private TopicListenerImpl topic;
+
+    @Mock
+    private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener1;
+
+    @Mock
+    private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener1b;
+
+    @Mock
+    private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener2;
+
+
+    /**
+     * Sets up.
+     */
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
+        topic = new TopicListenerImpl();
+
+        forwarder1 = topic.addForwarder(new SelectorKey(KEY1));
+        forwarder2 = topic.addForwarder(new SelectorKey(KEY1), new SelectorKey(KEY2, SUBKEY));
+
+        assertNotNull(forwarder1);
+        assertNotNull(forwarder2);
+        assertNotSame(forwarder1, forwarder2);
+
+        forwarder1.register(Arrays.asList(VALUEA_REQID), listener1);
+        forwarder1.register(Arrays.asList(VALUEB_REQID), listener1b);
+        forwarder2.register(Arrays.asList(VALUEA_REQID, VALUEA_SUBREQID), listener2);
+    }
+
+    @Test
+    public void testShutdown() {
+        // shut it down, which should clear all forwarders
+        topic.shutdown();
+
+        // should get a new forwarder now
+        Forwarder forwarder = topic.addForwarder(new SelectorKey(KEY1));
+        assertNotSame(forwarder1, forwarder);
+        assertNotSame(forwarder2, forwarder);
+
+        // new forwarder should be unchanged
+        assertSame(forwarder, topic.addForwarder(new SelectorKey(KEY1)));
+    }
+
+    @Test
+    public void testAddForwarder() {
+        assertSame(forwarder1, topic.addForwarder(new SelectorKey(KEY1)));
+        assertSame(forwarder2, topic.addForwarder(new SelectorKey(KEY1), new SelectorKey(KEY2, SUBKEY)));
+    }
+
+    @Test
+    public void testOnTopicEvent() {
+        /*
+         * send a message that should go to listener1 on forwarder1 and listener2 on
+         * forwarder2
+         */
+        String msg = makeMessage(Map.of(KEY1, VALUEA_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID)));
+        topic.onTopicEvent(INFRA, MY_TOPIC, msg);
+
+        verify(listener1).accept(eq(INFRA), eq(msg), any());
+        verify(listener2).accept(eq(INFRA), eq(msg), any());
+
+        // not to listener1b
+        verify(listener1b, never()).accept(any(), any(), any());
+
+        /*
+         * now send a message that should only go to listener1b on forwarder1
+         */
+        msg = makeMessage(Map.of(KEY1, VALUEB_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID)));
+        topic.onTopicEvent(INFRA, MY_TOPIC, msg);
+
+        // should route to listener1 on forwarder1 and listener2 on forwarder2
+        verify(listener1b).accept(eq(INFRA), eq(msg), any());
+
+        // try one where the coder throws an exception
+        topic.onTopicEvent(INFRA, MY_TOPIC, "{invalid-json");
+
+        // no extra invocations
+        verify(listener1).accept(any(), any(), any());
+        verify(listener1b).accept(any(), any(), any());
+        verify(listener2).accept(any(), any(), any());
+    }
+
+    /**
+     * Makes a message from a map.
+     */
+    private String makeMessage(Map<String, Object> map) {
+        try {
+            return coder.encode(map);
+        } catch (CoderException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairTest.java
new file mode 100644 (file)
index 0000000..c6557d0
--- /dev/null
@@ -0,0 +1,158 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.topic;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+
+public class TopicPairTest {
+    private static final String UNKNOWN = "unknown";
+    private static final String MY_SOURCE = "pair-source";
+    private static final String MY_TARGET = "pair-target";
+    private static final String TEXT = "some text";
+
+    @Mock
+    private TopicSink publisher1;
+
+    @Mock
+    private TopicSink publisher2;
+
+    @Mock
+    private TopicSource subscriber1;
+
+    @Mock
+    private TopicSource subscriber2;
+
+    @Mock
+    private TopicEndpoint mgr;
+
+    private TopicPair pair;
+
+
+    /**
+     * Sets up.
+     */
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
+        when(mgr.getTopicSinks(MY_TARGET)).thenReturn(Arrays.asList(publisher1, publisher2));
+        when(mgr.getTopicSources(eq(Arrays.asList(MY_SOURCE)))).thenReturn(Arrays.asList(subscriber1, subscriber2));
+
+        when(publisher1.getTopicCommInfrastructure()).thenReturn(CommInfrastructure.NOOP);
+        when(publisher2.getTopicCommInfrastructure()).thenReturn(CommInfrastructure.UEB);
+
+        pair = new MyTopicPair(MY_SOURCE, MY_TARGET);
+
+        pair.start();
+    }
+
+    @Test
+    public void testTopicPair_testGetSource_testGetTarget() {
+        assertEquals(MY_SOURCE, pair.getSource());
+        assertEquals(MY_TARGET, pair.getTarget());
+
+        verify(mgr).getTopicSinks(anyString());
+        verify(mgr).getTopicSources(any());
+
+        // source not found
+        assertThatIllegalArgumentException().isThrownBy(() -> new MyTopicPair(UNKNOWN, MY_TARGET))
+                        .withMessageContaining("sources").withMessageContaining(UNKNOWN);
+
+        // target not found
+        assertThatIllegalArgumentException().isThrownBy(() -> new MyTopicPair(MY_SOURCE, UNKNOWN))
+                        .withMessageContaining("sinks").withMessageContaining(UNKNOWN);
+    }
+
+    @Test
+    public void testShutdown() {
+        pair.shutdown();
+        verify(subscriber1).unregister(pair);
+        verify(subscriber2).unregister(pair);
+    }
+
+    @Test
+    public void testStart() {
+        verify(subscriber1).register(pair);
+        verify(subscriber2).register(pair);
+    }
+
+    @Test
+    public void testStop() {
+        pair.stop();
+        verify(subscriber1).unregister(pair);
+        verify(subscriber2).unregister(pair);
+    }
+
+    @Test
+    public void testPublish() {
+        List<CommInfrastructure> infrastructures = pair.publish(TEXT);
+        assertEquals(Arrays.asList(CommInfrastructure.NOOP, CommInfrastructure.UEB), infrastructures);
+
+        verify(publisher1).send(TEXT);
+        verify(publisher2).send(TEXT);
+
+        // first one throws an exception - should have only published to the second
+        when(publisher1.send(any())).thenThrow(new IllegalStateException("expected exception"));
+
+        infrastructures = pair.publish(TEXT);
+        assertEquals(Arrays.asList(CommInfrastructure.UEB), infrastructures);
+
+        verify(publisher2, times(2)).send(TEXT);
+    }
+
+    @Test
+    public void testGetTopicEndpointManager() {
+        // setting "mgr" to null should cause it to use the superclass' method
+        mgr = null;
+        assertNotNull(pair.getTopicEndpointManager());
+    }
+
+
+    private class MyTopicPair extends TopicPair {
+        public MyTopicPair(String source, String target) {
+            super(source, target);
+        }
+
+        @Override
+        protected TopicEndpoint getTopicEndpointManager() {
+            return (mgr != null ? mgr : super.getTopicEndpointManager());
+        }
+    }
+}
index 8604688..b11983b 100644 (file)
     <logger name="org.onap.policy.controlloop.actorserviceprovider.impl.HttpOperation" level="info" additivity="false">
         <appender-ref ref="STDOUT" />
     </logger>
+
+    <!-- this is required for TopicPairOperationTest -->
+    <logger
+            name="org.onap.policy.controlloop.actorserviceprovider.impl.TopicPairOperation"
+            level="info" additivity="false">
+        <appender-ref ref="STDOUT" />
+    </logger>
 </configuration>