Fix for sonar critical issues.
[ccsdk/sli/plugins.git] / restconf-client / provider / src / main / java / org / onap / ccsdk / sli / plugins / restconfdiscovery / RestconfDiscoveryNode.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - CCSDK
4  * ================================================================================
5  * Copyright (C) 2018 Huawei Technologies Co., Ltd. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.ccsdk.sli.plugins.restconfdiscovery;
22
23 import org.glassfish.jersey.media.sse.EventSource;
24 import org.glassfish.jersey.media.sse.SseFeature;
25 import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
26 import org.onap.ccsdk.sli.core.sli.SvcLogicException;
27 import org.onap.ccsdk.sli.plugins.restconfapicall.RestconfApiCallNode;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 import javax.ws.rs.client.Client;
32 import javax.ws.rs.client.ClientBuilder;
33 import javax.ws.rs.client.WebTarget;
34 import java.util.Map;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.LinkedBlockingQueue;
39
40 /**
41  * Representation of a plugin to subscribe for notification and then
42  * to handle the received notifications.
43  */
44 public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin {
45     private static final Logger log = LoggerFactory.getLogger(RestconfDiscoveryNode.class);
46
47     private ExecutorService executor = Executors.newCachedThreadPool();
48     private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>();
49     private RestconfApiCallNode restconfApiCallNode;
50
51     private volatile Map<String, SubscriptionInfo> subscriptionInfoMap = new ConcurrentHashMap<>();
52     private volatile LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
53
54     private static final String SUBSCRIBER_ID = "subscriberId";
55     private static final String RESPONSE_CODE = "response-code";
56     private static final String RESPONSE_PREFIX = "responsePrefix";
57     private static final String OUTPUT_IDENTIFIER = "ietf-subscribed-notifications:output.identifier";
58     private static final String RESPONSE_CODE_200 = "200";
59     private static final String SSE_URL = "sseConnectURL";
60
61     /**
62      * Creates an instance of RestconfDiscoveryNode and
63      * starts processing of event.
64      */
65     public RestconfDiscoveryNode() {
66         ExecutorService e = Executors.newFixedThreadPool(20);
67         EventProcessor p = new EventProcessor(this);
68         for (int i = 0; i < 20; ++i) {
69             e.execute(p);
70         }
71     }
72
73     @Override
74     public void establishSubscription(Map<String, String> paramMap,
75                                       SvcLogicContext ctx) throws SvcLogicException {
76         String subscriberId = paramMap.get(SUBSCRIBER_ID);
77         if (subscriberId == null) {
78             throw new SvcLogicException("Subscriber Id is null");
79         }
80
81         restconfApiCallNode.sendRequest(paramMap, ctx);
82
83         if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
84             // TODO: save subscription id and subscriber in MYSQL
85
86             establishPersistentConnection(paramMap, ctx, subscriberId);
87         } else {
88             log.info("Failed to subscribe {}", subscriberId);
89             throw new SvcLogicException(ctx.getAttribute(RESPONSE_CODE));
90         }
91     }
92
93     @Override
94     public void modifySubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
95         // TODO: to be implemented
96     }
97
98     @Override
99     public void deleteSubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
100         String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
101         if (id != null) {
102             PersistentConnection conn = runnableInfo.get(id);
103             conn.terminate();
104             runnableInfo.remove(id);
105             subscriptionInfoMap.remove(id);
106         }
107     }
108
109     class PersistentConnection implements Runnable {
110         private String url;
111         private volatile boolean running = true;
112
113         PersistentConnection(String url) {
114             this.url = url;
115         }
116
117         private void terminate() {
118             running = false;
119         }
120
121         @Override
122         public void run() {
123             Client client = ClientBuilder.newBuilder()
124                     .register(SseFeature.class).build();
125             WebTarget target = client.target(url);
126             EventSource eventSource = EventSource.target(target).build();
127             eventSource.register(new EventHandler(RestconfDiscoveryNode.this));
128             eventSource.open();
129             log.info("Connected to SSE source");
130             while (running) {
131                 try {
132                     Thread.sleep(5000);
133                 } catch (InterruptedException e) {
134                     log.error("Interrupted!", e);
135                     Thread.currentThread().interrupt();
136                 }
137             }
138             eventSource.close();
139             log.info("Closed connection to SSE source");
140         }
141     }
142
143     /**
144      * Establishes a persistent between the client and server.
145      *
146      * @param paramMap input paramter map
147      * @param ctx service logic context
148      * @param subscriberId subscriber identifier
149      */
150     void establishPersistentConnection(Map<String, String> paramMap, SvcLogicContext ctx,
151                                               String subscriberId) {
152         String id = getOutputIdentifier(paramMap.get(RESPONSE_PREFIX), ctx);
153         SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
154                                                              paramMap.get("rpc"),
155                                                              paramMap.get("version"),
156                                                              paramMap.get("mode"));
157         SubscriptionInfo info = new SubscriptionInfo();
158         info.callBackDG(callbackDG);
159         info.subscriptionId(id);
160         info.subscriberId(subscriberId);
161         subscriptionInfoMap.put(id, info);
162
163         String url = paramMap.get(SSE_URL);
164         PersistentConnection connection = new PersistentConnection(url);
165         runnableInfo.put(id, connection);
166         executor.execute(connection);
167     }
168
169     /**
170      * Returns response code.
171      *
172      * @param prefix prefix given in input parameter
173      * @param ctx service logic context
174      * @return response code
175      */
176     String getResponseCode(String prefix, SvcLogicContext ctx) {
177         return ctx.getAttribute(getPrefix(prefix) + RESPONSE_CODE);
178     }
179
180     /**
181      * Returns subscription id from event.
182      *
183      * @param prefix prefix given in input parameter
184      * @param ctx service logic context
185      * @return subscription id from event
186      */
187     String getOutputIdentifier(String prefix, SvcLogicContext ctx) {
188         return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER);
189     }
190
191     private String getPrefix(String prefix) {
192         return prefix != null ? prefix + "." : "";
193     }
194
195     private String getSubscriptionId(String subscriberId) {
196         for (Map.Entry<String,SubscriptionInfo> entry
197                 : subscriptionInfoMap.entrySet()) {
198             if (entry.getValue().subscriberId()
199                     .equals(subscriberId)) {
200                 return entry.getKey();
201             }
202         }
203         return null;
204     }
205
206     /**
207      * Returns restconfApiCallNode.
208      *
209      * @return restconfApiCallNode
210      */
211     protected RestconfApiCallNode restconfapiCallNode() {
212         return restconfApiCallNode;
213     }
214
215     /**
216      * Sets restconfApiCallNode.
217      *
218      * @param node restconfApiCallNode
219      */
220     void restconfapiCallNode(RestconfApiCallNode node) {
221         restconfApiCallNode = node;
222     }
223
224     Map<String, SubscriptionInfo> subscriptionInfoMap() {
225         return subscriptionInfoMap;
226     }
227
228     void subscriptionInfoMap(Map<String, SubscriptionInfo> subscriptionInfoMap) {
229         this.subscriptionInfoMap = subscriptionInfoMap;
230     }
231
232     LinkedBlockingQueue<String> eventQueue() {
233         return eventQueue;
234     }
235
236     void eventQueue(LinkedBlockingQueue<String> eventQueue) {
237         this.eventQueue = eventQueue;
238     }
239 }