8d3f28e9acb873c9e1fe8d49e3de991903b24c37
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus;
23
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Properties;
29
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource;
32 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * UEB Topic Source Factory
38  */
39 public interface UebTopicSourceFactory {
40
41     /**
42      * Creates an UEB Topic Source based on properties files
43      * 
44      * @param properties Properties containing initialization values
45      * 
46      * @return an UEB Topic Source
47      * @throws IllegalArgumentException if invalid parameters are present
48      */
49     public List<UebTopicSource> build(Properties properties);
50
51     /**
52      * Instantiates a new UEB Topic Source
53      * 
54      * @param servers list of servers
55      * @param topic topic name
56      * @param apiKey API Key
57      * @param apiSecret API Secret
58      * @param consumerGroup Consumer Group
59      * @param consumerInstance Consumer Instance
60      * @param fetchTimeout Read Fetch Timeout
61      * @param fetchLimit Fetch Limit
62      * @param managed is this source endpoint managed?
63      * 
64      * @return an UEB Topic Source
65      * @throws IllegalArgumentException if invalid parameters are present
66      */
67     public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret,
68                                 String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean managed,
69                                 boolean useHttps, boolean allowSelfSignedCerts);
70
71     /**
72      * Instantiates a new UEB Topic Source
73      * 
74      * @param servers list of servers
75      * @param topic topic name
76      * @param apiKey API Key
77      * @param apiSecret API Secret
78      * 
79      * @return an UEB Topic Source
80      * @throws IllegalArgumentException if invalid parameters are present
81      */
82     public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
83
84     /**
85      * Instantiates a new UEB Topic Source
86      * 
87      * @param servers list of servers
88      * @param topic topic name
89      * 
90      * @return an UEB Topic Source
91      * @throws IllegalArgumentException if invalid parameters are present
92      */
93     public UebTopicSource build(List<String> servers, String topic);
94
95     /**
96      * Destroys an UEB Topic Source based on a topic
97      * 
98      * @param topic topic name
99      * @throws IllegalArgumentException if invalid parameters are present
100      */
101     public void destroy(String topic);
102
103     /**
104      * Destroys all UEB Topic Sources
105      */
106     public void destroy();
107
108     /**
109      * gets an UEB Topic Source based on topic name
110      * 
111      * @param topic the topic name
112      * @return an UEB Topic Source with topic name
113      * @throws IllegalArgumentException if an invalid topic is provided
114      * @throws IllegalStateException if the UEB Topic Source is an incorrect state
115      */
116     public UebTopicSource get(String topic);
117
118     /**
119      * Provides a snapshot of the UEB Topic Sources
120      * 
121      * @return a list of the UEB Topic Sources
122      */
123     public List<UebTopicSource> inventory();
124 }
125
126
127 /* ------------- implementation ----------------- */
128
129 /**
130  * Factory of UEB Source Topics indexed by topic name
131  */
132 class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
133     private static final String MISSING_TOPIC = "A topic must be provided";
134
135     /**
136      * Logger
137      */
138     private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class);
139
140     /**
141      * UEB Topic Name Index
142      */
143     protected HashMap<String, UebTopicSource> uebTopicSources = new HashMap<>();
144
145     /**
146      * {@inheritDoc}
147      */
148     @Override
149     public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret,
150             String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean managed,
151             boolean useHttps, boolean allowSelfSignedCerts) {
152         if (servers == null || servers.isEmpty()) {
153             throw new IllegalArgumentException("UEB Server(s) must be provided");
154         }
155
156         if (topic == null || topic.isEmpty()) {
157             throw new IllegalArgumentException(MISSING_TOPIC);
158         }
159
160         synchronized (this) {
161             if (uebTopicSources.containsKey(topic)) {
162                 return uebTopicSources.get(topic);
163             }
164
165             UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(BusTopicParams.builder()
166                     .servers(servers)
167                     .topic(topic)
168                     .apiKey(apiKey)
169                     .apiSecret(apiSecret)
170                     .consumerGroup(consumerGroup)
171                     .consumerInstance(consumerInstance)
172                     .fetchTimeout(fetchTimeout)
173                     .fetchLimit(fetchLimit)
174                     .useHttps(useHttps)
175                     .allowSelfSignedCerts(allowSelfSignedCerts)
176                     .build());
177
178             if (managed) {
179                 uebTopicSources.put(topic, uebTopicSource);
180             }
181
182             return uebTopicSource;
183         }
184     }
185
186     /**
187      * {@inheritDoc}
188      */
189     @Override
190     public List<UebTopicSource> build(Properties properties) {
191
192         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS);
193         if (readTopics == null || readTopics.isEmpty()) {
194             logger.info("{}: no topic for UEB Source", this);
195             return new ArrayList<>();
196         }
197         List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
198
199         List<UebTopicSource> newUebTopicSources = new ArrayList<>();
200         synchronized (this) {
201             for (String topic : readTopicList) {
202                 if (this.uebTopicSources.containsKey(topic)) {
203                     newUebTopicSources.add(this.uebTopicSources.get(topic));
204                     continue;
205                 }
206
207                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
208                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
209
210                 if (servers == null || servers.isEmpty()) {
211                     logger.error("{}: no UEB servers configured for sink {}", this, topic);
212                     continue;
213                 }
214
215                 List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
216
217                 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
218                         + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
219
220                 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
221                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
222
223                 String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
224                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
225
226                 String consumerInstance = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
227                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
228
229                 String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
230                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
231                 int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
232                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
233                     try {
234                         fetchTimeout = Integer.parseInt(fetchTimeoutString);
235                     } catch (NumberFormatException nfe) {
236                         logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
237                                 topic);
238                     }
239                 }
240
241                 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
242                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
243                 int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH;
244                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
245                     try {
246                         fetchLimit = Integer.parseInt(fetchLimitString);
247                     } catch (NumberFormatException nfe) {
248                         logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
249                                 topic);
250                     }
251                 }
252
253                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
254                         + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
255                 boolean managed = true;
256                 if (managedString != null && !managedString.isEmpty()) {
257                     managed = Boolean.parseBoolean(managedString);
258                 }
259
260                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
261                         + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
262
263                 // default is to use HTTP if no https property exists
264                 boolean useHttps = false;
265                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
266                     useHttps = Boolean.parseBoolean(useHttpsString);
267                 }
268
269                 String allowSelfSignedCertsString =
270                         properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
271                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
272
273                 // default is to disallow self-signed certs
274                 boolean allowSelfSignedCerts = false;
275                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
276                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
277                 }
278
279                 UebTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, consumerGroup,
280                         consumerInstance, fetchTimeout, fetchLimit, managed, useHttps, allowSelfSignedCerts);
281                 newUebTopicSources.add(uebTopicSource);
282             }
283         }
284         return newUebTopicSources;
285     }
286
287     /**
288      * {@inheritDoc}
289      */
290     @Override
291     public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
292
293         return this.build(servers, topic, apiKey, apiSecret, null, null, UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
294                 UebTopicSource.DEFAULT_LIMIT_FETCH, true, false, true);
295     }
296
297     /**
298      * {@inheritDoc}
299      */
300     @Override
301     public UebTopicSource build(List<String> servers, String topic) {
302         return this.build(servers, topic, null, null);
303     }
304
305     /**
306      * {@inheritDoc}
307      */
308     @Override
309     public void destroy(String topic) {
310
311         if (topic == null || topic.isEmpty()) {
312             throw new IllegalArgumentException(MISSING_TOPIC);
313         }
314
315         UebTopicSource uebTopicSource;
316
317         synchronized (this) {
318             if (!uebTopicSources.containsKey(topic)) {
319                 return;
320             }
321
322             uebTopicSource = uebTopicSources.remove(topic);
323         }
324
325         uebTopicSource.shutdown();
326     }
327
328     /**
329      * {@inheritDoc}
330      */
331     @Override
332     public UebTopicSource get(String topic) {
333
334         if (topic == null || topic.isEmpty()) {
335             throw new IllegalArgumentException(MISSING_TOPIC);
336         }
337
338         synchronized (this) {
339             if (uebTopicSources.containsKey(topic)) {
340                 return uebTopicSources.get(topic);
341             } else {
342                 throw new IllegalStateException("UebTopiceSource for " + topic + " not found");
343             }
344         }
345     }
346
347     @Override
348     public synchronized List<UebTopicSource> inventory() {
349         return new ArrayList<>(this.uebTopicSources.values());
350     }
351
352     @Override
353     public void destroy() {
354         List<UebTopicSource> readers = this.inventory();
355         for (UebTopicSource reader : readers) {
356             reader.shutdown();
357         }
358
359         synchronized (this) {
360             this.uebTopicSources.clear();
361         }
362     }
363
364     @Override
365     public String toString() {
366         StringBuilder builder = new StringBuilder();
367         builder.append("IndexedUebTopicSourceFactory []");
368         return builder.toString();
369     }
370 }