Fix intermittent unit test failures reseterquestor 19/105119/1
authorliamfallon <liam.fallon@est.tech>
Sat, 4 Apr 2020 14:35:07 +0000 (15:35 +0100)
committerliamfallon <liam.fallon@est.tech>
Sun, 5 Apr 2020 12:10:18 +0000 (13:10 +0100)
When consumers and producers are paired as in the case of the REST
Rquestor, both sides must come up and be wired together in the
initiation phase of apex-pdp before the consumers and producers start
handling envents.

In the ApexActivator class, the consumers were started immediately after
they were initialized meaning that a consumer could return events to a
producer that had not started yet.

This change fixes the ApexActivator so that it waits until all consumers
and producers are initialized before starting event handling.

It also fixes the timings on RestRequestor tests and tidies up the unit
tests.

Issue-ID: POLICY-2469
Change-Id: Ib66d9531bf21f2a879ab33795aded4f48e7bfbc6
Signed-off-by: liamfallon <liam.fallon@est.tech>
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/pom.xml
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorProducer.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumerTest.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorProducerTest.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RestRequestorCarrierTechnologyParametersTest.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RestRequestorTest.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/logback-test.xml [new file with mode: 0644]
services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java

index 0f15071..b71b689 100644 (file)
@@ -1,7 +1,7 @@
 <!--
   ============LICENSE_START=======================================================
    Copyright (C) 2018 Ericsson. All rights reserved.
-   Modifications Copyright (C) 2019 Nordix Foundation.
+   Modifications Copyright (C) 2019-2020 Nordix Foundation.
   ================================================================================
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   SPDX-License-Identifier: Apache-2.0
   ============LICENSE_END=========================================================
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project
+    xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.onap.policy.apex-pdp.plugins.plugins-event.plugins-event-carrier</groupId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <profiles>
index 57560d2..80f8fa6 100644 (file)
@@ -34,9 +34,9 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+
 import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.client.Entity;
@@ -103,39 +103,36 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer {
 
     @Override
     public void init(final String consumerName, final EventHandlerParameters consumerParameters,
-            final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
+        final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
         this.eventReceiver = incomingEventReceiver;
         this.name = consumerName;
 
         // Check and get the REST Properties
         if (!(consumerParameters
-                .getCarrierTechnologyParameters() instanceof RestRequestorCarrierTechnologyParameters)) {
+            .getCarrierTechnologyParameters() instanceof RestRequestorCarrierTechnologyParameters)) {
             final String errorMessage =
-                    "specified consumer properties are not applicable to REST Requestor consumer (" + this.name + ")";
-            LOGGER.warn(errorMessage);
+                "specified consumer properties are not applicable to REST Requestor consumer (" + this.name + ")";
             throw new ApexEventException(errorMessage);
         }
         restConsumerProperties =
-                (RestRequestorCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
+            (RestRequestorCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
 
         // Check if we are in peered mode
         if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) {
             final String errorMessage = "REST Requestor consumer (" + this.name
-                    + ") must run in peered requestor mode with a REST Requestor producer";
-            LOGGER.warn(errorMessage);
+                + ") must run in peered requestor mode with a REST Requestor producer";
             throw new ApexEventException(errorMessage);
         }
 
         // Check if the HTTP method has been set
         if (restConsumerProperties.getHttpMethod() == null) {
             restConsumerProperties
-                    .setHttpMethod(RestRequestorCarrierTechnologyParameters.DEFAULT_REQUESTOR_HTTP_METHOD);
+                .setHttpMethod(RestRequestorCarrierTechnologyParameters.DEFAULT_REQUESTOR_HTTP_METHOD);
         }
 
         // Check if the HTTP URL has been set
         if (restConsumerProperties.getUrl() == null) {
             final String errorMessage = "no URL has been specified on REST Requestor consumer (" + this.name + ")";
-            LOGGER.warn(errorMessage);
             throw new ApexEventException(errorMessage);
         }
 
@@ -144,7 +141,6 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer {
             new URL(restConsumerProperties.getUrl());
         } catch (final Exception e) {
             final String errorMessage = "invalid URL has been specified on REST Requestor consumer (" + this.name + ")";
-            LOGGER.warn(errorMessage);
             throw new ApexEventException(errorMessage, e);
         }
 
@@ -157,8 +153,8 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer {
 
         // Check if HTTP headers has been set
         if (restConsumerProperties.checkHttpHeadersSet()) {
-            LOGGER.debug("REST Requestor consumer has http headers ({}): {}", this.name,
-                    Arrays.deepToString(restConsumerProperties.getHttpHeaders()));
+            final String httpHeaderString = Arrays.deepToString(restConsumerProperties.getHttpHeaders());
+            LOGGER.debug("REST Requestor consumer has http headers ({}): {}", this.name, httpHeaderString);
         }
 
         // Initialize the HTTP client
@@ -177,8 +173,7 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer {
             incomingRestRequestQueue.add(restRequest);
         } catch (final Exception requestException) {
             final String errorMessage =
-                    "could not queue request \"" + restRequest + "\" on REST Requestor consumer (" + this.name + ")";
-            LOGGER.warn(errorMessage, requestException);
+                "could not queue request \"" + restRequest + "\" on REST Requestor consumer (" + this.name + ")";
             throw new ApexEventRuntimeException(errorMessage);
         }
     }
@@ -202,7 +197,7 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer {
             try {
                 // Take the next event from the queue
                 final ApexRestRequest restRequest =
-                        incomingRestRequestQueue.poll(REST_REQUESTOR_WAIT_SLEEP_TIME, TimeUnit.MILLISECONDS);
+                    incomingRestRequestQueue.poll(REST_REQUESTOR_WAIT_SLEEP_TIME, TimeUnit.MILLISECONDS);
                 if (restRequest == null) {
                     // Poll timed out, check for request timeouts
                     timeoutExpiredRequests();
@@ -215,11 +210,11 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer {
                     Set<String> names = restConsumerProperties.getKeysFromUrl();
                     Set<String> inputProperty = inputExecutionProperties.stringPropertyNames();
 
-                    names.stream().map(Optional::of).forEach(op ->
-                        op.filter(inputProperty::contains)
-                                .orElseThrow(() -> new ApexEventRuntimeException(
-                                        "key\"" + op.get() + "\"specified on url \"" + restConsumerProperties.getUrl()
-                                                + "\"not found in execution properties passed by the current policy")));
+                    names.stream().map(Optional::of)
+                        .forEach(op -> op.filter(inputProperty::contains)
+                            .orElseThrow(() -> new ApexEventRuntimeException(
+                                "key\"" + op.get() + "\"specified on url \"" + restConsumerProperties.getUrl()
+                                    + "\"not found in execution properties passed by the current policy")));
 
                     untaggedUrl = names.stream().reduce(untaggedUrl,
                         (acc, str) -> acc.replace("{" + str + "}", (String) inputExecutionProperties.get(str)));
@@ -264,7 +259,7 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer {
         // Interrupt timed out requests and remove them from the ongoing map
         for (final ApexRestRequest timedoutRequest : timedoutRequestList) {
             final String errorMessage =
-                    "REST Requestor consumer (" + this.name + "), REST request timed out: " + timedoutRequest;
+                "REST Requestor consumer (" + this.name + "), REST request timed out: " + timedoutRequest;
             LOGGER.warn(errorMessage);
 
             ongoingRestRequestMap.remove(timedoutRequest);
@@ -324,8 +319,8 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer {
                 // Check that the request worked
                 if (!isPass.matches()) {
                     final String errorMessage = "reception of event from URL \"" + restConsumerProperties.getUrl()
-                            + "\" failed with status code " + response.getStatus() + " and message \""
-                            + response.readEntity(String.class) + "\"";
+                        + "\" failed with status code " + response.getStatus() + " and message \""
+                        + response.readEntity(String.class) + "\"";
                     throw new ApexEventRuntimeException(errorMessage);
                 }
 
@@ -335,7 +330,7 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer {
                 // Check there is content
                 if (StringUtils.isBlank(eventJsonString)) {
                     final String errorMessage =
-                            "received an empty response to \"" + request + "\" from URL \"" + untaggedUrl + "\"";
+                        "received an empty response to \"" + request + "\" from URL \"" + untaggedUrl + "\"";
                     throw new ApexEventRuntimeException(errorMessage);
                 }
 
@@ -372,7 +367,7 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer {
          */
         public Response sendEventAsRestRequest(String untaggedUrl) {
             Builder headers = client.target(untaggedUrl).request(APPLICATION_JSON)
-                    .headers(restConsumerProperties.getHttpHeadersAsMultivaluedMap());
+                .headers(restConsumerProperties.getHttpHeadersAsMultivaluedMap());
             switch (restConsumerProperties.getHttpMethod()) {
                 case GET:
                     return headers.get();
index e166bdc..fe2f6c5 100644 (file)
@@ -30,8 +30,6 @@ import org.onap.policy.apex.service.engine.event.ApexPluginsEventProducer;
 import org.onap.policy.apex.service.engine.event.PeeredReference;
 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Concrete implementation of an Apex event requestor that manages the producer side of a REST request.
@@ -40,11 +38,6 @@ import org.slf4j.LoggerFactory;
  *
  */
 public class ApexRestRequestorProducer extends ApexPluginsEventProducer {
-    private static final Logger LOGGER = LoggerFactory.getLogger(ApexRestRequestorProducer.class);
-
-    // The REST carrier properties
-    private RestRequestorCarrierTechnologyParameters restProducerProperties;
-
     // The number of events sent
     private int eventsSent = 0;
 
@@ -53,40 +46,36 @@ public class ApexRestRequestorProducer extends ApexPluginsEventProducer {
      */
     @Override
     public void init(final String producerName, final EventHandlerParameters producerParameters)
-            throws ApexEventException {
+        throws ApexEventException {
         this.name = producerName;
 
         // Check and get the REST Properties
         if (!(producerParameters
-                .getCarrierTechnologyParameters() instanceof RestRequestorCarrierTechnologyParameters)) {
+            .getCarrierTechnologyParameters() instanceof RestRequestorCarrierTechnologyParameters)) {
             final String errorMessage =
-                    "specified producer properties are not applicable to REST requestor producer (" + this.name + ")";
-            LOGGER.warn(errorMessage);
+                "specified producer properties are not applicable to REST requestor producer (" + this.name + ")";
             throw new ApexEventException(errorMessage);
         }
-        restProducerProperties =
-                (RestRequestorCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
+        RestRequestorCarrierTechnologyParameters restProducerProperties =
+            (RestRequestorCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
 
         // Check if we are in peered mode
         if (!producerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) {
             final String errorMessage = "REST Requestor producer (" + this.name
-                    + ") must run in peered requestor mode with a REST Requestor consumer";
-            LOGGER.warn(errorMessage);
+                + ") must run in peered requestor mode with a REST Requestor consumer";
             throw new ApexEventException(errorMessage);
         }
 
         // Check if the HTTP URL has been set
         if (restProducerProperties.getUrl() != null) {
             final String errorMessage = "URL may not be specified on REST Requestor producer (" + this.name + ")";
-            LOGGER.warn(errorMessage);
             throw new ApexEventException(errorMessage);
         }
 
         // Check if the HTTP method has been set
         if (restProducerProperties.getHttpMethod() != null) {
             final String errorMessage =
-                    "HTTP method may not be specified on REST Requestor producer (" + this.name + ")";
-            LOGGER.warn(errorMessage);
+                "HTTP method may not be specified on REST Requestor producer (" + this.name + ")";
             throw new ApexEventException(errorMessage);
         }
     }
@@ -105,7 +94,7 @@ public class ApexRestRequestorProducer extends ApexPluginsEventProducer {
      */
     @Override
     public void sendEvent(final long executionId, final Properties executionProperties, final String eventName,
-            final Object event) {
+        final Object event) {
         super.sendEvent(executionId, executionProperties, eventName, event);
 
         // Find the peered consumer for this producer
@@ -114,23 +103,20 @@ public class ApexRestRequestorProducer extends ApexPluginsEventProducer {
             // Find the REST Response Consumer that will handle this request
             final ApexEventConsumer consumer = peeredRequestorReference.getPeeredConsumer();
             if (!(consumer instanceof ApexRestRequestorConsumer)) {
-                final String errorMessage = "send of event to URL \"" + restProducerProperties.getUrl() + "\" failed,"
-                        + " REST response consumer is not an instance of ApexRestRequestorConsumer\n" + event;
-                LOGGER.warn(errorMessage);
+                final String errorMessage = "send of event failed,"
+                    + " REST response consumer is not an instance of ApexRestRequestorConsumer\n" + event;
                 throw new ApexEventRuntimeException(errorMessage);
             }
 
             // Use the consumer to handle this event
             final ApexRestRequestorConsumer restRequstConsumer = (ApexRestRequestorConsumer) consumer;
             restRequstConsumer
-                    .processRestRequest(new ApexRestRequest(executionId, executionProperties, eventName, event));
+                .processRestRequest(new ApexRestRequest(executionId, executionProperties, eventName, event));
 
             eventsSent++;
         } else {
             // No peered consumer defined
-            final String errorMessage = "send of event to URL \"" + restProducerProperties.getUrl() + "\" failed,"
-                    + " REST response consumer is not defined\n" + event;
-            LOGGER.warn(errorMessage);
+            final String errorMessage = "send of event failed," + " REST response consumer is not defined\n" + event;
             throw new ApexEventRuntimeException(errorMessage);
         }
     }
index 0c6067a..a1dc0b4 100644 (file)
 
 package org.onap.policy.apex.plugins.event.carrier.restrequestor;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
 
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
@@ -54,41 +54,29 @@ public class ApexRestRequestorConsumerTest {
         EventHandlerParameters consumerParameters = new EventHandlerParameters();
         ApexEventReceiver incomingEventReceiver = null;
 
-        try {
+        assertThatThrownBy(() -> {
             consumer.init(CONSUMER_NAME, consumerParameters, incomingEventReceiver);
-            fail("test should throw an exception here");
-        } catch (ApexEventException aee) {
-            assertEquals("specified consumer properties are not applicable to REST Requestor consumer (ConsumerName)",
-                    aee.getMessage());
-        }
+        }).hasMessage("specified consumer properties are not applicable to REST Requestor consumer (ConsumerName)");
 
         RestRequestorCarrierTechnologyParameters rrctp = new RestRequestorCarrierTechnologyParameters();
         consumerParameters.setCarrierTechnologyParameters(rrctp);
-        try {
+        assertThatThrownBy(() -> {
             consumer.init(CONSUMER_NAME, consumerParameters, incomingEventReceiver);
-            fail("test should throw an exception here");
-        } catch (ApexEventException aee) {
-            assertEquals("REST Requestor consumer (ConsumerName) must run in peered requestor mode "
-                    + "with a REST Requestor producer", aee.getMessage());
-        }
+        }).hasMessage("REST Requestor consumer (ConsumerName) must run in peered requestor mode "
+            + "with a REST Requestor producer");
 
         consumerParameters.setPeeredMode(EventHandlerPeeredMode.REQUESTOR, true);
         rrctp.setHttpMethod(null);
-        try {
+
+        assertThatThrownBy(() -> {
             consumer.init(CONSUMER_NAME, consumerParameters, incomingEventReceiver);
-            fail("test should throw an exception here");
-        } catch (ApexEventException aee) {
-            assertEquals("no URL has been specified on REST Requestor consumer (ConsumerName)", aee.getMessage());
-        }
+        }).hasMessage("no URL has been specified on REST Requestor consumer (ConsumerName)");
 
         rrctp.setHttpMethod(RestRequestorCarrierTechnologyParameters.HttpMethod.GET);
         rrctp.setUrl("ZZZZ");
-        try {
+        assertThatThrownBy(() -> {
             consumer.init(CONSUMER_NAME, consumerParameters, incomingEventReceiver);
-            fail("test should throw an exception here");
-        } catch (ApexEventException aee) {
-            assertEquals("invalid URL has been specified on REST Requestor consumer (ConsumerName)", aee.getMessage());
-        }
+        }).hasMessage("invalid URL has been specified on REST Requestor consumer (ConsumerName)");
 
         rrctp.setHttpMethod(RestRequestorCarrierTechnologyParameters.HttpMethod.GET);
         rrctp.setUrl("http://www.onap.org");
@@ -97,12 +85,9 @@ public class ApexRestRequestorConsumerTest {
 
         consumer.init(CONSUMER_NAME, consumerParameters, incomingEventReceiver);
 
-        try {
+        assertThatThrownBy(() -> {
             consumer.processRestRequest(null);
-            fail("test should throw an exception here");
-        } catch (Exception ex) {
-            assertEquals("could not queue request \"null\" on REST Requestor consumer (ConsumerName)", ex.getMessage());
-        }
+        }).hasMessage("could not queue request \"null\" on REST Requestor consumer (ConsumerName)");
 
         assertEquals(CONSUMER_NAME, consumer.getName());
         assertEquals(0, consumer.getEventsReceived());
index d168f24..450a21f 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2018 Ericsson. All rights reserved.
+ *  Modifications Copyright (C) 2020 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,6 +21,7 @@
 
 package org.onap.policy.apex.plugins.event.carrier.restrequestor;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
@@ -47,41 +49,30 @@ public class ApexRestRequestorProducerTest {
 
         EventHandlerParameters producerParameters = new EventHandlerParameters();
 
-        try {
+        assertThatThrownBy(() -> {
             producer.init(PRODUCER_NAME, producerParameters);
-        } catch (ApexEventException aee) {
-            assertEquals("specified producer properties are not applicable to REST requestor producer (ProducerName)",
-                            aee.getMessage());
-        }
+        }).hasMessage("specified producer properties are not applicable to REST requestor producer (ProducerName)");
 
         RestRequestorCarrierTechnologyParameters rrctp = new RestRequestorCarrierTechnologyParameters();
         producerParameters.setCarrierTechnologyParameters(rrctp);
-        try {
+        assertThatThrownBy(() -> {
             producer.init(PRODUCER_NAME, producerParameters);
-            fail("test should throw an exception here");
-        } catch (ApexEventException aee) {
-            assertEquals("REST Requestor producer (ProducerName) must run in peered requestor mode "
-                            + "with a REST Requestor consumer", aee.getMessage());
-        }
+        }).hasMessage("REST Requestor producer (ProducerName) must run in peered requestor mode "
+            + "with a REST Requestor consumer");
 
         producerParameters.setPeeredMode(EventHandlerPeeredMode.REQUESTOR, true);
         rrctp.setUrl("ZZZZ");
-        try {
+        assertThatThrownBy(() -> {
             producer.init(PRODUCER_NAME, producerParameters);
-            fail("test should throw an exception here");
-        } catch (ApexEventException aee) {
-            assertEquals("URL may not be specified on REST Requestor producer (ProducerName)", aee.getMessage());
-        }
+        }).hasMessage("URL may not be specified on REST Requestor producer (ProducerName)");
 
         rrctp.setUrl(null);
         rrctp.setHttpMethod(RestRequestorCarrierTechnologyParameters.HttpMethod.GET);
-        try {
+
+        assertThatThrownBy(() -> {
             producer.init(PRODUCER_NAME, producerParameters);
             fail("test should throw an exception here");
-        } catch (ApexEventException aee) {
-            assertEquals("HTTP method may not be specified on REST Requestor producer (ProducerName)",
-                            aee.getMessage());
-        }
+        }).hasMessage("HTTP method may not be specified on REST Requestor producer (ProducerName)");
 
         rrctp.setHttpMethod(null);
         producer.init(PRODUCER_NAME, producerParameters);
@@ -109,28 +100,20 @@ public class ApexRestRequestorProducerTest {
         String eventName = "EventName";
         String event = "This is the event";
 
-        try {
+        assertThatThrownBy(() -> {
             producer.sendEvent(12345, null, eventName, event);
-            fail("test should throw an exception here");
-        } catch (Exception aee) {
-            assertEquals("send of event to URL \"null\" failed, REST response consumer is not defined\n"
-                            + "This is the event", aee.getMessage());
-        }
+        }).hasMessage("send of event failed, REST response consumer is not defined\n" + "This is the event");
 
         ApexEventConsumer consumer = new ApexFileEventConsumer();
-        SynchronousEventCache eventCache = new SynchronousEventCache(EventHandlerPeeredMode.SYNCHRONOUS, consumer,
-                        producer, 1000);
+        SynchronousEventCache eventCache =
+            new SynchronousEventCache(EventHandlerPeeredMode.SYNCHRONOUS, consumer, producer, 1000);
         producer.setPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS, eventCache);
 
         PeeredReference peeredReference = new PeeredReference(EventHandlerPeeredMode.REQUESTOR, consumer, producer);
         producer.setPeeredReference(EventHandlerPeeredMode.REQUESTOR, peeredReference);
-        try {
+        assertThatThrownBy(() -> {
             producer.sendEvent(12345, null, eventName, event);
-            fail("test should throw an exception here");
-        } catch (Exception aee) {
-            assertEquals("send of event to URL \"null\" failed, REST response consumer "
-                            + "is not an instance of ApexRestRequestorConsumer\n" + "This is the event",
-                            aee.getMessage());
-        }
+        }).hasMessage("send of event failed, REST response consumer "
+            + "is not an instance of ApexRestRequestorConsumer\n" + "This is the event");
     }
 }
index b9b997f..1e00068 100644 (file)
@@ -1,7 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2018 Ericsson. All rights reserved.
- *  Modifications Copyright (C) 2019 Nordix Foundation.
+ *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.policy.apex.plugins.event.carrier.restrequestor;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.util.Set;
 
@@ -45,13 +45,9 @@ public class RestRequestorCarrierTechnologyParametersTest {
         arguments.setConfigurationFilePath("src/test/resources/prodcons/RESTRequestorWithHTTPHeaderBadList.json");
         arguments.setRelativeFileRoot(".");
 
-        try {
+        assertThatThrownBy(() -> {
             new ApexParameterHandler().getParameters(arguments);
-            fail("test should throw an exception here");
-        } catch (ParameterException pe) {
-            assertTrue(pe.getMessage().contains("HTTP header array entry is null\n    parameter"));
-            assertTrue(pe.getMessage().trim().endsWith("HTTP header array entry is null"));
-        }
+        }).hasMessageContaining("HTTP header array entry is null\n    parameter");
     }
 
     @Test
@@ -60,15 +56,9 @@ public class RestRequestorCarrierTechnologyParametersTest {
         arguments.setConfigurationFilePath("src/test/resources/prodcons/RESTRequestorWithHTTPHeaderNotKvPairs.json");
         arguments.setRelativeFileRoot(".");
 
-        try {
+        assertThatThrownBy(() -> {
             new ApexParameterHandler().getParameters(arguments);
-            fail("test should throw an exception here");
-        } catch (ParameterException pe) {
-            assertTrue(pe.getMessage()
-                    .contains("HTTP header array entries must have one key and one value: [aaa, bbb, ccc]"));
-            assertTrue(pe.getMessage().trim()
-                    .endsWith("HTTP header array entries must have one key and one value: [aaa]"));
-        }
+        }).hasMessageContaining("HTTP header array entries must have one key and one value: [aaa, bbb, ccc]");
     }
 
     @Test
@@ -77,13 +67,9 @@ public class RestRequestorCarrierTechnologyParametersTest {
         arguments.setConfigurationFilePath("src/test/resources/prodcons/RESTRequestorWithHTTPHeaderNulls.json");
         arguments.setRelativeFileRoot(".");
 
-        try {
+        assertThatThrownBy(() -> {
             new ApexParameterHandler().getParameters(arguments);
-            fail("test should throw an exception here");
-        } catch (ParameterException pe) {
-            assertTrue(pe.getMessage().contains("HTTP header key is null or blank: [null, bbb]"));
-            assertTrue(pe.getMessage().trim().endsWith("HTTP header value is null or blank: [ccc, null]"));
-        }
+        }).hasMessageContaining("HTTP header key is null or blank: [null, bbb]");
     }
 
     @Test
@@ -95,11 +81,11 @@ public class RestRequestorCarrierTechnologyParametersTest {
         ApexParameters parameters = new ApexParameterHandler().getParameters(arguments);
 
         RestRequestorCarrierTechnologyParameters rrctp0 = (RestRequestorCarrierTechnologyParameters) parameters
-                .getEventInputParameters().get("RestRequestorConsumer0").getCarrierTechnologyParameters();
+            .getEventInputParameters().get("RestRequestorConsumer0").getCarrierTechnologyParameters();
         assertEquals(0, rrctp0.getHttpHeaders().length);
 
         RestRequestorCarrierTechnologyParameters rrctp1 = (RestRequestorCarrierTechnologyParameters) parameters
-                .getEventInputParameters().get("RestRequestorConsumer1").getCarrierTechnologyParameters();
+            .getEventInputParameters().get("RestRequestorConsumer1").getCarrierTechnologyParameters();
         assertEquals(3, rrctp1.getHttpHeaders().length);
         assertEquals("bbb", rrctp1.getHttpHeadersAsMultivaluedMap().get("aaa").get(0));
         assertEquals("ddd", rrctp1.getHttpHeadersAsMultivaluedMap().get("ccc").get(0));
@@ -112,16 +98,13 @@ public class RestRequestorCarrierTechnologyParametersTest {
         arguments.setConfigurationFilePath("src/test/resources/prodcons/RESTClientWithHTTPFilterInvalid.json");
         arguments.setRelativeFileRoot(".");
 
-        try {
+        assertThatThrownBy(() -> {
             new ApexParameterHandler().getParameters(arguments);
             ApexParameters parameters = new ApexParameterHandler().getParameters(arguments);
 
             parameters.getEventInputParameters().get("RestRequestorConsumer0").getCarrierTechnologyParameters();
-            fail("test should throw an exception here");
-        } catch (ParameterException pe) {
-            assertTrue(pe.getMessage().contains(
-                    "Invalid HTTP code filter, the filter must be specified as a three digit regular expression: "));
-        }
+        }).hasMessageContaining(
+            "Invalid HTTP code filter, the filter must be specified as a three digit regular expression: ");
     }
 
     @Test
@@ -163,8 +146,8 @@ public class RestRequestorCarrierTechnologyParametersTest {
         assertEquals(RestRequestorCarrierTechnologyParameters.HttpMethod.DELETE, rrctp.getHttpMethod());
 
         assertEquals("RESTREQUESTORCarrierTechnologyParameters "
-                + "[url=http://some.where, httpMethod=DELETE, httpHeaders=[[aaa, bbb], [ccc, ddd]],"
-                + " httpCodeFilter=[1-5][0][0-5]]", rrctp.toString());
+            + "[url=http://some.where, httpMethod=DELETE, httpHeaders=[[aaa, bbb], [ccc, ddd]],"
+            + " httpCodeFilter=[1-5][0][0-5]]", rrctp.toString());
     }
 
     @Test
index 326be51..cbb81f9 100644 (file)
@@ -21,6 +21,7 @@
 
 package org.onap.policy.apex.plugins.event.carrier.restrequestor;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -30,6 +31,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
@@ -40,7 +42,6 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.onap.policy.apex.core.infrastructure.messaging.MessagingException;
-import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
 import org.onap.policy.apex.service.engine.main.ApexMain;
 import org.onap.policy.common.endpoints.http.server.HttpServletServer;
@@ -105,47 +106,23 @@ public class RestRequestorTest {
      * Test rest requestor get.
      *
      * @throws MessagingException the messaging exception
-     * @throws ApexException the apex exception
-     * @throws IOException Signals that an I/O exception has occurred.
+     * @throws Exception an exception
      */
     @Test
-    public void testRestRequestorGet() throws MessagingException, ApexException, IOException {
+    public void testRestRequestorGet() throws Exception {
         final Client client = ClientBuilder.newClient();
 
         final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FileGet.json"};
         final ApexMain apexMain = new ApexMain(args);
+        await().atMost(2, TimeUnit.SECONDS).until(() -> apexMain.isAlive());
 
-        Response response = null;
-
-        // Wait for the required amount of events to be received or for 10 seconds
-        Double getsSoFar = 0.0;
-        for (int i = 0; i < 40; i++) {
-            ThreadUtilities.sleep(100);
-
-            response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats")
-                .request("application/json").get();
-
-            if (Response.Status.OK.getStatusCode() != response.getStatus()) {
-                break;
-            }
-
-            final String responseString = response.readEntity(String.class);
-
-            @SuppressWarnings("unchecked")
-            final Map<String, Object> jsonMap = new Gson().fromJson(responseString, Map.class);
-            getsSoFar = Double.valueOf(jsonMap.get("GET").toString());
-
-            if (getsSoFar >= 50.0) {
-                break;
-            }
-        }
+        await().pollInterval(300, TimeUnit.MILLISECONDS).atMost(10, TimeUnit.SECONDS)
+            .until(() -> getStatsFromServer(client, "GET") >= 50.0);
 
         apexMain.shutdown();
-        client.close();
-
-        assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+        await().atMost(2, TimeUnit.SECONDS).until(() -> !apexMain.isAlive());
 
-        assertEquals(Double.valueOf(50.0), getsSoFar);
+        client.close();
     }
 
     /**
@@ -161,14 +138,13 @@ public class RestRequestorTest {
 
         final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FileGetEmpty.json"};
         final ApexMain apexMain = new ApexMain(args);
+        await().atMost(2, TimeUnit.SECONDS).until(() -> apexMain.isAlive());
 
         Response response = null;
 
         // Wait for the required amount of events to be received or for 10 seconds
         Double getsSoFar = 0.0;
         for (int i = 0; i < 40; i++) {
-            ThreadUtilities.sleep(100);
-
             response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats")
                 .request("application/json").get();
 
@@ -188,6 +164,8 @@ public class RestRequestorTest {
         }
 
         apexMain.shutdown();
+        await().atMost(2, TimeUnit.SECONDS).until(() -> !apexMain.isAlive());
+
         client.close();
 
         assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
@@ -206,37 +184,15 @@ public class RestRequestorTest {
 
         final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FilePut.json"};
         final ApexMain apexMain = new ApexMain(args);
+        await().atMost(2, TimeUnit.SECONDS).until(() -> apexMain.isAlive());
 
-        // Wait for the required amount of events to be received or for 10 seconds
-        Double putsSoFar = 0.0;
-
-        Response response = null;
-        for (int i = 0; i < 40; i++) {
-            ThreadUtilities.sleep(100);
-
-            response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats")
-                .request("application/json").get();
-
-            if (Response.Status.OK.getStatusCode() != response.getStatus()) {
-                break;
-            }
-
-            final String responseString = response.readEntity(String.class);
-
-            @SuppressWarnings("unchecked")
-            final Map<String, Object> jsonMap = new Gson().fromJson(responseString, Map.class);
-            putsSoFar = Double.valueOf(jsonMap.get("PUT").toString());
-
-            if (putsSoFar >= 50.0) {
-                break;
-            }
-        }
+        await().pollInterval(300, TimeUnit.MILLISECONDS).atMost(10, TimeUnit.SECONDS)
+            .until(() -> getStatsFromServer(client, "PUT") >= 50.0);
 
         apexMain.shutdown();
-        client.close();
+        await().atMost(2, TimeUnit.SECONDS).until(() -> !apexMain.isAlive());
 
-        assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
-        assertEquals(Double.valueOf(50.0), putsSoFar);
+        client.close();
     }
 
     /**
@@ -252,31 +208,15 @@ public class RestRequestorTest {
 
         final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FilePost.json"};
         final ApexMain apexMain = new ApexMain(args);
+        await().atMost(2, TimeUnit.SECONDS).until(() -> apexMain.isAlive());
 
-        // Wait for the required amount of events to be received or for 10 seconds
-        Double postsSoFar = 0.0;
-        for (int i = 0; i < 40; i++) {
-            ThreadUtilities.sleep(100);
-
-            final Response response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats")
-                .request("application/json").get();
-
-            assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
-            final String responseString = response.readEntity(String.class);
-
-            @SuppressWarnings("unchecked")
-            final Map<String, Object> jsonMap = new Gson().fromJson(responseString, Map.class);
-            postsSoFar = Double.valueOf(jsonMap.get("POST").toString());
-
-            if (postsSoFar >= 50.0) {
-                break;
-            }
-        }
+        await().pollInterval(300, TimeUnit.MILLISECONDS).atMost(10, TimeUnit.SECONDS)
+            .until(() -> getStatsFromServer(client, "POST") >= 50.0);
 
         apexMain.shutdown();
-        client.close();
+        await().atMost(2, TimeUnit.SECONDS).until(() -> !apexMain.isAlive());
 
-        assertEquals(Double.valueOf(50.0), postsSoFar);
+        client.close();
     }
 
     /**
@@ -292,31 +232,16 @@ public class RestRequestorTest {
 
         final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FileDelete.json"};
         final ApexMain apexMain = new ApexMain(args);
+        await().atMost(2, TimeUnit.SECONDS).until(() -> apexMain.isAlive());
 
-        // Wait for the required amount of events to be received or for 10 seconds
-        Double deletesSoFar = 0.0;
-        for (int i = 0; i < 40; i++) {
-            ThreadUtilities.sleep(100);
-
-            final Response response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats")
-                .request("application/json").get();
-
-            assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
-            final String responseString = response.readEntity(String.class);
-
-            @SuppressWarnings("unchecked")
-            final Map<String, Object> jsonMap = new Gson().fromJson(responseString, Map.class);
-            deletesSoFar = Double.valueOf(jsonMap.get("DELETE").toString());
-
-            if (deletesSoFar >= 50.0) {
-                break;
-            }
-        }
+        // Wait for the required amount of events to be received
+        await().pollInterval(300, TimeUnit.MILLISECONDS).atMost(10, TimeUnit.SECONDS)
+            .until(() -> getStatsFromServer(client, "DELETE") >= 50.0);
 
         apexMain.shutdown();
-        client.close();
+        await().atMost(2, TimeUnit.SECONDS).until(() -> !apexMain.isAlive());
 
-        assertEquals(Double.valueOf(50.0), deletesSoFar);
+        client.close();
     }
 
     /**
@@ -332,31 +257,15 @@ public class RestRequestorTest {
 
         final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FileGetMulti.json"};
         final ApexMain apexMain = new ApexMain(args);
+        await().atMost(10, TimeUnit.SECONDS).until(() -> apexMain.isAlive());
 
-        // Wait for the required amount of events to be received or for 10 seconds
-        Double getsSoFar = 0.0;
-        for (int i = 0; i < 40; i++) {
-            ThreadUtilities.sleep(100);
-
-            final Response response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats")
-                .request("application/json").get();
-
-            assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
-            final String responseString = response.readEntity(String.class);
-
-            @SuppressWarnings("unchecked")
-            final Map<String, Object> jsonMap = new Gson().fromJson(responseString, Map.class);
-            getsSoFar = Double.valueOf(jsonMap.get("GET").toString());
-
-            if (getsSoFar >= 8.0) {
-                break;
-            }
-        }
+        await().pollInterval(300, TimeUnit.MILLISECONDS).atMost(10, TimeUnit.SECONDS)
+            .until(() -> getStatsFromServer(client, "GET") >= 8.0);
 
         apexMain.shutdown();
-        client.close();
+        await().atMost(2, TimeUnit.SECONDS).until(() -> !apexMain.isAlive());
 
-        assertEquals(Double.valueOf(8.0), getsSoFar);
+        client.close();
     }
 
     /**
@@ -373,8 +282,7 @@ public class RestRequestorTest {
 
         final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FileGetProducerAlone.json"};
 
-        final ApexMain apexMain = new ApexMain(args);
-        ThreadUtilities.sleep(200);
+        ApexMain apexMain = new ApexMain(args);
         apexMain.shutdown();
 
         final String outString = outContent.toString();
@@ -400,8 +308,7 @@ public class RestRequestorTest {
 
         final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FileGetConsumerAlone.json"};
 
-        final ApexMain apexMain = new ApexMain(args);
-        ThreadUtilities.sleep(200);
+        ApexMain apexMain = new ApexMain(args);
         apexMain.shutdown();
 
         final String outString = outContent.toString();
@@ -412,4 +319,16 @@ public class RestRequestorTest {
         assertTrue(outString.contains("peer \"RestRequestorProducer for peered mode REQUESTOR "
             + "does not exist or is not defined with the same peered mode"));
     }
+
+    private Double getStatsFromServer(final Client client, final String statToGet) {
+        final Response response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats")
+            .request("application/json").get();
+
+        assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+        final String responseString = response.readEntity(String.class);
+
+        @SuppressWarnings("unchecked")
+        final Map<String, Object> jsonMap = new Gson().fromJson(responseString, Map.class);
+        return Double.valueOf(jsonMap.get(statToGet).toString());
+    }
 }
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/logback-test.xml b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/logback-test.xml
new file mode 100644 (file)
index 0000000..f0fd0b1
--- /dev/null
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ============LICENSE_START=======================================================
+   Copyright (C) 2016-2018 Ericsson. All rights reserved.
+   Modifications Copyright (C) 2020 Nordix Foundation.
+  ================================================================================
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+  SPDX-License-Identifier: Apache-2.0
+  ============LICENSE_END=========================================================
+-->
+
+<configuration>
+    <contextName>Apex</contextName>
+    <statusListener class="ch.qos.logback.core.status.OnConsoleStatusListener" />
+
+    <!-- USE FOR STD OUT ONLY -->
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <Pattern>%d %contextName [%t] %level %logger{36} - %msg%n</Pattern>
+        </encoder>
+    </appender>
+
+    <root level="ERROR">
+        <appender-ref ref="STDOUT" />
+    </root>
+
+    <logger name="org.onap.policy.apex.plugins.event.carrier.restrequestor" level="ERROR" additivity="false">
+        <appender-ref ref="STDOUT" />
+    </logger>
+</configuration>
index 205b865..fddbcb7 100644 (file)
@@ -106,7 +106,7 @@ public class ApexActivator {
             ApexParameters apexParameters = apexParametersMap.values().iterator().next();
             // totalInstanceCount is the sum of instance counts required as per each policy
             int totalInstanceCount = apexParametersMap.values().stream()
-                    .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum();
+                .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum();
             apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount);
             instantiateEngine(apexParameters);
             setUpModelMarhsallerAndUnmarshaller(apexParameters);
@@ -126,12 +126,12 @@ public class ApexActivator {
         for (Entry<ToscaPolicyIdentifier, ApexParameters> apexParamsEntry : apexParametersMap.entrySet()) {
             ApexParameters apexParams = apexParamsEntry.getValue();
             boolean duplicateInputParameterExist =
-                    apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey);
+                apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey);
             boolean duplicateOutputParameterExist =
-                    apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey);
+                apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey);
             if (duplicateInputParameterExist || duplicateOutputParameterExist) {
                 LOGGER.error("I/O Parameters for {}:{} has duplicates. So this policy is not executed.",
-                        apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion());
+                    apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion());
                 apexParametersMap.remove(apexParamsEntry.getKey());
                 continue;
             }
@@ -140,22 +140,29 @@ public class ApexActivator {
             // Check if a policy model file has been specified
             if (apexParams.getEngineServiceParameters().getPolicyModelFileName() != null) {
                 LOGGER.debug("deploying policy model in \"{}\" to the apex engines . . .",
-                        apexParams.getEngineServiceParameters().getPolicyModelFileName());
+                    apexParams.getEngineServiceParameters().getPolicyModelFileName());
 
-                final String policyModelString = TextFileUtils
-                        .getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName());
+                final String policyModelString =
+                    TextFileUtils.getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName());
                 AxPolicyModel policyModel = EngineServiceImpl
-                        .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString);
+                    .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString);
                 policyModelsMap.put(apexParamsEntry.getKey(), policyModel);
             }
         }
         AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap);
+
         // Set the policy model in the engine
         apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(), finalPolicyModel,
-                true);
+            true);
+
         setUpMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap,
-                outputParametersMap);
+            outputParametersMap);
+
+        // Wire up pairings between marhsallers and unmarshallers
         setUpMarshalerPairings(inputParametersMap);
+
+        // Start event processing
+        startUnmarshallers(inputParametersMap);
     }
 
     private AxPolicyModel aggregatePolicyModels(Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap) {
@@ -163,42 +170,43 @@ public class ApexActivator {
         ToscaPolicyIdentifier tempId = new ToscaPolicyIdentifier(firstEntry.getKey());
         AxPolicyModel tempModel = new AxPolicyModel(firstEntry.getValue());
         Stream<Entry<ToscaPolicyIdentifier, AxPolicyModel>> policyModelStream =
-                policyModelsMap.entrySet().stream().skip(1);
+            policyModelsMap.entrySet().stream().skip(1);
         Entry<ToscaPolicyIdentifier, AxPolicyModel> finalPolicyModelEntry =
-                policyModelStream.reduce(firstEntry, ((entry1, entry2) -> {
-                    try {
-                        entry1.setValue(PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(),
-                                true, true));
-                    } catch (ApexModelException exc) {
-                        LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.",
-                                entry2.getKey().getName(), entry2.getKey().getVersion(), exc);
-                        apexParametersMap.remove(entry2.getKey());
-                        policyModelsMap.remove(entry2.getKey());
-                    }
-                    return entry1;
-                }));
+            policyModelStream.reduce(firstEntry, ((entry1, entry2) -> {
+                try {
+                    entry1.setValue(
+                        PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true));
+                } catch (ApexModelException exc) {
+                    LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.",
+                        entry2.getKey().getName(), entry2.getKey().getVersion(), exc);
+                    apexParametersMap.remove(entry2.getKey());
+                    policyModelsMap.remove(entry2.getKey());
+                }
+                return entry1;
+            }));
         AxPolicyModel finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.getValue());
         policyModelsMap.put(tempId, tempModel); // put back the original first entry into the policyModelsMap
         return finalPolicyModel;
     }
 
     private void setUpMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters,
-            Map<String, EventHandlerParameters> inputParametersMap,
-            Map<String, EventHandlerParameters> outputParametersMap) throws ApexEventException {
+        Map<String, EventHandlerParameters> inputParametersMap, Map<String, EventHandlerParameters> outputParametersMap)
+        throws ApexEventException {
         // Producer parameters specify what event marshalers to handle events leaving Apex are
         // set up and how they are set up
         for (Entry<String, EventHandlerParameters> outputParameters : outputParametersMap.entrySet()) {
             final ApexEventMarshaller marshaller = new ApexEventMarshaller(outputParameters.getKey(),
-                    engineServiceParameters, outputParameters.getValue());
+                engineServiceParameters, outputParameters.getValue());
             marshaller.init();
             apexEngineService.registerActionListener(outputParameters.getKey(), marshaller);
             marshallerMap.put(outputParameters.getKey(), marshaller);
         }
+
         // Consumer parameters specify what event unmarshalers to handle events coming into Apex
         // are set up and how they are set up
         for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
             final ApexEventUnmarshaller unmarshaller = new ApexEventUnmarshaller(inputParameters.getKey(),
-                    engineServiceParameters, inputParameters.getValue());
+                engineServiceParameters, inputParameters.getValue());
             unmarshallerMap.put(inputParameters.getKey(), unmarshaller);
             unmarshaller.init(engineServiceHandler);
         }
@@ -206,7 +214,7 @@ public class ApexActivator {
 
     private void instantiateEngine(ApexParameters apexParameters) throws ApexException {
         if (null != apexEngineService
-                && apexEngineService.getKey().equals(apexParameters.getEngineServiceParameters().getEngineKey())) {
+            && apexEngineService.getKey().equals(apexParameters.getEngineServiceParameters().getEngineKey())) {
             throw new ApexException("Apex Engine already initialized.");
         }
         // Create engine with specified thread count
@@ -216,7 +224,7 @@ public class ApexActivator {
         // Instantiate and start the messaging service for Deployment
         LOGGER.debug("starting apex deployment service . . .");
         final EngDepMessagingService engDepService = new EngDepMessagingService(apexEngineService,
-                apexParameters.getEngineServiceParameters().getDeploymentPort());
+            apexParameters.getEngineServiceParameters().getDeploymentPort());
         engDepService.start();
 
         // Create the engine holder to hold the engine's references and act as an event receiver
@@ -240,14 +248,23 @@ public class ApexActivator {
                 if (inputParameters.getValue().isPeeredMode(peeredMode)) {
                     // Find the unmarshaler and marshaler
                     final ApexEventMarshaller peeredMarshaler =
-                            marshallerMap.get(inputParameters.getValue().getPeer(peeredMode));
+                        marshallerMap.get(inputParameters.getValue().getPeer(peeredMode));
 
                     // Connect the unmarshaler and marshaler
                     unmarshaller.connectMarshaler(peeredMode, peeredMarshaler);
                 }
             }
-            // Now let's get events flowing
-            unmarshaller.start();
+        }
+    }
+
+    /**
+     * Start up event processing, this happens once all marshaller to unmarshaller wiring has been done.
+     *
+     * @param inputParametersMap the apex parameters
+     */
+    private void startUnmarshallers(Map<String, EventHandlerParameters> inputParametersMap) {
+        for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
+            unmarshallerMap.get(inputParameters.getKey()).start();
         }
     }