+ String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
+ if (id != null) {
+ PersistentConnection conn = runnableInfo.get(id);
+ conn.terminate();
+ runnableInfo.remove(id);
+ subscriptionInfoMap.remove(id);
+ }
+ }
+
+ class PersistentConnection implements Runnable {
+ private String url;
+ private volatile boolean running = true;
+
+ PersistentConnection(String url) {
+ this.url = url;
+ }
+
+ private void terminate() {
+ running = false;
+ }
+
+ @Override
+ public void run() {
+ Client client = ClientBuilder.newBuilder()
+ .register(SseFeature.class).build();
+ WebTarget target = client.target(url);
+ EventSource eventSource = EventSource.target(target).build();
+ eventSource.register(new EventHandler(RestconfDiscoveryNode.this));
+ eventSource.open();
+ log.info("Connected to SSE source");
+ while (running) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ log.error("Interrupted!", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ eventSource.close();
+ log.info("Closed connection to SSE source");
+ }
+ }
+
+ /**
+ * Establishes a persistent between the client and server.
+ *
+ * @param paramMap input paramter map
+ * @param ctx service logic context
+ * @param subscriberId subscriber identifier
+ */
+ void establishPersistentConnection(Map<String, String> paramMap, SvcLogicContext ctx,
+ String subscriberId) {
+ String id = getOutputIdentifier(paramMap.get(RESPONSE_PREFIX), ctx);
+ SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
+ paramMap.get("rpc"),
+ paramMap.get("version"),
+ paramMap.get("mode"));
+ SubscriptionInfo info = new SubscriptionInfo();
+ info.callBackDG(callbackDG);
+ info.subscriptionId(id);
+ info.subscriberId(subscriberId);
+ subscriptionInfoMap.put(id, info);
+
+ String url = paramMap.get(SSE_URL);
+ PersistentConnection connection = new PersistentConnection(url);
+ runnableInfo.put(id, connection);
+ executor.execute(connection);
+ }
+
+ /**
+ * Returns response code.
+ *
+ * @param prefix prefix given in input parameter
+ * @param ctx service logic context
+ * @return response code
+ */
+ String getResponseCode(String prefix, SvcLogicContext ctx) {
+ return ctx.getAttribute(getPrefix(prefix) + RESPONSE_CODE);
+ }
+
+ /**
+ * Returns subscription id from event.
+ *
+ * @param prefix prefix given in input parameter
+ * @param ctx service logic context
+ * @return subscription id from event
+ */
+ String getOutputIdentifier(String prefix, SvcLogicContext ctx) {
+ return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER);
+ }
+
+ private String getPrefix(String prefix) {
+ return prefix != null ? prefix + "." : "";
+ }
+
+ private String getSubscriptionId(String subscriberId) {
+ for (Map.Entry<String,SubscriptionInfo> entry
+ : subscriptionInfoMap.entrySet()) {
+ if (entry.getValue().subscriberId()
+ .equals(subscriberId)) {
+ return entry.getKey();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Returns restconfApiCallNode.
+ *
+ * @return restconfApiCallNode
+ */
+ protected RestconfApiCallNode restconfapiCallNode() {
+ return restconfApiCallNode;
+ }
+
+ /**
+ * Sets restconfApiCallNode.
+ *
+ * @param node restconfApiCallNode
+ */
+ void restconfapiCallNode(RestconfApiCallNode node) {
+ restconfApiCallNode = node;
+ }
+
+ Map<String, SubscriptionInfo> subscriptionInfoMap() {
+ return subscriptionInfoMap;
+ }
+
+ void subscriptionInfoMap(Map<String, SubscriptionInfo> subscriptionInfoMap) {
+ this.subscriptionInfoMap = subscriptionInfoMap;
+ }
+
+ LinkedBlockingQueue<String> eventQueue() {
+ return eventQueue;
+ }