Fixing the gRPC consumer side 52/103452/2
authora.sreekumar <ajith.sreekumar@est.tech>
Mon, 9 Mar 2020 10:56:39 +0000 (10:56 +0000)
committera.sreekumar <ajith.sreekumar@est.tech>
Wed, 11 Mar 2020 15:49:31 +0000 (15:49 +0000)
Change-Id: I21d9253f41eee9b958e8fb723f6c19f266502cef
Issue-ID: POLICY-1656
Signed-off-by: a.sreekumar <ajith.sreekumar@est.tech>
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/test/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumerTest.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/test/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParametersTest.java

index 7333c8a..0f6bdf6 100644 (file)
 
 package org.onap.policy.apex.plugins.event.carrier.grpc;
 
+import lombok.Getter;
 import org.onap.policy.apex.service.engine.event.ApexEventException;
 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
-import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
 import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer;
-import org.onap.policy.apex.service.engine.event.PeeredReference;
 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
 
 /**
- * This class implements an Apex gRPC consumer. It is not expected to receive events using gRPC.
- * So, initializing a gRPC consumer will result in error.
+ * This class implements an Apex gRPC consumer. The consumer is used by it's peer gRPC producer to consume response
+ * events.
  *
  * @author Ajith Sreekumar (ajith.sreekumar@est.tech)
  */
 public class ApexGrpcConsumer extends ApexPluginsEventConsumer {
 
-    private static final String GRPC_CONSUMER_ERROR_MSG =
-        "A gRPC Consumer may not be specified. Only sending events is possible using gRPC";
+    // The event receiver that will receive events from this consumer
+    @Getter
+    private ApexEventReceiver eventReceiver;
 
     @Override
     public void init(final String consumerName, final EventHandlerParameters consumerParameters,
         final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
-        throw new ApexEventException(GRPC_CONSUMER_ERROR_MSG);
-    }
+        this.eventReceiver = incomingEventReceiver;
+        this.name = consumerName;
 
-    @Override
-    public void run() {
-        throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG);
-    }
+        // Check and get the gRPC Properties
+        if (!(consumerParameters.getCarrierTechnologyParameters() instanceof GrpcCarrierTechnologyParameters)) {
+            final String errorMessage =
+                "specified consumer properties are not applicable to the gRPC consumer (" + this.name + ")";
+            throw new ApexEventException(errorMessage);
+        }
 
-    @Override
-    public void start() {
-        throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG);
+        // Check if we are in peered mode
+        if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) {
+            final String errorMessage =
+                "gRPC consumer (" + this.name + ") must run in peered requestor mode with a gRPC producer";
+            throw new ApexEventException(errorMessage);
+        }
     }
 
     @Override
     public void stop() {
-        throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG);
+        // For gRPC requests, all the implementation is in the producer
     }
 
     @Override
-    public PeeredReference getPeeredReference(EventHandlerPeeredMode peeredMode) {
-        throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG);
+    public void run() {
+        // For gRPC requests, all the implementation is in the producer
     }
 
-    @Override
-    public void setPeeredReference(EventHandlerPeeredMode peeredMode, PeeredReference peeredReference) {
-        throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG);
-    }
 }
index 380ae12..c98fa41 100644 (file)
@@ -31,10 +31,13 @@ import org.onap.ccsdk.cds.controllerblueprints.common.api.Status;
 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput.Builder;
 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
 import org.onap.policy.apex.service.engine.event.ApexEventException;
 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
 import org.onap.policy.apex.service.engine.event.ApexPluginsEventProducer;
+import org.onap.policy.apex.service.engine.event.PeeredReference;
 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
 import org.onap.policy.cds.api.CdsProcessorListener;
 import org.onap.policy.cds.client.CdsProcessorGrpcClient;
 import org.onap.policy.cds.properties.CdsServerProperties;
@@ -73,7 +76,7 @@ public class ApexGrpcProducer extends ApexPluginsEventProducer implements CdsPro
         }
         GrpcCarrierTechnologyParameters grpcProducerProperties =
             (GrpcCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
-
+        grpcProducerProperties.validateGrpcParameters(true);
         client = makeGrpcClient(grpcProducerProperties);
     }
 
@@ -125,6 +128,32 @@ public class ApexGrpcProducer extends ApexPluginsEventProducer implements CdsPro
                 + "response from CDS:\n" + cdsResponse.get();
             throw new ApexEventRuntimeException(errorMessage);
         }
+
+        consumeEvent(executionId, cdsResponse.get());
+    }
+
+    private void consumeEvent(long executionId, ExecutionServiceOutput event) {
+        // Find the peered consumer for this producer
+        final PeeredReference peeredRequestorReference = peerReferenceMap.get(EventHandlerPeeredMode.REQUESTOR);
+        if (peeredRequestorReference == null) {
+            return;
+        }
+        // Find the gRPC Response Consumer that will take in the response to APEX Engine
+        final ApexEventConsumer consumer = peeredRequestorReference.getPeeredConsumer();
+        if (!(consumer instanceof ApexGrpcConsumer)) {
+            final String errorMessage = "Recieve of gRPC response by APEX failed,"
+                + "The consumer is not an instance of ApexGrpcConsumer\n. The received gRPC response:" + event;
+            throw new ApexEventRuntimeException(errorMessage);
+        }
+
+        // Use the consumer to consume this response event in APEX
+        final ApexGrpcConsumer grpcConsumer = (ApexGrpcConsumer) consumer;
+        try {
+            grpcConsumer.getEventReceiver().receiveEvent(executionId, new Properties(),
+                JsonFormat.printer().print(event));
+        } catch (ApexEventException | InvalidProtocolBufferException e) {
+            throw new ApexEventRuntimeException("Consuming gRPC response failed.", e);
+        }
     }
 
     /**
index 59db167..f13248e 100644 (file)
@@ -22,10 +22,9 @@ package org.onap.policy.apex.plugins.event.carrier.grpc;
 
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.commons.lang3.StringUtils;
+import org.onap.policy.apex.service.engine.event.ApexEventException;
 import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
-import org.onap.policy.common.parameters.annotations.Max;
-import org.onap.policy.common.parameters.annotations.Min;
-import org.onap.policy.common.parameters.annotations.NotNull;
 
 // @formatter:off
 /**
@@ -59,20 +58,10 @@ public class GrpcCarrierTechnologyParameters extends CarrierTechnologyParameters
     /** The consumer plugin class for the gRPC carrier technology. */
     public static final String GRPC_EVENT_CONSUMER_PLUGIN_CLASS = ApexGrpcConsumer.class.getName();
 
-    @Min(value = 1)
     private int timeout;
-
-    @Min(value = MIN_USER_PORT)
-    @Max(value = MAX_USER_PORT)
     private int port;
-
-    @NotNull
     private String host;
-
-    @NotNull
     private String username;
-
-    @NotNull
     private String password;
 
 
@@ -87,4 +76,36 @@ public class GrpcCarrierTechnologyParameters extends CarrierTechnologyParameters
         this.setEventProducerPluginClass(GRPC_EVENT_PRODUCER_PLUGIN_CLASS);
         this.setEventConsumerPluginClass(GRPC_EVENT_CONSUMER_PLUGIN_CLASS);
     }
+
+    /**
+     * The method validates the gRPC parameters. Host details are specified as parameters only for a gRPC producer.
+     *
+     * @param isProducer if the parameters specified are for the gRPC producer or consumer
+     * @throws ApexEventException exception thrown when invalid parameters are provided
+     */
+    public void validateGrpcParameters(boolean isProducer) throws ApexEventException {
+        StringBuilder errorMessage = new StringBuilder();
+        if (isProducer) {
+            if (timeout < 1) {
+                errorMessage.append("timeout should have a positive value.\n");
+            }
+            if (MIN_USER_PORT > port || MAX_USER_PORT < port) {
+                errorMessage.append("port range should be between ").append(MIN_USER_PORT).append(" and ")
+                .append(MAX_USER_PORT).append("\n");
+            }
+            if (StringUtils.isEmpty(host)) {
+                errorMessage.append("host should be specified.\n");
+            }
+            if (StringUtils.isEmpty(username)) {
+                errorMessage.append("username should be specified.\n");
+            }
+            if (StringUtils.isEmpty(password)) {
+                errorMessage.append("password should be specified.\n");
+            }
+        }
+        if (errorMessage.length() > 0) {
+            errorMessage.insert(0, "Issues in specifying gRPC Producer parameters:\n");
+            throw new ApexEventException(errorMessage.toString());
+        }
+    }
 }
index dc5cc38..48fce1b 100644 (file)
@@ -22,21 +22,24 @@ package org.onap.policy.apex.plugins.event.carrier.grpc;
 
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Before;
 import org.junit.Test;
 import org.onap.policy.apex.service.engine.event.ApexEventException;
 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
+import org.onap.policy.apex.service.engine.event.PeeredReference;
 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
 
 public class ApexGrpcConsumerTest {
-    ApexGrpcConsumer grpcConsumer = null;
-    EventHandlerParameters consumerParameters = null;
-    ApexEventReceiver incomingEventReceiver = null;
-
-    private static final String GRPC_CONSUMER_ERROR_MSG =
-        "A gRPC Consumer may not be specified. Only sending events is possible using gRPC";
+    private static final String CONSUMER_NAME = "TestApexGrpcConsumer";
+    private ApexGrpcConsumer grpcConsumer = null;
+    private ApexGrpcProducer grpcProducer = null;
+    private EventHandlerParameters consumerParameters = null;
+    private ApexEventReceiver incomingEventReceiver = null;
 
     /**
      * Set up testing.
@@ -46,22 +49,22 @@ public class ApexGrpcConsumerTest {
     @Before
     public void setUp() throws ApexEventException {
         grpcConsumer = new ApexGrpcConsumer();
-        consumerParameters = new EventHandlerParameters();
-        consumerParameters.setCarrierTechnologyParameters(new GrpcCarrierTechnologyParameters() {});
+        grpcProducer = new ApexGrpcProducer();
     }
 
     @Test
     public void testInit() {
-        assertThatThrownBy(() -> {
-            grpcConsumer.init("TestApexGrpcConsumer", consumerParameters, incomingEventReceiver);
-        }).hasMessage(GRPC_CONSUMER_ERROR_MSG);
+        consumerParameters = populateConsumerParameters(true, true);
+        Assertions.assertThatCode(() -> grpcConsumer.init(CONSUMER_NAME, consumerParameters, incomingEventReceiver))
+            .doesNotThrowAnyException();
     }
 
     @Test
-    public void testStart() {
-        assertThatThrownBy(() -> {
-            grpcConsumer.start();
-        }).hasMessage(GRPC_CONSUMER_ERROR_MSG);
+    public void testInit_invalidPeeredMode() {
+        consumerParameters = populateConsumerParameters(true, false);
+        assertThatThrownBy(() -> grpcConsumer.init(CONSUMER_NAME, consumerParameters, incomingEventReceiver))
+            .hasMessageContaining(
+                "gRPC consumer (" + CONSUMER_NAME + ") must run in peered requestor mode with a gRPC producer");
     }
 
     @Test
@@ -70,24 +73,35 @@ public class ApexGrpcConsumerTest {
     }
 
     @Test
-    public void testGetPeeredReference() {
-        assertThatThrownBy(() -> {
-            grpcConsumer.getPeeredReference(EventHandlerPeeredMode.REQUESTOR);
-        }).hasMessage(GRPC_CONSUMER_ERROR_MSG);
+    public void testPeeredReference() throws ApexEventException {
+        consumerParameters = populateConsumerParameters(true, true);
+        grpcConsumer.setPeeredReference(EventHandlerPeeredMode.REQUESTOR,
+            new PeeredReference(EventHandlerPeeredMode.REQUESTOR, grpcConsumer, grpcProducer));
+        grpcConsumer.init(CONSUMER_NAME, consumerParameters, incomingEventReceiver);
+        PeeredReference peeredReference = grpcConsumer.getPeeredReference(EventHandlerPeeredMode.REQUESTOR);
+        assertNotNull(peeredReference);
+        assertTrue(peeredReference.getPeeredConsumer().equals(grpcConsumer));
+        assertTrue(peeredReference.getPeeredProducer().equals(grpcProducer));
     }
 
-    @Test
-    public void testSetPeeredReference() {
-        assertThatThrownBy(() -> {
-            grpcConsumer.setPeeredReference(null, null);
-        }).hasMessage(GRPC_CONSUMER_ERROR_MSG);
-    }
-
-    @Test()
-    public void testStop() {
-        assertThatThrownBy(() -> {
-            new ApexGrpcConsumer().stop();
-        }).hasMessage(GRPC_CONSUMER_ERROR_MSG);
+    private EventHandlerParameters populateConsumerParameters(boolean isConsumer, boolean isPeeredMode) {
+        consumerParameters = new EventHandlerParameters();
+        GrpcCarrierTechnologyParameters params = new GrpcCarrierTechnologyParameters();
+        params.setLabel("GRPC");
+        params.setEventProducerPluginClass(ApexGrpcProducer.class.getName());
+        params.setEventConsumerPluginClass(ApexGrpcConsumer.class.getName());
+        if (!isConsumer) {
+            params.setHost("hostname");
+            params.setPort(3214);
+            params.setUsername("dummyUser");
+            params.setPassword("dummyPassword");
+            params.setTimeout(1);
+        }
+        consumerParameters.setCarrierTechnologyParameters(params);
+        if (isPeeredMode) {
+            consumerParameters.setPeeredMode(EventHandlerPeeredMode.REQUESTOR, true);
+            consumerParameters.setPeer(EventHandlerPeeredMode.REQUESTOR, "requestorPeerName");
+        }
+        return consumerParameters;
     }
-
 }
index a3994c2..480d842 100644 (file)
 
 package org.onap.policy.apex.plugins.event.carrier.grpc;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Before;
 import org.junit.Test;
+import org.onap.policy.apex.service.engine.event.ApexEventException;
 import org.onap.policy.common.parameters.GroupValidationResult;
 
 public class GrpcCarrierTechnologyParametersTest {
@@ -42,19 +44,13 @@ public class GrpcCarrierTechnologyParametersTest {
     }
 
     @Test
-    public void testGrpcCarrierTechnologyParameters_invalid() {
+    public void testGrpcCarrierTechnologyParameters_invalid_producer_params() throws ApexEventException {
         GroupValidationResult result = params.validate();
-        assertFalse(result.isValid());
-        assertTrue(result.getResult().contains("field \"timeout\" type \"int\" value \"0\" INVALID, must be >= 1"));
-        assertTrue(result.getResult().contains("field \"port\" type \"int\" value \"0\" INVALID, must be >= 1024"));
-        assertTrue(
-            result.getResult().contains("field \"host\" type \"java.lang.String\" value \"null\" INVALID, is null"));
-        assertTrue(result.getResult()
-            .contains("field \"username\" type \"java.lang.String\" value \"null\" INVALID, is null"));
-        assertTrue(result.getResult()
-            .contains("field \"password\" type \"java.lang.String\" value \"null\" INVALID, is null"));
-        assertTrue(result.getResult().contains(""));
-        assertTrue(result.getResult().contains(""));
+        assertTrue(result.isValid());
+        assertThatThrownBy(() -> params.validateGrpcParameters(true))
+            .hasMessage("Issues in specifying gRPC Producer parameters:\ntimeout should have a positive value.\n"
+                + "port range should be between 1024 and 65535\n" + "host should be specified.\n"
+                + "username should be specified.\n" + "password should be specified.\n");
     }
 
     @Test
@@ -70,6 +66,7 @@ public class GrpcCarrierTechnologyParametersTest {
         params.setUsername(USERNAME);
         GroupValidationResult result = params.validate();
         assertTrue(result.isValid());
+        Assertions.assertThatCode(() -> params.validateGrpcParameters(true)).doesNotThrowAnyException();
     }
 
     @Test
@@ -81,7 +78,8 @@ public class GrpcCarrierTechnologyParametersTest {
 
         params.setPort(23); // invalid value
         GroupValidationResult result = params.validate();
-        assertFalse(result.isValid());
-        assertTrue(result.getResult().contains("field \"port\" type \"int\" value \"23\" INVALID, must be >= 1024"));
+        assertTrue(result.isValid());
+        assertThatThrownBy(() -> params.validateGrpcParameters(true))
+            .hasMessageContaining("port range should be between 1024 and 65535");
     }
 }