2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018 Huawei Technologies Co., Ltd. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.ccsdk.sli.plugins.restconfdiscovery;
23 import org.glassfish.jersey.media.sse.EventSource;
24 import org.glassfish.jersey.media.sse.SseFeature;
25 import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
26 import org.onap.ccsdk.sli.core.sli.SvcLogicException;
27 import org.onap.ccsdk.sli.plugins.restconfapicall.RestconfApiCallNode;
28 import org.slf4j.Logger;
30 import javax.ws.rs.client.Client;
31 import javax.ws.rs.client.ClientBuilder;
32 import javax.ws.rs.client.WebTarget;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.LinkedBlockingQueue;
39 import static org.slf4j.LoggerFactory.getLogger;
42 * Representation of a plugin to subscribe for notification and then
43 * to handle the received notifications.
45 public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin {
47 private static final Logger log = getLogger(RestconfDiscoveryNode.class);
49 private ExecutorService executor = Executors.newCachedThreadPool();
50 private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>();
51 private RestconfApiCallNode restconfApiCallNode;
53 private volatile Map<String, SubscriptionInfo> subscriptionInfoMap = new ConcurrentHashMap<>();
54 private volatile LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
56 private static final String SUBSCRIBER_ID = "subscriberId";
57 private static final String RESPONSE_CODE = "response-code";
58 private static final String RESPONSE_PREFIX = "responsePrefix";
59 private static final String OUTPUT_IDENTIFIER = "ietf-subscribed-notif" +
60 "ications:establish-subscription.output.identifier";
61 private static final String RESPONSE_CODE_200 = "200";
62 private static final String SSE_URL = "sseConnectURL";
65 * Creates an instance of RestconfDiscoveryNode and starts processing of
68 * @param r restconf api call node
70 public RestconfDiscoveryNode(RestconfApiCallNode r) {
71 this.restconfApiCallNode = r;
72 ExecutorService e = Executors.newFixedThreadPool(20);
73 EventProcessor p = new EventProcessor(this);
74 for (int i = 0; i < 20; ++i) {
80 public void establishSubscription(Map<String, String> paramMap,
81 SvcLogicContext ctx) throws SvcLogicException {
82 String subscriberId = paramMap.get(SUBSCRIBER_ID);
83 if (subscriberId == null) {
84 throw new SvcLogicException("Subscriber Id is null");
87 restconfApiCallNode.sendRequest(paramMap, ctx);
89 if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
90 // TODO: save subscription id and subscriber in MYSQL
92 establishPersistentConnection(paramMap, ctx, subscriberId);
94 log.info("Failed to subscribe {}", subscriberId);
95 throw new SvcLogicException(ctx.getAttribute(RESPONSE_CODE));
100 public void modifySubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
101 // TODO: to be implemented
105 public void deleteSubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
106 String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
108 PersistentConnection conn = runnableInfo.get(id);
110 runnableInfo.remove(id);
111 subscriptionInfoMap.remove(id);
115 class PersistentConnection implements Runnable {
117 private volatile boolean running = true;
119 PersistentConnection(String url) {
123 private void terminate() {
129 Client client = ClientBuilder.newBuilder()
130 .register(SseFeature.class).build();
131 WebTarget target = client.target(url);
132 EventSource eventSource = EventSource.target(target).build();
133 eventSource.register(new EventHandler(RestconfDiscoveryNode.this));
135 log.info("Connected to SSE source");
139 } catch (InterruptedException e) {
140 log.error("Interrupted!", e);
141 Thread.currentThread().interrupt();
145 log.info("Closed connection to SSE source");
150 * Establishes a persistent between the client and server.
152 * @param paramMap input paramter map
153 * @param ctx service logic context
154 * @param subscriberId subscriber identifier
156 void establishPersistentConnection(Map<String, String> paramMap, SvcLogicContext ctx,
157 String subscriberId) {
158 String id = getOutputIdentifier(paramMap.get(RESPONSE_PREFIX), ctx);
159 SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
161 paramMap.get("version"),
162 paramMap.get("mode"));
163 SubscriptionInfo info = new SubscriptionInfo();
164 info.callBackDG(callbackDG);
165 info.subscriptionId(id);
166 info.subscriberId(subscriberId);
167 subscriptionInfoMap.put(id, info);
169 String url = paramMap.get(SSE_URL);
170 PersistentConnection connection = new PersistentConnection(url);
171 runnableInfo.put(id, connection);
172 executor.execute(connection);
176 * Returns response code.
178 * @param prefix prefix given in input parameter
179 * @param ctx service logic context
180 * @return response code
182 String getResponseCode(String prefix, SvcLogicContext ctx) {
183 return ctx.getAttribute(getPrefix(prefix) + RESPONSE_CODE);
187 * Returns subscription id from event.
189 * @param prefix prefix given in input parameter
190 * @param ctx service logic context
191 * @return subscription id from event
193 String getOutputIdentifier(String prefix, SvcLogicContext ctx) {
194 return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER);
197 private String getPrefix(String prefix) {
198 return prefix != null ? prefix + "." : "";
201 private String getSubscriptionId(String subscriberId) {
202 for (Map.Entry<String,SubscriptionInfo> entry
203 : subscriptionInfoMap.entrySet()) {
204 if (entry.getValue().subscriberId()
205 .equals(subscriberId)) {
206 return entry.getKey();
213 * Returns restconfApiCallNode.
215 * @return restconfApiCallNode
217 protected RestconfApiCallNode restconfapiCallNode() {
218 return restconfApiCallNode;
222 * Sets restconfApiCallNode.
224 * @param node restconfApiCallNode
226 void restconfapiCallNode(RestconfApiCallNode node) {
227 restconfApiCallNode = node;
230 Map<String, SubscriptionInfo> subscriptionInfoMap() {
231 return subscriptionInfoMap;
234 void subscriptionInfoMap(Map<String, SubscriptionInfo> subscriptionInfoMap) {
235 this.subscriptionInfoMap = subscriptionInfoMap;
238 LinkedBlockingQueue<String> eventQueue() {
242 void eventQueue(LinkedBlockingQueue<String> eventQueue) {
243 this.eventQueue = eventQueue;