Add authorization header in SSE request
[ccsdk/sli/plugins.git] / restconf-client / provider / src / main / java / org / onap / ccsdk / sli / plugins / restconfdiscovery / EventProcessor.java
index a85876c..e46e47a 100644 (file)
@@ -28,17 +28,18 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 
 import static org.onap.ccsdk.sli.plugins.prop.JsonParser.convertToProperties;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Processes the events from event queue and executes callback DG.
  */
 class EventProcessor implements Runnable {
-    private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
+
+    private static final Logger log = getLogger(EventProcessor.class);
     private RestconfDiscoveryNode node;
 
-    private static final String EVENT_SUBSCRIPTION_ID = "ietf-notification:notification" +
-            ".ietf-yang-push:push-change-update" +
-            ".subscription-id";
+    private static final String EVENT_SUBSCRIPTION_ID = "ietf-restconf:" +
+            "notification.ietf-yang-push:push-change-update.subscription-id";
 
     public EventProcessor(RestconfDiscoveryNode node) {
         this.node = node;
@@ -53,17 +54,22 @@ class EventProcessor implements Runnable {
                 String id = param.get(EVENT_SUBSCRIPTION_ID);
                 SubscriptionInfo info = node.subscriptionInfoMap().get(id);
                 if (info != null) {
-                    SvcLogicContext ctx = new SvcLogicContext();
-                    for (Map.Entry<String, String> entry : param.entrySet()) {
-                        ctx.setAttribute(entry.getKey(), entry.getValue());
-                    }
+                    SvcLogicContext ctx = setContext(param);
                     SvcLogicGraphInfo callbackDG = info.callBackDG();
                     callbackDG.executeGraph(ctx);
                 }
             } catch (InterruptedException | SvcLogicException e) {
-                log.error(e.getMessage());
-                throw new RuntimeException(e.getMessage());
+                log.error("Interrupted!", e);
+                Thread.currentThread().interrupt();
             }
         }
     }
+
+    private SvcLogicContext setContext(Map<String, String> param) {
+        SvcLogicContext ctx = new SvcLogicContext();
+        for (Map.Entry<String, String> entry : param.entrySet()) {
+            ctx.setAttribute(entry.getKey(), entry.getValue());
+        }
+        return ctx;
+    }
 }