Merge of new rebased code
[appc.git] / appc-event-listener / appc-event-listener-bundle / src / main / java / org / openecomp / appc / listener / LCM / impl / WorkerImpl.java
index af8334e..41a77c7 100644 (file)
@@ -24,6 +24,7 @@ package org.openecomp.appc.listener.LCM.impl;
 import org.openecomp.appc.exceptions.APPCException;
 import org.openecomp.appc.listener.EventHandler;
 import org.openecomp.appc.listener.LCM.conv.Converter;
+import org.openecomp.appc.listener.LCM.model.DmaapMessage;
 import org.openecomp.appc.listener.LCM.model.DmaapOutgoingMessage;
 import org.openecomp.appc.listener.LCM.operation.ProviderOperations;
 
@@ -38,28 +39,31 @@ public class WorkerImpl implements Runnable {
     private final EELFLogger LOG = EELFManager.getInstance().getLogger(WorkerImpl.class);
 
     // Should have all of the data we need for processing
-    private JsonNode event;
+    private DmaapMessage event;
 
     // So we can post messages from inside the worker.
     private EventHandler dmaap;
-    private String rpcName;
 
-    public WorkerImpl(String rpcName,JsonNode message, EventHandler dmaap) {
-        this.rpcName = rpcName;
+    //so we know were to post the messages
+    private final ProviderOperations providerOperations;
+
+
+    public WorkerImpl(DmaapMessage message, EventHandler dmaap, ProviderOperations providerOperations) {
         this.event = message;
         this.dmaap = dmaap;
+        this.providerOperations = providerOperations;
     }
 
     @Override
     public void run() {
-        String requestIdWithSubId = extractRequestIdWithSubId(event);
+        String requestIdWithSubId = extractRequestIdWithSubId(event.getBody());
         LOG.debug(String.format("Started working on %s", requestIdWithSubId));
 
         // Run the dg in a try catch to handle all exceptions and update the
         // message at the end
         try {
-            JsonNode outputJsonNode = doDG(rpcName, event);
-            DmaapOutgoingMessage dmaapOutgoingMessage= Converter.convJsonNodeToDmaapOutgoingMessage(outputJsonNode,rpcName);
+            JsonNode outputJsonNode = doDG(event.getRpcName(), event.getBody());
+            DmaapOutgoingMessage dmaapOutgoingMessage= Converter.convJsonNodeToDmaapOutgoingMessage(event, outputJsonNode);
             postMessageToDMaaP(dmaapOutgoingMessage,requestIdWithSubId);
             Integer statusCode = extractStatusCode(dmaapOutgoingMessage.getBody());
             if (ProviderOperations.isSucceeded(statusCode)) {
@@ -73,7 +77,7 @@ public class WorkerImpl implements Runnable {
             // along
             String msg = "Exception: " + e.getMessage();
             LOG.error(String.format("Event %s finished with failure. %s", requestIdWithSubId, msg));
-            DmaapOutgoingMessage dmaapOutgoingMessage= Converter.buildDmaapOutgoingMessageWithUnexpectedError(event,rpcName,e);
+            DmaapOutgoingMessage dmaapOutgoingMessage= Converter.buildDmaapOutgoingMessageWithUnexpectedError(event, e);
             postMessageToDMaaP(dmaapOutgoingMessage,requestIdWithSubId);
         }
 
@@ -115,6 +119,6 @@ public class WorkerImpl implements Runnable {
     }
 
     private JsonNode doDG(String rpcName, JsonNode msg) throws APPCException {
-        return ProviderOperations.topologyDG(rpcName,msg);
+        return providerOperations.topologyDG(rpcName,msg);
     }
 }