111d628474abce3dd7a4efebd1caa152576bd921
[ccsdk/sli/plugins.git] / restconf-client / provider / src / main / java / org / onap / ccsdk / sli / plugins / restconfdiscovery / RestconfDiscoveryNode.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - CCSDK
4  * ================================================================================
5  * Copyright (C) 2018 Huawei Technologies Co., Ltd. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.ccsdk.sli.plugins.restconfdiscovery;
22
23 import org.glassfish.jersey.media.sse.EventSource;
24 import org.glassfish.jersey.media.sse.SseFeature;
25 import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
26 import org.onap.ccsdk.sli.core.sli.SvcLogicException;
27 import org.onap.ccsdk.sli.plugins.restconfapicall.RestconfApiCallNode;
28 import org.slf4j.Logger;
29
30 import javax.ws.rs.client.Client;
31 import javax.ws.rs.client.ClientBuilder;
32 import javax.ws.rs.client.WebTarget;
33 import java.util.Map;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.LinkedBlockingQueue;
38
39 import static org.slf4j.LoggerFactory.getLogger;
40
41 /**
42  * Representation of a plugin to subscribe for notification and then
43  * to handle the received notifications.
44  */
45 public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin {
46
47     private static final Logger log = getLogger(RestconfDiscoveryNode.class);
48
49     private ExecutorService executor = Executors.newCachedThreadPool();
50     private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>();
51     private RestconfApiCallNode restconfApiCallNode;
52
53     private volatile Map<String, SubscriptionInfo> subscriptionInfoMap = new ConcurrentHashMap<>();
54     private volatile LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
55
56     private static final String SUBSCRIBER_ID = "subscriberId";
57     private static final String RESPONSE_CODE = "response-code";
58     private static final String RESPONSE_PREFIX = "responsePrefix";
59     private static final String OUTPUT_IDENTIFIER = "ietf-subscribed-notif" +
60             "ications:establish-subscription.output.identifier";
61     private static final String RESPONSE_CODE_200 = "200";
62     private static final String SSE_URL = "sseConnectURL";
63
64     /**
65      * Creates an instance of RestconfDiscoveryNode and starts processing of
66      * event.
67      *
68      * @param r restconf api call node
69      */
70     public RestconfDiscoveryNode(RestconfApiCallNode r) {
71         this.restconfApiCallNode = r;
72         ExecutorService e = Executors.newFixedThreadPool(20);
73         EventProcessor p = new EventProcessor(this);
74         for (int i = 0; i < 20; ++i) {
75             e.execute(p);
76         }
77     }
78
79     @Override
80     public void establishSubscription(Map<String, String> paramMap,
81                                       SvcLogicContext ctx) throws SvcLogicException {
82         String subscriberId = paramMap.get(SUBSCRIBER_ID);
83         if (subscriberId == null) {
84             throw new SvcLogicException("Subscriber Id is null");
85         }
86
87         restconfApiCallNode.sendRequest(paramMap, ctx);
88
89         if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
90             // TODO: save subscription id and subscriber in MYSQL
91
92             establishPersistentConnection(paramMap, ctx, subscriberId);
93         } else {
94             log.info("Failed to subscribe {}", subscriberId);
95             throw new SvcLogicException(ctx.getAttribute(RESPONSE_CODE));
96         }
97     }
98
99     @Override
100     public void modifySubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
101         // TODO: to be implemented
102     }
103
104     @Override
105     public void deleteSubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
106         String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
107         if (id != null) {
108             PersistentConnection conn = runnableInfo.get(id);
109             conn.terminate();
110             runnableInfo.remove(id);
111             subscriptionInfoMap.remove(id);
112         }
113     }
114
115     class PersistentConnection implements Runnable {
116         private String url;
117         private volatile boolean running = true;
118
119         PersistentConnection(String url) {
120             this.url = url;
121         }
122
123         private void terminate() {
124             running = false;
125         }
126
127         @Override
128         public void run() {
129             Client client = ClientBuilder.newBuilder()
130                     .register(SseFeature.class).build();
131             WebTarget target = client.target(url);
132             EventSource eventSource = EventSource.target(target).build();
133             eventSource.register(new EventHandler(RestconfDiscoveryNode.this));
134             eventSource.open();
135             log.info("Connected to SSE source");
136             while (running) {
137                 try {
138                     Thread.sleep(5000);
139                 } catch (InterruptedException e) {
140                     log.error("Interrupted!", e);
141                     Thread.currentThread().interrupt();
142                 }
143             }
144             eventSource.close();
145             log.info("Closed connection to SSE source");
146         }
147     }
148
149     /**
150      * Establishes a persistent between the client and server.
151      *
152      * @param paramMap input paramter map
153      * @param ctx service logic context
154      * @param subscriberId subscriber identifier
155      */
156     void establishPersistentConnection(Map<String, String> paramMap, SvcLogicContext ctx,
157                                               String subscriberId) {
158         String id = getOutputIdentifier(paramMap.get(RESPONSE_PREFIX), ctx);
159         SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
160                                                              paramMap.get("rpc"),
161                                                              paramMap.get("version"),
162                                                              paramMap.get("mode"));
163         SubscriptionInfo info = new SubscriptionInfo();
164         info.callBackDG(callbackDG);
165         info.subscriptionId(id);
166         info.subscriberId(subscriberId);
167         subscriptionInfoMap.put(id, info);
168
169         String url = paramMap.get(SSE_URL);
170         PersistentConnection connection = new PersistentConnection(url);
171         runnableInfo.put(id, connection);
172         executor.execute(connection);
173     }
174
175     /**
176      * Returns response code.
177      *
178      * @param prefix prefix given in input parameter
179      * @param ctx service logic context
180      * @return response code
181      */
182     String getResponseCode(String prefix, SvcLogicContext ctx) {
183         return ctx.getAttribute(getPrefix(prefix) + RESPONSE_CODE);
184     }
185
186     /**
187      * Returns subscription id from event.
188      *
189      * @param prefix prefix given in input parameter
190      * @param ctx service logic context
191      * @return subscription id from event
192      */
193     String getOutputIdentifier(String prefix, SvcLogicContext ctx) {
194         return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER);
195     }
196
197     private String getPrefix(String prefix) {
198         return prefix != null ? prefix + "." : "";
199     }
200
201     private String getSubscriptionId(String subscriberId) {
202         for (Map.Entry<String,SubscriptionInfo> entry
203                 : subscriptionInfoMap.entrySet()) {
204             if (entry.getValue().subscriberId()
205                     .equals(subscriberId)) {
206                 return entry.getKey();
207             }
208         }
209         return null;
210     }
211
212     /**
213      * Returns restconfApiCallNode.
214      *
215      * @return restconfApiCallNode
216      */
217     protected RestconfApiCallNode restconfapiCallNode() {
218         return restconfApiCallNode;
219     }
220
221     /**
222      * Sets restconfApiCallNode.
223      *
224      * @param node restconfApiCallNode
225      */
226     void restconfapiCallNode(RestconfApiCallNode node) {
227         restconfApiCallNode = node;
228     }
229
230     Map<String, SubscriptionInfo> subscriptionInfoMap() {
231         return subscriptionInfoMap;
232     }
233
234     void subscriptionInfoMap(Map<String, SubscriptionInfo> subscriptionInfoMap) {
235         this.subscriptionInfoMap = subscriptionInfoMap;
236     }
237
238     LinkedBlockingQueue<String> eventQueue() {
239         return eventQueue;
240     }
241
242     void eventQueue(LinkedBlockingQueue<String> eventQueue) {
243         this.eventQueue = eventQueue;
244     }
245 }