Changes to retain executionProperties per event flow for better context handling 61/121161/1
authora.sreekumar <ajith.sreekumar@bell.ca>
Wed, 28 Apr 2021 10:51:23 +0000 (11:51 +0100)
committerAjith Sreekumar <ajith.sreekumar@bell.ca>
Fri, 7 May 2021 14:29:12 +0000 (14:29 +0000)
Change-Id: I11668e9222dd9c61cc3096fa5c754c8702a781bf
Issue-ID: POLICY-3227
Signed-off-by: a.sreekumar <ajith.sreekumar@bell.ca>
(cherry picked from commit 7bcbd221e350e45db5be0c4f1ceb2e4e1a418f63)

plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/ApexRestClientConsumer.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java

index 410c83c..ce00210 100644 (file)
@@ -128,10 +128,10 @@ public class ApexGrpcProducer extends ApexPluginsEventProducer implements CdsPro
             LOGGER.error("Sending event \"{}\" by {} to CDS failed.", eventName, this.name);
         }
 
-        consumeEvent(executionId, cdsResponse.get());
+        consumeEvent(executionId, executionProperties, cdsResponse.get());
     }
 
-    private void consumeEvent(long executionId, ExecutionServiceOutput event) {
+    private void consumeEvent(long executionId, Properties executionProperties, ExecutionServiceOutput event) {
         // Find the peered consumer for this producer
         final PeeredReference peeredRequestorReference = peerReferenceMap.get(EventHandlerPeeredMode.REQUESTOR);
         if (peeredRequestorReference == null) {
@@ -148,7 +148,7 @@ public class ApexGrpcProducer extends ApexPluginsEventProducer implements CdsPro
         // Use the consumer to consume this response event in APEX
         final ApexGrpcConsumer grpcConsumer = (ApexGrpcConsumer) consumer;
         try {
-            grpcConsumer.getEventReceiver().receiveEvent(executionId, new Properties(),
+            grpcConsumer.getEventReceiver().receiveEvent(executionId, executionProperties,
                 JsonFormat.printer().print(event));
         } catch (ApexEventException | InvalidProtocolBufferException e) {
             throw new ApexEventRuntimeException("Consuming gRPC response failed.", e);
index 452ecf8..fc7c11e 100644 (file)
@@ -2,6 +2,7 @@
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
  *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
+ *  Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -50,9 +51,6 @@ public class ApexRestClientConsumer extends ApexPluginsEventConsumer {
     // The amount of time to wait in milliseconds between checks that the consumer thread has stopped
     private static final long REST_CLIENT_WAIT_SLEEP_TIME = 50;
 
-    // The Key for property
-    private static final String HTTP_CODE_STATUS = "HTTP_CODE_STATUS";
-
     // The REST parameters read from the parameter service
     private RestClientCarrierTechnologyParameters restConsumerProperties;
 
@@ -171,12 +169,8 @@ public class ApexRestClientConsumer extends ApexPluginsEventConsumer {
                     throw new ApexEventRuntimeException(errorMessage);
                 }
 
-                // build a key and value property in excutionProperties
-                Properties executionProperties = new Properties();
-                executionProperties.put(HTTP_CODE_STATUS, response.getStatus());
-
                 // Send the event into Apex
-                eventReceiver.receiveEvent(executionProperties, eventJsonString);
+                eventReceiver.receiveEvent(new Properties(), eventJsonString);
             } catch (final Exception e) {
                 LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e);
             }
index d7ee0a5..81997e3 100644 (file)
@@ -71,9 +71,6 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer {
     // stopped
     private static final long REST_REQUESTOR_WAIT_SLEEP_TIME = 50;
 
-    // The Key for property
-    private static final String HTTP_CODE_STATUS = "HTTP_CODE_STATUS";
-
     // The REST parameters read from the parameter service
     private RestRequestorCarrierTechnologyParameters restConsumerProperties;
 
@@ -335,12 +332,8 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer {
                     throw new ApexEventRuntimeException(errorMessage);
                 }
 
-                // build a key and value property in excutionProperties
-                Properties executionProperties = new Properties();
-                executionProperties.put(HTTP_CODE_STATUS, response.getStatus());
-
                 // Send the event into Apex
-                eventReceiver.receiveEvent(request.getExecutionId(), executionProperties, eventJsonString);
+                eventReceiver.receiveEvent(request.getExecutionId(), inputExecutionProperties, eventJsonString);
 
                 synchronized (eventsReceivedLock) {
                     eventsReceived++;