Update to consume and publish events in new format
[aai/champ.git] / champ-service / src / main / java / org / onap / champ / async / ChampAsyncResponsePublisher.java
index a9560b0..c3f3859 100644 (file)
@@ -27,13 +27,13 @@ import java.util.concurrent.ThreadPoolExecutor;
 
 import org.onap.aai.cl.api.Logger;
 import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.event.api.EventPublisher;
 import org.onap.champ.event.GraphEvent;
 import org.onap.champ.event.GraphEvent.GraphEventResult;
+import org.onap.champ.event.envelope.GraphEventEnvelope;
 import org.onap.champ.service.ChampThreadFactory;
 import org.onap.champ.service.logging.ChampMsgs;
 
-import org.onap.aai.event.api.EventPublisher;
-
 public class ChampAsyncResponsePublisher {
 
   private EventPublisher asyncResponsePublisher;
@@ -51,7 +51,7 @@ public class ChampAsyncResponsePublisher {
   /**
    * Internal queue where outgoing events will be buffered.
    **/
-  private BlockingQueue<GraphEvent> responsePublisherEventQueue;
+  private BlockingQueue<GraphEventEnvelope> responsePublisherEventQueue;
 
   /**
    * Pool of worker threads that do the work of publishing the events to the
@@ -72,7 +72,7 @@ public class ChampAsyncResponsePublisher {
 
     this.responsePublisherPoolSize = responsePublisherPoolSize;
 
-    responsePublisherEventQueue = new ArrayBlockingQueue<GraphEvent>(responsePublisherQueueSize);
+    responsePublisherEventQueue = new ArrayBlockingQueue<GraphEventEnvelope>(responsePublisherQueueSize);
     responsePublisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(responsePublisherPoolSize,
         new ChampThreadFactory(CHAMP_GRAPH_RESPONSE_PUBLISH_THREAD_NAME));
 
@@ -91,7 +91,7 @@ public class ChampAsyncResponsePublisher {
 
     responsePublisherPoolSize = DEFAULT_ASYNC_RESPONSE_PUBLISH_THREAD_POOL_SIZE;
 
-    responsePublisherEventQueue = new ArrayBlockingQueue<GraphEvent>(responsePublisherQueueSize);
+    responsePublisherEventQueue = new ArrayBlockingQueue<GraphEventEnvelope>(responsePublisherQueueSize);
     responsePublisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(responsePublisherPoolSize,
         new ChampThreadFactory(CHAMP_GRAPH_RESPONSE_PUBLISH_THREAD_NAME));
 
@@ -105,8 +105,8 @@ public class ChampAsyncResponsePublisher {
             + asyncResponsePublisher.getClass().getName());
   }
 
-  public void publishResponseEvent(GraphEvent event) {
-    responsePublisherEventQueue.offer(event);
+  public void publishResponseEvent(GraphEventEnvelope eventEnvelope) {
+    responsePublisherEventQueue.offer(eventEnvelope);
 
   }
 
@@ -116,23 +116,20 @@ public class ChampAsyncResponsePublisher {
     public void run() {
 
       while (true) {
-
+        GraphEventEnvelope eventEnvelope = null;
         GraphEvent event = null;
         try {
-
           // Get the next event to be published from the queue.
-          event = responsePublisherEventQueue.take();
-
+          eventEnvelope = responsePublisherEventQueue.take();
+          event = eventEnvelope.getBody();
         } catch (InterruptedException e) {
-
           // Restore the interrupted status.
           Thread.currentThread().interrupt();
         }
         // Publish the response
-
         try {
           event.setTimestamp(System.currentTimeMillis());
-          asyncResponsePublisher.sendSync(event.toJson());
+          asyncResponsePublisher.sendSync(eventEnvelope.toJson());
           if (event.getResult().equals(GraphEventResult.SUCCESS)) {
             logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
                 "Response published for Event of type: " + event.getObjectType() + " with key: " + event.getObjectKey()