2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018 Huawei Technologies Co., Ltd. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.ccsdk.sli.plugins.restconfdiscovery;
23 import com.google.common.collect.ImmutableMap;
24 import org.glassfish.jersey.media.sse.EventInput;
25 import org.glassfish.jersey.media.sse.EventSource;
26 import org.glassfish.jersey.media.sse.InboundEvent;
27 import org.glassfish.jersey.media.sse.SseFeature;
28 import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
29 import org.onap.ccsdk.sli.core.sli.SvcLogicException;
30 import org.onap.ccsdk.sli.core.utils.common.AcceptIpAddressHostNameVerifier;
31 import org.onap.ccsdk.sli.plugins.restapicall.Parameters;
32 import org.onap.ccsdk.sli.plugins.restapicall.RestapiCallNode;
33 import org.onap.ccsdk.sli.plugins.restconfapicall.RestconfApiCallNode;
34 import org.slf4j.Logger;
36 import javax.net.ssl.SSLContext;
37 import javax.net.ssl.TrustManager;
38 import javax.net.ssl.X509TrustManager;
39 import javax.ws.rs.client.Client;
40 import javax.ws.rs.client.ClientBuilder;
41 import javax.ws.rs.client.WebTarget;
42 import javax.ws.rs.core.Response;
43 import java.net.MalformedURLException;
45 import java.security.KeyManagementException;
46 import java.security.NoSuchAlgorithmException;
47 import java.security.cert.CertificateException;
48 import java.security.cert.X509Certificate;
50 import java.util.NoSuchElementException;
51 import java.util.concurrent.*;
53 import static org.onap.ccsdk.sli.plugins.restapicall.JsonParser.convertToProperties;
54 import static org.slf4j.LoggerFactory.getLogger;
57 * Representation of a plugin to subscribe for notification and then
58 * to handle the received notifications.
60 public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDiscoveryPlugin {
62 private static final Logger log = getLogger(PeriodicDiscoveryNode.class);
64 private static final String ROOT_RESOURCE = "/restconf";
65 private static final String SUBSCRIBER_ID = "subscriberId";
66 private static final String RESPONSE_CODE = "response-code";
67 private static final String RESPONSE_PREFIX = "responsePrefix";
68 private static final String OUTPUT_IDENTIFIER = "ietf-subscribed-notif" +
69 "ications:establish-subscription.output.identifier";
70 private static final String OUTPUT_IDENTIFIER_NO_PREFIX = "output.identifier";
71 private static final String RESPONSE_CODE_200 = "200";
72 private static final String SSE_URL = "sseConnectURL";
73 private static final String PERIODIC_PUL_URL = "periodicPullURL";
74 private static final String REST_API_URL = "restapiUrl";
75 private static final String RESOURCE_PATH_PREFIX = "/data/";
76 private static final String NOTIFICATION_PATH_PREFIX = "/streams/";
77 private static final String DEVICE_IP = "deviceIp";
78 private static final String DEVICE_PORT = "devicePort";
79 private static final String DOUBLESLASH = "//";
80 private static final String COLON = ":";
82 private RestconfApiCallNode restconfApiCallNode;
83 private RestapiCallNode restapiCallNode = new RestapiCallNode();
84 private volatile Map<String, SubscriptionInfo> subscriptionInfoMap = new ConcurrentHashMap<>();
85 private volatile LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
86 private Map<DeviceId, PeriodicPullRunnable> periodicRunnableTable = new ConcurrentHashMap<>();
87 private Map<DeviceId, String> subscribedDevicesTable = new ConcurrentHashMap<>();
88 private Map<DeviceId, BlockingQueue<String>> eventQMap = new ConcurrentHashMap<>();
89 private Map<DeviceId, InternalPeriodicPullingProcessorRunnable>
90 processorRunnableTable = new ConcurrentHashMap<>();
91 private final Map<DeviceId, RestSBDevice> deviceMap = new ConcurrentHashMap<>();
92 private final Map<DeviceId, Client> clientMap = new ConcurrentHashMap<>();
93 private ExecutorService executor = Executors.newCachedThreadPool();
94 private ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
97 * Creates an instance of RestconfDiscoveryNode and starts processing of
100 * @param r restconf api call node
102 public PeriodicDiscoveryNode(RestconfApiCallNode r) {
103 log.info("inside RestconfDiscoveryNode Constructor");
104 this.restconfApiCallNode = r;
106 // ExecutorService e = Executors.newFixedThreadPool(20);
107 // EventProcessor p = new EventProcessor(this);
108 // for (int i = 0; i < 20; ++i) {
113 public void activate() {
114 log.info("RESTCONF SBI Started");
117 public void deactivate() {
118 log.info("RESTCONF SBI Stopped");
120 this.getClientMap().clear();
121 this.getDeviceMap().clear();
124 public Map<DeviceId, RestSBDevice> getDeviceMap() {
128 public Map<DeviceId, Client> getClientMap() {
133 public Map<DeviceId, RestSBDevice> getDevices() {
134 log.trace("RESTCONF SBI::getDevices");
135 return ImmutableMap.copyOf(deviceMap);
139 public RestSBDevice getDevice(DeviceId deviceInfo) {
140 log.trace("RESTCONF SBI::getDevice with deviceId");
141 return deviceMap.get(deviceInfo);
145 public RestSBDevice getDevice(String ip, int port) {
146 log.trace("RESTCONF SBI::getDevice with ip and port");
148 if (!deviceMap.isEmpty()) {
149 return deviceMap.values().stream().filter(v -> v.ip().equals(ip) && v.port() == port).findFirst().get();
151 } catch (NoSuchElementException noSuchElementException) {
152 log.error("getDevice::device {}:{} does not exist in deviceMap", ip, port);
158 public void addDevice(RestSBDevice device) {
159 log.trace("RESTCONF SBI::addDevice");
160 if (!deviceMap.containsKey(device.deviceId())) {
161 if (device.username() != null) {
162 String username = device.username();
163 String password = device.password() == null ? "" : device.password();
164 // authenticate(client, username, password);
166 BlockingQueue<String> newBlockingQueue = new LinkedBlockingQueue<>();
167 eventQMap.put(device.deviceId(), newBlockingQueue);
168 InternalPeriodicPullingProcessorRunnable eventProcessorRunnable =
169 new InternalPeriodicPullingProcessorRunnable(device.deviceId());
170 processorRunnableTable.put(device.deviceId(), eventProcessorRunnable);
171 log.trace("addDevice::restconf event processor runnable is created and is going for execute");
172 executor.execute(eventProcessorRunnable);
173 log.trace("addDevice::restconf event processor runnable was sent for execute");
174 deviceMap.put(device.deviceId(), device);
176 log.warn("addDevice::Trying to add a device which already exists {}", device.deviceId());
181 public void removeDevice(DeviceId deviceId) {
182 log.trace("RESTCONF SBI::removeDevice");
183 eventQMap.remove(deviceId);
184 clientMap.remove(deviceId);
185 deviceMap.remove(deviceId);
189 public void establishSubscription(Map<String, String> paramMap,
190 SvcLogicContext ctx) throws SvcLogicException {
191 String subscriberId = paramMap.get(SUBSCRIBER_ID);
192 if (subscriberId == null) {
193 throw new SvcLogicException("Subscriber Id is null");
196 restconfApiCallNode.sendRequest(paramMap, ctx);
198 if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
199 // TODO: save subscription id and subscriber in MYSQL
201 establishPersistentConnection(paramMap, ctx, subscriberId);
203 log.info("Failed to subscribe {}", subscriberId);
204 throw new SvcLogicException(ctx.getAttribute(RESPONSE_CODE));
209 public void establishSubscriptionOnly(Map<String, String> paramMap, SvcLogicContext ctx)
210 throws SvcLogicException {
211 String subscriberId = paramMap.get(SUBSCRIBER_ID);
212 if (subscriberId == null) {
213 throw new SvcLogicException("Subscriber Id is null");
216 String subscribeUrlString = paramMap.get(REST_API_URL);
217 URL subscribeUrl = null;
218 RestSBDevice dev = null;
220 subscribeUrl = new URL(subscribeUrlString);
221 dev = getDevice(subscribeUrl.getHost(), subscribeUrl.getPort());
222 } catch (MalformedURLException e) {
223 log.error("establishSubscriptionOnly::MalformedURLException happened. e: {}", e);
228 log.warn("establishSubscriptionOnly::device does not exist in the map. Trying to create one now.");
229 //FIXME: TODO: create a new RestSBDevice and add it to the map, as well as a client and clientMap
230 dev = new DefaultRestSBDevice(subscribeUrl.getHost(),
231 subscribeUrl.getPort(), "onos", "rocks", "http",
232 subscribeUrl.getHost() + ":" + subscribeUrl.getPort(), true);
236 if (!subscribedDevicesTable.containsKey(dev.deviceId())) {
237 log.info("establishSubscriptionOnly::The device {} has not been subscribed yet. " +
238 "Trying to subscribe it now...");
239 restapiCallNode.sendRequest(paramMap, ctx);
240 if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
241 // TODO: save subscription id and subscriber in MYSQL
242 String id = getOutputIdentifierNoPrefix(paramMap.get(RESPONSE_PREFIX), ctx);
243 log.info("establishSubscriptionOnly::Subscription is done successfully and " +
244 "the output.identifier is: {}", id);
245 log.info("establishSubscriptionOnly::The subscriptionID returned by the server " +
246 "does not exist in the map. Adding it now...");
247 subscribedDevicesTable.put(dev.deviceId(), id);
249 SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
251 paramMap.get("version"),
252 paramMap.get("mode"));
253 SubscriptionInfo info = new SubscriptionInfo();
254 info.callBackDG(callbackDG);
255 info.subscriptionId(id);
256 info.subscriberId(subscriberId);
257 subscriptionInfoMap.put(id, info);
265 public void modifySubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
266 // TODO: to be implemented
270 public void deleteSubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
271 String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
273 subscriptionInfoMap.remove(id);
277 protected String getTokenId(String customHttpHeaders) {
278 if (customHttpHeaders.contains("=")) {
279 String[] s = customHttpHeaders.split("=");
282 return customHttpHeaders;
285 protected WebTarget addToken(WebTarget target, String customHttpHeaders) {
286 if (customHttpHeaders == null) {
290 return new AdditionalHeaderWebTarget(
291 target, getTokenId(customHttpHeaders));
295 * Establishes a persistent between the client and server.
297 * @param paramMap input paramter map
298 * @param ctx service logic context
299 * @param subscriberId subscriber identifier
301 void establishPersistentConnection(Map<String, String> paramMap, SvcLogicContext ctx,
302 String subscriberId) {
306 * Returns response code.
308 * @param prefix prefix given in input parameter
309 * @param ctx service logic context
310 * @return response code
312 String getResponseCode(String prefix, SvcLogicContext ctx) {
313 return ctx.getAttribute(getPrefix(prefix) + RESPONSE_CODE);
316 String getOutputIdentifierNoPrefix(String prefix, SvcLogicContext ctx) {
317 return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER_NO_PREFIX);
321 * Returns subscription id from event.
323 * @param prefix prefix given in input parameter
324 * @param ctx service logic context
325 * @return subscription id from event
327 String getOutputIdentifier(String prefix, SvcLogicContext ctx) {
328 return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER);
331 private String getPrefix(String prefix) {
332 return prefix != null ? prefix + "." : "";
335 private String getSubscriptionId(String subscriberId) {
336 for (Map.Entry<String,SubscriptionInfo> entry
337 : subscriptionInfoMap.entrySet()) {
338 if (entry.getValue().subscriberId()
339 .equals(subscriberId)) {
340 return entry.getKey();
346 private String getUrlString(DeviceId deviceId, String request) {
347 RestSBDevice restSBDevice = deviceMap.get(deviceId);
348 if (restSBDevice == null) {
349 log.warn("getUrlString::restSbDevice cannot be NULL!");
352 if (restSBDevice.url() != null) {
353 return restSBDevice.protocol() + COLON + DOUBLESLASH + restSBDevice.url() + request;
355 return restSBDevice.protocol() + COLON + DOUBLESLASH + restSBDevice.ip().toString()
356 + COLON + restSBDevice.port() + request;
360 private String getSubscriptionIdFromDeviceId(DeviceId deviceId) {
361 if (subscribedDevicesTable.containsKey(deviceId)) {
362 return subscribedDevicesTable.get(deviceId);
367 private BlockingQueue<String> getEventQ(DeviceId deviceId) {
368 if (eventQMap.containsKey(deviceId)) {
369 return eventQMap.get(deviceId);
375 * Returns restconfApiCallNode.
377 * @return restconfApiCallNode
379 protected RestconfApiCallNode restconfapiCallNode() {
380 return restconfApiCallNode;
384 * Sets restconfApiCallNode.
386 * @param node restconfApiCallNode
388 void restconfapiCallNode(RestconfApiCallNode node) {
389 restconfApiCallNode = node;
392 Map<String, SubscriptionInfo> subscriptionInfoMap() {
393 return subscriptionInfoMap;
396 void subscriptionInfoMap(Map<String, SubscriptionInfo> subscriptionInfoMap) {
397 this.subscriptionInfoMap = subscriptionInfoMap;
400 LinkedBlockingQueue<String> eventQueue() {
404 void eventQueue(LinkedBlockingQueue<String> eventQueue) {
405 this.eventQueue = eventQueue;
409 * Establishes a persistent SSE connection between the client and the server.
411 * @param paramMap input paramter map
412 * @param ctx service logic context
415 public void establishPersistentSseConnection(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException {
420 public void establishPeriodicPullConnection(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException {
421 String subscriberId = paramMap.get(SUBSCRIBER_ID);
422 SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
424 paramMap.get("version"),
425 paramMap.get("mode"));
426 SubscriptionInfo info = new SubscriptionInfo();
427 info.callBackDG(callbackDG);
428 info.subscriberId(subscriberId);
430 String periodicPullUrlString = paramMap.get(PERIODIC_PUL_URL);
431 URL periodicPullUrl = null;
432 RestSBDevice dev = null;
434 periodicPullUrl = new URL(periodicPullUrlString);
435 dev = getDevice(periodicPullUrl.getHost(), periodicPullUrl.getPort());
436 } catch (MalformedURLException e) {
437 log.error("establishPersistentSseConnection::MalformedURLException happened. e: {}", e);
442 log.warn("establishPeriodicPullConnection::device does not exist in the map. Trying to add one now.");
443 dev = new DefaultRestSBDevice(periodicPullUrl.getHost(),
444 periodicPullUrl.getPort(), "onos", "rocks", "http",
445 periodicPullUrl.getHost() + ":" + periodicPullUrl.getPort(), true);
449 if (isNotificationEnabled(dev.deviceId())) {
450 log.warn("establishPeriodicPullConnection::notifications already enabled on device: {}",
455 if (getSubscriptionIdFromDeviceId(dev.deviceId()) == null) {
456 log.warn("This device {} has not yet been subscribed to receive notifications.",
461 RestconfNotificationEventListenerImpl myListener =
462 new RestconfNotificationEventListenerImpl(info);
463 enableNotifications(dev.deviceId(), "ietf-service-pm:performance-monitoring", "json", myListener);
467 public void enableNotifications(DeviceId device, String request,
469 RestconfNotificationEventListener listener) {
470 if (isNotificationEnabled(device)) {
471 log.warn("enableNotifications::already enabled on device: {}", device);
475 request = discoverRootResource(device) + RESOURCE_PATH_PREFIX
478 addNotificationListener(device, listener);
480 PeriodicPullRunnable periodicRunnable = new PeriodicPullRunnable(request, device);
481 periodicRunnableTable.put(device, periodicRunnable);
482 scheduledExecutor.scheduleAtFixedRate(periodicRunnable, 0, 60, TimeUnit.SECONDS);
485 public void stopNotifications(DeviceId device) {
487 periodicRunnableTable.get(device).terminate();
488 processorRunnableTable.get(device).terminate();
489 } catch (Exception ex) {
490 log.error("stopNotifications::Exception happened when terminating, ex: {}", ex);
492 log.info("stopNotifications::Runnable is now terminated");
493 periodicRunnableTable.remove(device);
494 processorRunnableTable.remove(device);
495 log.debug("stopNotifications::Stop sending notifications for device URI: " + device.uri().toString());
499 public void deleteSubscriptionAndSseConnection(Map<String, String> paramMap, SvcLogicContext ctx) {
500 String deleteSubscribeUrlString = paramMap.get(REST_API_URL);
501 URL deleteSubscribeUrl = null;
502 RestSBDevice dev = null;
504 deleteSubscribeUrl = new URL(deleteSubscribeUrlString);
505 dev = getDevice(deleteSubscribeUrl.getHost(), deleteSubscribeUrl.getPort());
506 } catch (MalformedURLException e) {
507 log.error("establishSubscriptionOnly::MalformedURLException happened. e: {}", e);
511 String deviceIp = deleteSubscribeUrl.getHost();
512 String devicePort = String.valueOf(deleteSubscribeUrl.getPort());
513 log.info("deleteSubscriptionAndSseConnection::Trying to unsubscribe device {}:{}",
514 deviceIp, devicePort);
516 log.error("deleteSubscriptionAndSseConnection::device does not exist in the map");
519 String subscriptionId = getSubscriptionIdFromDeviceId(dev.deviceId());
521 if (subscriptionId != null) {
522 log.info("deleteSubscriptionAndSseConnection::SubscriptionID is found {}", subscriptionId);
523 log.info("deleteSubscriptionAndSseConnection::About to send unsubscribe request");
525 ctx.setAttribute("subscriptionId", subscriptionId);
526 restapiCallNode.sendRequest(paramMap, ctx);
527 if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
528 log.info("deleteSubscriptionAndSseConnection::Successfully unsubscribed");
529 stopNotifications(dev.deviceId());
530 subscribedDevicesTable.remove(dev.deviceId());
532 String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
534 subscriptionInfoMap.remove(id);
538 log.info("deleteSubscriptionAndSseConnection::Unsubscription was NOT successfull");
540 } catch (SvcLogicException e) {
541 log.error("deleteSubscriptionAndSseConnection::Exception happened ex: {}", e);
544 log.warn("deleteSubscriptionAndSseConnection::This device has already been unsubscribed");
548 public class PeriodicPullRunnable implements Runnable {
549 private String request;
550 private DeviceId deviceId;
552 private volatile boolean running = true;
554 public void terminate() {
555 log.info("PeriodicPullRunnable.terminate()::threadID: {}",
556 Thread.currentThread().getId());
561 * @param request request
562 * @param deviceId device identifier
564 public PeriodicPullRunnable(String request, DeviceId deviceId) {
565 this.request = request;
566 this.deviceId = deviceId;
571 log.trace("PeriodicPullRunnable.run()::threadID is: {} ...., running is: {}",
572 Thread.currentThread().getId(), running);
574 Client client = ClientBuilder.newBuilder().build();
575 WebTarget target = client.target(getUrlString(deviceId, request));
576 log.trace("PeriodicPullRunnable.run()::target URI is {}", target.getUri().toString());
577 Response response = null;
579 response = target.request().get();
580 String rcvdData = response.readEntity(String.class);
581 log.trace("PeriodicPullRunnable.run()::after readEntity");
582 BlockingQueue<String> eventQ = getEventQ(deviceId);
583 if (eventQ != null) {
584 eventQ.add(rcvdData);
585 eventQMap.put(deviceId, eventQ);
586 log.trace("PeriodicPullRunnable.run()::eventQ got filled.");
588 log.error("PeriodicPullRunnable.run()::eventQ has not been initialized for this device {}",
592 log.trace("PeriodicPullRunnable.run()::running is false! " +
593 "closing the client and the response, threadID: {}", Thread.currentThread().getId());
596 log.info("PeriodicPullRunnable.run()::eventInput is closed in run()");
598 } catch (Exception ex) {
599 log.info("PeriodicPullRunnable.run()::We got some exception: {}, threadID: {} ", ex,
600 Thread.currentThread().getId());
602 log.trace("PeriodicPullRunnable.run()::after Runnable Try Catch. threadID: {} ",
603 Thread.currentThread().getId());
607 public class InternalPeriodicPullingProcessorRunnable implements Runnable {
609 private volatile boolean running = true;
610 private DeviceId deviceId;
612 public InternalPeriodicPullingProcessorRunnable(DeviceId deviceId) {
613 this.deviceId = deviceId;
616 public void terminate() {
617 log.info("InternalPeriodicPullingProcessorRunnable.terminate()::threadID: {}",
618 Thread.currentThread().getId());
624 log.trace("InternalPeriodicPullingProcessorRunnable::restconf event processor runnable inside run()");
627 if (eventQMap != null && !eventQMap.isEmpty() && eventQMap.get(deviceId) != null) {
628 log.trace("InternalPeriodicPullingProcessorRunnable::waiting for take()");
630 String eventJsonString = eventQMap.get(deviceId).take();
631 log.trace("InternalPeriodicPullingProcessorRunnable::after take()");
632 log.info("InternalPeriodicPullingProcessorRunnable::eventJsonString is {}", eventJsonString);
633 Map<String, String> param = convertToProperties(eventJsonString);
634 String idString = param.get("push-change-update.subscription-id");
635 SubscriptionInfo info = subscriptionInfoMap().get(idString);
637 SvcLogicContext ctx = setContext(param);
638 SvcLogicGraphInfo callbackDG = info.callBackDG();
639 callbackDG.executeGraph(ctx);
642 log.info("InternalPeriodicPullingProcessorRunnable.run()::running has changed to false " +
643 "while eventQ was blocked to process new notifications");
644 log.info("InternalPeriodicPullingProcessorRunnable.run()::" +
645 "the client is no longer interested to receive notifications.");
649 } catch (InterruptedException | SvcLogicException e) {
654 private SvcLogicContext setContext(Map<String, String> param) {
655 SvcLogicContext ctx = new SvcLogicContext();
656 for (Map.Entry<String, String> entry : param.entrySet()) {
657 ctx.setAttribute(entry.getKey(), entry.getValue());
663 public String discoverRootResource(DeviceId device) {
664 return ROOT_RESOURCE;
668 public void addNotificationListener(DeviceId deviceId,
669 RestconfNotificationEventListener listener) {
673 public void removeNotificationListener(DeviceId deviceId,
674 RestconfNotificationEventListener listener) {
677 public boolean isNotificationEnabled(DeviceId deviceId) {
678 return periodicRunnableTable.containsKey(deviceId);