Exception not propagated by processResponse 99/104099/1
authorJim Hahn <jrh3@att.com>
Fri, 20 Mar 2020 22:29:21 +0000 (18:29 -0400)
committerJim Hahn <jrh3@att.com>
Fri, 20 Mar 2020 22:29:21 +0000 (18:29 -0400)
If the topic processResponse() method throws an exception, then
the actor/operation is left in an incomplete state.

Issue-ID: POLICY-2434
Signed-off-by: Jim Hahn <jrh3@att.com>
Change-Id: I6c5d149d4046fbfb970c8dd831fc3938516d1115

models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java
models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperationTest.java

index ec522a4..f598d62 100644 (file)
@@ -106,7 +106,7 @@ public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial
     @Override
     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
 
-        final Pair<String,Q> pair = makeRequest(attempt);
+        final Pair<String, Q> pair = makeRequest(attempt);
         final Q request = pair.getRight();
         outcome.setSubRequestId(pair.getLeft());
 
@@ -118,10 +118,15 @@ public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial
         // register a listener BEFORE publishing
 
         BiConsumer<String, StandardCoderObject> listener = (rawResponse, scoResponse) -> {
-            OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse);
-            if (latestOutcome != null) {
-                // final response - complete the controller
-                controller.completeAsync(() -> latestOutcome, executor);
+            try {
+                OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse);
+                if (latestOutcome != null) {
+                    // final response - complete the controller
+                    controller.completeAsync(() -> latestOutcome, executor);
+                }
+            } catch (RuntimeException e) {
+                logger.warn("{}: failed to process response for {}", getFullName(), params.getRequestId());
+                controller.completeExceptionally(e);
             }
         };
 
index 587564a..48669f7 100644 (file)
@@ -167,11 +167,41 @@ public class BidirectionalTopicOperationTest {
         verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue()));
     }
 
+    /**
+     * Tests startOperationAsync() when processResponse() throws an exception.
+     */
+    @Test
+    public void testStartOperationAsyncProcException() throws Exception {
+        oper = new MyOperation() {
+            @Override
+            protected OperationOutcome processResponse(OperationOutcome outcome, String rawResponse,
+                            StandardCoderObject scoResponse) {
+                throw EXPECTED_EXCEPTION;
+            }
+        };
+
+        CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome);
+        assertFalse(future.isDone());
+
+        assertEquals(SUB_REQID, outcome.getSubRequestId());
+
+        verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
+
+        verify(forwarder, never()).unregister(any(), any());
+
+        // provide a response
+        listenerCaptor.getValue().accept(responseText, stdResponse);
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertTrue(future.isCompletedExceptionally());
+
+        verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue()));
+    }
+
     /**
      * Tests startOperationAsync() when the publisher throws an exception.
      */
     @Test
-    public void testStartOperationAsyncException() throws Exception {
+    public void testStartOperationAsyncPubException() throws Exception {
         // indicate that nothing was published
         when(handler.send(any())).thenReturn(false);