2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018 Huawei Technologies Co., Ltd. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.ccsdk.sli.plugins.restconfdiscovery;
23 import org.glassfish.jersey.media.sse.EventSource;
24 import org.glassfish.jersey.media.sse.SseFeature;
25 import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
26 import org.onap.ccsdk.sli.core.sli.SvcLogicException;
27 import org.onap.ccsdk.sli.plugins.restconfapicall.RestconfApiCallNode;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
31 import javax.ws.rs.client.Client;
32 import javax.ws.rs.client.ClientBuilder;
33 import javax.ws.rs.client.WebTarget;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.LinkedBlockingQueue;
41 * Representation of a plugin to subscribe for notification and then
42 * to handle the received notifications.
44 public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin {
45 private static final Logger log = LoggerFactory.getLogger(RestconfDiscoveryNode.class);
47 private ExecutorService executor = Executors.newCachedThreadPool();
48 private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>();
49 private RestconfApiCallNode restconfApiCallNode;
51 private volatile Map<String, SubscriptionInfo> subscriptionInfoMap = new ConcurrentHashMap<>();
52 private volatile LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
54 private static final String SUBSCRIBER_ID = "subscriberId";
55 private static final String RESPONSE_CODE = "response-code";
56 private static final String RESPONSE_PREFIX = "responsePrefix";
57 private static final String OUTPUT_IDENTIFIER = "ietf-subscribed-notifications:output.identifier";
58 private static final String RESPONSE_CODE_200 = "200";
59 private static final String SSE_URL = "sseConnectURL";
62 * Creates an instance of RestconfDiscoveryNode and
63 * starts processing of event.
65 public RestconfDiscoveryNode() {
66 ExecutorService e = Executors.newFixedThreadPool(20);
67 EventProcessor p = new EventProcessor(this);
68 for (int i = 0; i < 20; ++i) {
74 public void establishSubscription(Map<String, String> paramMap,
75 SvcLogicContext ctx) throws SvcLogicException {
76 String subscriberId = paramMap.get(SUBSCRIBER_ID);
77 if (subscriberId == null) {
78 throw new SvcLogicException("Subscriber Id is null");
81 restconfApiCallNode.sendRequest(paramMap, ctx);
83 if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
84 // TODO: save subscription id and subscriber in MYSQL
86 establishPersistentConnection(paramMap, ctx, subscriberId);
88 log.info("Failed to subscribe {}", subscriberId);
89 throw new SvcLogicException(ctx.getAttribute(RESPONSE_CODE));
94 public void modifySubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
95 // TODO: to be implemented
99 public void deleteSubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
100 String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
102 PersistentConnection conn = runnableInfo.get(id);
104 runnableInfo.remove(id);
105 subscriptionInfoMap.remove(id);
109 class PersistentConnection implements Runnable {
111 private volatile boolean running = true;
113 PersistentConnection(String url) {
117 private void terminate() {
123 Client client = ClientBuilder.newBuilder()
124 .register(SseFeature.class).build();
125 WebTarget target = client.target(url);
126 EventSource eventSource = EventSource.target(target).build();
127 eventSource.register(new EventHandler(RestconfDiscoveryNode.this));
129 log.info("Connected to SSE source");
133 } catch (InterruptedException e) {
134 log.error("Interrupted!", e);
135 Thread.currentThread().interrupt();
139 log.info("Closed connection to SSE source");
144 * Establishes a persistent between the client and server.
146 * @param paramMap input paramter map
147 * @param ctx service logic context
148 * @param subscriberId subscriber identifier
150 void establishPersistentConnection(Map<String, String> paramMap, SvcLogicContext ctx,
151 String subscriberId) {
152 String id = getOutputIdentifier(paramMap.get(RESPONSE_PREFIX), ctx);
153 SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
155 paramMap.get("version"),
156 paramMap.get("mode"));
157 SubscriptionInfo info = new SubscriptionInfo();
158 info.callBackDG(callbackDG);
159 info.subscriptionId(id);
160 info.subscriberId(subscriberId);
161 subscriptionInfoMap.put(id, info);
163 String url = paramMap.get(SSE_URL);
164 PersistentConnection connection = new PersistentConnection(url);
165 runnableInfo.put(id, connection);
166 executor.execute(connection);
170 * Returns response code.
172 * @param prefix prefix given in input parameter
173 * @param ctx service logic context
174 * @return response code
176 String getResponseCode(String prefix, SvcLogicContext ctx) {
177 return ctx.getAttribute(getPrefix(prefix) + RESPONSE_CODE);
181 * Returns subscription id from event.
183 * @param prefix prefix given in input parameter
184 * @param ctx service logic context
185 * @return subscription id from event
187 String getOutputIdentifier(String prefix, SvcLogicContext ctx) {
188 return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER);
191 private String getPrefix(String prefix) {
192 return prefix != null ? prefix + "." : "";
195 private String getSubscriptionId(String subscriberId) {
196 for (Map.Entry<String,SubscriptionInfo> entry
197 : subscriptionInfoMap.entrySet()) {
198 if (entry.getValue().subscriberId()
199 .equals(subscriberId)) {
200 return entry.getKey();
207 * Returns restconfApiCallNode.
209 * @return restconfApiCallNode
211 protected RestconfApiCallNode restconfapiCallNode() {
212 return restconfApiCallNode;
216 * Sets restconfApiCallNode.
218 * @param node restconfApiCallNode
220 void restconfapiCallNode(RestconfApiCallNode node) {
221 restconfApiCallNode = node;
224 Map<String, SubscriptionInfo> subscriptionInfoMap() {
225 return subscriptionInfoMap;
228 void subscriptionInfoMap(Map<String, SubscriptionInfo> subscriptionInfoMap) {
229 this.subscriptionInfoMap = subscriptionInfoMap;
232 LinkedBlockingQueue<String> eventQueue() {
236 void eventQueue(LinkedBlockingQueue<String> eventQueue) {
237 this.eventQueue = eventQueue;