More changes to actor code 59/101859/5
authorJim Hahn <jrh3@att.com>
Mon, 17 Feb 2020 18:57:56 +0000 (13:57 -0500)
committerJim Hahn <jrh3@att.com>
Tue, 18 Feb 2020 01:41:09 +0000 (20:41 -0500)
Use Coder.convert() from policy-common.
Passed response to setOutcome().
Changed class names from XxxOperator to XxxOperation.
Modified SDNC junits to invoke start() instead of startOperationAsync().
Changed context obtain() to re-run if the future was canceled.
Added junit support class, BasicBidirectionalTopicOperation.

Modified HttpOperation to allow subsequent requests to be issued.
Some actors, like SO, send an initial HTTP request and then follow
it with HTTP "are you done?" requests.

Issue-ID: POLICY-2363-prop
Change-Id: I12b5c2d4f07254e0cb79fabfe1ccf844b70a0654
Signed-off-by: Jim Hahn <jrh3@att.com>
22 files changed:
models-interactions/model-actors/actor.aai/src/main/java/org/onap/policy/controlloop/actor/aai/AaiCustomQueryOperation.java
models-interactions/model-actors/actor.aai/src/main/java/org/onap/policy/controlloop/actor/aai/AaiGetOperation.java
models-interactions/model-actors/actor.aai/src/test/java/org/onap/policy/controlloop/actor/aai/AaiCustomQueryOperationTest.java
models-interactions/model-actors/actor.aai/src/test/java/org/onap/policy/controlloop/actor/aai/AaiGetOperationTest.java [moved from models-interactions/model-actors/actor.aai/src/test/java/org/onap/policy/controlloop/actor/aai/AaiGetOperatorTest.java with 97% similarity]
models-interactions/model-actors/actor.aai/src/test/java/org/onap/policy/controlloop/actor/aai/AaiUtilTest.java
models-interactions/model-actors/actor.aai/src/test/java/org/onap/policy/controlloop/actor/aai/BasicAaiOperation.java [moved from models-interactions/model-actors/actor.aai/src/test/java/org/onap/policy/controlloop/actor/aai/BasicAaiOperator.java with 90% similarity]
models-interactions/model-actors/actor.appc/pom.xml
models-interactions/model-actors/actor.sdnc/src/test/java/org/onap/policy/controlloop/actor/sdnc/BandwidthOnDemandOperationTest.java [moved from models-interactions/model-actors/actor.sdnc/src/test/java/org/onap/policy/controlloop/actor/sdnc/BandwidthOnDemandOperatorTest.java with 95% similarity]
models-interactions/model-actors/actor.sdnc/src/test/java/org/onap/policy/controlloop/actor/sdnc/BasicSdncOperation.java [moved from models-interactions/model-actors/actor.sdnc/src/test/java/org/onap/policy/controlloop/actor/sdnc/BasicSdncOperator.java with 96% similarity]
models-interactions/model-actors/actor.sdnc/src/test/java/org/onap/policy/controlloop/actor/sdnc/RerouteOperationTest.java [moved from models-interactions/model-actors/actor.sdnc/src/test/java/org/onap/policy/controlloop/actor/sdnc/RerouteOperatorTest.java with 96% similarity]
models-interactions/model-actors/actor.sdnc/src/test/java/org/onap/policy/controlloop/actor/sdnc/SdncOperationTest.java [moved from models-interactions/model-actors/actor.sdnc/src/test/java/org/onap/policy/controlloop/actor/sdnc/SdncOperatorTest.java with 97% similarity]
models-interactions/model-actors/actor.test/src/main/java/org/onap/policy/controlloop/actor/test/BasicBidirectionalTopicOperation.java [new file with mode: 0644]
models-interactions/model-actors/actor.test/src/main/java/org/onap/policy/controlloop/actor/test/BasicHttpOperation.java
models-interactions/model-actors/actor.test/src/test/java/org/onap/policy/controlloop/actor/test/BasicBidirectionalTopicOperationTest.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperationTest.java

index bc2dde9..e32734b 100644 (file)
@@ -121,9 +121,12 @@ public class AaiCustomQueryOperation extends HttpOperation<String> {
      * Injects the response into the context.
      */
     @Override
-    protected void postProcessResponse(OperationOutcome outcome, String url, Response rawResponse, String response) {
+    protected CompletableFuture<OperationOutcome> postProcessResponse(OperationOutcome outcome, String url,
+                    Response rawResponse, String response) {
 
         logger.info("{}: caching response for {}", getFullName(), params.getRequestId());
         params.getContext().setProperty(AaiCqResponse.CONTEXT_KEY, new AaiCqResponse(response));
+
+        return super.postProcessResponse(outcome, url, rawResponse, response);
     }
 }
index 60a2820..ee1c461 100644 (file)
@@ -116,13 +116,15 @@ public class AaiGetOperation extends HttpOperation<StandardCoderObject> {
      * Injects the response into the context.
      */
     @Override
-    protected void postProcessResponse(OperationOutcome outcome, String url, Response rawResponse,
-                    StandardCoderObject response) {
+    protected CompletableFuture<OperationOutcome> postProcessResponse(OperationOutcome outcome, String url,
+                    Response rawResponse, StandardCoderObject response) {
         String entity = params.getTargetEntity();
 
         logger.info("{}: caching response of {} for {}", getFullName(), entity, params.getRequestId());
 
         params.getContext().setProperty(propertyPrefix + entity, response);
+
+        return super.postProcessResponse(outcome, url, rawResponse, response);
     }
 
     /**
index c95425e..a935087 100644 (file)
@@ -50,7 +50,7 @@ import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams;
 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
 import org.onap.policy.controlloop.policy.PolicyResult;
 
-public class AaiCustomQueryOperationTest extends BasicAaiOperator<Map<String, String>> {
+public class AaiCustomQueryOperationTest extends BasicAaiOperation<Map<String, String>> {
     private static final StandardCoder coder = new StandardCoder();
 
     private static final String MY_LINK = "my-link";
@@ -38,14 +38,14 @@ import org.onap.policy.common.utils.coder.StandardCoderObject;
 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 import org.onap.policy.controlloop.policy.PolicyResult;
 
-public class AaiGetOperatorTest extends BasicAaiOperator<Void> {
+public class AaiGetOperationTest extends BasicAaiOperation<Void> {
 
     private static final String INPUT_FIELD = "input";
     private static final String TEXT = "my-text";
 
     private AaiGetOperation oper;
 
-    public AaiGetOperatorTest() {
+    public AaiGetOperationTest() {
         super(AaiConstants.ACTOR_NAME, AaiGetOperation.TENANT);
     }
 
index 39ed6fe..ae38cca 100644 (file)
@@ -23,7 +23,7 @@ package org.onap.policy.controlloop.actor.aai;
 import java.util.Map;
 import org.junit.Test;
 
-public class AaiUtilTest extends BasicAaiOperator<Void> {
+public class AaiUtilTest extends BasicAaiOperation<Void> {
 
     @Test
     public void testMakeHeaders() {
@@ -28,12 +28,12 @@ import org.onap.policy.controlloop.actor.test.BasicHttpOperation;
 /**
  * Superclass for various operator tests.
  */
-public abstract class BasicAaiOperator<Q> extends BasicHttpOperation<Q> {
+public abstract class BasicAaiOperation<Q> extends BasicHttpOperation<Q> {
 
     /**
      * Constructs the object using a default actor and operation name.
      */
-    public BasicAaiOperator() {
+    public BasicAaiOperation() {
         super();
     }
 
@@ -43,7 +43,7 @@ public abstract class BasicAaiOperator<Q> extends BasicHttpOperation<Q> {
      * @param actor actor name
      * @param operation operation name
      */
-    public BasicAaiOperator(String actor, String operation) {
+    public BasicAaiOperation(String actor, String operation) {
         super(actor, operation);
     }
 
index 26eb7c1..0cc243c 100644 (file)
             <version>${policy.common.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.onap.policy.models.policy-models-interactions.model-actors</groupId>
+            <artifactId>actor.test</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito2</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
@@ -28,11 +28,11 @@ import org.junit.Before;
 import org.junit.Test;
 import org.onap.policy.sdnc.SdncRequest;
 
-public class BandwidthOnDemandOperatorTest extends BasicSdncOperator {
+public class BandwidthOnDemandOperationTest extends BasicSdncOperation {
 
     private BandwidthOnDemandOperation oper;
 
-    public BandwidthOnDemandOperatorTest() {
+    public BandwidthOnDemandOperationTest() {
         super(DEFAULT_ACTOR, BandwidthOnDemandOperation.NAME);
     }
 
@@ -49,14 +49,14 @@ import org.powermock.reflect.Whitebox;
 /**
  * Superclass for various operator tests.
  */
-public abstract class BasicSdncOperator extends BasicHttpOperation<SdncRequest> {
+public abstract class BasicSdncOperation extends BasicHttpOperation<SdncRequest> {
 
     protected SdncResponse response;
 
     /**
      * Constructs the object using a default actor and operation name.
      */
-    public BasicSdncOperator() {
+    public BasicSdncOperation() {
         super();
     }
 
@@ -66,7 +66,7 @@ public abstract class BasicSdncOperator extends BasicHttpOperation<SdncRequest>
      * @param actor actor name
      * @param operation operation name
      */
-    public BasicSdncOperator(String actor, String operation) {
+    public BasicSdncOperation(String actor, String operation) {
         super(actor, operation);
     }
 
@@ -94,7 +94,8 @@ public abstract class BasicSdncOperator extends BasicHttpOperation<SdncRequest>
     protected SdncRequest verifyOperation(SdncOperation operation)
                     throws InterruptedException, ExecutionException, TimeoutException {
 
-        CompletableFuture<OperationOutcome> future2 = operation.startOperationAsync(1, outcome);
+        CompletableFuture<OperationOutcome> future2 = operation.start();
+        executor.runAll(100);
         assertFalse(future2.isDone());
 
         verify(client).post(callbackCaptor.capture(), any(), requestCaptor.capture(), any());
@@ -28,11 +28,11 @@ import org.junit.Before;
 import org.junit.Test;
 import org.onap.policy.sdnc.SdncRequest;
 
-public class RerouteOperatorTest extends BasicSdncOperator {
+public class RerouteOperationTest extends BasicSdncOperation {
 
     private RerouteOperation oper;
 
-    public RerouteOperatorTest() {
+    public RerouteOperationTest() {
         super(DEFAULT_ACTOR, RerouteOperation.NAME);
     }
 
diff --git a/models-interactions/model-actors/actor.test/src/main/java/org/onap/policy/controlloop/actor/test/BasicBidirectionalTopicOperation.java b/models-interactions/model-actors/actor.test/src/main/java/org/onap/policy/controlloop/actor/test/BasicBidirectionalTopicOperation.java
new file mode 100644 (file)
index 0000000..14c7ef5
--- /dev/null
@@ -0,0 +1,181 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actor.test;
+
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.onap.policy.common.utils.time.PseudoExecutor;
+import org.onap.policy.controlloop.VirtualControlLoopEvent;
+import org.onap.policy.controlloop.actorserviceprovider.ActorService;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
+import org.onap.policy.controlloop.actorserviceprovider.impl.BidirectionalTopicOperator;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
+import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
+
+/**
+ * Superclass for various BidirectionalTopicOperation tests.
+ */
+public class BasicBidirectionalTopicOperation {
+    protected static final UUID REQ_ID = UUID.randomUUID();
+    protected static final String DEFAULT_ACTOR = "default-actor";
+    protected static final String DEFAULT_OPERATION = "default-operation";
+    protected static final String MY_SINK = "my-sink";
+    protected static final String MY_SOURCE = "my-source";
+    protected static final String TARGET_ENTITY = "my-target";
+    protected static final Coder coder = new StandardCoder();
+    protected static final int TIMEOUT = 10;
+
+    protected final String actorName;
+    protected final String operationName;
+
+    @Captor
+    protected ArgumentCaptor<BiConsumer<String, StandardCoderObject>> listenerCaptor;
+
+    @Mock
+    protected ActorService service;
+    @Mock
+    protected BidirectionalTopicHandler topicHandler;
+    @Mock
+    protected Forwarder forwarder;
+    @Mock
+    protected BidirectionalTopicOperator operator;
+
+    protected BidirectionalTopicParams topicParams;
+    protected ControlLoopOperationParams params;
+    protected Map<String, String> enrichment;
+    protected VirtualControlLoopEvent event;
+    protected ControlLoopEventContext context;
+    protected OperationOutcome outcome;
+    protected PseudoExecutor executor;
+
+    /**
+     * Constructs the object using a default actor and operation name.
+     */
+    public BasicBidirectionalTopicOperation() {
+        this.actorName = DEFAULT_ACTOR;
+        this.operationName = DEFAULT_OPERATION;
+    }
+
+    /**
+     * Constructs the object.
+     *
+     * @param actor actor name
+     * @param operation operation name
+     */
+    public BasicBidirectionalTopicOperation(String actor, String operation) {
+        this.actorName = actor;
+        this.operationName = operation;
+    }
+
+    /**
+     * Initializes mocks and sets up.
+     */
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+
+        executor = new PseudoExecutor();
+
+        makeContext();
+
+        outcome = params.makeOutcome();
+        topicParams = BidirectionalTopicParams.builder().sinkTopic(MY_SINK).sourceTopic(MY_SOURCE).timeoutSec(TIMEOUT)
+                        .build();
+
+        initOperator();
+    }
+
+    /**
+     * Reinitializes {@link #enrichment}, {@link #event}, {@link #context}, and
+     * {@link #params}.
+     * <p/>
+     * Note: {@link #params} is configured to use {@link #executor}.
+     */
+    protected void makeContext() {
+        enrichment = new TreeMap<>(makeEnrichment());
+
+        event = new VirtualControlLoopEvent();
+        event.setRequestId(REQ_ID);
+        event.setAai(enrichment);
+
+        context = new ControlLoopEventContext(event);
+
+        params = ControlLoopOperationParams.builder().executor(executor).context(context).actorService(service)
+                        .actor(actorName).operation(operationName).targetEntity(TARGET_ENTITY).payload(makePayload())
+                        .build();
+    }
+
+    protected Map<String, String> makePayload() {
+        return null;
+    }
+
+    /**
+     * Initializes an operator so that it is "alive" and has the given names.
+     */
+    protected void initOperator() {
+        when(operator.isAlive()).thenReturn(true);
+        when(operator.getFullName()).thenReturn(actorName + "." + operationName);
+        when(operator.getActorName()).thenReturn(actorName);
+        when(operator.getName()).thenReturn(operationName);
+        when(operator.getTopicHandler()).thenReturn(topicHandler);
+        when(operator.getForwarder()).thenReturn(forwarder);
+        when(operator.getParams()).thenReturn(topicParams);
+    }
+
+    /**
+     * Makes enrichment data.
+     *
+     * @return enrichment data
+     */
+    protected Map<String, String> makeEnrichment() {
+        return new TreeMap<>();
+    }
+
+    /**
+     * Provides a response to the topic {@link #listenerCaptor}.
+     *
+     * @param listener listener to which to provide the response
+     * @param response response to be provided
+     */
+    protected void provideResponse(BiConsumer<String, StandardCoderObject> listener, String response) {
+        try {
+            StandardCoderObject sco = coder.decode(response, StandardCoderObject.class);
+            listener.accept(response, sco);
+
+        } catch (CoderException e) {
+            throw new IllegalArgumentException("response is not a Map", e);
+        }
+    }
+}
index e160479..4929292 100644 (file)
@@ -45,7 +45,7 @@ import org.onap.policy.controlloop.actorserviceprovider.impl.HttpOperator;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 
 /**
- * Superclass for various operator tests.
+ * Superclass for various HttpOperation tests.
  *
  * @param <Q> request type
  */
diff --git a/models-interactions/model-actors/actor.test/src/test/java/org/onap/policy/controlloop/actor/test/BasicBidirectionalTopicOperationTest.java b/models-interactions/model-actors/actor.test/src/test/java/org/onap/policy/controlloop/actor/test/BasicBidirectionalTopicOperationTest.java
new file mode 100644 (file)
index 0000000..4fd5591
--- /dev/null
@@ -0,0 +1,140 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actor.test;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+
+import java.util.function.BiConsumer;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+
+public class BasicBidirectionalTopicOperationTest {
+    private static final String ACTOR = "my-actor";
+    private static final String OPERATION = "my-operation";
+
+    @Mock
+    private BiConsumer<String, StandardCoderObject> listener;
+
+    private BasicBidirectionalTopicOperation oper;
+
+
+    /**
+     * Sets up.
+     */
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+
+        oper = new BasicBidirectionalTopicOperation(ACTOR, OPERATION);
+        oper.setUp();
+    }
+
+    @Test
+    public void testBasicBidirectionalTopicOperation() {
+        oper = new BasicBidirectionalTopicOperation();
+        assertEquals(BasicHttpOperation.DEFAULT_ACTOR, oper.actorName);
+        assertEquals(BasicHttpOperation.DEFAULT_OPERATION, oper.operationName);
+    }
+
+    @Test
+    public void testBasicBidirectionalTopicOperationStringString() {
+        assertEquals(ACTOR, oper.actorName);
+        assertEquals(OPERATION, oper.operationName);
+    }
+
+    @Test
+    public void testSetUp() {
+        assertNotNull(oper.topicParams);
+        assertNotNull(oper.context);
+        assertNotNull(oper.outcome);
+        assertNotNull(oper.executor);
+        assertTrue(oper.operator.isAlive());
+    }
+
+    @Test
+    public void testMakeContext() {
+        oper.makeContext();
+
+        assertTrue(oper.enrichment.isEmpty());
+
+        assertSame(BasicBidirectionalTopicOperation.REQ_ID, oper.event.getRequestId());
+        assertSame(oper.enrichment, oper.event.getAai());
+
+        assertSame(oper.event, oper.context.getEvent());
+
+        assertSame(oper.context, oper.params.getContext());
+        assertSame(oper.service, oper.params.getActorService());
+        assertSame(oper.executor, oper.params.getExecutor());
+        assertEquals(ACTOR, oper.params.getActor());
+        assertEquals(OPERATION, oper.params.getOperation());
+        assertEquals(BasicBidirectionalTopicOperation.TARGET_ENTITY, oper.params.getTargetEntity());
+    }
+
+    @Test
+    public void testMakePayload() {
+        assertNull(oper.makePayload());
+    }
+
+    @Test
+    public void testInitOperator() {
+        oper.initOperator();
+
+        assertTrue(oper.operator.isAlive());
+        assertEquals(ACTOR + "." + OPERATION, oper.operator.getFullName());
+        assertEquals(ACTOR, oper.operator.getActorName());
+        assertEquals(OPERATION, oper.operator.getName());
+        assertSame(oper.topicHandler, oper.operator.getTopicHandler());
+        assertSame(oper.forwarder, oper.operator.getForwarder());
+        assertSame(oper.topicParams, oper.operator.getParams());
+    }
+
+    @Test
+    public void testMakeEnrichment() {
+        assertTrue(oper.makeEnrichment().isEmpty());
+    }
+
+    @Test
+    public void testProvideResponse() {
+        String response = "{\"input\": 10}";
+
+        oper.provideResponse(listener, response);
+
+        ArgumentCaptor<StandardCoderObject> scoCaptor = ArgumentCaptor.forClass(StandardCoderObject.class);
+        verify(listener).accept(eq(response), scoCaptor.capture());
+
+        assertEquals("10", scoCaptor.getValue().getString("input"));
+
+        // try with an invalid response
+        assertThatIllegalArgumentException().isThrownBy(() -> oper.provideResponse(listener, "{invalid json"))
+                        .withMessage("response is not a Map");
+    }
+}
index b885b5c..ba47859 100644 (file)
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
  */
 public class Util {
     private static final Logger logger = LoggerFactory.getLogger(Util.class);
+    private static final Coder coder = new StandardCoder();
 
     private Util() {
         // do nothing
@@ -84,11 +85,8 @@ public class Util {
      * @return the translated object
      */
     public static <T> T translate(String identifier, Object source, Class<T> clazz) {
-        Coder coder = new StandardCoder();
-
         try {
-            String json = coder.encode(source);
-            return coder.decode(json, clazz);
+            return coder.convert(source, clazz);
 
         } catch (CoderException | RuntimeException e) {
             throw new IllegalArgumentException("cannot translate parameters for " + identifier, e);
@@ -105,10 +103,6 @@ public class Util {
      */
     @SuppressWarnings("unchecked")
     public static Map<String, Object> translateToMap(String identifier, Object source) {
-        if (source == null) {
-            return null;
-        }
-
         return translate(identifier, source, LinkedHashMap.class);
     }
 }
index 1c37a8e..3e02da6 100644 (file)
@@ -130,15 +130,27 @@ public class ControlLoopEventContext implements Serializable {
             return null;
         }
 
-        CompletableFuture<OperationOutcome> future = retrievers.get(name);
-        if (future != null) {
-            return future;
-        }
+        /*
+         * Return any existing future, if it wasn't canceled. Otherwise, start a new
+         * request.
+         */
 
-        future = params.start();
+        // @formatter:off
+        CompletableFuture<OperationOutcome> oldFuture =
+            retrievers.compute(name, (key, future) -> (future == null || future.isCancelled() ? null : future));
+        // @formatter:on
 
-        CompletableFuture<OperationOutcome> oldFuture = retrievers.putIfAbsent(name, future);
         if (oldFuture != null) {
+            return oldFuture;
+        }
+
+        /*
+         * Note: must NOT invoke params.start() within retrievers.compute(), as start()
+         * may invoke obtain() which would cause a recursive update to the retrievers map.
+         */
+        CompletableFuture<OperationOutcome> future = params.start();
+
+        if ((oldFuture = retrievers.putIfAbsent(name, future)) != null) {
             future.cancel(false);
             return oldFuture;
         }
index f82015d..d1e21f8 100644 (file)
@@ -214,14 +214,14 @@ public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial
             case SUCCESS:
                 logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(),
                                 params.getRequestId());
-                setOutcome(outcome, PolicyResult.SUCCESS);
+                setOutcome(outcome, PolicyResult.SUCCESS, response);
                 postProcessResponse(outcome, rawResponse, response);
                 return outcome;
 
             case FAILURE:
                 logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
                                 params.getRequestId());
-                return setOutcome(outcome, PolicyResult.FAILURE);
+                return setOutcome(outcome, PolicyResult.FAILURE, response);
 
             case STILL_WAITING:
             default:
@@ -231,6 +231,18 @@ public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial
         }
     }
 
+    /**
+     * Sets an operation's outcome and default message based on the result.
+     *
+     * @param outcome operation to be updated
+     * @param result result of the operation
+     * @param response response used to populate the outcome
+     * @return the updated operation
+     */
+    public OperationOutcome setOutcome(OperationOutcome outcome, PolicyResult result, S response) {
+        return setOutcome(outcome, result);
+    }
+
     /**
      * Processes a successful response.
      *
index f1829d7..c3c0f6d 100644 (file)
@@ -148,21 +148,23 @@ public abstract class HttpOperation<T> extends OperationPartial {
         controller.add(requester.apply(callback));
 
         // once "future" completes, process the response, and then complete the controller
-        future.thenApplyAsync(response -> processResponse(outcome, url, response), executor)
+        future.thenComposeAsync(response -> processResponse(outcome, url, response), executor)
                         .whenCompleteAsync(controller.delayedComplete(), executor);
 
         return controller;
     }
 
     /**
-     * Processes a response. This method simply sets the outcome to SUCCESS.
+     * Processes a response. This method decodes the response, sets the outcome based on
+     * the response, and then returns a completed future.
      *
      * @param outcome outcome to be populate
      * @param url URL to which to request was sent
      * @param response raw response to process
-     * @return the outcome
+     * @return a future to cancel or await the outcome
      */
-    protected OperationOutcome processResponse(OperationOutcome outcome, String url, Response rawResponse) {
+    protected CompletableFuture<OperationOutcome> processResponse(OperationOutcome outcome, String url,
+                    Response rawResponse) {
 
         logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId());
 
@@ -173,7 +175,6 @@ public abstract class HttpOperation<T> extends OperationPartial {
         T response;
         if (responseClass == String.class) {
             response = responseClass.cast(strResponse);
-
         } else {
             try {
                 response = makeCoder().decode(strResponse, responseClass);
@@ -187,26 +188,40 @@ public abstract class HttpOperation<T> extends OperationPartial {
         if (!isSuccess(rawResponse, response)) {
             logger.info("{}.{} request failed with http error code {} for {}", params.getActor(), params.getOperation(),
                             rawResponse.getStatus(), params.getRequestId());
-            return setOutcome(outcome, PolicyResult.FAILURE);
+            return CompletableFuture.completedFuture(setOutcome(outcome, PolicyResult.FAILURE, response));
         }
 
         logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId());
-        setOutcome(outcome, PolicyResult.SUCCESS);
-        postProcessResponse(outcome, url, rawResponse, response);
+        setOutcome(outcome, PolicyResult.SUCCESS, response);
+        return postProcessResponse(outcome, url, rawResponse, response);
+    }
 
-        return outcome;
+    /**
+     * Sets an operation's outcome and default message based on the result.
+     *
+     * @param outcome operation to be updated
+     * @param result result of the operation
+     * @param response response used to populate the outcome
+     * @return the updated operation
+     */
+    public OperationOutcome setOutcome(OperationOutcome outcome, PolicyResult result, T response) {
+        return setOutcome(outcome, result);
     }
 
     /**
-     * Processes a successful response.
+     * Processes a successful response. This method simply returns the outcome wrapped in
+     * a completed future.
      *
      * @param outcome outcome to be populate
      * @param url URL to which to request was sent
      * @param rawResponse raw response
      * @param response decoded response
+     * @return a future to cancel or await the outcome
      */
-    protected void postProcessResponse(OperationOutcome outcome, String url, Response rawResponse, T response) {
-        // do nothing
+    protected CompletableFuture<OperationOutcome> postProcessResponse(OperationOutcome outcome, String url,
+                    Response rawResponse, T response) {
+
+        return CompletableFuture.completedFuture(outcome);
     }
 
     /**
index 0b34971..680a56f 100644 (file)
@@ -68,6 +68,7 @@ import org.slf4j.LoggerFactory;
 public abstract class OperationPartial implements Operation {
     private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
     private static final Coder coder = new StandardCoder();
+
     public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
 
     // values extracted from the operator
index fcb4635..93beab1 100644 (file)
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
  */
 public class TopicListenerImpl implements TopicListener {
     private static final Logger logger = LoggerFactory.getLogger(TopicListenerImpl.class);
-    private static StandardCoder coder = new StandardCoder();
+    private static final StandardCoder coder = new StandardCoder();
 
     /**
      * Maps selector to a forwarder.
index b462043..cf24262 100644 (file)
@@ -42,6 +42,7 @@ import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOp
 
 public class ControlLoopEventContextTest {
     private static final UUID REQ_ID = UUID.randomUUID();
+    private static final String ITEM_KEY = "obtain-C";
 
     private Map<String, String> enrichment;
     private VirtualControlLoopEvent event;
@@ -118,13 +119,28 @@ public class ControlLoopEventContextTest {
             ControlLoopOperationParams params2 = mock(ControlLoopOperationParams.class);
             when(params2.start()).thenReturn(future2);
 
-            assertSame(future2, context.obtain("obtain-C", params2));
+            assertSame(future2, context.obtain(ITEM_KEY, params2));
             return future;
         });
 
-        assertSame(future2, context.obtain("obtain-C", params));
+        assertSame(future2, context.obtain(ITEM_KEY, params));
 
         // should have canceled the interrupted future
         assertTrue(future.isCancelled());
+
+        // return a new future next time start() is called
+        CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
+        when(params.start()).thenReturn(future3);
+
+        // repeat - should get the same future
+        assertSame(future2, context.obtain(ITEM_KEY, params));
+        assertSame(future2, context.obtain(ITEM_KEY, params));
+
+        // future2 should still be active
+        assertFalse(future2.isCancelled());
+
+        // cancel it - now we should get the new future
+        future2.cancel(false);
+        assertSame(future3, context.obtain(ITEM_KEY, params));
     }
 }
index 50cb8fa..8189c74 100644 (file)
@@ -302,8 +302,10 @@ public class HttpOperationTest {
      * Tests processResponse() when it's a success and the response type is a String.
      */
     @Test
-    public void testProcessResponseSuccessString() {
-        assertSame(outcome, oper.processResponse(outcome, PATH, response));
+    public void testProcessResponseSuccessString() throws Exception {
+        CompletableFuture<OperationOutcome> result = oper.processResponse(outcome, PATH, response);
+        assertTrue(result.isDone());
+        assertSame(outcome, result.get());
         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
     }
 
@@ -311,9 +313,11 @@ public class HttpOperationTest {
      * Tests processResponse() when it's a failure.
      */
     @Test
-    public void testProcessResponseFailure() {
+    public void testProcessResponseFailure() throws Exception {
         when(response.getStatus()).thenReturn(555);
-        assertSame(outcome, oper.processResponse(outcome, PATH, response));
+        CompletableFuture<OperationOutcome> result = oper.processResponse(outcome, PATH, response);
+        assertTrue(result.isDone());
+        assertSame(outcome, result.get());
         assertEquals(PolicyResult.FAILURE, outcome.getResult());
     }
 
@@ -321,12 +325,14 @@ public class HttpOperationTest {
      * Tests processResponse() when the decoder succeeds.
      */
     @Test
-    public void testProcessResponseDecodeOk() throws CoderException {
+    public void testProcessResponseDecodeOk() throws Exception {
         when(response.readEntity(String.class)).thenReturn("10");
 
         MyGetOperation<Integer> oper2 = new MyGetOperation<>(Integer.class);
 
-        assertSame(outcome, oper2.processResponse(outcome, PATH, response));
+        CompletableFuture<OperationOutcome> result = oper2.processResponse(outcome, PATH, response);
+        assertTrue(result.isDone());
+        assertSame(outcome, result.get());
         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
     }