Use BidirectionalTopicClient from policy-common 70/101770/5
authorJim Hahn <jrh3@att.com>
Fri, 14 Feb 2020 19:22:48 +0000 (14:22 -0500)
committerJim Hahn <jrh3@att.com>
Mon, 17 Feb 2020 16:30:45 +0000 (11:30 -0500)
Also modified "target" to sink in various places, and renamed
various uses of "pair" to "bidirectional" (e.g., TopicPairParams
=> BidirectionalTopicParams).
Also replaced MyExec with PseudoExecutor, from policy-common.
As part of this, extracted the logRequest and logResponse methods
from the Http and Topic classes, moving them into the common
OperationPartial class.
Modified A&AI, SDNC junit tests to use PseudoExecutor.
Added support for incomplete responses on Topics, where multiple
responses may be received for one request
Fixed a duplicate entry in actor.aai pom.

As the changes were already big enough, went ahead and also did the
following to support the APPC Actor:
- Reorganized parameter classes and content.
- Modified anyOf, allOf to take functions instead of futures and handle
  exceptions thrown by any of the functions.  Also added sequence() method.
- Deleted doTask.
- Modified ActorService.config to take a map of maps, not just a map.
- Decided NOT to move anyOf, allOf, and sequence from OperationPartial
  to a utility class, because they depend on "params".

Issue-ID: POLICY-2363
Signed-off-by: Jim Hahn <jrh3@att.com>
Change-Id: I5a8bae05dfef22fe71c57c58f265b9dac20df5c5

47 files changed:
models-interactions/model-actors/actor.aai/pom.xml
models-interactions/model-actors/actor.aai/src/main/java/org/onap/policy/controlloop/actor/aai/AaiCustomQueryOperation.java
models-interactions/model-actors/actor.aai/src/main/java/org/onap/policy/controlloop/actor/aai/AaiGetOperation.java
models-interactions/model-actors/actor.aai/src/test/java/org/onap/policy/controlloop/actor/aai/AaiCustomQueryOperationTest.java
models-interactions/model-actors/actor.aai/src/test/java/org/onap/policy/controlloop/actor/aai/AaiGetOperatorTest.java
models-interactions/model-actors/actor.appc/pom.xml
models-interactions/model-actors/actor.appc/src/main/java/org/onap/policy/controlloop/actor/appc/AppcActorServiceProvider.java
models-interactions/model-actors/actor.sdnc/src/main/java/org/onap/policy/controlloop/actor/sdnc/SdncOperation.java
models-interactions/model-actors/actor.sdnc/src/test/java/org/onap/policy/controlloop/actor/sdnc/BasicSdncOperator.java
models-interactions/model-actors/actor.test/pom.xml
models-interactions/model-actors/actor.test/src/main/java/org/onap/policy/controlloop/actor/test/BasicHttpOperation.java
models-interactions/model-actors/actor.test/src/test/java/org/onap/policy/controlloop/actor/test/BasicHttpOperationTest.java
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActor.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java [moved from models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java with 56% similarity]
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperator.java [moved from models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java with 67% similarity]
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java [deleted file]
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParams.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicParams.java [moved from models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParams.java with 77% similarity]
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/CommonActorParams.java [moved from models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java with 54% similarity]
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicHandler.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicManager.java [moved from models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java with 72% similarity]
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java [deleted file]
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/ActorServiceTest.java
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActorTest.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperationTest.java [moved from models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperationTest.java with 57% similarity]
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperatorTest.java [moved from models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperatorTest.java with 70% similarity]
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActorTest.java
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperationTest.java
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/MyExec.java [deleted file]
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartialTest.java
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParamsTest.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicParamsTest.java [moved from models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParamsTest.java with 77% similarity]
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/CommonActorParamsTest.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParamsTest.java [deleted file]
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicHandlerTest.java [moved from models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairTest.java with 50% similarity]
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/ForwarderTest.java
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImplTest.java
models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml

index deb0181..4e932a1 100644 (file)
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
-        <dependency>
-            <groupId>org.onap.policy.models.policy-models-interactions.model-impl</groupId>
-            <artifactId>aai</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
         <dependency>
             <groupId>org.onap.policy.common</groupId>
             <artifactId>policy-endpoints</artifactId>
index 5f43270..bc2dde9 100644 (file)
@@ -27,6 +27,8 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.onap.policy.aai.AaiConstants;
 import org.onap.policy.aai.AaiCqResponse;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
 import org.onap.policy.common.utils.coder.StandardCoderObject;
 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 import org.onap.policy.controlloop.actorserviceprovider.impl.HttpOperation;
@@ -85,7 +87,7 @@ public class AaiCustomQueryOperation extends HttpOperation<String> {
         headers.put("Accept", MediaType.APPLICATION_JSON);
         String url = makeUrl();
 
-        logRestRequest(url, request);
+        logMessage(EventType.OUT, CommInfrastructure.REST, url, request);
 
         // @formatter:off
         return handleResponse(outcome, url,
index 3bc359a..60a2820 100644 (file)
@@ -26,6 +26,8 @@ import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.onap.policy.aai.AaiConstants;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
 import org.onap.policy.common.utils.coder.StandardCoderObject;
 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 import org.onap.policy.controlloop.actorserviceprovider.impl.HttpOperation;
@@ -92,7 +94,7 @@ public class AaiGetOperation extends HttpOperation<StandardCoderObject> {
         headers.put("Accept", MediaType.APPLICATION_JSON);
         String url = makeUrl();
 
-        logRestRequest(url, null);
+        logMessage(EventType.OUT, CommInfrastructure.REST, url, null);
 
         // @formatter:off
         return handleResponse(outcome, url,
index eca5062..c95425e 100644 (file)
@@ -23,6 +23,7 @@ package org.onap.policy.controlloop.actor.aai;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.when;
 
@@ -30,7 +31,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -92,7 +94,7 @@ public class AaiCustomQueryOperationTest extends BasicAaiOperator<Map<String, St
 
         CompletableFuture<OperationOutcome> future2 = oper.start();
 
-        assertEquals(PolicyResult.SUCCESS, future2.get(5, TimeUnit.SECONDS).getResult());
+        assertEquals(PolicyResult.SUCCESS, getResult(future2));
 
         // tenant response should have been cached within the context
         assertNotNull(context.getProperty(AaiGetOperation.getTenantKey(TARGET_ENTITY)));
@@ -116,7 +118,7 @@ public class AaiCustomQueryOperationTest extends BasicAaiOperator<Map<String, St
 
         CompletableFuture<OperationOutcome> future2 = oper.start();
 
-        assertEquals(PolicyResult.SUCCESS, future2.get(5, TimeUnit.SECONDS).getResult());
+        assertEquals(PolicyResult.SUCCESS, getResult(future2));
 
         // should not have replaced tenant response
         assertSame(data, context.getProperty(AaiGetOperation.getTenantKey(TARGET_ENTITY)));
@@ -141,7 +143,7 @@ public class AaiCustomQueryOperationTest extends BasicAaiOperator<Map<String, St
 
         CompletableFuture<OperationOutcome> future2 = oper.start();
 
-        assertEquals(PolicyResult.FAILURE_EXCEPTION, future2.get(5, TimeUnit.SECONDS).getResult());
+        assertEquals(PolicyResult.FAILURE_EXCEPTION, getResult(future2));
     }
 
     private String makeTenantReply() throws Exception {
@@ -166,11 +168,20 @@ public class AaiCustomQueryOperationTest extends BasicAaiOperator<Map<String, St
         context.setProperty(AaiGetOperation.getTenantKey(TARGET_ENTITY), data);
     }
 
+    private PolicyResult getResult(CompletableFuture<OperationOutcome> future2)
+                    throws InterruptedException, ExecutionException, TimeoutException {
+
+        executor.runAll(100);
+        assertTrue(future2.isDone());
+
+        return future2.get().getResult();
+    }
+
     protected class MyTenantOperator extends HttpOperator {
         public MyTenantOperator() {
             super(AaiConstants.ACTOR_NAME, AaiGetOperation.TENANT);
 
-            HttpParams http = HttpParams.builder().clientName(MY_CLIENT).path(PATH).build();
+            HttpParams http = HttpParams.builder().clientName(MY_CLIENT).path(PATH).timeoutSec(1).build();
 
             configure(Util.translateToMap(AaiGetOperation.TENANT, http));
             start();
index ca90ce4..ebe9535 100644 (file)
@@ -24,12 +24,12 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.when;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 import org.junit.Before;
 import org.junit.Test;
 import org.onap.policy.aai.AaiConstants;
@@ -80,7 +80,10 @@ public class AaiGetOperatorTest extends BasicAaiOperator<Void> {
         CompletableFuture<OperationOutcome> future2 = oper.startOperationAsync(1, outcome);
         assertFalse(future2.isDone());
 
-        assertEquals(PolicyResult.SUCCESS, future2.get(5, TimeUnit.SECONDS).getResult());
+        executor.runAll(100);
+        assertTrue(future2.isDone());
+
+        assertEquals(PolicyResult.SUCCESS, future2.get().getResult());
 
         // data should have been cached within the context
         StandardCoderObject data = context.getProperty(AaiGetOperation.getTenantKey(TARGET_ENTITY));
@@ -102,7 +105,10 @@ public class AaiGetOperatorTest extends BasicAaiOperator<Void> {
         CompletableFuture<OperationOutcome> future2 = oper.startOperationAsync(1, outcome);
         assertFalse(future2.isDone());
 
-        assertEquals(PolicyResult.FAILURE, future2.get(5, TimeUnit.SECONDS).getResult());
+        executor.runAll(100);
+        assertTrue(future2.isDone());
+
+        assertEquals(PolicyResult.FAILURE, future2.get().getResult());
 
         // data should NOT have been cached within the context
         assertNull(context.getProperty(AaiGetOperation.getTenantKey(TARGET_ENTITY)));
index 74bff9a..26eb7c1 100644 (file)
   ============LICENSE_END=========================================================
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
 
-  <parent>
-   <groupId>org.onap.policy.models.policy-models-interactions.model-actors</groupId>
-    <artifactId>model-actors</artifactId>
-    <version>2.2.1-SNAPSHOT</version>
-  </parent>
+    <parent>
+        <groupId>org.onap.policy.models.policy-models-interactions.model-actors</groupId>
+        <artifactId>model-actors</artifactId>
+        <version>2.2.1-SNAPSHOT</version>
+    </parent>
 
-  <artifactId>actor.appc</artifactId>
+    <artifactId>actor.appc</artifactId>
 
-  <dependencies>
-    <dependency>
-     <groupId>org.onap.policy.models.policy-models-interactions.model-actors</groupId>
-      <artifactId>actorServiceProvider</artifactId>
-      <version>${project.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.onap.policy.models.policy-models-interactions.model-impl</groupId>
-      <artifactId>appc</artifactId>
-      <version>${project.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.onap.policy.models.policy-models-interactions.model-impl</groupId>
-      <artifactId>events</artifactId>
-      <version>${project.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.google.code.gson</groupId>
-      <artifactId>gson</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.onap.policy.models.policy-models-interactions</groupId>
-      <artifactId>simulators</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.onap.policy.common</groupId>
-      <artifactId>policy-endpoints</artifactId>
-      <version>${policy.common.version}</version>
-      <scope>provided</scope>
-    </dependency>
-  </dependencies>
+    <dependencies>
+        <dependency>
+            <groupId>org.onap.policy.models.policy-models-interactions.model-actors</groupId>
+            <artifactId>actorServiceProvider</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.onap.policy.models.policy-models-interactions.model-impl</groupId>
+            <artifactId>appc</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.onap.policy.models.policy-models-interactions.model-impl</groupId>
+            <artifactId>aai</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.onap.policy.models.policy-models-interactions.model-impl</groupId>
+            <artifactId>events</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.onap.policy.models.policy-models-interactions.model-actors</groupId>
+            <artifactId>actor.aai</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.onap.policy.models.policy-models-interactions</groupId>
+            <artifactId>simulators</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.onap.policy.common</groupId>
+            <artifactId>policy-endpoints</artifactId>
+            <version>${policy.common.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
 </project>
index 0da1e2a..2491c33 100644 (file)
@@ -33,17 +33,19 @@ import org.onap.policy.common.utils.coder.CoderException;
 import org.onap.policy.common.utils.coder.StandardCoder;
 import org.onap.policy.controlloop.ControlLoopOperation;
 import org.onap.policy.controlloop.VirtualControlLoopEvent;
-import org.onap.policy.controlloop.actorserviceprovider.impl.ActorImpl;
+import org.onap.policy.controlloop.actorserviceprovider.impl.BidirectionalTopicActor;
 import org.onap.policy.controlloop.policy.Policy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class AppcActorServiceProvider extends ActorImpl {
+public class AppcActorServiceProvider extends BidirectionalTopicActor {
     private static final String NAME = "APPC";
 
     private static final Logger logger = LoggerFactory.getLogger(AppcActorServiceProvider.class);
 
+    // TODO old code: remove lines down to **HERE**
+
     private static final StandardCoder coder = new StandardCoder();
 
     // Strings for targets
@@ -57,17 +59,26 @@ public class AppcActorServiceProvider extends ActorImpl {
     private static final String RECIPE_MODIFY = "ModifyConfig";
 
     private static final ImmutableList<String> recipes =
-            ImmutableList.of(RECIPE_RESTART, RECIPE_REBUILD, RECIPE_MIGRATE, RECIPE_MODIFY);
+                    ImmutableList.of(RECIPE_RESTART, RECIPE_REBUILD, RECIPE_MIGRATE, RECIPE_MODIFY);
     private static final ImmutableMap<String, List<String>> targets = new ImmutableMap.Builder<String, List<String>>()
-            .put(RECIPE_RESTART, ImmutableList.of(TARGET_VM)).put(RECIPE_REBUILD, ImmutableList.of(TARGET_VM))
-            .put(RECIPE_MIGRATE, ImmutableList.of(TARGET_VM)).put(RECIPE_MODIFY, ImmutableList.of(TARGET_VNF)).build();
+                    .put(RECIPE_RESTART, ImmutableList.of(TARGET_VM)).put(RECIPE_REBUILD, ImmutableList.of(TARGET_VM))
+                    .put(RECIPE_MIGRATE, ImmutableList.of(TARGET_VM)).put(RECIPE_MODIFY, ImmutableList.of(TARGET_VNF))
+                    .build();
     private static final ImmutableMap<String, List<String>> payloads = new ImmutableMap.Builder<String, List<String>>()
-            .put(RECIPE_MODIFY, ImmutableList.of("generic-vnf.vnf-id")).build();
+                    .put(RECIPE_MODIFY, ImmutableList.of("generic-vnf.vnf-id")).build();
+
+    // **HERE**
 
+    /**
+     * Constructs the object.
+     */
     public AppcActorServiceProvider() {
         super(NAME);
     }
 
+
+    // TODO old code: remove lines down to **HERE**
+
     @Override
     public String actor() {
         return NAME;
@@ -89,17 +100,19 @@ public class AppcActorServiceProvider extends ActorImpl {
     }
 
     /**
-     * Constructs an APPC request conforming to the legacy API. The legacy API will be deprecated in
-     * future releases as all legacy functionality is moved into the LCM API.
+     * Constructs an APPC request conforming to the legacy API. The legacy API will be
+     * deprecated in future releases as all legacy functionality is moved into the LCM
+     * API.
      *
      * @param onset the event that is reporting the alert for policy to perform an action
-     * @param operation the control loop operation specifying the actor, operation, target, etc.
-     * @param policy the policy the was specified from the yaml generated by CLAMP or through the
-     *        Policy GUI/API
+     * @param operation the control loop operation specifying the actor, operation,
+     *        target, etc.
+     * @param policy the policy the was specified from the yaml generated by CLAMP or
+     *        through the Policy GUI/API
      * @return an APPC request conforming to the legacy API
      */
     public static Request constructRequest(VirtualControlLoopEvent onset, ControlLoopOperation operation, Policy policy,
-            String targetVnf) {
+                    String targetVnf) {
         /*
          * Construct an APPC request
          */
@@ -144,4 +157,5 @@ public class AppcActorServiceProvider extends ActorImpl {
         }
     }
 
+    // **HERE**
 }
index 9d42c49..406722e 100644 (file)
@@ -25,6 +25,8 @@ import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 import org.onap.policy.controlloop.actorserviceprovider.impl.HttpOperation;
 import org.onap.policy.controlloop.actorserviceprovider.impl.HttpOperator;
@@ -47,6 +49,14 @@ public abstract class SdncOperation extends HttpOperation<SdncResponse> {
         super(params, operator, SdncResponse.class);
     }
 
+    /**
+     * Starts the GUARD.
+     */
+    @Override
+    protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
+        return startGuardAsync();
+    }
+
     @Override
     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
 
@@ -59,7 +69,7 @@ public abstract class SdncOperation extends HttpOperation<SdncResponse> {
         headers.put("Accept", MediaType.APPLICATION_JSON);
         String url = makeUrl();
 
-        logRestRequest(url, request);
+        logMessage(EventType.OUT, CommInfrastructure.REST, url, request);
 
         // @formatter:off
         return handleResponse(outcome, url,
index d8c707c..deafc4e 100644 (file)
@@ -23,6 +23,7 @@ package org.onap.policy.controlloop.actor.sdnc;
 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -30,7 +31,6 @@ import static org.mockito.Mockito.when;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiFunction;
 import org.onap.policy.common.utils.coder.CoderException;
@@ -100,7 +100,10 @@ public abstract class BasicSdncOperator extends BasicHttpOperation<SdncRequest>
         verify(client).post(callbackCaptor.capture(), any(), requestCaptor.capture(), any());
         callbackCaptor.getValue().completed(rawResponse);
 
-        assertEquals(PolicyResult.SUCCESS, future2.get(5, TimeUnit.SECONDS).getResult());
+        executor.runAll(100);
+        assertTrue(future2.isDone());
+
+        assertEquals(PolicyResult.SUCCESS, future2.get().getResult());
 
         return requestCaptor.getValue().getEntity();
     }
index 6b05807..3a10fa3 100644 (file)
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.onap.policy.common</groupId>
+            <artifactId>utils-test</artifactId>
+            <version>${policy.common.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.powermock</groupId>
             <artifactId>powermock-api-mockito2</artifactId>
index cd9681c..e160479 100644 (file)
@@ -36,6 +36,7 @@ import org.mockito.MockitoAnnotations;
 import org.mockito.stubbing.Answer;
 import org.onap.policy.common.endpoints.http.client.HttpClient;
 import org.onap.policy.common.endpoints.http.client.HttpClientFactory;
+import org.onap.policy.common.utils.time.PseudoExecutor;
 import org.onap.policy.controlloop.VirtualControlLoopEvent;
 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
@@ -90,6 +91,7 @@ public class BasicHttpOperation<Q> {
     protected VirtualControlLoopEvent event;
     protected ControlLoopEventContext context;
     protected OperationOutcome outcome;
+    protected PseudoExecutor executor;
 
     /**
      * Constructs the object using a default actor and operation name.
@@ -123,6 +125,8 @@ public class BasicHttpOperation<Q> {
         future = new CompletableFuture<>();
         when(client.getBaseUrl()).thenReturn(BASE_URI);
 
+        executor = new PseudoExecutor();
+
         makeContext();
 
         outcome = params.makeOutcome();
@@ -133,6 +137,8 @@ public class BasicHttpOperation<Q> {
     /**
      * Reinitializes {@link #enrichment}, {@link #event}, {@link #context}, and
      * {@link #params}.
+     * <p/>
+     * Note: {@link #params} is configured to use {@link #executor}.
      */
     protected void makeContext() {
         enrichment = new TreeMap<>(makeEnrichment());
@@ -143,8 +149,8 @@ public class BasicHttpOperation<Q> {
 
         context = new ControlLoopEventContext(event);
 
-        params = ControlLoopOperationParams.builder().context(context).actorService(service).actor(actorName)
-                        .operation(operationName).targetEntity(TARGET_ENTITY).build();
+        params = ControlLoopOperationParams.builder().executor(executor).context(context).actorService(service)
+                        .actor(actorName).operation(operationName).targetEntity(TARGET_ENTITY).build();
     }
 
     /**
index 1de2f92..096b8b8 100644 (file)
@@ -67,6 +67,7 @@ public class BasicHttpOperationTest {
         assertEquals(BasicHttpOperation.BASE_URI, oper.client.getBaseUrl());
         assertNotNull(oper.context);
         assertNotNull(oper.outcome);
+        assertNotNull(oper.executor);
         assertTrue(oper.operator.isAlive());
     }
 
@@ -83,6 +84,7 @@ public class BasicHttpOperationTest {
 
         assertSame(oper.context, oper.params.getContext());
         assertSame(oper.service, oper.params.getActorService());
+        assertSame(oper.executor, oper.params.getExecutor());
         assertEquals(ACTOR, oper.params.getActor());
         assertEquals(OPERATION, oper.params.getOperation());
         assertEquals(BasicHttpOperation.TARGET_ENTITY, oper.params.getTargetEntity());
index 2886b1f..24c2cfc 100644 (file)
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
  * {@link #start()} to start all of the actors. When finished using the actor service,
  * invoke {@link #stop()} or {@link #shutdown()}.
  */
-public class ActorService extends StartConfigPartial<Map<String, Object>> {
+public class ActorService extends StartConfigPartial<Map<String, Map<String, Object>>> {
     private static final Logger logger = LoggerFactory.getLogger(ActorService.class);
 
     private final Map<String, Actor> name2actor;
@@ -116,14 +116,14 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> {
     }
 
     @Override
-    protected void doConfigure(Map<String, Object> parameters) {
+    protected void doConfigure(Map<String, Map<String, Object>> parameters) {
         logger.info("configuring actors");
 
         BeanValidationResult valres = new BeanValidationResult("ActorService", parameters);
 
         for (Actor actor : name2actor.values()) {
             String actorName = actor.getName();
-            Map<String, Object> subparams = Util.translateToMap(actorName, parameters.get(actorName));
+            Map<String, Object> subparams = parameters.get(actorName);
 
             if (subparams != null) {
 
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActor.java
new file mode 100644 (file)
index 0000000..1e44a17
--- /dev/null
@@ -0,0 +1,108 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicActorParams;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicManager;
+
+/**
+ * Actor that uses a bidirectional topic. The actor's parameters must be a
+ * {@link BidirectionalTopicActorParams}.
+ */
+public class BidirectionalTopicActor extends ActorImpl implements BidirectionalTopicManager {
+
+    /**
+     * Maps a pair of sink and source topic names to their bidirectional topic.
+     */
+    private final Map<Pair<String, String>, BidirectionalTopicHandler> params2topic = new ConcurrentHashMap<>();
+
+
+    /**
+     * Constructs the object.
+     *
+     * @param name actor's name
+     */
+    public BidirectionalTopicActor(String name) {
+        super(name);
+    }
+
+    @Override
+    protected void doStart() {
+        params2topic.values().forEach(BidirectionalTopicHandler::start);
+        super.doStart();
+    }
+
+    @Override
+    protected void doStop() {
+        params2topic.values().forEach(BidirectionalTopicHandler::stop);
+        super.doStop();
+    }
+
+    @Override
+    protected void doShutdown() {
+        params2topic.values().forEach(BidirectionalTopicHandler::shutdown);
+        params2topic.clear();
+        super.doShutdown();
+    }
+
+    @Override
+    public BidirectionalTopicHandler getTopicHandler(String sinkTopic, String sourceTopic) {
+        Pair<String, String> key = Pair.of(sinkTopic, sourceTopic);
+
+        return params2topic.computeIfAbsent(key, pair -> {
+            try {
+                return makeTopicHandler(sinkTopic, sourceTopic);
+            } catch (BidirectionalTopicClientException e) {
+                throw new IllegalArgumentException(e);
+            }
+        });
+    }
+
+    /**
+     * Translates the parameters to a {@link BidirectionalTopicActorParams} and then
+     * creates a function that will extract operator-specific parameters.
+     */
+    @Override
+    protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) {
+        String actorName = getName();
+
+        // @formatter:off
+        return Util.translate(actorName, actorParameters, BidirectionalTopicActorParams.class)
+                        .doValidation(actorName)
+                        .makeOperationParameters(actorName);
+        // @formatter:on
+    }
+
+    // may be overridden by junit tests
+
+    protected BidirectionalTopicHandler makeTopicHandler(String sinkTopic, String sourceTopic)
+                    throws BidirectionalTopicClientException {
+
+        return new BidirectionalTopicHandler(sinkTopic, sourceTopic);
+    }
+}
@@ -24,40 +24,42 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 import lombok.Getter;
-import org.apache.commons.lang3.tuple.Triple;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
-import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
-import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
 import org.onap.policy.common.utils.coder.CoderException;
-import org.onap.policy.common.utils.coder.StandardCoder;
 import org.onap.policy.common.utils.coder.StandardCoderObject;
 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams;
 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
-import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
 import org.onap.policy.controlloop.policy.PolicyResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Operation that uses a Topic pair.
+ * Operation that uses a bidirectional topic.
  *
  * @param <S> response type
  */
 @Getter
-public abstract class TopicPairOperation<Q, S> extends OperationPartial {
-    private static final Logger logger = LoggerFactory.getLogger(TopicPairOperation.class);
-    private static final Coder coder = new StandardCoder();
+public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial {
+    private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicOperation.class);
+
+    /**
+     * Response status.
+     */
+    public enum Status {
+        SUCCESS, FAILURE, STILL_WAITING
+    }
 
     // fields extracted from the operator
 
-    private final TopicPair topicPair;
+    private final BidirectionalTopicHandler topicHandler;
     private final Forwarder forwarder;
-    private final TopicPairParams pairParams;
+    private final BidirectionalTopicParams topicParams;
     private final long timeoutMs;
 
     /**
@@ -73,13 +75,14 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial {
      * @param operator operator that created this operation
      * @param clazz response class
      */
-    public TopicPairOperation(ControlLoopOperationParams params, TopicPairOperator operator, Class<S> clazz) {
+    public BidirectionalTopicOperation(ControlLoopOperationParams params, BidirectionalTopicOperator operator,
+                    Class<S> clazz) {
         super(params, operator);
-        this.topicPair = operator.getTopicPair();
+        this.topicHandler = operator.getTopicHandler();
         this.forwarder = operator.getForwarder();
-        this.pairParams = operator.getParams();
+        this.topicParams = operator.getParams();
         this.responseClass = clazz;
-        this.timeoutMs = TimeUnit.MILLISECONDS.convert(pairParams.getTimeoutSec(), TimeUnit.SECONDS);
+        this.timeoutMs = TimeUnit.MILLISECONDS.convert(topicParams.getTimeoutSec(), TimeUnit.SECONDS);
     }
 
     /**
@@ -101,18 +104,17 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial {
         final List<String> expectedKeyValues = getExpectedKeyValues(attempt, request);
 
         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-        final CompletableFuture<Triple<CommInfrastructure, String, StandardCoderObject>> future =
-                        new CompletableFuture<>();
         final Executor executor = params.getExecutor();
 
         // register a listener BEFORE publishing
 
-        // @formatter:off
-        TriConsumer<CommInfrastructure, String, StandardCoderObject> listener =
-            (infra, rawResponse, scoResponse) -> future.complete(Triple.of(infra, rawResponse, scoResponse));
-        // @formatter:on
-
-        // TODO this currently only allows a single matching response
+        BiConsumer<String, StandardCoderObject> listener = (rawResponse, scoResponse) -> {
+            OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse);
+            if (latestOutcome != null) {
+                // final response - complete the controller
+                controller.completeAsync(() -> latestOutcome, executor);
+            }
+        };
 
         forwarder.register(expectedKeyValues, listener);
 
@@ -128,16 +130,6 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial {
             throw e;
         }
 
-
-        // once "future" completes, process the response, and then complete the controller
-
-        // @formatter:off
-        future.thenApplyAsync(
-            triple -> processResponse(triple.getLeft(), outcome, triple.getMiddle(), triple.getRight()),
-                            executor)
-                        .whenCompleteAsync(controller.delayedComplete(), executor);
-        // @formatter:on
-
         return controller;
     }
 
@@ -175,12 +167,11 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial {
             throw new IllegalArgumentException("cannot encode request", e);
         }
 
-        List<CommInfrastructure> list = topicPair.publish(json);
-        if (list.isEmpty()) {
+        if (!topicHandler.send(json)) {
             throw new IllegalStateException("nothing published");
         }
 
-        logTopicRequest(list, request);
+        logMessage(EventType.OUT, topicHandler.getSinkTopicCommInfrastructure(), topicHandler.getSinkTopic(), request);
     }
 
     /**
@@ -190,15 +181,17 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial {
      * @param outcome outcome to be populated
      * @param response raw response to process
      * @param scoResponse response, as a {@link StandardCoderObject}
-     * @return the outcome
+     * @return the outcome, or {@code null} if still waiting for completion
      */
-    protected OperationOutcome processResponse(CommInfrastructure infra, OperationOutcome outcome, String rawResponse,
+    protected OperationOutcome processResponse(OperationOutcome outcome, String rawResponse,
                     StandardCoderObject scoResponse) {
 
         logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId());
 
-        logTopicResponse(infra, rawResponse);
+        logMessage(EventType.IN, topicHandler.getSourceTopicCommInfrastructure(), topicHandler.getSourceTopic(),
+                        rawResponse);
 
+        // decode the response
         S response;
         if (responseClass == String.class) {
             response = responseClass.cast(rawResponse);
@@ -216,17 +209,26 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial {
             }
         }
 
-        if (!isSuccess(rawResponse, response)) {
-            logger.info("{}.{} request failed  for {}", params.getActor(), params.getOperation(),
-                            params.getRequestId());
-            return setOutcome(outcome, PolicyResult.FAILURE);
-        }
+        // check its status
+        switch (detmStatus(rawResponse, response)) {
+            case SUCCESS:
+                logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(),
+                                params.getRequestId());
+                setOutcome(outcome, PolicyResult.SUCCESS);
+                postProcessResponse(outcome, rawResponse, response);
+                return outcome;
 
-        logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId());
-        setOutcome(outcome, PolicyResult.SUCCESS);
-        postProcessResponse(outcome, rawResponse, response);
+            case FAILURE:
+                logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
+                                params.getRequestId());
+                return setOutcome(outcome, PolicyResult.FAILURE);
 
-        return outcome;
+            case STILL_WAITING:
+            default:
+                logger.info("{}.{} request incomplete for {}", params.getActor(), params.getOperation(),
+                                params.getRequestId());
+                return null;
+        }
     }
 
     /**
@@ -241,76 +243,11 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial {
     }
 
     /**
-     * Determines if the response indicates success.
+     * Determines the status of the response.
      *
      * @param rawResponse raw response
      * @param response decoded response
-     * @return {@code true} if the response indicates success, {@code false} otherwise
-     */
-    protected abstract boolean isSuccess(String rawResponse, S response);
-
-    /**
-     * Logs a TOPIC request. If the request is not of type, String, then it attempts to
-     * pretty-print it into JSON before logging.
-     *
-     * @param infrastructures list of communication infrastructures on which it was
-     *        published
-     * @param request request to be logged
-     */
-    protected void logTopicRequest(List<CommInfrastructure> infrastructures, Q request) {
-        if (infrastructures.isEmpty()) {
-            return;
-        }
-
-        String json;
-        try {
-            if (request == null) {
-                json = null;
-            } else if (request instanceof String) {
-                json = request.toString();
-            } else {
-                json = makeCoder().encode(request, true);
-            }
-
-        } catch (CoderException e) {
-            logger.warn("cannot pretty-print request", e);
-            json = request.toString();
-        }
-
-        for (CommInfrastructure infra : infrastructures) {
-            logger.info("[OUT|{}|{}|]{}{}", infra, pairParams.getTarget(), NetLoggerUtil.SYSTEM_LS, json);
-        }
-    }
-
-    /**
-     * Logs a TOPIC response. If the response is not of type, String, then it attempts to
-     * pretty-print it into JSON before logging.
-     *
-     * @param infra communication infrastructure on which the response was received
-     * @param response response to be logged
+     * @return the status of the response
      */
-    protected <T> void logTopicResponse(CommInfrastructure infra, T response) {
-        String json;
-        try {
-            if (response == null) {
-                json = null;
-            } else if (response instanceof String) {
-                json = response.toString();
-            } else {
-                json = makeCoder().encode(response, true);
-            }
-
-        } catch (CoderException e) {
-            logger.warn("cannot pretty-print response", e);
-            json = response.toString();
-        }
-
-        logger.info("[IN|{}|{}|]{}{}", infra, pairParams.getSource(), NetLoggerUtil.SYSTEM_LS, json);
-    }
-
-    // these may be overridden by junit tests
-
-    protected Coder makeCoder() {
-        return coder;
-    }
+    protected abstract Status detmStatus(String rawResponse, S response);
 }
@@ -28,24 +28,24 @@ import lombok.Getter;
 import org.onap.policy.common.parameters.ValidationResult;
 import org.onap.policy.controlloop.actorserviceprovider.Operation;
 import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicManager;
 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
 import org.onap.policy.controlloop.actorserviceprovider.topic.SelectorKey;
-import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
-import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager;
 
 /**
- * Operator that uses a pair of topics, one for publishing the request, and another for
- * receiving the response. Topic operators may share a {@link TopicPair}.
+ * Operator that uses a bidirectional topic. Topic operators may share a
+ * {@link BidirectionalTopicHandler}.
  */
-public abstract class TopicPairOperator extends OperatorPartial {
+public abstract class BidirectionalTopicOperator extends OperatorPartial {
 
     /**
-     * Manager from which to get the topic pair.
+     * Manager from which to get the topic handlers.
      */
-    private final TopicPairManager pairManager;
+    private final BidirectionalTopicManager topicManager;
 
     /**
      * Keys used to extract the fields used to select responses for this operator.
@@ -62,13 +62,13 @@ public abstract class TopicPairOperator extends OperatorPartial {
      * will not, thus operations may copy it.
      */
     @Getter
-    private TopicPairParams params;
+    private BidirectionalTopicParams params;
 
     /**
-     * Topic pair associated with the parameters.
+     * Topic handler associated with the parameters.
      */
     @Getter
-    private TopicPair topicPair;
+    private BidirectionalTopicHandler topicHandler;
 
     /**
      * Forwarder associated with the parameters.
@@ -82,27 +82,27 @@ public abstract class TopicPairOperator extends OperatorPartial {
      *
      * @param actorName name of the actor with which this operator is associated
      * @param name operation name
-     * @param pairManager manager from which to get the topic pair
+     * @param topicManager manager from which to get the topic handler
      * @param selectorKeys keys used to extract the fields used to select responses for
      *        this operator
      */
-    public TopicPairOperator(String actorName, String name, TopicPairManager pairManager,
+    public BidirectionalTopicOperator(String actorName, String name, BidirectionalTopicManager topicManager,
                     List<SelectorKey> selectorKeys) {
         super(actorName, name);
-        this.pairManager = pairManager;
+        this.topicManager = topicManager;
         this.selectorKeys = selectorKeys;
     }
 
     @Override
     protected void doConfigure(Map<String, Object> parameters) {
-        params = Util.translate(getFullName(), parameters, TopicPairParams.class);
+        params = Util.translate(getFullName(), parameters, BidirectionalTopicParams.class);
         ValidationResult result = params.validate(getFullName());
         if (!result.isValid()) {
             throw new ParameterValidationRuntimeException("invalid parameters", result);
         }
 
-        topicPair = pairManager.getTopicPair(params.getSource(), params.getTarget());
-        forwarder = topicPair.addForwarder(selectorKeys);
+        topicHandler = topicManager.getTopicHandler(params.getSinkTopic(), params.getSourceTopic());
+        forwarder = topicHandler.addForwarder(selectorKeys);
     }
 
     /**
@@ -112,19 +112,21 @@ public abstract class TopicPairOperator extends OperatorPartial {
      * @param <S> response type
      * @param actorName actor name
      * @param operation operation name
-     * @param pairManager manager from which to get the topic pair
+     * @param topicManager manager from which to get the topic handler
      * @param operationMaker function to make an operation
      * @param keys keys used to extract the fields used to select responses for this
      *        operator
      * @return a new operator
      */
     // @formatter:off
-    public static <Q,S> TopicPairOperator makeOperator(String actorName, String operation, TopicPairManager pairManager,
-                    BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<Q,S>> operationMaker,
+    public static <Q, S> BidirectionalTopicOperator makeOperator(String actorName, String operation,
+                    BidirectionalTopicManager topicManager,
+                    BiFunction<ControlLoopOperationParams, BidirectionalTopicOperator,
+                        BidirectionalTopicOperation<Q, S>> operationMaker,
                     SelectorKey... keys) {
         // @formatter:off
 
-        return makeOperator(actorName, operation, pairManager, Arrays.asList(keys), operationMaker);
+        return makeOperator(actorName, operation, topicManager, Arrays.asList(keys), operationMaker);
     }
 
     /**
@@ -134,19 +136,21 @@ public abstract class TopicPairOperator extends OperatorPartial {
      * @param <S> response type
      * @param actorName actor name
      * @param operation operation name
-     * @param pairManager manager from which to get the topic pair
+     * @param topicManager manager from which to get the topic handler
      * @param keys keys used to extract the fields used to select responses for
      *        this operator
      * @param operationMaker function to make an operation
      * @return a new operator
      */
     // @formatter:off
-    public static <Q,S> TopicPairOperator makeOperator(String actorName, String operation, TopicPairManager pairManager,
+    public static <Q,S> BidirectionalTopicOperator makeOperator(String actorName, String operation,
+                    BidirectionalTopicManager topicManager,
                     List<SelectorKey> keys,
-                    BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<Q,S>> operationMaker) {
+                    BiFunction<ControlLoopOperationParams, BidirectionalTopicOperator,
+                        BidirectionalTopicOperation<Q,S>> operationMaker) {
         // @formatter:on
 
-        return new TopicPairOperator(actorName, operation, pairManager, keys) {
+        return new BidirectionalTopicOperator(actorName, operation, topicManager, keys) {
             @Override
             public synchronized Operation buildOperation(ControlLoopOperationParams params) {
                 return operationMaker.apply(params, this);
index ba75f0b..f1829d7 100644 (file)
@@ -33,9 +33,7 @@ import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 import org.onap.policy.common.endpoints.http.client.HttpClient;
 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
-import org.onap.policy.common.utils.coder.Coder;
 import org.onap.policy.common.utils.coder.CoderException;
-import org.onap.policy.common.utils.coder.StandardCoder;
 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams;
@@ -52,7 +50,6 @@ import org.slf4j.LoggerFactory;
 @Getter
 public abstract class HttpOperation<T> extends OperationPartial {
     private static final Logger logger = LoggerFactory.getLogger(HttpOperation.class);
-    private static final Coder coder = new StandardCoder();
 
     /**
      * Operator that created this operation.
@@ -171,7 +168,7 @@ public abstract class HttpOperation<T> extends OperationPartial {
 
         String strResponse = HttpClient.getBody(rawResponse, String.class);
 
-        logRestResponse(url, strResponse);
+        logMessage(EventType.IN, CommInfrastructure.REST, url, strResponse);
 
         T response;
         if (responseClass == String.class) {
@@ -224,63 +221,10 @@ public abstract class HttpOperation<T> extends OperationPartial {
         return (rawResponse.getStatus() == 200);
     }
 
-    /**
-     * Logs a REST request. If the request is not of type, String, then it attempts to
-     * pretty-print it into JSON before logging.
-     *
-     * @param url request URL
-     * @param request request to be logged
-     */
-    public <Q> void logRestRequest(String url, Q request) {
-        String json;
-        try {
-            if (request == null) {
-                json = null;
-            } else if (request instanceof String) {
-                json = request.toString();
-            } else {
-                json = makeCoder().encode(request, true);
-            }
-
-        } catch (CoderException e) {
-            logger.warn("cannot pretty-print request", e);
-            json = request.toString();
-        }
-
-        NetLoggerUtil.log(EventType.OUT, CommInfrastructure.REST, url, json);
-        logger.info("[OUT|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json);
-    }
-
-    /**
-     * Logs a REST response. If the response is not of type, String, then it attempts to
-     * pretty-print it into JSON before logging.
-     *
-     * @param url request URL
-     * @param response response to be logged
-     */
-    public <S> void logRestResponse(String url, S response) {
-        String json;
-        try {
-            if (response == null) {
-                json = null;
-            } else if (response instanceof String) {
-                json = response.toString();
-            } else {
-                json = makeCoder().encode(response, true);
-            }
-
-        } catch (CoderException e) {
-            logger.warn("cannot pretty-print response", e);
-            json = response.toString();
-        }
-
-        NetLoggerUtil.log(EventType.IN, CommInfrastructure.REST, url, json);
-        logger.info("[IN|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json);
-    }
-
-    // these may be overridden by junit tests
-
-    protected Coder makeCoder() {
-        return coder;
+    @Override
+    public <Q> String logMessage(EventType direction, CommInfrastructure infra, String sink, Q request) {
+        String json = super.logMessage(direction, infra, sink, request);
+        NetLoggerUtil.log(direction, infra, sink, json);
+        return json;
     }
 }
index d00b88b..0b34971 100644 (file)
 
 package org.onap.policy.controlloop.actorserviceprovider.impl;
 
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
@@ -28,6 +33,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
 import org.onap.policy.controlloop.ControlLoopOperation;
 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
 import org.onap.policy.controlloop.actorserviceprovider.Operation;
@@ -53,8 +66,8 @@ import org.slf4j.LoggerFactory;
  * be done to cancel that particular operation.
  */
 public abstract class OperationPartial implements Operation {
-
     private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
+    private static final Coder coder = new StandardCoder();
     public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
 
     // values extracted from the operator
@@ -470,103 +483,110 @@ public abstract class OperationPartial implements Operation {
      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
      * any outstanding futures when one completes.
      *
-     * @param futures futures for which to wait
-     * @return a future to cancel or await an outcome. If this future is canceled, then
-     *         all of the futures will be canceled
+     * @param futureMakers function to make a future. If the function returns
+     *        {@code null}, then no future is created for that function. On the other
+     *        hand, if the function throws an exception, then the previously created
+     *        functions are canceled and the exception is re-thrown
+     * @return a future to cancel or await an outcome, or {@code null} if no futures were
+     *         created. If this future is canceled, then all of the futures will be
+     *         canceled
      */
-    protected CompletableFuture<OperationOutcome> anyOf(List<CompletableFuture<OperationOutcome>> futures) {
-
-        // convert list to an array
-        @SuppressWarnings("rawtypes")
-        CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
+    protected CompletableFuture<OperationOutcome> anyOf(
+                    @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
 
-        @SuppressWarnings("unchecked")
-        CompletableFuture<OperationOutcome> result = anyOf(arrFutures);
-        return result;
+        return anyOf(Arrays.asList(futureMakers));
     }
 
     /**
-     * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any
-     * outstanding futures when one completes.
+     * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
+     * any outstanding futures when one completes.
      *
-     * @param futures futures for which to wait
-     * @return a future to cancel or await an outcome. If this future is canceled, then
-     *         all of the futures will be canceled
+     * @param futureMakers function to make a future. If the function returns
+     *        {@code null}, then no future is created for that function. On the other
+     *        hand, if the function throws an exception, then the previously created
+     *        functions are canceled and the exception is re-thrown
+     * @return a future to cancel or await an outcome, or {@code null} if no futures were
+     *         created. If this future is canceled, then all of the futures will be
+     *         canceled. Similarly, when this future completes, any incomplete futures
+     *         will be canceled
      */
     protected CompletableFuture<OperationOutcome> anyOf(
-                    @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+                    List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
+
+        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+        CompletableFuture<OperationOutcome>[] futures =
+                        attachFutures(controller, futureMakers, UnaryOperator.identity());
+
+        if (futures.length == 0) {
+            // no futures were started
+            return null;
+        }
 
         if (futures.length == 1) {
             return futures[0];
         }
 
-        final Executor executor = params.getExecutor();
-        final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        attachFutures(controller, futures);
-
-        // @formatter:off
-        CompletableFuture.anyOf(futures)
-                            .thenApply(object -> (OperationOutcome) object)
-                            .whenCompleteAsync(controller.delayedComplete(), executor);
-        // @formatter:on
+        CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
+                        .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
 
         return controller;
     }
 
     /**
-     * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels
-     * the futures if returned future is canceled. The future returns the "worst" outcome,
-     * based on priority (see {@link #detmPriority(OperationOutcome)}).
+     * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
      *
-     * @param futures futures for which to wait
-     * @return a future to cancel or await an outcome. If this future is canceled, then
-     *         all of the futures will be canceled
+     * @param futureMakers function to make a future. If the function returns
+     *        {@code null}, then no future is created for that function. On the other
+     *        hand, if the function throws an exception, then the previously created
+     *        functions are canceled and the exception is re-thrown
+     * @return a future to cancel or await an outcome, or {@code null} if no futures were
+     *         created. If this future is canceled, then all of the futures will be
+     *         canceled
      */
-    protected CompletableFuture<OperationOutcome> allOf(List<CompletableFuture<OperationOutcome>> futures) {
-
-        // convert list to an array
-        @SuppressWarnings("rawtypes")
-        CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
+    protected CompletableFuture<OperationOutcome> allOf(
+                    @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
 
-        @SuppressWarnings("unchecked")
-        CompletableFuture<OperationOutcome> result = allOf(arrFutures);
-        return result;
+        return allOf(Arrays.asList(futureMakers));
     }
 
     /**
-     * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the
-     * futures if returned future is canceled. The future returns the "worst" outcome,
-     * based on priority (see {@link #detmPriority(OperationOutcome)}).
+     * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
      *
-     * @param futures futures for which to wait
-     * @return a future to cancel or await an outcome. If this future is canceled, then
-     *         all of the futures will be canceled
+     * @param futureMakers function to make a future. If the function returns
+     *        {@code null}, then no future is created for that function. On the other
+     *        hand, if the function throws an exception, then the previously created
+     *        functions are canceled and the exception is re-thrown
+     * @return a future to cancel or await an outcome, or {@code null} if no futures were
+     *         created. If this future is canceled, then all of the futures will be
+     *         canceled. Similarly, when this future completes, any incomplete futures
+     *         will be canceled
      */
     protected CompletableFuture<OperationOutcome> allOf(
-                    @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
-
-        if (futures.length == 1) {
-            return futures[0];
-        }
+                    List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
+        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
-        final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        attachFutures(controller, futures);
+        Queue<OperationOutcome> outcomes = new LinkedList<>();
 
-        OperationOutcome[] outcomes = new OperationOutcome[futures.length];
+        CompletableFuture<OperationOutcome>[] futures =
+                        attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
+                            synchronized (outcomes) {
+                                outcomes.add(outcome);
+                            }
+                            return outcome;
+                        }));
 
-        @SuppressWarnings("rawtypes")
-        CompletableFuture[] futures2 = new CompletableFuture[futures.length];
+        if (futures.length == 0) {
+            // no futures were started
+            return null;
+        }
 
-        // record the outcomes of each future when it completes
-        for (int count = 0; count < futures2.length; ++count) {
-            final int count2 = count;
-            futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2);
+        if (futures.length == 1) {
+            return futures[0];
         }
 
         // @formatter:off
-        CompletableFuture.allOf(futures2)
+        CompletableFuture.allOf(futures)
                         .thenApply(unused -> combineOutcomes(outcomes))
                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
         // @formatter:on
@@ -575,22 +595,62 @@ public abstract class OperationPartial implements Operation {
     }
 
     /**
-     * Attaches the given futures to the controller.
+     * Invokes the functions to create the futures and attaches them to the controller.
      *
      * @param controller master controller for all of the futures
-     * @param futures futures to be attached to the controller
-     */
-    private void attachFutures(PipelineControllerFuture<OperationOutcome> controller,
-                    @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+     * @param futureMakers futures to be attached to the controller
+     * @param adorn function that "adorns" the future, possible adding onto its pipeline.
+     *        Returns the adorned future
+     * @return an array of futures, possibly zero-length. If the array is of size one,
+     *         then that one item should be returned instead of the controller
+     */
+    private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
+                    List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
+                    UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
+
+        if (futureMakers.isEmpty()) {
+            @SuppressWarnings("unchecked")
+            CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
+            return result;
+        }
 
-        if (futures.length == 0) {
-            throw new IllegalArgumentException("empty list of futures");
+        // the last, unadorned future that is created
+        CompletableFuture<OperationOutcome> lastFuture = null;
+
+        List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
+
+        // make each future
+        for (var maker : futureMakers) {
+            try {
+                CompletableFuture<OperationOutcome> future = maker.get();
+                if (future == null) {
+                    continue;
+                }
+
+                // propagate "stop" to the future
+                controller.add(future);
+
+                futures.add(adorn.apply(future));
+
+                lastFuture = future;
+
+            } catch (RuntimeException e) {
+                logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
+                controller.cancel(false);
+                throw e;
+            }
         }
 
-        // attach each task
-        for (CompletableFuture<OperationOutcome> future : futures) {
-            controller.add(future);
+        @SuppressWarnings("unchecked")
+        CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
+
+        if (result.length == 1) {
+            // special case - return the unadorned future
+            result[0] = lastFuture;
+            return result;
         }
+
+        return futures.toArray(result);
     }
 
     /**
@@ -599,15 +659,13 @@ public abstract class OperationPartial implements Operation {
      * @param outcomes outcomes to be examined
      * @return the combined outcome
      */
-    private OperationOutcome combineOutcomes(OperationOutcome[] outcomes) {
+    private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
 
         // identify the outcome with the highest priority
-        OperationOutcome outcome = outcomes[0];
+        OperationOutcome outcome = outcomes.remove();
         int priority = detmPriority(outcome);
 
-        // start with "1", as we've already dealt with "0"
-        for (int count = 1; count < outcomes.length; ++count) {
-            OperationOutcome outcome2 = outcomes[count];
+        for (OperationOutcome outcome2 : outcomes) {
             int priority2 = detmPriority(outcome2);
 
             if (priority2 > priority) {
@@ -656,71 +714,113 @@ public abstract class OperationPartial implements Operation {
     }
 
     /**
-     * Performs a task, after verifying that the controller is still running. Also checks
-     * that the previous outcome was successful, if specified.
+     * Performs a sequence of tasks, stopping if a task fails. A given task's future is
+     * not created until the previous task completes. The pipeline returns the outcome of
+     * the last task executed.
      *
-     * @param controller overall pipeline controller
-     * @param checkSuccess {@code true} to check the previous outcome, {@code false}
-     *        otherwise
-     * @param outcome outcome of the previous task
-     * @param task task to be performed
-     * @return the task, if everything checks out. Otherwise, it returns an incomplete
-     *         future and completes the controller instead
+     * @param futureMakers functions to make the futures
+     * @return a future to cancel the sequence or await the outcome
      */
-    // @formatter:off
-    protected CompletableFuture<OperationOutcome> doTask(
-                    PipelineControllerFuture<OperationOutcome> controller,
-                    boolean checkSuccess, OperationOutcome outcome,
-                    CompletableFuture<OperationOutcome> task) {
-        // @formatter:on
+    protected CompletableFuture<OperationOutcome> sequence(
+                    @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
 
-        if (checkSuccess && !isSuccess(outcome)) {
-            /*
-             * must complete before canceling so that cancel() doesn't cause controller to
-             * complete
-             */
-            controller.complete(outcome);
-            task.cancel(false);
-            return new CompletableFuture<>();
+        return sequence(Arrays.asList(futureMakers));
+    }
+
+    /**
+     * Performs a sequence of tasks, stopping if a task fails. A given task's future is
+     * not created until the previous task completes. The pipeline returns the outcome of
+     * the last task executed.
+     *
+     * @param futureMakers functions to make the futures
+     * @return a future to cancel the sequence or await the outcome, or {@code null} if
+     *         there were no tasks to perform
+     */
+    protected CompletableFuture<OperationOutcome> sequence(
+                    List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
+
+        Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
+
+        CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
+        if (nextTask == null) {
+            // no tasks
+            return null;
+        }
+
+        if (queue.isEmpty()) {
+            // only one task - just return it rather than wrapping it in a controller
+            return nextTask;
         }
 
-        return controller.wrap(task);
+        /*
+         * multiple tasks - need a controller to stop whichever task is currently
+         * executing
+         */
+        final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+        final Executor executor = params.getExecutor();
+
+        // @formatter:off
+        controller.wrap(nextTask)
+                    .thenComposeAsync(nextTaskOnSuccess(controller, queue), executor)
+                    .whenCompleteAsync(controller.delayedComplete(), executor);
+        // @formatter:on
+
+        return controller;
     }
 
     /**
-     * Performs a task, after verifying that the controller is still running. Also checks
-     * that the previous outcome was successful, if specified.
+     * Executes the next task in the queue, if the previous outcome was successful.
      *
-     * @param controller overall pipeline controller
-     * @param checkSuccess {@code true} to check the previous outcome, {@code false}
-     *        otherwise
-     * @param task function to start the task to be performed
-     * @return a function to perform the task. If everything checks out, then it returns
-     *         the task. Otherwise, it returns an incomplete future and completes the
-     *         controller instead
+     * @param controller pipeline controller
+     * @param taskQueue queue of tasks to be performed
+     * @return a future to execute the remaining tasks, or the current outcome, if it's a
+     *         failure, or if there are no more tasks
      */
-    // @formatter:off
-    protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask(
+    private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
                     PipelineControllerFuture<OperationOutcome> controller,
-                    boolean checkSuccess,
-                    Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) {
-        // @formatter:on
+                    Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
 
         return outcome -> {
-
-            if (!controller.isRunning()) {
-                return new CompletableFuture<>();
+            if (!isSuccess(outcome)) {
+                // return the failure
+                return CompletableFuture.completedFuture(outcome);
             }
 
-            if (checkSuccess && !isSuccess(outcome)) {
-                controller.complete(outcome);
-                return new CompletableFuture<>();
+            CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
+            if (nextTask == null) {
+                // no tasks - just return the success
+                return CompletableFuture.completedFuture(outcome);
             }
 
-            return controller.wrap(task.apply(outcome));
+            // @formatter:off
+            return controller
+                        .wrap(nextTask)
+                        .thenComposeAsync(nextTaskOnSuccess(controller, taskQueue), params.getExecutor());
+            // @formatter:on
         };
     }
 
+    /**
+     * Gets the next task from the queue, skipping those that are {@code null}.
+     *
+     * @param taskQueue task queue
+     * @return the next task, or {@code null} if the queue is now empty
+     */
+    private CompletableFuture<OperationOutcome> getNextTask(
+                    Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
+
+        Supplier<CompletableFuture<OperationOutcome>> maker;
+
+        while ((maker = taskQueue.poll()) != null) {
+            CompletableFuture<OperationOutcome> future = maker.get();
+            if (future != null) {
+                return future;
+            }
+        }
+
+        return null;
+    }
+
     /**
      * Sets the start time of the operation and invokes the callback to indicate that the
      * operation has started. Does nothing if the pipeline has been stopped.
@@ -809,6 +909,38 @@ public abstract class OperationPartial implements Operation {
         return (thrown instanceof TimeoutException);
     }
 
+    /**
+     * Logs a response. If the response is not of type, String, then it attempts to
+     * pretty-print it into JSON before logging.
+     *
+     * @param direction IN or OUT
+     * @param infra communication infrastructure on which it was published
+     * @param source source name (e.g., the URL or Topic name)
+     * @param response response to be logged
+     * @return the JSON text that was logged
+     */
+    public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T response) {
+        String json;
+        try {
+            if (response == null) {
+                json = null;
+            } else if (response instanceof String) {
+                json = response.toString();
+            } else {
+                json = makeCoder().encode(response, true);
+            }
+
+        } catch (CoderException e) {
+            String type = (direction == EventType.IN ? "response" : "request");
+            logger.warn("cannot pretty-print {}", type, e);
+            json = response.toString();
+        }
+
+        logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
+
+        return json;
+    }
+
     // these may be overridden by subclasses or junit tests
 
     /**
@@ -841,4 +973,10 @@ public abstract class OperationPartial implements Operation {
     protected long getTimeoutMs(Integer timeoutSec) {
         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
     }
+
+    // these may be overridden by junit tests
+
+    protected Coder makeCoder() {
+        return coder;
+    }
 }
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java
deleted file mode 100644 (file)
index c3e1e5c..0000000
+++ /dev/null
@@ -1,112 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.controlloop.actorserviceprovider.impl;
-
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
-import org.apache.commons.lang3.tuple.Pair;
-import org.onap.policy.common.parameters.ValidationResult;
-import org.onap.policy.controlloop.actorserviceprovider.Util;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairActorParams;
-import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
-import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager;
-
-/**
- * Actor that uses a topic pair. The actor's parameters must be a
- * {@link TopicPairActorParams}.
- */
-public class TopicPairActor extends ActorImpl implements TopicPairManager {
-
-    /**
-     * Maps a topic source and target name to its topic pair.
-     */
-    private final Map<Pair<String, String>, TopicPair> params2topic = new ConcurrentHashMap<>();
-
-
-    /**
-     * Constructs the object.
-     *
-     * @param name actor's name
-     */
-    public TopicPairActor(String name) {
-        super(name);
-    }
-
-    @Override
-    protected void doStart() {
-        params2topic.values().forEach(TopicPair::start);
-        super.doStart();
-    }
-
-    @Override
-    protected void doStop() {
-        params2topic.values().forEach(TopicPair::stop);
-        super.doStop();
-    }
-
-    @Override
-    protected void doShutdown() {
-        params2topic.values().forEach(TopicPair::shutdown);
-        params2topic.clear();
-        super.doShutdown();
-    }
-
-    @Override
-    public TopicPair getTopicPair(String source, String target) {
-        Pair<String, String> key = Pair.of(source, target);
-        return params2topic.computeIfAbsent(key, pair -> new TopicPair(source, target));
-    }
-
-    /**
-     * Translates the parameters to a {@link TopicPairActorParams} and then creates a
-     * function that will extract operator-specific parameters.
-     */
-    @Override
-    protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) {
-        String actorName = getName();
-
-        TopicPairActorParams params = Util.translate(actorName, actorParameters, TopicPairActorParams.class);
-        ValidationResult result = params.validate(getName());
-        if (!result.isValid()) {
-            throw new ParameterValidationRuntimeException("invalid parameters", result);
-        }
-
-        // create a map of the default parameters
-        Map<String, Object> defaultParams = Util.translateToMap(getName(), params.getDefaults());
-        Map<String, Map<String, Object>> operations = params.getOperation();
-
-        return operationName -> {
-            Map<String, Object> specificParams = operations.get(operationName);
-            if (specificParams == null) {
-                return null;
-            }
-
-            // start with a copy of defaults and overlay with specific
-            Map<String, Object> subparams = new TreeMap<>(defaultParams);
-            subparams.putAll(specificParams);
-
-            return Util.translateToMap(getName() + "." + operationName, subparams);
-        };
-    }
-}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParams.java
new file mode 100644 (file)
index 0000000..291aeeb
--- /dev/null
@@ -0,0 +1,57 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.common.parameters.annotations.Min;
+
+/**
+ * Parameters used by Actors whose Operators use bidirectional topic.
+ */
+@Getter
+@Setter
+@EqualsAndHashCode(callSuper = true)
+public class BidirectionalTopicActorParams extends CommonActorParams {
+
+    /*
+     * Optional, default values that are used if missing from the operation-specific
+     * parameters.
+     */
+
+    /**
+     * Sink topic name to which requests should be published.
+     */
+    private String sinkTopic;
+
+    /**
+     * Source topic name, from which to read responses.
+     */
+    private String sourceTopic;
+
+    /**
+     * Amount of time, in seconds, to wait for the HTTP request to complete. The default
+     * is 90 seconds.
+     */
+    @Min(1)
+    private int timeoutSec = 90;
+}
@@ -29,31 +29,33 @@ import org.onap.policy.common.parameters.annotations.NotBlank;
 import org.onap.policy.common.parameters.annotations.NotNull;
 
 /**
- * Parameters used by Operators that use a pair of Topics, one to publish requests and the
- * other to receive responses.
+ * Parameters used by Operators that use a bidirectional topic.
  */
 @NotNull
 @NotBlank
 @Data
 @Builder(toBuilder = true)
-public class TopicPairParams {
+public class BidirectionalTopicParams {
 
     /**
-     * Source topic end point, from which to read responses.
+     * Sink topic name to which requests should be published.
      */
-    private String source;
+    private String sinkTopic;
 
     /**
-     * Name of the target topic end point to which requests should be published.
+     * Source topic name, from which to read responses.
      */
-    private String target;
+    private String sourceTopic;
 
     /**
-     * Amount of time, in seconds to wait for the response. The default is five minutes.
+     * Amount of time, in seconds to wait for the response.
+     * <p/>
+     * Note: this should NOT have a default value, as it receives its default value from
+     * {@link BidirectionalTopicActorParams}.
      */
     @Min(1)
-    @Builder.Default
-    private int timeoutSec = 300;
+    private int timeoutSec;
+
 
     /**
      * Validates the parameters.
 package org.onap.policy.controlloop.actorserviceprovider.parameters;
 
 import java.util.Map;
-import lombok.Builder;
-import lombok.Data;
-import org.onap.policy.common.parameters.BeanValidationResult;
+import java.util.TreeMap;
+import java.util.function.Function;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
 import org.onap.policy.common.parameters.BeanValidator;
 import org.onap.policy.common.parameters.ValidationResult;
-import org.onap.policy.common.parameters.annotations.NotBlank;
 import org.onap.policy.common.parameters.annotations.NotNull;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
 
 /**
- * Parameters used by Actors whose Operators use a pair of Topics, one to publish requests
- * and the other to receive responses.
+ * Superclass for Actor parameters that have default values in "this" object, and
+ * operation-specific values in {@link #operation}.
  */
-@NotNull
-@NotBlank
-@Data
-@Builder
-public class TopicPairActorParams {
+@Getter
+@Setter
+@EqualsAndHashCode
+public class CommonActorParams {
 
     /**
-     * This contains the default parameters that are used when an operation doesn't
-     * specify them. Note: each operation to be used must still have an entry in
-     * {@link #operation}, even if it's empty. Otherwise, the given operation will not be
-     * started.
+     * Maps the operation name to its parameters.
      */
-    private TopicPairParams defaults;
+    @NotNull
+    protected Map<String, Map<String, Object>> operation;
+
 
     /**
-     * Maps an operation name to its individual parameters.
+     * Extracts a specific operation's parameters from "this".
+     *
+     * @param name name of the item containing "this"
+     * @return a function to extract an operation's parameters from "this". Note: the
+     *         returned function is not thread-safe
      */
-    private Map<String, Map<String, Object>> operation;
+    public Function<String, Map<String, Object>> makeOperationParameters(String name) {
+
+        Map<String, Object> defaultParams = Util.translateToMap(name, this);
+        defaultParams.remove("operation");
+
+        return operationName -> {
+            Map<String, Object> specificParams = operation.get(operationName);
+            if (specificParams == null) {
+                return null;
+            }
+
+            // start with a copy of defaults and overlay with specific
+            Map<String, Object> subparams = new TreeMap<>(defaultParams);
+            subparams.putAll(specificParams);
 
+            return Util.translateToMap(name + "." + operationName, subparams);
+        };
+    }
 
     /**
      * Validates the parameters.
@@ -60,7 +80,7 @@ public class TopicPairActorParams {
      * @return "this"
      * @throws IllegalArgumentException if the parameters are invalid
      */
-    public TopicPairActorParams doValidation(String name) {
+    public CommonActorParams doValidation(String name) {
         ValidationResult result = validate(name);
         if (!result.isValid()) {
             throw new ParameterValidationRuntimeException("invalid parameters", result);
@@ -77,17 +97,6 @@ public class TopicPairActorParams {
      * @return the validation result
      */
     public ValidationResult validate(String resultName) {
-        BeanValidationResult result = new BeanValidator().validateTop(resultName, this);
-
-        if (defaults != null) {
-            result.addResult(defaults.validate("defaults"));
-        }
-
-        // @formatter:off
-        result.validateMap("operation", operation,
-            (result2, entry) -> result2.validateNotNull(entry.getKey(), entry.getValue()));
-        // @formatter:on
-
-        return result;
+        return new BeanValidator().validateTop(resultName, this);
     }
 }
index 275c8bc..d589e1d 100644 (file)
 
 package org.onap.policy.controlloop.actorserviceprovider.parameters;
 
-import java.util.Map;
-import java.util.function.Function;
-import lombok.Data;
-import org.onap.policy.common.parameters.BeanValidationResult;
-import org.onap.policy.common.parameters.BeanValidator;
-import org.onap.policy.common.parameters.ValidationResult;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
 import org.onap.policy.common.parameters.annotations.Min;
-import org.onap.policy.common.parameters.annotations.NotBlank;
-import org.onap.policy.common.parameters.annotations.NotNull;
-import org.onap.policy.controlloop.actorserviceprovider.Util;
 
 /**
- * Parameters used by Actors that connect to a server via HTTP. This contains the
- * parameters that are common to all of the operations. Only the path changes for each
- * operation, thus it includes a mapping from operation name to path.
+ * Parameters used by Actors that connect to a server via HTTP.
  */
-@Data
-@NotNull
-@NotBlank
-public class HttpActorParams {
+@Getter
+@Setter
+@EqualsAndHashCode(callSuper = true)
+public class HttpActorParams extends CommonActorParams {
+
+    /*
+     * Optional, default values that are used if missing from the operation-specific
+     * parameters.
+     */
 
     /**
      * Name of the HttpClient, as found in the HttpClientFactory.
@@ -47,66 +44,9 @@ public class HttpActorParams {
     private String clientName;
 
     /**
-     * Amount of time, in seconds to wait for the HTTP request to complete, where zero
-     * indicates that it should wait forever. The default is zero.
-     */
-    @Min(0)
-    private int timeoutSec = 0;
-
-    /**
-     * Maps the operation name to its URI path.
-     */
-    private Map<String, String> path;
-
-    /**
-     * Extracts a specific operation's parameters from "this".
-     *
-     * @param name name of the item containing "this"
-     * @return a function to extract an operation's parameters from "this". Note: the
-     *         returned function is not thread-safe
-     */
-    public Function<String, Map<String, Object>> makeOperationParameters(String name) {
-        HttpParams subparams = HttpParams.builder().clientName(getClientName()).timeoutSec(getTimeoutSec()).build();
-
-        return operation -> {
-            String subpath = path.get(operation);
-            if (subpath == null) {
-                return null;
-            }
-
-            subparams.setPath(subpath);
-            return Util.translateToMap(name + "." + operation, subparams);
-        };
-    }
-
-    /**
-     * Validates the parameters.
-     *
-     * @param name name of the object containing these parameters
-     * @return "this"
-     * @throws IllegalArgumentException if the parameters are invalid
+     * Amount of time, in seconds, to wait for the HTTP request to complete. The default
+     * is 90 seconds.
      */
-    public HttpActorParams doValidation(String name) {
-        ValidationResult result = validate(name);
-        if (!result.isValid()) {
-            throw new ParameterValidationRuntimeException("invalid parameters", result);
-        }
-
-        return this;
-    }
-
-    /**
-     * Validates the parameters.
-     *
-     * @param resultName name of the result
-     *
-     * @return the validation result
-     */
-    public ValidationResult validate(String resultName) {
-        BeanValidationResult result = new BeanValidator().validateTop(resultName, this);
-
-        result.validateMap("path", path, (result2, entry) -> result2.validateNotNull(entry.getKey(), entry.getValue()));
-
-        return result;
-    }
+    @Min(1)
+    private int timeoutSec = 90;
 }
index 93711c0..2d3ab8b 100644 (file)
@@ -48,12 +48,13 @@ public class HttpParams {
     private String path;
 
     /**
-     * Amount of time, in seconds to wait for the HTTP request to complete, where zero
-     * indicates that it should wait forever. The default is zero.
+     * Amount of time, in seconds, to wait for the HTTP request to complete.
+     * <p/>
+     * Note: this should NOT have a default value, as it receives its default value from
+     * {@link HttpActorParams}.
      */
-    @Min(0)
-    @Builder.Default
-    private int timeoutSec = 0;
+    @Min(1)
+    private int timeoutSec;
 
 
     /**
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicHandler.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicHandler.java
new file mode 100644 (file)
index 0000000..30ee1e2
--- /dev/null
@@ -0,0 +1,79 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.topic;
+
+import java.util.List;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException;
+
+/**
+ * Handler for a bidirectional topic, supporting both publishing and forwarding of
+ * incoming messages.
+ */
+public class BidirectionalTopicHandler extends BidirectionalTopicClient {
+
+    /**
+     * Listener that will be attached to the topic to receive responses.
+     */
+    private final TopicListenerImpl listener = new TopicListenerImpl();
+
+
+    /**
+     * Constructs the object.
+     *
+     * @param sinkTopic sink topic name
+     * @param sourceTopic source topic name
+     * @throws BidirectionalTopicClientException if an error occurs
+     */
+    public BidirectionalTopicHandler(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException {
+        super(sinkTopic, sourceTopic);
+    }
+
+    /**
+     * Starts listening on the source topic(s).
+     */
+    public void start() {
+        getSource().register(listener);
+    }
+
+    /**
+     * Stops listening on the source topic(s).
+     */
+    public void stop() {
+        getSource().unregister(listener);
+    }
+
+    /**
+     * Stops listening on the source topic(s) and clears all of the forwarders.
+     */
+    public void shutdown() {
+        stop();
+        listener.shutdown();
+    }
+
+    public Forwarder addForwarder(SelectorKey... keys) {
+        return listener.addForwarder(keys);
+    }
+
+    public Forwarder addForwarder(List<SelectorKey> keys) {
+        return listener.addForwarder(keys);
+    }
+}
 package org.onap.policy.controlloop.actorserviceprovider.topic;
 
 /**
- * Manages topic pairs.
+ * Manages bidirectional topics.
  */
 @FunctionalInterface
-public interface TopicPairManager {
+public interface BidirectionalTopicManager {
 
     /**
-     * Gets the topic pair for the given parameters, creating one if it does not exist.
+     * Gets the topic handler for the given parameters, creating one if it does not exist.
      *
-     * @param source source topic name
-     * @param target target topic name
-     * @return the topic pair associated with the given source and target topics
+     * @param sinkTopic sink topic name
+     * @param sourceTopic source topic name
+     * @return the topic handler associated with the given sink and source topic names
      */
-    TopicPair getTopicPair(String source, String target);
+    BidirectionalTopicHandler getTopicHandler(String sinkTopic, String sourceTopic);
 }
index 8e9109c..2d98b66 100644 (file)
@@ -24,8 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
+import java.util.function.BiConsumer;
 import org.onap.policy.common.utils.coder.StandardCoderObject;
 import org.onap.policy.controlloop.actorserviceprovider.Util;
 import org.slf4j.Logger;
@@ -43,7 +42,7 @@ public class Forwarder {
      * Maps a set of field values to one or more listeners.
      */
     // @formatter:off
-    private final Map<List<String>, Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String>>
+    private final Map<List<String>, Map<BiConsumer<String, StandardCoderObject>, String>>
                 values2listeners = new ConcurrentHashMap<>();
     // @formatter:on
 
@@ -68,13 +67,13 @@ public class Forwarder {
      * @param values field values of interest, in one-to-one correspondence with the keys
      * @param listener listener to register
      */
-    public void register(List<String> values, TriConsumer<CommInfrastructure, String, StandardCoderObject> listener) {
+    public void register(List<String> values, BiConsumer<String, StandardCoderObject> listener) {
         if (keys.size() != values.size()) {
             throw new IllegalArgumentException("key/value mismatch");
         }
 
         values2listeners.compute(values, (key, listeners) -> {
-            Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String> map = listeners;
+            Map<BiConsumer<String, StandardCoderObject>, String> map = listeners;
             if (map == null) {
                 map = new ConcurrentHashMap<>();
             }
@@ -90,7 +89,7 @@ public class Forwarder {
      * @param values field values of interest, in one-to-one correspondence with the keys
      * @param listener listener to unregister
      */
-    public void unregister(List<String> values, TriConsumer<CommInfrastructure, String, StandardCoderObject> listener) {
+    public void unregister(List<String> values, BiConsumer<String, StandardCoderObject> listener) {
         values2listeners.computeIfPresent(values, (key, listeners) -> {
             listeners.remove(listener);
             return (listeners.isEmpty() ? null : listeners);
@@ -100,11 +99,10 @@ public class Forwarder {
     /**
      * Processes a message, forwarding it to the appropriate listeners, if any.
      *
-     * @param infra communication infrastructure on which the response was received
      * @param textMessage original text message that was received
      * @param scoMessage decoded text message
      */
-    public void onMessage(CommInfrastructure infra, String textMessage, StandardCoderObject scoMessage) {
+    public void onMessage(String textMessage, StandardCoderObject scoMessage) {
         // extract the key values from the message
         List<String> values = new ArrayList<>(keys.size());
         for (SelectorKey key : keys) {
@@ -121,8 +119,7 @@ public class Forwarder {
         }
 
         // get the listeners for this set of values
-        Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String> listeners =
-                        values2listeners.get(values);
+        Map<BiConsumer<String, StandardCoderObject>, String> listeners = values2listeners.get(values);
         if (listeners == null) {
             // no listeners for this particular list of values
             return;
@@ -130,9 +127,9 @@ public class Forwarder {
 
 
         // forward the message to each listener
-        for (TriConsumer<CommInfrastructure, String, StandardCoderObject> listener : listeners.keySet()) {
+        for (BiConsumer<String, StandardCoderObject> listener : listeners.keySet()) {
             try {
-                listener.accept(infra, textMessage, scoMessage);
+                listener.accept(textMessage, scoMessage);
             } catch (RuntimeException e) {
                 logger.warn("exception thrown by listener {}", Util.ident(listener), e);
             }
index eb805ca..fcb4635 100644 (file)
@@ -98,7 +98,7 @@ public class TopicListenerImpl implements TopicListener {
          * them all take a crack at it.
          */
         for (Forwarder forwarder : selector2forwarder.values()) {
-            forwarder.onMessage(infra, message, object);
+            forwarder.onMessage(message, object);
         }
     }
 }
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java
deleted file mode 100644 (file)
index c0cfe25..0000000
+++ /dev/null
@@ -1,122 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.controlloop.actorserviceprovider.topic;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import lombok.Getter;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A pair of topics, one of which is used to publish requests and the other to receive
- * responses.
- */
-public class TopicPair extends TopicListenerImpl {
-    private static final Logger logger = LoggerFactory.getLogger(TopicPair.class);
-
-    @Getter
-    private final String source;
-
-    @Getter
-    private final String target;
-
-    private final List<TopicSink> publishers;
-    private final List<TopicSource> subscribers;
-
-    /**
-     * Constructs the object.
-     *
-     * @param source source topic name
-     * @param target target topic name
-     */
-    public TopicPair(String source, String target) {
-        this.source = source;
-        this.target = target;
-
-        publishers = getTopicEndpointManager().getTopicSinks(target);
-        if (publishers.isEmpty()) {
-            throw new IllegalArgumentException("no sinks for topic: " + target);
-        }
-
-        subscribers = getTopicEndpointManager().getTopicSources(Arrays.asList(source));
-        if (subscribers.isEmpty()) {
-            throw new IllegalArgumentException("no sources for topic: " + source);
-        }
-    }
-
-    /**
-     * Starts listening on the source topic(s).
-     */
-    public void start() {
-        subscribers.forEach(topic -> topic.register(this));
-    }
-
-    /**
-     * Stops listening on the source topic(s).
-     */
-    public void stop() {
-        subscribers.forEach(topic -> topic.unregister(this));
-    }
-
-    /**
-     * Stops listening on the source topic(s) and clears all of the forwarders.
-     */
-    @Override
-    public void shutdown() {
-        stop();
-        super.shutdown();
-    }
-
-    /**
-     * Publishes a message to the target topic.
-     *
-     * @param message message to be published
-     * @return a list of the infrastructures on which it was published
-     */
-    public List<CommInfrastructure> publish(String message) {
-        List<CommInfrastructure> infrastructures = new ArrayList<>(publishers.size());
-
-        for (TopicSink topic : publishers) {
-            try {
-                topic.send(message);
-                infrastructures.add(topic.getTopicCommInfrastructure());
-
-            } catch (RuntimeException e) {
-                logger.warn("cannot publish to {}:{}", topic.getTopicCommInfrastructure(), target, e);
-            }
-        }
-
-        return infrastructures;
-    }
-
-    // these may be overridden by junit tests
-
-    protected TopicEndpoint getTopicEndpointManager() {
-        return TopicEndpointManager.getManager();
-    }
-}
index 851a791..efc7bb8 100644 (file)
@@ -65,7 +65,7 @@ public class ActorServiceTest {
     private Map<String, Object> sub2;
     private Map<String, Object> sub3;
     private Map<String, Object> sub4;
-    private Map<String, Object> params;
+    private Map<String, Map<String, Object>> params;
 
     private ActorService service;
 
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActorTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActorTest.java
new file mode 100644 (file)
index 0000000..e1606ae
--- /dev/null
@@ -0,0 +1,242 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.function.Function;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicActorParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
+
+public class BidirectionalTopicActorTest {
+
+    private static final String ACTOR = "my-actor";
+    private static final String UNKNOWN = "unknown";
+    private static final String MY_SINK = "my-sink";
+    private static final String MY_SOURCE1 = "my-source-A";
+    private static final String MY_SOURCE2 = "my-source-B";
+    private static final int TIMEOUT = 10;
+
+    @Mock
+    private BidirectionalTopicHandler handler1;
+    @Mock
+    private BidirectionalTopicHandler handler2;
+
+    private BidirectionalTopicActor actor;
+
+
+    /**
+     * Configures the endpoints.
+     */
+    @BeforeClass
+    public static void setUpBeforeClass() {
+        Properties props = new Properties();
+        props.setProperty("noop.sink.topics", MY_SINK);
+        props.setProperty("noop.source.topics", MY_SOURCE1 + "," + MY_SOURCE2);
+
+        // clear all topics and then configure one sink and two sources
+        TopicEndpointManager.getManager().shutdown();
+        TopicEndpointManager.getManager().addTopicSinks(props);
+        TopicEndpointManager.getManager().addTopicSources(props);
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() {
+        // clear all topics after the tests
+        TopicEndpointManager.getManager().shutdown();
+    }
+
+    /**
+     * Sets up.
+     */
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
+        actor = new MyActor();
+        actor.configure(Util.translateToMap(ACTOR, makeParams()));
+    }
+
+    @Test
+    public void testDoStart() throws BidirectionalTopicClientException {
+        // allocate some handlers
+        actor.getTopicHandler(MY_SINK, MY_SOURCE1);
+        actor.getTopicHandler(MY_SINK, MY_SOURCE2);
+
+        // start it
+        actor.start();
+
+        verify(handler1).start();
+        verify(handler2).start();
+
+        verify(handler1, never()).stop();
+        verify(handler2, never()).stop();
+
+        verify(handler1, never()).shutdown();
+        verify(handler2, never()).shutdown();
+    }
+
+    @Test
+    public void testDoStop() throws BidirectionalTopicClientException {
+        // allocate some handlers
+        actor.getTopicHandler(MY_SINK, MY_SOURCE1);
+        actor.getTopicHandler(MY_SINK, MY_SOURCE2);
+
+        // start it
+        actor.start();
+
+        // stop it
+        actor.stop();
+
+        verify(handler1).stop();
+        verify(handler2).stop();
+
+        verify(handler1, never()).shutdown();
+        verify(handler2, never()).shutdown();
+    }
+
+    @Test
+    public void testDoShutdown() {
+        // allocate some handlers
+        actor.getTopicHandler(MY_SINK, MY_SOURCE1);
+        actor.getTopicHandler(MY_SINK, MY_SOURCE2);
+
+        // start it
+        actor.start();
+
+        // stop it
+        actor.shutdown();
+
+        verify(handler1).shutdown();
+        verify(handler2).shutdown();
+
+        verify(handler1, never()).stop();
+        verify(handler2, never()).stop();
+    }
+
+    @Test
+    public void testMakeOperatorParameters() {
+        BidirectionalTopicActorParams params = makeParams();
+
+        final BidirectionalTopicActor prov = new BidirectionalTopicActor(ACTOR);
+        Function<String, Map<String, Object>> maker =
+                        prov.makeOperatorParameters(Util.translateToMap(prov.getName(), params));
+
+        assertNull(maker.apply(UNKNOWN));
+
+        // use a TreeMap to ensure the properties are sorted
+        assertEquals("{sinkTopic=my-sink, sourceTopic=my-source-A, timeoutSec=10}",
+                        new TreeMap<>(maker.apply("operA")).toString());
+
+        assertEquals("{sinkTopic=my-sink, sourceTopic=topicB, timeoutSec=10}",
+                        new TreeMap<>(maker.apply("operB")).toString());
+
+        // with invalid actor parameters
+        params.setOperation(null);
+        assertThatThrownBy(() -> prov.makeOperatorParameters(Util.translateToMap(prov.getName(), params)))
+                        .isInstanceOf(ParameterValidationRuntimeException.class);
+    }
+
+    @Test
+    public void testBidirectionalTopicActor() {
+        assertEquals(ACTOR, actor.getName());
+        assertEquals(ACTOR, actor.getFullName());
+    }
+
+    @Test
+    public void testGetTopicHandler() {
+        assertSame(handler1, actor.getTopicHandler(MY_SINK, MY_SOURCE1));
+        assertSame(handler2, actor.getTopicHandler(MY_SINK, MY_SOURCE2));
+
+        assertThatIllegalArgumentException().isThrownBy(() -> actor.getTopicHandler(UNKNOWN, MY_SOURCE1));
+    }
+
+    @Test
+    public void testMakeTopicHandler() {
+        // use a real actor
+        actor = new BidirectionalTopicActor(ACTOR);
+
+        handler1 = actor.getTopicHandler(MY_SINK, MY_SOURCE1);
+        handler2 = actor.getTopicHandler(MY_SINK, MY_SOURCE2);
+
+        assertNotNull(handler1);
+        assertNotNull(handler2);
+        assertNotSame(handler1, handler2);
+    }
+
+
+    private BidirectionalTopicActorParams makeParams() {
+        BidirectionalTopicActorParams params = new BidirectionalTopicActorParams();
+        params.setSinkTopic(MY_SINK);
+        params.setSourceTopic(MY_SOURCE1);
+        params.setTimeoutSec(TIMEOUT);
+
+        // @formatter:off
+        params.setOperation(Map.of(
+                        "operA", Map.of(),
+                        "operB", Map.of("sourceTopic", "topicB")));
+        // @formatter:on
+        return params;
+    }
+
+    private class MyActor extends BidirectionalTopicActor {
+
+        public MyActor() {
+            super(ACTOR);
+        }
+
+        @Override
+        protected BidirectionalTopicHandler makeTopicHandler(String sinkTopic, String sourceTopic)
+                        throws BidirectionalTopicClientException {
+
+            if (MY_SINK.equals(sinkTopic)) {
+                if (MY_SOURCE1.equals(sourceTopic)) {
+                    return handler1;
+                } else if (MY_SOURCE2.equals(sourceTopic)) {
+                    return handler2;
+                }
+            }
+
+            throw new BidirectionalTopicClientException("no topic " + sinkTopic + "/" + sourceTopic);
+        }
+    }
+}
@@ -20,7 +20,6 @@
 
 package org.onap.policy.controlloop.actorserviceprovider.impl;
 
-import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
@@ -35,96 +34,64 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import ch.qos.logback.classic.Logger;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 import lombok.Getter;
 import lombok.Setter;
-import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
 import org.onap.policy.common.utils.coder.Coder;
 import org.onap.policy.common.utils.coder.CoderException;
 import org.onap.policy.common.utils.coder.StandardCoder;
 import org.onap.policy.common.utils.coder.StandardCoderObject;
-import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
+import org.onap.policy.common.utils.time.PseudoExecutor;
 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
-import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
 import org.onap.policy.controlloop.policy.PolicyResult;
-import org.slf4j.LoggerFactory;
 
-public class TopicPairOperationTest {
-    private static final List<CommInfrastructure> INFRA_LIST =
-                    Arrays.asList(CommInfrastructure.NOOP, CommInfrastructure.UEB);
+public class BidirectionalTopicOperationTest {
+    private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
     private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
     private static final String ACTOR = "my-actor";
     private static final String OPERATION = "my-operation";
     private static final String REQ_ID = "my-request-id";
+    private static final String MY_SINK = "my-sink";
     private static final String MY_SOURCE = "my-source";
-    private static final String MY_TARGET = "my-target";
     private static final String TEXT = "some text";
     private static final int TIMEOUT_SEC = 10;
     private static final long TIMEOUT_MS = 1000 * TIMEOUT_SEC;
+    private static final int MAX_REQUESTS = 100;
 
     private static final StandardCoder coder = new StandardCoder();
 
-    /**
-     * Used to attach an appender to the class' logger.
-     */
-    private static final Logger logger = (Logger) LoggerFactory.getLogger(TopicPairOperation.class);
-    private static final ExtractAppender appender = new ExtractAppender();
-
     @Mock
-    private TopicPairOperator operator;
+    private BidirectionalTopicOperator operator;
     @Mock
-    private TopicPair pair;
+    private BidirectionalTopicHandler handler;
     @Mock
     private Forwarder forwarder;
 
     @Captor
-    private ArgumentCaptor<TriConsumer<CommInfrastructure, String, StandardCoderObject>> listenerCaptor;
+    private ArgumentCaptor<BiConsumer<String, StandardCoderObject>> listenerCaptor;
 
     private ControlLoopOperationParams params;
-    private TopicPairParams topicParams;
+    private BidirectionalTopicParams topicParams;
     private OperationOutcome outcome;
     private StandardCoderObject stdResponse;
     private String responseText;
-    private MyExec executor;
-    private TopicPairOperation<MyRequest, MyResponse> oper;
-
-    /**
-     * Attaches the appender to the logger.
-     */
-    @BeforeClass
-    public static void setUpBeforeClass() throws Exception {
-        /**
-         * Attach appender to the logger.
-         */
-        appender.setContext(logger.getLoggerContext());
-        appender.start();
-
-        logger.addAppender(appender);
-    }
-
-    /**
-     * Stops the appender.
-     */
-    @AfterClass
-    public static void tearDownAfterClass() {
-        appender.stop();
-    }
+    private PseudoExecutor executor;
+    private int ntimes;
+    private BidirectionalTopicOperation<MyRequest, MyResponse> oper;
 
     /**
      * Sets up.
@@ -133,20 +100,20 @@ public class TopicPairOperationTest {
     public void setUp() throws CoderException {
         MockitoAnnotations.initMocks(this);
 
-        appender.clearExtractions();
-
-        topicParams = TopicPairParams.builder().source(MY_SOURCE).target(MY_TARGET).timeoutSec(TIMEOUT_SEC).build();
+        topicParams = BidirectionalTopicParams.builder().sourceTopic(MY_SOURCE).sinkTopic(MY_SINK)
+                        .timeoutSec(TIMEOUT_SEC).build();
 
         when(operator.getActorName()).thenReturn(ACTOR);
         when(operator.getName()).thenReturn(OPERATION);
-        when(operator.getTopicPair()).thenReturn(pair);
+        when(operator.getTopicHandler()).thenReturn(handler);
         when(operator.getForwarder()).thenReturn(forwarder);
         when(operator.getParams()).thenReturn(topicParams);
         when(operator.isAlive()).thenReturn(true);
 
-        when(pair.publish(any())).thenReturn(INFRA_LIST);
+        when(handler.send(any())).thenReturn(true);
+        when(handler.getSinkTopicCommInfrastructure()).thenReturn(SINK_INFRA);
 
-        executor = new MyExec(100);
+        executor = new PseudoExecutor();
 
         params = ControlLoopOperationParams.builder().actor(ACTOR).operation(OPERATION).executor(executor).build();
         outcome = params.makeOutcome();
@@ -154,22 +121,28 @@ public class TopicPairOperationTest {
         responseText = coder.encode(new MyResponse());
         stdResponse = coder.decode(responseText, StandardCoderObject.class);
 
+        ntimes = 1;
+
         oper = new MyOperation();
     }
 
     @Test
-    public void testTopicPairOperation_testGetTopicPair_testGetForwarder_testGetPairParams() {
+    public void testConstructor_testGetTopicHandler_testGetForwarder_testGetTopicParams() {
         assertEquals(ACTOR, oper.getActorName());
         assertEquals(OPERATION, oper.getName());
-        assertSame(pair, oper.getTopicPair());
+        assertSame(handler, oper.getTopicHandler());
         assertSame(forwarder, oper.getForwarder());
-        assertSame(topicParams, oper.getPairParams());
+        assertSame(topicParams, oper.getTopicParams());
         assertEquals(TIMEOUT_MS, oper.getTimeoutMs());
         assertSame(MyResponse.class, oper.getResponseClass());
     }
 
     @Test
     public void testStartOperationAsync() throws Exception {
+
+        // tell it to expect three responses
+        ntimes = 3;
+
         CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome);
         assertFalse(future.isDone());
 
@@ -177,17 +150,24 @@ public class TopicPairOperationTest {
 
         verify(forwarder, never()).unregister(any(), any());
 
-        verify(pair).publish(any());
+        verify(handler).send(any());
 
-        // provide the response
-        listenerCaptor.getValue().accept(CommInfrastructure.NOOP, responseText, stdResponse);
+        // provide first response
+        listenerCaptor.getValue().accept(responseText, stdResponse);
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertFalse(future.isDone());
 
-        // run the tasks
-        assertTrue(executor.runAll());
+        // provide second response
+        listenerCaptor.getValue().accept(responseText, stdResponse);
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertFalse(future.isDone());
 
+        // provide final response
+        listenerCaptor.getValue().accept(responseText, stdResponse);
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(future.isDone());
 
-        assertSame(outcome, future.get(5, TimeUnit.SECONDS));
+        assertSame(outcome, future.get());
         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
 
         verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue()));
@@ -199,7 +179,7 @@ public class TopicPairOperationTest {
     @Test
     public void testStartOperationAsyncException() throws Exception {
         // indicate that nothing was published
-        when(pair.publish(any())).thenReturn(Arrays.asList());
+        when(handler.send(any())).thenReturn(false);
 
         assertThatIllegalStateException().isThrownBy(() -> oper.startOperationAsync(1, outcome));
 
@@ -221,8 +201,7 @@ public class TopicPairOperationTest {
 
     @Test
     public void testPublishRequest() {
-        oper.publishRequest(new MyRequest());
-        assertEquals(INFRA_LIST.size(), appender.getExtracted().size());
+        assertThatCode(() -> oper.publishRequest(new MyRequest())).doesNotThrowAnyException();
     }
 
     /**
@@ -230,7 +209,7 @@ public class TopicPairOperationTest {
      */
     @Test
     public void testPublishRequestUnpublished() {
-        when(pair.publish(any())).thenReturn(Arrays.asList());
+        when(handler.send(any())).thenReturn(false);
         assertThatIllegalStateException().isThrownBy(() -> oper.publishRequest(new MyRequest()));
     }
 
@@ -240,8 +219,7 @@ public class TopicPairOperationTest {
     @Test
     public void testPublishRequestString() {
         MyStringOperation oper2 = new MyStringOperation();
-        oper2.publishRequest(TEXT);
-        assertEquals(INFRA_LIST.size(), appender.getExtracted().size());
+        assertThatCode(() -> oper2.publishRequest(TEXT)).doesNotThrowAnyException();
     }
 
     /**
@@ -260,7 +238,7 @@ public class TopicPairOperationTest {
     public void testProcessResponseSuccessString() {
         MyStringOperation oper2 = new MyStringOperation();
 
-        assertSame(outcome, oper2.processResponse(CommInfrastructure.NOOP, outcome, TEXT, null));
+        assertSame(outcome, oper2.processResponse(outcome, TEXT, null));
         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
     }
 
@@ -272,7 +250,7 @@ public class TopicPairOperationTest {
     public void testProcessResponseSuccessSco() {
         MyScoOperation oper2 = new MyScoOperation();
 
-        assertSame(outcome, oper2.processResponse(CommInfrastructure.NOOP, outcome, responseText, stdResponse));
+        assertSame(outcome, oper2.processResponse(outcome, responseText, stdResponse));
         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
     }
 
@@ -288,7 +266,7 @@ public class TopicPairOperationTest {
         responseText = coder.encode(resp);
         stdResponse = coder.decode(responseText, StandardCoderObject.class);
 
-        assertSame(outcome, oper.processResponse(CommInfrastructure.NOOP, outcome, responseText, stdResponse));
+        assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
         assertEquals(PolicyResult.FAILURE, outcome.getResult());
     }
 
@@ -297,7 +275,7 @@ public class TopicPairOperationTest {
      */
     @Test
     public void testProcessResponseDecodeOk() throws CoderException {
-        assertSame(outcome, oper.processResponse(CommInfrastructure.NOOP, outcome, responseText, stdResponse));
+        assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
     }
 
@@ -308,7 +286,7 @@ public class TopicPairOperationTest {
     public void testProcessResponseDecodeExcept() throws CoderException {
         // @formatter:off
         assertThatIllegalArgumentException().isThrownBy(
-            () -> oper.processResponse(CommInfrastructure.NOOP, outcome, "{invalid json", stdResponse));
+            () -> oper.processResponse(outcome, "{invalid json", stdResponse));
         // @formatter:on
     }
 
@@ -317,88 +295,6 @@ public class TopicPairOperationTest {
         assertThatCode(() -> oper.postProcessResponse(outcome, null, null)).doesNotThrowAnyException();
     }
 
-    @Test
-    public void testLogTopicRequest() {
-        // nothing to log
-        appender.clearExtractions();
-        oper.logTopicRequest(Arrays.asList(), new MyRequest());
-        assertEquals(0, appender.getExtracted().size());
-
-        // log structured data
-        appender.clearExtractions();
-        oper.logTopicRequest(INFRA_LIST, new MyRequest());
-        List<String> output = appender.getExtracted();
-        assertEquals(2, output.size());
-
-        assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString())
-                        .contains("{\n  \"theRequestId\": \"my-request-id\"\n}");
-
-        assertThat(output.get(1)).contains(CommInfrastructure.UEB.toString())
-                        .contains("{\n  \"theRequestId\": \"my-request-id\"\n}");
-
-        // log a plain string
-        appender.clearExtractions();
-        new MyStringOperation().logTopicRequest(Arrays.asList(CommInfrastructure.NOOP), TEXT);
-        output = appender.getExtracted();
-        assertEquals(1, output.size());
-        assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains(TEXT);
-
-        // log a null request
-        appender.clearExtractions();
-        oper.logTopicRequest(Arrays.asList(CommInfrastructure.NOOP), null);
-        output = appender.getExtracted();
-        assertEquals(1, output.size());
-
-        assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains("null");
-
-        // exception from coder
-        setOperCoderException();
-
-        appender.clearExtractions();
-        oper.logTopicRequest(Arrays.asList(CommInfrastructure.NOOP), new MyRequest());
-        output = appender.getExtracted();
-        assertEquals(2, output.size());
-        assertThat(output.get(0)).contains("cannot pretty-print request");
-        assertThat(output.get(1)).contains(CommInfrastructure.NOOP.toString());
-    }
-
-    @Test
-    public void testLogTopicResponse() {
-        // log structured data
-        appender.clearExtractions();
-        oper.logTopicResponse(CommInfrastructure.NOOP, new MyResponse());
-        List<String> output = appender.getExtracted();
-        assertEquals(1, output.size());
-
-        assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString())
-                        .contains("{\n  \"requestId\": \"my-request-id\"\n}");
-
-        // log a plain string
-        appender.clearExtractions();
-        new MyStringOperation().logTopicResponse(CommInfrastructure.NOOP, TEXT);
-        output = appender.getExtracted();
-        assertEquals(1, output.size());
-        assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains(TEXT);
-
-        // log a null response
-        appender.clearExtractions();
-        oper.logTopicResponse(CommInfrastructure.NOOP, null);
-        output = appender.getExtracted();
-        assertEquals(1, output.size());
-
-        assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains("null");
-
-        // exception from coder
-        setOperCoderException();
-
-        appender.clearExtractions();
-        oper.logTopicResponse(CommInfrastructure.NOOP, new MyResponse());
-        output = appender.getExtracted();
-        assertEquals(2, output.size());
-        assertThat(output.get(0)).contains("cannot pretty-print response");
-        assertThat(output.get(1)).contains(CommInfrastructure.NOOP.toString());
-    }
-
     @Test
     public void testMakeCoder() {
         assertNotNull(oper.makeCoder());
@@ -436,9 +332,9 @@ public class TopicPairOperationTest {
     }
 
 
-    private class MyStringOperation extends TopicPairOperation<String, String> {
+    private class MyStringOperation extends BidirectionalTopicOperation<String, String> {
         public MyStringOperation() {
-            super(TopicPairOperationTest.this.params, operator, String.class);
+            super(BidirectionalTopicOperationTest.this.params, operator, String.class);
         }
 
         @Override
@@ -452,15 +348,15 @@ public class TopicPairOperationTest {
         }
 
         @Override
-        protected boolean isSuccess(String rawResponse, String response) {
-            return (response != null);
+        protected Status detmStatus(String rawResponse, String response) {
+            return (response != null ? Status.SUCCESS : Status.FAILURE);
         }
     }
 
 
-    private class MyScoOperation extends TopicPairOperation<MyRequest, StandardCoderObject> {
+    private class MyScoOperation extends BidirectionalTopicOperation<MyRequest, StandardCoderObject> {
         public MyScoOperation() {
-            super(TopicPairOperationTest.this.params, operator, StandardCoderObject.class);
+            super(BidirectionalTopicOperationTest.this.params, operator, StandardCoderObject.class);
         }
 
         @Override
@@ -474,15 +370,15 @@ public class TopicPairOperationTest {
         }
 
         @Override
-        protected boolean isSuccess(String rawResponse, StandardCoderObject response) {
-            return (response.getString("output") == null);
+        protected Status detmStatus(String rawResponse, StandardCoderObject response) {
+            return (response.getString("output") == null ? Status.SUCCESS : Status.FAILURE);
         }
     }
 
 
-    private class MyOperation extends TopicPairOperation<MyRequest, MyResponse> {
+    private class MyOperation extends BidirectionalTopicOperation<MyRequest, MyResponse> {
         public MyOperation() {
-            super(TopicPairOperationTest.this.params, operator, MyResponse.class);
+            super(BidirectionalTopicOperationTest.this.params, operator, MyResponse.class);
         }
 
         @Override
@@ -496,8 +392,12 @@ public class TopicPairOperationTest {
         }
 
         @Override
-        protected boolean isSuccess(String rawResponse, MyResponse response) {
-            return (response.getOutput() == null);
+        protected Status detmStatus(String rawResponse, MyResponse response) {
+            if (--ntimes <= 0) {
+                return (response.getOutput() == null ? Status.SUCCESS : Status.FAILURE);
+            }
+
+            return Status.STILL_WAITING;
         }
     }
 }
@@ -34,32 +34,32 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.onap.policy.controlloop.actorserviceprovider.Operation;
 import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicManager;
 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
 import org.onap.policy.controlloop.actorserviceprovider.topic.SelectorKey;
-import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
-import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager;
 
-public class TopicPairOperatorTest {
+public class BidirectionalTopicOperatorTest {
     private static final String ACTOR = "my-actor";
     private static final String OPERATION = "my-operation";
     private static final String MY_SOURCE = "my-source";
-    private static final String MY_TARGET = "my-target";
+    private static final String MY_SINK = "my-target";
     private static final int TIMEOUT_SEC = 10;
 
     @Mock
-    private TopicPairManager mgr;
+    private BidirectionalTopicManager mgr;
     @Mock
-    private TopicPair pair;
+    private BidirectionalTopicHandler handler;
     @Mock
     private Forwarder forwarder;
     @Mock
-    private TopicPairOperation<String, Integer> operation;
+    private BidirectionalTopicOperation<String, Integer> operation;
 
     private List<SelectorKey> keys;
-    private TopicPairParams params;
+    private BidirectionalTopicParams params;
     private MyOperator oper;
 
     /**
@@ -71,22 +71,23 @@ public class TopicPairOperatorTest {
 
         keys = List.of(new SelectorKey(""));
 
-        when(mgr.getTopicPair(MY_SOURCE, MY_TARGET)).thenReturn(pair);
-        when(pair.addForwarder(keys)).thenReturn(forwarder);
+        when(mgr.getTopicHandler(MY_SINK, MY_SOURCE)).thenReturn(handler);
+        when(handler.addForwarder(keys)).thenReturn(forwarder);
 
         oper = new MyOperator(keys);
 
-        params = TopicPairParams.builder().source(MY_SOURCE).target(MY_TARGET).timeoutSec(TIMEOUT_SEC).build();
+        params = BidirectionalTopicParams.builder().sourceTopic(MY_SOURCE).sinkTopic(MY_SINK).timeoutSec(TIMEOUT_SEC)
+                        .build();
         oper.configure(Util.translateToMap(OPERATION, params));
         oper.start();
     }
 
     @Test
-    public void testTopicPairOperator_testGetParams_testGetTopicPair_testGetForwarder() {
+    public void testConstructor_testGetParams_testGetTopicHandler_testGetForwarder() {
         assertEquals(ACTOR, oper.getActorName());
         assertEquals(OPERATION, oper.getName());
         assertEquals(params, oper.getParams());
-        assertSame(pair, oper.getTopicPair());
+        assertSame(handler, oper.getTopicHandler());
         assertSame(forwarder, oper.getForwarder());
     }
 
@@ -95,7 +96,7 @@ public class TopicPairOperatorTest {
         oper.stop();
 
         // invalid parameters
-        params.setSource(null);
+        params.setSourceTopic(null);
         assertThatThrownBy(() -> oper.configure(Util.translateToMap(OPERATION, params)))
                         .isInstanceOf(ParameterValidationRuntimeException.class);
     }
@@ -103,18 +104,20 @@ public class TopicPairOperatorTest {
     @Test
     public void testMakeOperator() {
         AtomicReference<ControlLoopOperationParams> paramsRef = new AtomicReference<>();
-        AtomicReference<TopicPairOperator> operRef = new AtomicReference<>();
+        AtomicReference<BidirectionalTopicOperator> operRef = new AtomicReference<>();
 
         // @formatter:off
-        BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<String, Integer>> maker =
-            (params, operator) -> {
-                paramsRef.set(params);
-                operRef.set(operator);
-                return operation;
-            };
+        BiFunction<ControlLoopOperationParams, BidirectionalTopicOperator,
+                    BidirectionalTopicOperation<String, Integer>> maker =
+                        (params, operator) -> {
+                            paramsRef.set(params);
+                            operRef.set(operator);
+                            return operation;
+                        };
         // @formatter:on
 
-        TopicPairOperator oper2 = TopicPairOperator.makeOperator(ACTOR, OPERATION, mgr, maker, new SelectorKey(""));
+        BidirectionalTopicOperator oper2 =
+                        BidirectionalTopicOperator.makeOperator(ACTOR, OPERATION, mgr, maker, new SelectorKey(""));
 
         assertEquals(ACTOR, oper2.getActorName());
         assertEquals(OPERATION, oper2.getName());
@@ -127,7 +130,7 @@ public class TopicPairOperatorTest {
     }
 
 
-    private class MyOperator extends TopicPairOperator {
+    private class MyOperator extends BidirectionalTopicOperator {
         public MyOperator(List<SelectorKey> selectorKeys) {
             super(ACTOR, OPERATION, mgr, selectorKeys);
         }
index 8ce3b32..80b1d42 100644 (file)
@@ -52,7 +52,12 @@ public class HttpActorTest {
         HttpActorParams params = new HttpActorParams();
         params.setClientName(CLIENT);
         params.setTimeoutSec(TIMEOUT);
-        params.setPath(Map.of("operA", "urlA", "operB", "urlB"));
+
+        // @formatter:off
+        params.setOperation(Map.of(
+                        "operA", Map.of("path", "urlA"),
+                        "operB", Map.of("path", "urlB")));
+        // @formatter:on
 
         final HttpActor prov = new HttpActor(ACTOR);
         Function<String, Map<String, Object>> maker =
@@ -68,7 +73,7 @@ public class HttpActorTest {
                         new TreeMap<>(maker.apply("operB")).toString());
 
         // with invalid actor parameters
-        params.setClientName(null);
+        params.setOperation(null);
         assertThatThrownBy(() -> prov.makeOperatorParameters(Util.translateToMap(prov.getName(), params)))
                         .isInstanceOf(ParameterValidationRuntimeException.class);
     }
index 39d6fd4..50cb8fa 100644 (file)
@@ -32,9 +32,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
-import ch.qos.logback.classic.Logger;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
@@ -65,6 +63,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder;
 import org.onap.policy.common.endpoints.http.client.HttpClient;
@@ -72,12 +71,10 @@ import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
 import org.onap.policy.common.endpoints.http.server.HttpServletServer;
 import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
 import org.onap.policy.common.gson.GsonMessageBodyHandler;
-import org.onap.policy.common.utils.coder.Coder;
 import org.onap.policy.common.utils.coder.CoderException;
-import org.onap.policy.common.utils.coder.StandardCoder;
 import org.onap.policy.common.utils.network.NetworkUtil;
-import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
 import org.onap.policy.controlloop.VirtualControlLoopEvent;
 import org.onap.policy.controlloop.actorserviceprovider.Operation;
 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
@@ -86,7 +83,6 @@ import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopE
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams;
 import org.onap.policy.controlloop.policy.PolicyResult;
-import org.slf4j.LoggerFactory;
 
 public class HttpOperationTest {
 
@@ -96,18 +92,11 @@ public class HttpOperationTest {
     private static final String HTTP_CLIENT = "my-client";
     private static final String HTTP_NO_SERVER = "my-http-no-server-client";
     private static final String MEDIA_TYPE_APPLICATION_JSON = "application/json";
-    private static final String MY_REQUEST = "my-request";
     private static final String BASE_URI = "oper";
     private static final String PATH = "/my-path";
     private static final String TEXT = "my-text";
     private static final UUID REQ_ID = UUID.randomUUID();
 
-    /**
-     * Used to attach an appender to the class' logger.
-     */
-    private static final Logger logger = (Logger) LoggerFactory.getLogger(HttpOperation.class);
-    private static final ExtractAppender appender = new ExtractAppender();
-
     /**
      * {@code True} if the server should reject the request, {@code false} otherwise.
      */
@@ -164,14 +153,6 @@ public class HttpOperationTest {
 
         HttpClientFactoryInstance.getClientFactory()
                         .build(builder.clientName(HTTP_NO_SERVER).port(NetworkUtil.allocPort()).build());
-
-        /**
-         * Attach appender to the logger.
-         */
-        appender.setContext(logger.getLoggerContext());
-        appender.start();
-
-        logger.addAppender(appender);
     }
 
     /**
@@ -179,8 +160,6 @@ public class HttpOperationTest {
      */
     @AfterClass
     public static void tearDownAfterClass() {
-        appender.stop();
-
         HttpClientFactoryInstance.getClientFactory().destroy();
         HttpServletServerFactoryInstance.getServerFactory().destroy();
     }
@@ -193,8 +172,6 @@ public class HttpOperationTest {
     public void setUp() {
         MockitoAnnotations.initMocks(this);
 
-        appender.clearExtractions();
-
         rejectRequest = false;
         nget = 0;
         npost = 0;
@@ -260,9 +237,9 @@ public class HttpOperationTest {
     @Test
     public void testDoConfigureMapOfStringObject_testGetClient_testGetPath_testGetTimeoutMs() {
 
-        // no default yet
-        assertEquals(0L, oper.getTimeoutMs(null));
-        assertEquals(0L, oper.getTimeoutMs(0));
+        // use value from operator
+        assertEquals(1000L, oper.getTimeoutMs(null));
+        assertEquals(1000L, oper.getTimeoutMs(0));
 
         // should use given value
         assertEquals(20 * 1000L, oper.getTimeoutMs(20));
@@ -441,96 +418,6 @@ public class HttpOperationTest {
         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
     }
 
-    @Test
-    public void testLogRestRequest() throws CoderException {
-        // log structured data
-        appender.clearExtractions();
-        oper.logRestRequest(PATH, new MyRequest());
-        List<String> output = appender.getExtracted();
-        assertEquals(1, output.size());
-
-        assertThat(output.get(0)).contains(PATH).contains("{\n  \"input\": \"some input\"\n}");
-
-        // log a plain string
-        appender.clearExtractions();
-        oper.logRestRequest(PATH, MY_REQUEST);
-        output = appender.getExtracted();
-        assertEquals(1, output.size());
-
-        assertThat(output.get(0)).contains(PATH).contains(MY_REQUEST);
-
-        // log a null request
-        appender.clearExtractions();
-        oper.logRestRequest(PATH, null);
-        output = appender.getExtracted();
-        assertEquals(1, output.size());
-
-        // exception from coder
-        oper = new MyGetOperation<>(String.class) {
-            @Override
-            protected Coder makeCoder() {
-                return new StandardCoder() {
-                    @Override
-                    public String encode(Object object, boolean pretty) throws CoderException {
-                        throw new CoderException(EXPECTED_EXCEPTION);
-                    }
-                };
-            }
-        };
-
-        appender.clearExtractions();
-        oper.logRestRequest(PATH, new MyRequest());
-        output = appender.getExtracted();
-        assertEquals(2, output.size());
-        assertThat(output.get(0)).contains("cannot pretty-print request");
-        assertThat(output.get(1)).contains(PATH);
-    }
-
-    @Test
-    public void testLogRestResponse() throws CoderException {
-        // log structured data
-        appender.clearExtractions();
-        oper.logRestResponse(PATH, new MyResponse());
-        List<String> output = appender.getExtracted();
-        assertEquals(1, output.size());
-
-        assertThat(output.get(0)).contains(PATH).contains("{\n  \"output\": \"some output\"\n}");
-
-        // log a plain string
-        appender.clearExtractions();
-        oper.logRestResponse(PATH, MY_REQUEST);
-        output = appender.getExtracted();
-        assertEquals(1, output.size());
-
-        // log a null response
-        appender.clearExtractions();
-        oper.logRestResponse(PATH, null);
-        output = appender.getExtracted();
-        assertEquals(1, output.size());
-
-        assertThat(output.get(0)).contains(PATH).contains("null");
-
-        // exception from coder
-        oper = new MyGetOperation<>(String.class) {
-            @Override
-            protected Coder makeCoder() {
-                return new StandardCoder() {
-                    @Override
-                    public String encode(Object object, boolean pretty) throws CoderException {
-                        throw new CoderException(EXPECTED_EXCEPTION);
-                    }
-                };
-            }
-        };
-
-        appender.clearExtractions();
-        oper.logRestResponse(PATH, new MyResponse());
-        output = appender.getExtracted();
-        assertEquals(2, output.size());
-        assertThat(output.get(0)).contains("cannot pretty-print response");
-        assertThat(output.get(1)).contains(PATH);
-    }
-
     @Test
     public void testMakeDecoder() {
         assertNotNull(oper.makeCoder());
@@ -569,7 +456,7 @@ public class HttpOperationTest {
     private void initOper(HttpOperator operator, String clientName) {
         operator.stop();
 
-        HttpParams params = HttpParams.builder().clientName(clientName).path(PATH).build();
+        HttpParams params = HttpParams.builder().clientName(clientName).path(PATH).timeoutSec(1).build();
         Map<String, Object> mapParams = Util.translateToMap(OPERATION, params);
         operator.configure(mapParams);
         operator.start();
@@ -614,7 +501,7 @@ public class HttpOperationTest {
             headers.put("Accept", MediaType.APPLICATION_JSON);
             String url = makeUrl();
 
-            logRestRequest(url, null);
+            logMessage(EventType.OUT, CommInfrastructure.REST, url, null);
 
             // @formatter:off
             return handleResponse(outcome, url,
@@ -640,7 +527,7 @@ public class HttpOperationTest {
             headers.put("Accept", MediaType.APPLICATION_JSON);
             String url = makeUrl();
 
-            logRestRequest(url, request);
+            logMessage(EventType.OUT, CommInfrastructure.REST, url, request);
 
             // @formatter:off
             return handleResponse(outcome, url,
@@ -666,7 +553,7 @@ public class HttpOperationTest {
             headers.put("Accept", MediaType.APPLICATION_JSON);
             String url = makeUrl();
 
-            logRestRequest(url, request);
+            logMessage(EventType.OUT, CommInfrastructure.REST, url, request);
 
             // @formatter:off
             return handleResponse(outcome, url,
@@ -687,7 +574,7 @@ public class HttpOperationTest {
             headers.put("Accept", MediaType.APPLICATION_JSON);
             String url = makeUrl();
 
-            logRestRequest(url, null);
+            logMessage(EventType.OUT, CommInfrastructure.REST, url, null);
 
             // @formatter:off
             return handleResponse(outcome, url,
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/MyExec.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/MyExec.java
deleted file mode 100644 (file)
index 6515eb3..0000000
+++ /dev/null
@@ -1,65 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.controlloop.actorserviceprovider.impl;
-
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.Executor;
-
-/**
- * Executor that will run tasks until the queue is empty or a maximum number of tasks have
- * been executed. Doesn't actually run anything until {@link #runAll()} is invoked.
- */
-public class MyExec implements Executor {
-
-    // TODO move this to policy-common/utils-test
-
-    private final int maxTasks;
-    private final Queue<Runnable> commands = new LinkedList<>();
-
-    public MyExec(int maxTasks) {
-        this.maxTasks = maxTasks;
-    }
-
-    public int getQueueLength() {
-        return commands.size();
-    }
-
-    @Override
-    public void execute(Runnable command) {
-        commands.add(command);
-    }
-
-    /**
-     * Runs all tasks until the queue is empty or the maximum number of tasks have been
-     * reached.
-     *
-     * @return {@code true} if the queue is empty, {@code false} if the maximum number of
-     *         tasks have been reached before the queue was completed
-     */
-    public boolean runAll() {
-        for (int count = 0; count < maxTasks && !commands.isEmpty(); ++count) {
-            commands.remove().run();
-        }
-
-        return commands.isEmpty();
-    }
-}
index f28c1f6..67ac27c 100644 (file)
@@ -20,8 +20,8 @@
 
 package org.onap.policy.controlloop.actorserviceprovider.impl;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -30,6 +30,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
+import ch.qos.logback.classic.Logger;
 import java.time.Instant;
 import java.util.Arrays;
 import java.util.LinkedList;
@@ -45,42 +46,59 @@ import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
-import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.Setter;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
+import org.onap.policy.common.utils.coder.Coder;
 import org.onap.policy.common.utils.coder.CoderException;
 import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
+import org.onap.policy.common.utils.time.PseudoExecutor;
 import org.onap.policy.controlloop.ControlLoopOperation;
 import org.onap.policy.controlloop.VirtualControlLoopEvent;
 import org.onap.policy.controlloop.actorserviceprovider.Operation;
 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
-import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
 import org.onap.policy.controlloop.policy.PolicyResult;
+import org.slf4j.LoggerFactory;
 
 public class OperationPartialTest {
-    private static final int MAX_PARALLEL_REQUESTS = 10;
+    private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
+    private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
+    private static final int MAX_REQUESTS = 100;
+    private static final int MAX_PARALLEL = 10;
     private static final String EXPECTED_EXCEPTION = "expected exception";
     private static final String ACTOR = "my-actor";
     private static final String OPERATION = "my-operation";
-    private static final String TARGET = "my-target";
+    private static final String MY_SINK = "my-sink";
+    private static final String MY_SOURCE = "my-source";
+    private static final String TEXT = "my-text";
     private static final int TIMEOUT = 1000;
     private static final UUID REQ_ID = UUID.randomUUID();
 
     private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
                     .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
 
+    /**
+     * Used to attach an appender to the class' logger.
+     */
+    private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
+    private static final ExtractAppender appender = new ExtractAppender();
+
     private VirtualControlLoopEvent event;
     private ControlLoopEventContext context;
-    private MyExec executor;
+    private PseudoExecutor executor;
     private ControlLoopOperationParams params;
 
     private MyOper oper;
@@ -95,6 +113,28 @@ public class OperationPartialTest {
 
     private OperatorPartial operator;
 
+    /**
+     * Attaches the appender to the logger.
+     */
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        /**
+         * Attach appender to the logger.
+         */
+        appender.setContext(logger.getLoggerContext());
+        appender.start();
+
+        logger.addAppender(appender);
+    }
+
+    /**
+     * Stops the appender.
+     */
+    @AfterClass
+    public static void tearDownAfterClass() {
+        appender.stop();
+    }
+
     /**
      * Initializes the fields, including {@link #oper}.
      */
@@ -104,11 +144,11 @@ public class OperationPartialTest {
         event.setRequestId(REQ_ID);
 
         context = new ControlLoopEventContext(event);
-        executor = new MyExec(100 * MAX_PARALLEL_REQUESTS);
+        executor = new PseudoExecutor();
 
         params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
                         .executor(executor).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
-                        .startCallback(this::starter).targetEntity(TARGET).build();
+                        .startCallback(this::starter).targetEntity(MY_SINK).build();
 
         operator = new OperatorPartial(ACTOR, OPERATION) {
             @Override
@@ -209,19 +249,19 @@ public class OperationPartialTest {
      */
     @Test
     public void testStartMultiple() {
-        for (int count = 0; count < MAX_PARALLEL_REQUESTS; ++count) {
+        for (int count = 0; count < MAX_PARALLEL; ++count) {
             oper.start();
         }
 
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
 
         assertNotNull(opstart);
         assertNotNull(opend);
         assertEquals(PolicyResult.SUCCESS, opend.getResult());
 
-        assertEquals(MAX_PARALLEL_REQUESTS, numStart);
-        assertEquals(MAX_PARALLEL_REQUESTS, oper.getCount());
-        assertEquals(MAX_PARALLEL_REQUESTS, numEnd);
+        assertEquals(MAX_PARALLEL, numStart);
+        assertEquals(MAX_PARALLEL, oper.getCount());
+        assertEquals(MAX_PARALLEL, numEnd);
     }
 
     /**
@@ -254,7 +294,7 @@ public class OperationPartialTest {
         oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
 
         oper.start().cancel(false);
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
 
         assertNull(opstart);
         assertNull(opend);
@@ -295,7 +335,7 @@ public class OperationPartialTest {
     @Test
     public void testStartOperationAsync() {
         oper.start();
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
 
         assertEquals(1, oper.getCount());
     }
@@ -330,14 +370,14 @@ public class OperationPartialTest {
         outcome.setResult(PolicyResult.FAILURE);
 
         // incorrect actor
-        outcome.setActor(TARGET);
+        outcome.setActor(MY_SINK);
         assertFalse(oper.isActorFailed(outcome));
         outcome.setActor(null);
         assertFalse(oper.isActorFailed(outcome));
         outcome.setActor(ACTOR);
 
         // incorrect operation
-        outcome.setOperation(TARGET);
+        outcome.setOperation(MY_SINK);
         assertFalse(oper.isActorFailed(outcome));
         outcome.setOperation(null);
         assertFalse(oper.isActorFailed(outcome));
@@ -355,7 +395,7 @@ public class OperationPartialTest {
         OperationPartial oper2 = new OperationPartial(params, operator) {};
 
         oper2.start();
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
 
         assertNotNull(opend);
         assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
@@ -519,14 +559,14 @@ public class OperationPartialTest {
         // wrong actor - should be false
         outcome.setActor(null);
         assertFalse(oper.isSameOperation(outcome));
-        outcome.setActor(TARGET);
+        outcome.setActor(MY_SINK);
         assertFalse(oper.isSameOperation(outcome));
         outcome.setActor(ACTOR);
 
         // wrong operation - should be null
         outcome.setOperation(null);
         assertFalse(oper.isSameOperation(outcome));
-        outcome.setOperation(TARGET);
+        outcome.setOperation(MY_SINK);
         assertFalse(oper.isSameOperation(outcome));
         outcome.setOperation(OPERATION);
 
@@ -584,43 +624,47 @@ public class OperationPartialTest {
     @Test
     public void testAnyOf() throws Exception {
         // first task completes, others do not
-        List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
         final OperationOutcome outcome = params.makeOutcome();
 
-        tasks.add(CompletableFuture.completedFuture(outcome));
-        tasks.add(new CompletableFuture<>());
-        tasks.add(new CompletableFuture<>());
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
+        tasks.add(() -> new CompletableFuture<>());
+        tasks.add(() -> null);
+        tasks.add(() -> new CompletableFuture<>());
 
         CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertTrue(result.isDone());
+        assertSame(outcome, result.get());
 
+        // repeat using array form
+        @SuppressWarnings("unchecked")
+        Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
+        result = oper.anyOf(tasks.toArray(taskArray));
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(result.isDone());
         assertSame(outcome, result.get());
 
         // second task completes, others do not
-        tasks = new LinkedList<>();
-
-        tasks.add(new CompletableFuture<>());
-        tasks.add(CompletableFuture.completedFuture(outcome));
-        tasks.add(new CompletableFuture<>());
+        tasks.clear();
+        tasks.add(() -> new CompletableFuture<>());
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
+        tasks.add(() -> new CompletableFuture<>());
 
         result = oper.anyOf(tasks);
-        assertTrue(executor.runAll());
-
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(result.isDone());
         assertSame(outcome, result.get());
 
         // third task completes, others do not
-        tasks = new LinkedList<>();
-
-        tasks.add(new CompletableFuture<>());
-        tasks.add(new CompletableFuture<>());
-        tasks.add(CompletableFuture.completedFuture(outcome));
+        tasks.clear();
+        tasks.add(() -> new CompletableFuture<>());
+        tasks.add(() -> new CompletableFuture<>());
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
         result = oper.anyOf(tasks);
-        assertTrue(executor.runAll());
-
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(result.isDone());
         assertSame(outcome, result.get());
     }
@@ -631,54 +675,82 @@ public class OperationPartialTest {
     @Test
     @SuppressWarnings("unchecked")
     public void testAnyOfEdge() throws Exception {
-        List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
         // zero items: check both using a list and using an array
-        assertThatIllegalArgumentException().isThrownBy(() -> oper.anyOf(tasks));
-        assertThatIllegalArgumentException().isThrownBy(() -> oper.anyOf());
+        assertNull(oper.anyOf(tasks));
+        assertNull(oper.anyOf());
 
         // one item: : check both using a list and using an array
         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
-        tasks.add(future1);
+        tasks.add(() -> future1);
 
         assertSame(future1, oper.anyOf(tasks));
-        assertSame(future1, oper.anyOf(future1));
+        assertSame(future1, oper.anyOf(() -> future1));
     }
 
-    /**
-     * Tests both flavors of allOf(), because one invokes the other.
-     */
     @Test
-    public void testAllOf() throws Exception {
-        List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+    public void testAllOfArray() throws Exception {
+        final OperationOutcome outcome = params.makeOutcome();
 
+        CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
+        CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
+        CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
+
+        @SuppressWarnings("unchecked")
+        CompletableFuture<OperationOutcome> result =
+                        oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
+
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertFalse(result.isDone());
+        future1.complete(outcome);
+
+        // complete 3 before 2
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertFalse(result.isDone());
+        future3.complete(outcome);
+
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertFalse(result.isDone());
+        future2.complete(outcome);
+
+        // all of them are now done
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertTrue(result.isDone());
+        assertSame(outcome, result.get());
+    }
+
+    @Test
+    public void testAllOfList() throws Exception {
         final OperationOutcome outcome = params.makeOutcome();
 
         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
 
-        tasks.add(future1);
-        tasks.add(future2);
-        tasks.add(future3);
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+        tasks.add(() -> future1);
+        tasks.add(() -> future2);
+        tasks.add(() -> null);
+        tasks.add(() -> future3);
 
         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
 
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertFalse(result.isDone());
         future1.complete(outcome);
 
         // complete 3 before 2
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertFalse(result.isDone());
         future3.complete(outcome);
 
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertFalse(result.isDone());
         future2.complete(outcome);
 
         // all of them are now done
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(result.isDone());
         assertSame(outcome, result.get());
     }
@@ -689,18 +761,41 @@ public class OperationPartialTest {
     @Test
     @SuppressWarnings("unchecked")
     public void testAllOfEdge() throws Exception {
-        List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
         // zero items: check both using a list and using an array
-        assertThatIllegalArgumentException().isThrownBy(() -> oper.allOf(tasks));
-        assertThatIllegalArgumentException().isThrownBy(() -> oper.allOf());
+        assertNull(oper.allOf(tasks));
+        assertNull(oper.allOf());
 
         // one item: : check both using a list and using an array
         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
-        tasks.add(future1);
+        tasks.add(() -> future1);
 
         assertSame(future1, oper.allOf(tasks));
-        assertSame(future1, oper.allOf(future1));
+        assertSame(future1, oper.allOf(() -> future1));
+    }
+
+    @Test
+    public void testAttachFutures() throws Exception {
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+
+        // third task throws an exception during construction
+        CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
+        CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
+        CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
+        tasks.add(() -> future1);
+        tasks.add(() -> future2);
+        tasks.add(() -> {
+            throw new IllegalStateException(EXPECTED_EXCEPTION);
+        });
+        tasks.add(() -> future3);
+
+        assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
+
+        // should have canceled the first two, but not the last
+        assertTrue(future1.isCancelled());
+        assertTrue(future2.isCancelled());
+        assertFalse(future3.isCancelled());
     }
 
     @Test
@@ -714,12 +809,14 @@ public class OperationPartialTest {
         verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
         verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
 
-        // null outcome
-        final List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
-        tasks.add(CompletableFuture.completedFuture(null));
+        // null outcome - takes precedence over a success
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+        tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
+        tasks.add(() -> CompletableFuture.completedFuture(null));
+        tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
 
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(result.isDone());
         assertNull(result.get());
 
@@ -727,26 +824,85 @@ public class OperationPartialTest {
         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
 
         tasks.clear();
-        tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
-        tasks.add(CompletableFuture.failedFuture(except));
-        tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
+        tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
+        tasks.add(() -> CompletableFuture.failedFuture(except));
+        tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
         result = oper.allOf(tasks);
 
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(result.isCompletedExceptionally());
         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
     }
 
-    private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
-        List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+    /**
+     * Tests both flavors of sequence(), because one invokes the other.
+     */
+    @Test
+    public void testSequence() throws Exception {
+        final OperationOutcome outcome = params.makeOutcome();
+
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
+        tasks.add(() -> null);
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
+
+        CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertTrue(result.isDone());
+        assertSame(outcome, result.get());
+
+        // repeat using array form
+        @SuppressWarnings("unchecked")
+        Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
+        result = oper.sequence(tasks.toArray(taskArray));
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertTrue(result.isDone());
+        assertSame(outcome, result.get());
+
+        // second task fails, third should not run
+        OperationOutcome failure = params.makeOutcome();
+        failure.setResult(PolicyResult.FAILURE);
+        tasks.clear();
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
+        tasks.add(() -> CompletableFuture.completedFuture(failure));
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
+
+        result = oper.sequence(tasks);
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertTrue(result.isDone());
+        assertSame(failure, result.get());
+    }
+
+    /**
+     * Tests both flavors of sequence(), for edge cases: zero items, and one item.
+     */
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testSequenceEdge() throws Exception {
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+
+        // zero items: check both using a list and using an array
+        assertNull(oper.sequence(tasks));
+        assertNull(oper.sequence());
 
+        // one item: : check both using a list and using an array
+        CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
+        tasks.add(() -> future1);
+
+        assertSame(future1, oper.sequence(tasks));
+        assertSame(future1, oper.sequence(() -> future1));
+    }
+
+    private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
         OperationOutcome expectedOutcome = null;
 
         for (int count = 0; count < results.length; ++count) {
             OperationOutcome outcome = params.makeOutcome();
             outcome.setResult(results[count]);
-            tasks.add(CompletableFuture.completedFuture(outcome));
+            tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
             if (count == expected) {
                 expectedOutcome = outcome;
@@ -755,17 +911,11 @@ public class OperationPartialTest {
 
         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
 
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(result.isDone());
         assertSame(expectedOutcome, result.get());
     }
 
-    private Function<OperationOutcome, CompletableFuture<OperationOutcome>> makeTask(
-                    final OperationOutcome taskOutcome) {
-
-        return outcome -> CompletableFuture.completedFuture(taskOutcome);
-    }
-
     @Test
     public void testDetmPriority() throws CoderException {
         assertEquals(1, oper.detmPriority(null));
@@ -789,210 +939,6 @@ public class OperationPartialTest {
         assertEquals(1, oper.detmPriority(outcome));
     }
 
-    /**
-     * Tests doTask(Future) when the controller is not running.
-     */
-    @Test
-    public void testDoTaskFutureNotRunning() throws Exception {
-        CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-        controller.complete(params.makeOutcome());
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, params.makeOutcome(), taskFuture);
-        assertFalse(future.isDone());
-        assertTrue(executor.runAll());
-
-        // should not have run the task
-        assertFalse(future.isDone());
-
-        // should have canceled the task future
-        assertTrue(taskFuture.isCancelled());
-    }
-
-    /**
-     * Tests doTask(Future) when the previous outcome was successful.
-     */
-    @Test
-    public void testDoTaskFutureSuccess() throws Exception {
-        CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
-        final OperationOutcome taskOutcome = params.makeOutcome();
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, params.makeOutcome(), taskFuture);
-
-        taskFuture.complete(taskOutcome);
-        assertTrue(executor.runAll());
-
-        assertTrue(future.isDone());
-        assertSame(taskOutcome, future.get());
-
-        // controller should not be done yet
-        assertFalse(controller.isDone());
-    }
-
-    /**
-     * Tests doTask(Future) when the previous outcome was failed.
-     */
-    @Test
-    public void testDoTaskFutureFailure() throws Exception {
-        CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
-        final OperationOutcome failedOutcome = params.makeOutcome();
-        failedOutcome.setResult(PolicyResult.FAILURE);
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, failedOutcome, taskFuture);
-        assertFalse(future.isDone());
-        assertTrue(executor.runAll());
-
-        // should not have run the task
-        assertFalse(future.isDone());
-
-        // should have canceled the task future
-        assertTrue(taskFuture.isCancelled());
-
-        // controller SHOULD be done now
-        assertTrue(controller.isDone());
-        assertSame(failedOutcome, controller.get());
-    }
-
-    /**
-     * Tests doTask(Future) when the previous outcome was failed, but not checking
-     * success.
-     */
-    @Test
-    public void testDoTaskFutureUncheckedFailure() throws Exception {
-        CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
-        final OperationOutcome failedOutcome = params.makeOutcome();
-        failedOutcome.setResult(PolicyResult.FAILURE);
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, failedOutcome, taskFuture);
-        assertFalse(future.isDone());
-
-        // complete the task
-        OperationOutcome taskOutcome = params.makeOutcome();
-        taskFuture.complete(taskOutcome);
-
-        assertTrue(executor.runAll());
-
-        // should have run the task
-        assertTrue(future.isDone());
-
-        assertTrue(future.isDone());
-        assertSame(taskOutcome, future.get());
-
-        // controller should not be done yet
-        assertFalse(controller.isDone());
-    }
-
-    /**
-     * Tests doTask(Function) when the controller is not running.
-     */
-    @Test
-    public void testDoTaskFunctionNotRunning() throws Exception {
-        AtomicBoolean invoked = new AtomicBoolean();
-
-        Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
-            invoked.set(true);
-            return CompletableFuture.completedFuture(params.makeOutcome());
-        };
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-        controller.complete(params.makeOutcome());
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, task).apply(params.makeOutcome());
-        assertFalse(future.isDone());
-        assertTrue(executor.runAll());
-
-        // should not have run the task
-        assertFalse(future.isDone());
-
-        // should not have even invoked the task
-        assertFalse(invoked.get());
-    }
-
-    /**
-     * Tests doTask(Function) when the previous outcome was successful.
-     */
-    @Test
-    public void testDoTaskFunctionSuccess() throws Exception {
-        final OperationOutcome taskOutcome = params.makeOutcome();
-
-        final OperationOutcome failedOutcome = params.makeOutcome();
-
-        Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, task).apply(failedOutcome);
-
-        assertTrue(future.isDone());
-        assertSame(taskOutcome, future.get());
-
-        // controller should not be done yet
-        assertFalse(controller.isDone());
-    }
-
-    /**
-     * Tests doTask(Function) when the previous outcome was failed.
-     */
-    @Test
-    public void testDoTaskFunctionFailure() throws Exception {
-        final OperationOutcome failedOutcome = params.makeOutcome();
-        failedOutcome.setResult(PolicyResult.FAILURE);
-
-        AtomicBoolean invoked = new AtomicBoolean();
-
-        Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
-            invoked.set(true);
-            return CompletableFuture.completedFuture(params.makeOutcome());
-        };
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, task).apply(failedOutcome);
-        assertFalse(future.isDone());
-        assertTrue(executor.runAll());
-
-        // should not have run the task
-        assertFalse(future.isDone());
-
-        // should not have even invoked the task
-        assertFalse(invoked.get());
-
-        // controller should have the failed task
-        assertTrue(controller.isDone());
-        assertSame(failedOutcome, controller.get());
-    }
-
-    /**
-     * Tests doTask(Function) when the previous outcome was failed, but not checking
-     * success.
-     */
-    @Test
-    public void testDoTaskFunctionUncheckedFailure() throws Exception {
-        final OperationOutcome taskOutcome = params.makeOutcome();
-
-        final OperationOutcome failedOutcome = params.makeOutcome();
-        failedOutcome.setResult(PolicyResult.FAILURE);
-
-        Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, task).apply(failedOutcome);
-
-        assertTrue(future.isDone());
-        assertSame(taskOutcome, future.get());
-
-        // controller should not be done yet
-        assertFalse(controller.isDone());
-    }
-
     /**
      * Tests callbackStarted() when the pipeline has already been stopped.
      */
@@ -1013,7 +959,7 @@ public class OperationPartialTest {
         oper = new MyOper();
 
         future.set(oper.start());
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
 
         // should have only run once
         assertEquals(1, numStart);
@@ -1035,7 +981,7 @@ public class OperationPartialTest {
         oper = new MyOper();
 
         future.set(oper.start());
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
 
         // should not have been set
         assertNull(opend);
@@ -1090,6 +1036,62 @@ public class OperationPartialTest {
         assertTrue(oper.isTimeout(new CompletionException(timex)));
     }
 
+    @Test
+    public void testLogMessage() {
+        final String infraStr = SINK_INFRA.toString();
+
+        // log structured data
+        appender.clearExtractions();
+        oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
+        List<String> output = appender.getExtracted();
+        assertEquals(1, output.size());
+
+        assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
+                        .contains("{\n  \"text\": \"my-text\"\n}");
+
+        // repeat with a response
+        appender.clearExtractions();
+        oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
+        output = appender.getExtracted();
+        assertEquals(1, output.size());
+
+        assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
+                        .contains("{\n  \"text\": \"my-text\"\n}");
+
+        // log a plain string
+        appender.clearExtractions();
+        oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
+        output = appender.getExtracted();
+        assertEquals(1, output.size());
+        assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
+
+        // log a null request
+        appender.clearExtractions();
+        oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
+        output = appender.getExtracted();
+        assertEquals(1, output.size());
+
+        assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
+
+        // generate exception from coder
+        setOperCoderException();
+
+        appender.clearExtractions();
+        oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
+        output = appender.getExtracted();
+        assertEquals(2, output.size());
+        assertThat(output.get(0)).contains("cannot pretty-print request");
+        assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
+
+        // repeat with a response
+        appender.clearExtractions();
+        oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
+        output = appender.getExtracted();
+        assertEquals(2, output.size());
+        assertThat(output.get(0)).contains("cannot pretty-print response");
+        assertThat(output.get(1)).contains(MY_SOURCE);
+    }
+
     @Test
     public void testGetRetry() {
         assertEquals(0, oper.getRetry(null));
@@ -1187,7 +1189,7 @@ public class OperationPartialTest {
 
         manipulator.accept(future);
 
-        assertTrue(testName, executor.runAll());
+        assertTrue(testName, executor.runAll(MAX_REQUESTS));
 
         assertEquals(testName, expectedCallbacks, numStart);
         assertEquals(testName, expectedCallbacks, numEnd);
@@ -1216,6 +1218,30 @@ public class OperationPartialTest {
         assertEquals(testName, expectedOperations, oper.getCount());
     }
 
+    /**
+     * Creates a new {@link #oper} whose coder will throw an exception.
+     */
+    private void setOperCoderException() {
+        oper = new MyOper() {
+            @Override
+            protected Coder makeCoder() {
+                return new StandardCoder() {
+                    @Override
+                    public String encode(Object object, boolean pretty) throws CoderException {
+                        throw new CoderException(EXPECTED_EXCEPTION);
+                    }
+                };
+            }
+        };
+    }
+
+
+    @Getter
+    public static class MyData {
+        private String text = TEXT;
+    }
+
+
     private class MyOper extends OperationPartial {
         @Getter
         private int count = 0;
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParamsTest.java
new file mode 100644 (file)
index 0000000..1f38ad3
--- /dev/null
@@ -0,0 +1,118 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Consumer;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+
+public class BidirectionalTopicActorParamsTest {
+    private static final String CONTAINER = "my-container";
+
+    private static final String DFLT_SOURCE = "default-source";
+    private static final String DFLT_SINK = "default-target";
+    private static final int DFLT_TIMEOUT = 10;
+
+    private static final String OPER1_NAME = "oper A";
+    private static final String OPER1_SOURCE = "source A";
+    private static final String OPER1_SINK = "target A";
+    private static final int OPER1_TIMEOUT = 20;
+
+    // oper2 uses some default values
+    private static final String OPER2_NAME = "oper B";
+    private static final String OPER2_SOURCE = "source B";
+
+    // oper3 uses default values for everything
+    private static final String OPER3_NAME = "oper C";
+
+    private Map<String, Map<String, Object>> operMap;
+    private BidirectionalTopicActorParams params;
+
+
+    /**
+     * Sets up.
+     */
+    @Before
+    public void setUp() {
+        BidirectionalTopicParams oper1 = BidirectionalTopicParams.builder().sourceTopic(OPER1_SOURCE)
+                        .sinkTopic(OPER1_SINK).timeoutSec(OPER1_TIMEOUT).build();
+
+        Map<String, Object> oper1Map = Util.translateToMap(OPER1_NAME, oper1);
+        Map<String, Object> oper2Map = Map.of("source", OPER2_SOURCE);
+        Map<String, Object> oper3Map = Collections.emptyMap();
+        operMap = Map.of(OPER1_NAME, oper1Map, OPER2_NAME, oper2Map, OPER3_NAME, oper3Map);
+
+        params = makeBidirectionalTopicActorParams();
+    }
+
+    @Test
+    public void testValidate() {
+        assertTrue(params.validate(CONTAINER).isValid());
+
+        // only a few fields are required
+        BidirectionalTopicActorParams sparse = Util.translate(CONTAINER, Map.of("operation", operMap, "timeoutSec", 1),
+                        BidirectionalTopicActorParams.class);
+        assertTrue(sparse.validate(CONTAINER).isValid());
+
+        testValidateField("operation", "null", params2 -> params2.setOperation(null));
+        testValidateField("timeoutSec", "minimum", params2 -> params2.setTimeoutSec(-1));
+
+        // check edge cases
+        params.setTimeoutSec(0);
+        assertFalse(params.validate(CONTAINER).isValid());
+
+        params.setTimeoutSec(1);
+        assertTrue(params.validate(CONTAINER).isValid());
+    }
+
+    private void testValidateField(String fieldName, String expected,
+                    Consumer<BidirectionalTopicActorParams> makeInvalid) {
+
+        // original params should be valid
+        ValidationResult result = params.validate(CONTAINER);
+        assertTrue(fieldName, result.isValid());
+
+        // make invalid params
+        BidirectionalTopicActorParams params2 = makeBidirectionalTopicActorParams();
+        makeInvalid.accept(params2);
+        result = params2.validate(CONTAINER);
+        assertFalse(fieldName, result.isValid());
+        assertThat(result.getResult()).contains(CONTAINER).contains(fieldName).contains(expected);
+    }
+
+    private BidirectionalTopicActorParams makeBidirectionalTopicActorParams() {
+        BidirectionalTopicActorParams params2 = new BidirectionalTopicActorParams();
+        params2.setSinkTopic(DFLT_SINK);
+        params2.setSourceTopic(DFLT_SOURCE);
+        params2.setTimeoutSec(DFLT_TIMEOUT);
+        params2.setOperation(operMap);
+
+        return params2;
+    }
+}
@@ -29,47 +29,46 @@ import java.util.function.Function;
 import org.junit.Before;
 import org.junit.Test;
 import org.onap.policy.common.parameters.ValidationResult;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams.TopicPairParamsBuilder;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams.BidirectionalTopicParamsBuilder;
 
-public class TopicPairParamsTest {
+public class BidirectionalTopicParamsTest {
 
     private static final String CONTAINER = "my-container";
-    private static final String TARGET = "my-target";
+    private static final String SINK = "my-sink";
     private static final String SOURCE = "my-source";
     private static final int TIMEOUT = 10;
 
-    private TopicPairParams params;
+    private BidirectionalTopicParams params;
 
     @Before
     public void setUp() {
-        params = TopicPairParams.builder().target(TARGET).source(SOURCE).timeoutSec(TIMEOUT).build();
+        params = BidirectionalTopicParams.builder().sinkTopic(SINK).sourceTopic(SOURCE).timeoutSec(TIMEOUT).build();
     }
 
     @Test
     public void testValidate() {
-        testValidateField("target", "null", bldr -> bldr.target(null));
-        testValidateField("source", "null", bldr -> bldr.source(null));
+        assertTrue(params.validate(CONTAINER).isValid());
+
+        testValidateField("sink", "null", bldr -> bldr.sinkTopic(null));
+        testValidateField("source", "null", bldr -> bldr.sourceTopic(null));
         testValidateField("timeoutSec", "minimum", bldr -> bldr.timeoutSec(-1));
 
         // check edge cases
         assertFalse(params.toBuilder().timeoutSec(0).build().validate(CONTAINER).isValid());
         assertTrue(params.toBuilder().timeoutSec(1).build().validate(CONTAINER).isValid());
-
-        // some default values should be valid
-        assertTrue(TopicPairParams.builder().target(TARGET).source(SOURCE).build().validate(CONTAINER).isValid());
     }
 
     @Test
     public void testBuilder_testToBuilder() {
-        assertEquals(TARGET, params.getTarget());
-        assertEquals(SOURCE, params.getSource());
+        assertEquals(SINK, params.getSinkTopic());
+        assertEquals(SOURCE, params.getSourceTopic());
         assertEquals(TIMEOUT, params.getTimeoutSec());
 
         assertEquals(params, params.toBuilder().build());
     }
 
     private void testValidateField(String fieldName, String expected,
-                    Function<TopicPairParamsBuilder, TopicPairParamsBuilder> makeInvalid) {
+                    Function<BidirectionalTopicParamsBuilder, BidirectionalTopicParamsBuilder> makeInvalid) {
 
         // original params should be valid
         ValidationResult result = params.validate(CONTAINER);
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/CommonActorParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/CommonActorParamsTest.java
new file mode 100644 (file)
index 0000000..9014203
--- /dev/null
@@ -0,0 +1,137 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import lombok.Setter;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+
+public class CommonActorParamsTest {
+
+    private static final String CONTAINER = "my-container";
+
+    private static final String PATH1 = "path #1";
+    private static final String PATH2 = "path #2";
+    private static final String URI1 = "uri #1";
+    private static final String URI2 = "uri #2";
+    private static final String TEXT1 = "hello";
+    private static final String TEXT2 = "world";
+    private static final String TEXT2B = "bye";
+
+    private Map<String, Map<String, Object>> operations;
+    private CommonActorParams params;
+
+    /**
+     * Initializes {@link #operations} with two items and {@link params} with a fully
+     * populated object.
+     */
+    @Before
+    public void setUp() {
+        operations = new TreeMap<>();
+        operations.put(PATH1, Map.of("path", URI1));
+        operations.put(PATH2, Map.of("path", URI2, "text2", TEXT2B));
+
+        params = makeCommonActorParams();
+    }
+
+    @Test
+    public void testMakeOperationParameters() {
+        Function<String, Map<String, Object>> maker = params.makeOperationParameters(CONTAINER);
+        assertNull(maker.apply("unknown-operation"));
+
+        Map<String, Object> subparam = maker.apply(PATH1);
+        assertNotNull(subparam);
+        assertEquals("{path=uri #1, text1=hello, text2=world}", new TreeMap<>(subparam).toString());
+
+        subparam = maker.apply(PATH2);
+        assertNotNull(subparam);
+        assertEquals("{path=uri #2, text1=hello, text2=bye}", new TreeMap<>(subparam).toString());
+    }
+
+    @Test
+    public void testDoValidation() {
+        assertThatCode(() -> params.doValidation(CONTAINER)).doesNotThrowAnyException();
+
+        // invalid param
+        params.setOperation(null);
+        assertThatThrownBy(() -> params.doValidation(CONTAINER))
+                        .isInstanceOf(ParameterValidationRuntimeException.class);
+    }
+
+    @Test
+    public void testValidate() {
+        assertTrue(params.validate(CONTAINER).isValid());
+
+        // only a few fields are required
+        CommonActorParams sparse = Util.translate(CONTAINER, Map.of("operation", operations, "timeoutSec", 1),
+                        CommonActorParams.class);
+        assertTrue(sparse.validate(CONTAINER).isValid());
+
+        testValidateField("operation", "null", params2 -> params2.setOperation(null));
+    }
+
+    private void testValidateField(String fieldName, String expected, Consumer<CommonActorParams> makeInvalid) {
+
+        // original params should be valid
+        ValidationResult result = params.validate(CONTAINER);
+        assertTrue(fieldName, result.isValid());
+
+        // make invalid params
+        CommonActorParams params2 = makeCommonActorParams();
+        makeInvalid.accept(params2);
+        result = params2.validate(CONTAINER);
+        assertFalse(fieldName, result.isValid());
+        assertThat(result.getResult()).contains(CONTAINER).contains(fieldName).contains(expected);
+    }
+
+    private CommonActorParams makeCommonActorParams() {
+        MyParams params2 = new MyParams();
+        params2.setOperation(operations);
+        params2.setText1(TEXT1);
+        params2.setText2(TEXT2);
+
+        return params2;
+    }
+
+    @Setter
+    public static class MyParams extends CommonActorParams {
+        @SuppressWarnings("unused")
+        private String text1;
+
+        @SuppressWarnings("unused")
+        private String text2;
+    }
+}
index daa0aff..9e70853 100644 (file)
 package org.onap.policy.controlloop.actorserviceprovider.parameters;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.function.Consumer;
-import java.util.function.Function;
 import org.junit.Before;
 import org.junit.Test;
 import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
 
 public class HttpActorParamsTest {
 
@@ -48,63 +43,40 @@ public class HttpActorParamsTest {
     private static final String URI1 = "uri #1";
     private static final String URI2 = "uri #2";
 
-    private Map<String, String> paths;
+    private Map<String, Map<String, Object>> operations;
     private HttpActorParams params;
 
     /**
-     * Initializes {@link #paths} with two items and {@link params} with a fully populated
-     * object.
+     * Initializes {@link #operations} with two items and {@link params} with a fully
+     * populated object.
      */
     @Before
     public void setUp() {
-        paths = new TreeMap<>();
-        paths.put(PATH1, URI1);
-        paths.put(PATH2, URI2);
+        operations = new TreeMap<>();
+        operations.put(PATH1, Map.of("path", URI1));
+        operations.put(PATH2, Map.of("path", URI2));
 
         params = makeHttpActorParams();
     }
 
-    @Test
-    public void testMakeOperationParameters() {
-        Function<String, Map<String, Object>> maker = params.makeOperationParameters(CONTAINER);
-        assertNull(maker.apply("unknown-operation"));
-
-        Map<String, Object> subparam = maker.apply(PATH1);
-        assertNotNull(subparam);
-        assertEquals("{clientName=my-client, path=uri #1, timeoutSec=10}", new TreeMap<>(subparam).toString());
-
-        subparam = maker.apply(PATH2);
-        assertNotNull(subparam);
-        assertEquals("{clientName=my-client, path=uri #2, timeoutSec=10}", new TreeMap<>(subparam).toString());
-    }
-
-    @Test
-    public void testDoValidation() {
-        assertThatCode(() -> params.doValidation(CONTAINER)).doesNotThrowAnyException();
-
-        // invalid param
-        params.setClientName(null);
-        assertThatThrownBy(() -> params.doValidation(CONTAINER))
-                        .isInstanceOf(ParameterValidationRuntimeException.class);
-    }
-
     @Test
     public void testValidate() {
         assertTrue(params.validate(CONTAINER).isValid());
 
-        testValidateField("clientName", "null", params2 -> params2.setClientName(null));
-        testValidateField("path", "null", params2 -> params2.setPath(null));
+        // only a few fields are required
+        HttpActorParams sparse = Util.translate(CONTAINER, Map.of("operation", operations, "timeoutSec", 1),
+                        HttpActorParams.class);
+        assertTrue(sparse.validate(CONTAINER).isValid());
+
+        testValidateField("operation", "null", params2 -> params2.setOperation(null));
         testValidateField("timeoutSec", "minimum", params2 -> params2.setTimeoutSec(-1));
 
         // check edge cases
         params.setTimeoutSec(0);
-        assertTrue(params.validate(CONTAINER).isValid());
+        assertFalse(params.validate(CONTAINER).isValid());
 
         params.setTimeoutSec(1);
         assertTrue(params.validate(CONTAINER).isValid());
-
-        // one path value is null
-        testValidateField(PATH2, "null", params2 -> paths.put(PATH2, null));
     }
 
     private void testValidateField(String fieldName, String expected, Consumer<HttpActorParams> makeInvalid) {
@@ -125,7 +97,7 @@ public class HttpActorParamsTest {
         HttpActorParams params2 = new HttpActorParams();
         params2.setClientName(CLIENT);
         params2.setTimeoutSec(TIMEOUT);
-        params2.setPath(paths);
+        params2.setOperation(operations);
 
         return params2;
     }
index ae4a79f..fdfb4b4 100644 (file)
@@ -54,7 +54,7 @@ public class HttpParamsTest {
         testValidateField("timeoutSec", "minimum", bldr -> bldr.timeoutSec(-1));
 
         // check edge cases
-        assertTrue(params.toBuilder().timeoutSec(0).build().validate(CONTAINER).isValid());
+        assertFalse(params.toBuilder().timeoutSec(0).build().validate(CONTAINER).isValid());
         assertTrue(params.toBuilder().timeoutSec(1).build().validate(CONTAINER).isValid());
     }
 
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParamsTest.java
deleted file mode 100644 (file)
index 4322c5f..0000000
+++ /dev/null
@@ -1,132 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.controlloop.actorserviceprovider.parameters;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.TreeMap;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.policy.common.parameters.ValidationResult;
-import org.onap.policy.controlloop.actorserviceprovider.Util;
-
-public class TopicPairActorParamsTest {
-    private static final String MY_NAME = "my-name";
-
-    private static final String DFLT_SOURCE = "default-source";
-    private static final String DFLT_TARGET = "default-target";
-    private static final int DFLT_TIMEOUT = 10;
-
-    private static final String OPER1_NAME = "oper A";
-    private static final String OPER1_SOURCE = "source A";
-    private static final String OPER1_TARGET = "target A";
-    private static final int OPER1_TIMEOUT = 20;
-
-    // oper2 uses some default values
-    private static final String OPER2_NAME = "oper B";
-    private static final String OPER2_SOURCE = "source B";
-
-    // oper3 uses default values for everything
-    private static final String OPER3_NAME = "oper C";
-
-    private TopicPairParams defaults;
-    private Map<String, Map<String, Object>> operMap;
-    private TopicPairActorParams params;
-
-
-    /**
-     * Sets up.
-     */
-    @Before
-    public void setUp() {
-        defaults = TopicPairParams.builder().source(DFLT_SOURCE).target(DFLT_TARGET).timeoutSec(DFLT_TIMEOUT).build();
-
-        TopicPairParams oper1 = TopicPairParams.builder().source(OPER1_SOURCE).target(OPER1_TARGET)
-                        .timeoutSec(OPER1_TIMEOUT).build();
-
-        Map<String, Object> oper1Map = Util.translateToMap(OPER1_NAME, oper1);
-        Map<String, Object> oper2Map = Map.of("source", OPER2_SOURCE);
-        Map<String, Object> oper3Map = Collections.emptyMap();
-        operMap = Map.of(OPER1_NAME, oper1Map, OPER2_NAME, oper2Map, OPER3_NAME, oper3Map);
-
-        params = TopicPairActorParams.builder().defaults(defaults).operation(operMap).build();
-
-    }
-
-    @Test
-    public void testTopicPairActorParams() {
-        assertSame(defaults, params.getDefaults());
-        assertSame(operMap, params.getOperation());
-    }
-
-    @Test
-    public void testDoValidation() {
-        assertSame(params, params.doValidation(MY_NAME));
-
-        // test with invalid parameters
-        defaults.setTimeoutSec(-1);
-        assertThatThrownBy(() -> params.doValidation(MY_NAME)).isInstanceOf(ParameterValidationRuntimeException.class);
-    }
-
-    @Test
-    public void testValidate() {
-        ValidationResult result;
-
-        // null defaults
-        params.setDefaults(null);
-        result = params.validate(MY_NAME);
-        assertFalse(result.isValid());
-        assertThat(result.getResult()).contains("defaults").contains("null");
-        params.setDefaults(defaults);
-
-        // invalid value in defaults
-        defaults.setTimeoutSec(-1);
-        result = params.validate(MY_NAME);
-        assertFalse(result.isValid());
-        assertThat(result.getResult()).contains("defaults").contains("timeoutSec");
-        defaults.setTimeoutSec(DFLT_TIMEOUT);
-
-        // null map
-        params.setOperation(null);
-        result = params.validate(MY_NAME);
-        assertFalse(result.isValid());
-        assertThat(result.getResult()).contains("operation");
-        params.setOperation(operMap);
-
-        // null entry in the map
-        Map<String, Map<String, Object>> map2 = new TreeMap<>(operMap);
-        map2.put(OPER2_NAME, null);
-        params.setOperation(map2);
-        result = params.validate(MY_NAME);
-        assertFalse(result.isValid());
-        assertThat(result.getResult()).contains("operation").contains("null");
-        params.setOperation(operMap);
-
-        // test success case
-        assertTrue(params.validate(MY_NAME).isValid());
-    }
-}
 
 package org.onap.policy.controlloop.actorserviceprovider.topic;
 
-import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
-import java.util.List;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -40,114 +39,101 @@ import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
 import org.onap.policy.common.endpoints.event.comm.TopicSink;
 import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException;
 
-public class TopicPairTest {
+public class BidirectionalTopicHandlerTest {
     private static final String UNKNOWN = "unknown";
-    private static final String MY_SOURCE = "pair-source";
-    private static final String MY_TARGET = "pair-target";
-    private static final String TEXT = "some text";
+    private static final String MY_SOURCE = "my-source";
+    private static final String MY_SINK = "my-sink";
+    private static final String KEY1 = "requestId";
+    private static final String KEY2 = "subRequestId";
 
     @Mock
-    private TopicSink publisher1;
+    private TopicSink publisher;
 
     @Mock
-    private TopicSink publisher2;
-
-    @Mock
-    private TopicSource subscriber1;
-
-    @Mock
-    private TopicSource subscriber2;
+    private TopicSource subscriber;
 
     @Mock
     private TopicEndpoint mgr;
 
-    private TopicPair pair;
+    private MyTopicHandler handler;
 
 
     /**
      * Sets up.
      */
     @Before
-    public void setUp() {
+    public void setUp() throws BidirectionalTopicClientException {
         MockitoAnnotations.initMocks(this);
 
-        when(mgr.getTopicSinks(MY_TARGET)).thenReturn(Arrays.asList(publisher1, publisher2));
-        when(mgr.getTopicSources(eq(Arrays.asList(MY_SOURCE)))).thenReturn(Arrays.asList(subscriber1, subscriber2));
+        when(mgr.getTopicSinks(MY_SINK)).thenReturn(Arrays.asList(publisher));
+        when(mgr.getTopicSources(eq(Arrays.asList(MY_SOURCE)))).thenReturn(Arrays.asList(subscriber));
 
-        when(publisher1.getTopicCommInfrastructure()).thenReturn(CommInfrastructure.NOOP);
-        when(publisher2.getTopicCommInfrastructure()).thenReturn(CommInfrastructure.UEB);
+        when(publisher.getTopicCommInfrastructure()).thenReturn(CommInfrastructure.NOOP);
 
-        pair = new MyTopicPair(MY_SOURCE, MY_TARGET);
+        handler = new MyTopicHandler(MY_SINK, MY_SOURCE);
 
-        pair.start();
+        handler.start();
     }
 
     @Test
-    public void testTopicPair_testGetSource_testGetTarget() {
-        assertEquals(MY_SOURCE, pair.getSource());
-        assertEquals(MY_TARGET, pair.getTarget());
+    public void testBidirectionalTopicHandler_testGetSource_testGetTarget() {
+        assertEquals(MY_SOURCE, handler.getSourceTopic());
+        assertEquals(MY_SINK, handler.getSinkTopic());
 
         verify(mgr).getTopicSinks(anyString());
         verify(mgr).getTopicSources(any());
 
         // source not found
-        assertThatIllegalArgumentException().isThrownBy(() -> new MyTopicPair(UNKNOWN, MY_TARGET))
-                        .withMessageContaining("sources").withMessageContaining(UNKNOWN);
+        assertThatThrownBy(() -> new MyTopicHandler(MY_SINK, UNKNOWN))
+                        .isInstanceOf(BidirectionalTopicClientException.class).hasMessageContaining("sources")
+                        .hasMessageContaining(UNKNOWN);
 
         // target not found
-        assertThatIllegalArgumentException().isThrownBy(() -> new MyTopicPair(MY_SOURCE, UNKNOWN))
-                        .withMessageContaining("sinks").withMessageContaining(UNKNOWN);
+        assertThatThrownBy(() -> new MyTopicHandler(UNKNOWN, MY_SOURCE))
+                        .isInstanceOf(BidirectionalTopicClientException.class).hasMessageContaining("sinks")
+                        .hasMessageContaining(UNKNOWN);
     }
 
     @Test
     public void testShutdown() {
-        pair.shutdown();
-        verify(subscriber1).unregister(pair);
-        verify(subscriber2).unregister(pair);
+        handler.shutdown();
+        verify(subscriber).unregister(any());
     }
 
     @Test
     public void testStart() {
-        verify(subscriber1).register(pair);
-        verify(subscriber2).register(pair);
+        verify(subscriber).register(any());
     }
 
     @Test
     public void testStop() {
-        pair.stop();
-        verify(subscriber1).unregister(pair);
-        verify(subscriber2).unregister(pair);
+        handler.stop();
+        verify(subscriber).unregister(any());
     }
 
     @Test
-    public void testPublish() {
-        List<CommInfrastructure> infrastructures = pair.publish(TEXT);
-        assertEquals(Arrays.asList(CommInfrastructure.NOOP, CommInfrastructure.UEB), infrastructures);
-
-        verify(publisher1).send(TEXT);
-        verify(publisher2).send(TEXT);
-
-        // first one throws an exception - should have only published to the second
-        when(publisher1.send(any())).thenThrow(new IllegalStateException("expected exception"));
-
-        infrastructures = pair.publish(TEXT);
-        assertEquals(Arrays.asList(CommInfrastructure.UEB), infrastructures);
+    public void testAddForwarder() {
+        // array form
+        Forwarder forwarder = handler.addForwarder(new SelectorKey(KEY1), new SelectorKey(KEY2));
+        assertNotNull(forwarder);
 
-        verify(publisher2, times(2)).send(TEXT);
+        // repeat using list form
+        assertSame(forwarder, handler.addForwarder(Arrays.asList(new SelectorKey(KEY1), new SelectorKey(KEY2))));
     }
 
     @Test
     public void testGetTopicEndpointManager() {
         // setting "mgr" to null should cause it to use the superclass' method
         mgr = null;
-        assertNotNull(pair.getTopicEndpointManager());
+        assertNotNull(handler.getTopicEndpointManager());
     }
 
 
-    private class MyTopicPair extends TopicPair {
-        public MyTopicPair(String source, String target) {
-            super(source, target);
+    private class MyTopicHandler extends BidirectionalTopicHandler {
+        public MyTopicHandler(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException {
+            super(sinkTopic, sourceTopic);
         }
 
         @Override
index 24f8b70..a01159b 100644 (file)
@@ -29,17 +29,15 @@ import static org.mockito.Mockito.verify;
 
 import java.util.Arrays;
 import java.util.Map;
+import java.util.function.BiConsumer;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
 import org.onap.policy.common.utils.coder.StandardCoderObject;
 import org.onap.policy.controlloop.actorserviceprovider.Util;
 
 public class ForwarderTest {
-    private static final CommInfrastructure INFRA = CommInfrastructure.NOOP;
     private static final String TEXT = "some text";
 
     private static final String KEY1 = "requestId";
@@ -58,16 +56,16 @@ public class ForwarderTest {
     private static final String VALUEC_SUBREQID = "bye-bye";
 
     @Mock
-    private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener1;
+    private BiConsumer<String, StandardCoderObject> listener1;
 
     @Mock
-    private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener1b;
+    private BiConsumer<String, StandardCoderObject> listener1b;
 
     @Mock
-    private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener2;
+    private BiConsumer<String, StandardCoderObject> listener2;
 
     @Mock
-    private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener3;
+    private BiConsumer<String, StandardCoderObject> listener3;
 
     private Forwarder forwarder;
 
@@ -102,68 +100,68 @@ public class ForwarderTest {
         forwarder.unregister(Arrays.asList(VALUEA_REQID, VALUEA_SUBREQID), listener1b);
 
         StandardCoderObject sco = makeMessage(Map.of(KEY1, VALUEA_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID)));
-        forwarder.onMessage(INFRA, TEXT, sco);
+        forwarder.onMessage(TEXT, sco);
 
-        verify(listener1).accept(INFRA, TEXT, sco);
-        verify(listener1b, never()).accept(any(), any(), any());
+        verify(listener1).accept(TEXT, sco);
+        verify(listener1b, never()).accept(any(), any());
 
         // remove listener1
         forwarder.unregister(Arrays.asList(VALUEA_REQID, VALUEA_SUBREQID), listener1);
-        forwarder.onMessage(INFRA, TEXT, sco);
+        forwarder.onMessage(TEXT, sco);
 
         // route a message to listener2
         sco = makeMessage(Map.of(KEY1, VALUEB_REQID, KEY2, Map.of(SUBKEY, VALUEB_SUBREQID)));
-        forwarder.onMessage(INFRA, TEXT, sco);
-        verify(listener2).accept(INFRA, TEXT, sco);
+        forwarder.onMessage(TEXT, sco);
+        verify(listener2).accept(TEXT, sco);
 
         // no more messages to listener1 or 1b
-        verify(listener1).accept(any(), any(), any());
-        verify(listener1b, never()).accept(any(), any(), any());
+        verify(listener1).accept(any(), any());
+        verify(listener1b, never()).accept(any(), any());
     }
 
     @Test
     public void testOnMessage() {
         StandardCoderObject sco = makeMessage(Map.of(KEY1, VALUEA_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID)));
-        forwarder.onMessage(INFRA, TEXT, sco);
+        forwarder.onMessage(TEXT, sco);
 
-        verify(listener1).accept(INFRA, TEXT, sco);
-        verify(listener1b).accept(INFRA, TEXT, sco);
+        verify(listener1).accept(TEXT, sco);
+        verify(listener1b).accept(TEXT, sco);
 
         // repeat - counts should increment
-        forwarder.onMessage(INFRA, TEXT, sco);
+        forwarder.onMessage(TEXT, sco);
 
-        verify(listener1, times(2)).accept(INFRA, TEXT, sco);
-        verify(listener1b, times(2)).accept(INFRA, TEXT, sco);
+        verify(listener1, times(2)).accept(TEXT, sco);
+        verify(listener1b, times(2)).accept(TEXT, sco);
 
         // should not have been invoked
-        verify(listener2, never()).accept(any(), any(), any());
-        verify(listener3, never()).accept(any(), any(), any());
+        verify(listener2, never()).accept(any(), any());
+        verify(listener3, never()).accept(any(), any());
 
         // try other listeners now
         sco = makeMessage(Map.of(KEY1, VALUEB_REQID, KEY2, Map.of(SUBKEY, VALUEB_SUBREQID)));
-        forwarder.onMessage(INFRA, TEXT, sco);
-        verify(listener2).accept(INFRA, TEXT, sco);
+        forwarder.onMessage(TEXT, sco);
+        verify(listener2).accept(TEXT, sco);
 
         sco = makeMessage(Map.of(KEY1, VALUEC_REQID, KEY2, Map.of(SUBKEY, VALUEC_SUBREQID)));
-        forwarder.onMessage(INFRA, TEXT, sco);
-        verify(listener3).accept(INFRA, TEXT, sco);
+        forwarder.onMessage(TEXT, sco);
+        verify(listener3).accept(TEXT, sco);
 
         // message has no listeners
         sco = makeMessage(Map.of(KEY1, "xyzzy", KEY2, Map.of(SUBKEY, VALUEB_SUBREQID)));
-        forwarder.onMessage(INFRA, TEXT, sco);
+        forwarder.onMessage(TEXT, sco);
 
         // message doesn't have both keys
         sco = makeMessage(Map.of(KEY1, VALUEA_REQID));
-        forwarder.onMessage(INFRA, TEXT, sco);
+        forwarder.onMessage(TEXT, sco);
 
         // counts should not have incremented
-        verify(listener1, times(2)).accept(any(), any(), any());
-        verify(listener1b, times(2)).accept(any(), any(), any());
-        verify(listener2).accept(any(), any(), any());
-        verify(listener3).accept(any(), any(), any());
+        verify(listener1, times(2)).accept(any(), any());
+        verify(listener1b, times(2)).accept(any(), any());
+        verify(listener2).accept(any(), any());
+        verify(listener3).accept(any(), any());
 
         // listener throws an exception
-        doThrow(new IllegalStateException("expected exception")).when(listener1).accept(any(), any(), any());
+        doThrow(new IllegalStateException("expected exception")).when(listener1).accept(any(), any());
     }
 
     /*
@@ -171,12 +169,12 @@ public class ForwarderTest {
      */
     @Test
     public void testOnMessageListenerException1() {
-        doThrow(new IllegalStateException("expected exception")).when(listener1).accept(any(), any(), any());
+        doThrow(new IllegalStateException("expected exception")).when(listener1).accept(any(), any());
 
         StandardCoderObject sco = makeMessage(Map.of(KEY1, VALUEA_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID)));
-        forwarder.onMessage(INFRA, TEXT, sco);
+        forwarder.onMessage(TEXT, sco);
 
-        verify(listener1b).accept(INFRA, TEXT, sco);
+        verify(listener1b).accept(TEXT, sco);
     }
 
     /*
@@ -184,12 +182,12 @@ public class ForwarderTest {
      */
     @Test
     public void testOnMessageListenerException1b() {
-        doThrow(new IllegalStateException("expected exception")).when(listener1b).accept(any(), any(), any());
+        doThrow(new IllegalStateException("expected exception")).when(listener1b).accept(any(), any());
 
         StandardCoderObject sco = makeMessage(Map.of(KEY1, VALUEA_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID)));
-        forwarder.onMessage(INFRA, TEXT, sco);
+        forwarder.onMessage(TEXT, sco);
 
-        verify(listener1).accept(INFRA, TEXT, sco);
+        verify(listener1).accept(TEXT, sco);
     }
 
     /**
index a370857..3012ff6 100644 (file)
@@ -30,12 +30,12 @@ import static org.mockito.Mockito.verify;
 
 import java.util.Arrays;
 import java.util.Map;
+import java.util.function.BiConsumer;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
 import org.onap.policy.common.utils.coder.CoderException;
 import org.onap.policy.common.utils.coder.StandardCoder;
 import org.onap.policy.common.utils.coder.StandardCoderObject;
@@ -58,13 +58,13 @@ public class TopicListenerImplTest {
     private TopicListenerImpl topic;
 
     @Mock
-    private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener1;
+    private BiConsumer<String, StandardCoderObject> listener1;
 
     @Mock
-    private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener1b;
+    private BiConsumer<String, StandardCoderObject> listener1b;
 
     @Mock
-    private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener2;
+    private BiConsumer<String, StandardCoderObject> listener2;
 
 
     /**
@@ -117,11 +117,11 @@ public class TopicListenerImplTest {
         String msg = makeMessage(Map.of(KEY1, VALUEA_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID)));
         topic.onTopicEvent(INFRA, MY_TOPIC, msg);
 
-        verify(listener1).accept(eq(INFRA), eq(msg), any());
-        verify(listener2).accept(eq(INFRA), eq(msg), any());
+        verify(listener1).accept(eq(msg), any());
+        verify(listener2).accept(eq(msg), any());
 
         // not to listener1b
-        verify(listener1b, never()).accept(any(), any(), any());
+        verify(listener1b, never()).accept(any(), any());
 
         /*
          * now send a message that should only go to listener1b on forwarder1
@@ -130,15 +130,15 @@ public class TopicListenerImplTest {
         topic.onTopicEvent(INFRA, MY_TOPIC, msg);
 
         // should route to listener1 on forwarder1 and listener2 on forwarder2
-        verify(listener1b).accept(eq(INFRA), eq(msg), any());
+        verify(listener1b).accept(eq(msg), any());
 
         // try one where the coder throws an exception
         topic.onTopicEvent(INFRA, MY_TOPIC, "{invalid-json");
 
         // no extra invocations
-        verify(listener1).accept(any(), any(), any());
-        verify(listener1b).accept(any(), any(), any());
-        verify(listener2).accept(any(), any(), any());
+        verify(listener1).accept(any(), any());
+        verify(listener1b).accept(any(), any());
+        verify(listener2).accept(any(), any());
     }
 
     /**
index b11983b..7b5b9fc 100644 (file)
         <appender-ref ref="STDOUT" />
     </logger>
 
-    <!-- this is required for HttpOperationTest -->
-    <logger name="org.onap.policy.controlloop.actorserviceprovider.impl.HttpOperation" level="info" additivity="false">
-        <appender-ref ref="STDOUT" />
-    </logger>
-
-    <!-- this is required for TopicPairOperationTest -->
+    <!-- this is required for OperationPartialTest -->
     <logger
-            name="org.onap.policy.controlloop.actorserviceprovider.impl.TopicPairOperation"
+            name="org.onap.policy.controlloop.actorserviceprovider.impl.OperationPartial"
             level="info" additivity="false">
         <appender-ref ref="STDOUT" />
     </logger>