import org.glassfish.jersey.media.sse.SseFeature;
import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
import org.onap.ccsdk.sli.core.sli.SvcLogicException;
+import org.onap.ccsdk.sli.plugins.restapicall.Parameters;
+import org.onap.ccsdk.sli.plugins.restapicall.RestapiCallNode;
import org.onap.ccsdk.sli.plugins.restconfapicall.RestconfApiCallNode;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import static org.slf4j.LoggerFactory.getLogger;
+
/**
* Representation of a plugin to subscribe for notification and then
* to handle the received notifications.
*/
public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin {
- private static final Logger log = LoggerFactory.getLogger(RestconfDiscoveryNode.class);
+
+ private static final Logger log = getLogger(RestconfDiscoveryNode.class);
private ExecutorService executor = Executors.newCachedThreadPool();
private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>();
private static final String SUBSCRIBER_ID = "subscriberId";
private static final String RESPONSE_CODE = "response-code";
private static final String RESPONSE_PREFIX = "responsePrefix";
- private static final String OUTPUT_IDENTIFIER = "ietf-subscribed-notifications:output.identifier";
+ private static final String OUTPUT_IDENTIFIER = "ietf-subscribed-notif" +
+ "ications:establish-subscription.output.identifier";
private static final String RESPONSE_CODE_200 = "200";
private static final String SSE_URL = "sseConnectURL";
/**
- * Creates an instance of RestconfDiscoveryNode and
- * starts processing of event.
+ * Creates an instance of RestconfDiscoveryNode and starts processing of
+ * event.
+ *
+ * @param r restconf api call node
*/
- public RestconfDiscoveryNode() {
+ public RestconfDiscoveryNode(RestconfApiCallNode r) {
+ this.restconfApiCallNode = r;
ExecutorService e = Executors.newFixedThreadPool(20);
EventProcessor p = new EventProcessor(this);
for (int i = 0; i < 20; ++i) {
class PersistentConnection implements Runnable {
private String url;
private volatile boolean running = true;
+ private Map<String, String> paramMap;
- PersistentConnection(String url) {
+ PersistentConnection(String url, Map<String, String> paramMap) {
this.url = url;
+ this.paramMap = paramMap;
}
private void terminate() {
@Override
public void run() {
- Client client = ClientBuilder.newBuilder()
- .register(SseFeature.class).build();
- WebTarget target = client.target(url);
+ Parameters p;
+ WebTarget target = null;
+ try {
+ RestapiCallNode restapi = restconfApiCallNode.getRestapiCallNode();
+ p = restapi.getParameters(paramMap, new Parameters());
+ Client client = ignoreSslClient().register(SseFeature.class);
+ target = restapi.addAuthType(client, p).target(url);
+ } catch (SvcLogicException e) {
+ log.error("Exception occured!", e);
+ Thread.currentThread().interrupt();
+ }
+
+ target = addToken(target, paramMap.get("customHttpHeaders"));
EventSource eventSource = EventSource.target(target).build();
eventSource.register(new EventHandler(RestconfDiscoveryNode.this));
eventSource.open();
log.info("Connected to SSE source");
while (running) {
try {
+ log.info("SSE state " + eventSource.isOpen());
Thread.sleep(5000);
} catch (InterruptedException e) {
log.error("Interrupted!", e);
}
}
+ private Client ignoreSslClient() {
+ SSLContext sslcontext = null;
+
+ try {
+ sslcontext = SSLContext.getInstance("TLS");
+ sslcontext.init(null, new TrustManager[]{new X509TrustManager() {
+ @Override
+ public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
+ }
+
+ @Override
+ public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
+ }
+
+ @Override
+ public X509Certificate[] getAcceptedIssuers() {
+ return new X509Certificate[0];
+ }
+ } }, new java.security.SecureRandom());
+ } catch (NoSuchAlgorithmException | KeyManagementException e) {
+ throw new IllegalStateException(e);
+ }
+
+ return ClientBuilder.newBuilder().sslContext(sslcontext).hostnameVerifier((s1, s2) -> true).build();
+ }
+
+ protected String getTokenId(String customHttpHeaders) {
+ if (customHttpHeaders.contains("=")) {
+ String s[] = customHttpHeaders.split("=");
+ return s[1];
+ }
+ return customHttpHeaders;
+ }
+
+ protected WebTarget addToken(WebTarget target, String customHttpHeaders) {
+ if (customHttpHeaders == null) {
+ return target;
+ }
+
+ return new AdditionalHeaderWebTarget(
+ target, getTokenId(customHttpHeaders));
+ }
+
/**
* Establishes a persistent between the client and server.
*
subscriptionInfoMap.put(id, info);
String url = paramMap.get(SSE_URL);
- PersistentConnection connection = new PersistentConnection(url);
+ PersistentConnection connection = new PersistentConnection(url, paramMap);
runnableInfo.put(id, connection);
executor.execute(connection);
}