c4a69831fd27bee2ea22a2bc8e0ec1c7aa1cac2c
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus;
23
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Properties;
29
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource;
32 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * UEB Topic Source Factory.
38  */
39 public interface UebTopicSourceFactory {
40
41     /**
42      * Creates an UEB Topic Source based on properties files.
43      * 
44      * @param properties Properties containing initialization values
45      * 
46      * @return an UEB Topic Source
47      * @throws IllegalArgumentException if invalid parameters are present
48      */
49     public List<UebTopicSource> build(Properties properties);
50
51     /**
52      * Instantiates a new UEB Topic Source.
53      * 
54      * @param servers list of servers
55      * @param topic topic name
56      * @param apiKey API Key
57      * @param apiSecret API Secret
58      * @param consumerGroup Consumer Group
59      * @param consumerInstance Consumer Instance
60      * @param fetchTimeout Read Fetch Timeout
61      * @param fetchLimit Fetch Limit
62      * @param managed is this source endpoint managed?
63      * 
64      * @return an UEB Topic Source
65      * @throws IllegalArgumentException if invalid parameters are present
66      */
67     public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret,
68                                 String consumerGroup, String consumerInstance, 
69                                 int fetchTimeout, int fetchLimit, boolean managed,
70                                 boolean useHttps, boolean allowSelfSignedCerts);
71
72     /**
73      * Instantiates a new UEB Topic Source.
74      * 
75      * @param servers list of servers
76      * @param topic topic name
77      * @param apiKey API Key
78      * @param apiSecret API Secret
79      * 
80      * @return an UEB Topic Source
81      * @throws IllegalArgumentException if invalid parameters are present
82      */
83     public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
84
85     /**
86      * Instantiates a new UEB Topic Source.
87      * 
88      * @param servers list of servers
89      * @param topic topic name
90      * 
91      * @return an UEB Topic Source
92      * @throws IllegalArgumentException if invalid parameters are present
93      */
94     public UebTopicSource build(List<String> servers, String topic);
95
96     /**
97      * Destroys an UEB Topic Source based on a topic.
98      * 
99      * @param topic topic name
100      * @throws IllegalArgumentException if invalid parameters are present
101      */
102     public void destroy(String topic);
103
104     /**
105      * Destroys all UEB Topic Sources.
106      */
107     public void destroy();
108
109     /**
110      * Gets an UEB Topic Source based on topic name.
111      * 
112      * @param topic the topic name
113      * @return an UEB Topic Source with topic name
114      * @throws IllegalArgumentException if an invalid topic is provided
115      * @throws IllegalStateException if the UEB Topic Source is an incorrect state
116      */
117     public UebTopicSource get(String topic);
118
119     /**
120      * Provides a snapshot of the UEB Topic Sources.
121      * 
122      * @return a list of the UEB Topic Sources
123      */
124     public List<UebTopicSource> inventory();
125 }
126
127
128 /* ------------- implementation ----------------- */
129
130 /**
131  * Factory of UEB Source Topics indexed by topic name.
132  */
133 class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
134     private static final String MISSING_TOPIC = "A topic must be provided";
135
136     /**
137      * Logger.
138      */
139     private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class);
140
141     /**
142      * UEB Topic Name Index.
143      */
144     protected HashMap<String, UebTopicSource> uebTopicSources = new HashMap<>();
145
146     /**
147      * {@inheritDoc}
148      */
149     @Override
150     public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret,
151             String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean managed,
152             boolean useHttps, boolean allowSelfSignedCerts) {
153         if (servers == null || servers.isEmpty()) {
154             throw new IllegalArgumentException("UEB Server(s) must be provided");
155         }
156
157         if (topic == null || topic.isEmpty()) {
158             throw new IllegalArgumentException(MISSING_TOPIC);
159         }
160
161         synchronized (this) {
162             if (uebTopicSources.containsKey(topic)) {
163                 return uebTopicSources.get(topic);
164             }
165
166             UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(BusTopicParams.builder()
167                     .servers(servers)
168                     .topic(topic)
169                     .apiKey(apiKey)
170                     .apiSecret(apiSecret)
171                     .consumerGroup(consumerGroup)
172                     .consumerInstance(consumerInstance)
173                     .fetchTimeout(fetchTimeout)
174                     .fetchLimit(fetchLimit)
175                     .useHttps(useHttps)
176                     .allowSelfSignedCerts(allowSelfSignedCerts)
177                     .build());
178
179             if (managed) {
180                 uebTopicSources.put(topic, uebTopicSource);
181             }
182
183             return uebTopicSource;
184         }
185     }
186
187     /**
188      * {@inheritDoc}
189      */
190     @Override
191     public List<UebTopicSource> build(Properties properties) {
192
193         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS);
194         if (readTopics == null || readTopics.isEmpty()) {
195             logger.info("{}: no topic for UEB Source", this);
196             return new ArrayList<>();
197         }
198         List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
199
200         List<UebTopicSource> newUebTopicSources = new ArrayList<>();
201         synchronized (this) {
202             for (String topic : readTopicList) {
203                 if (this.uebTopicSources.containsKey(topic)) {
204                     newUebTopicSources.add(this.uebTopicSources.get(topic));
205                     continue;
206                 }
207
208                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
209                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
210
211                 if (servers == null || servers.isEmpty()) {
212                     logger.error("{}: no UEB servers configured for sink {}", this, topic);
213                     continue;
214                 }
215
216                 List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
217
218                 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
219                         + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
220
221                 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
222                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
223
224                 String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
225                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
226
227                 String consumerInstance = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
228                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
229
230                 String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
231                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
232                 int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
233                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
234                     try {
235                         fetchTimeout = Integer.parseInt(fetchTimeoutString);
236                     } catch (NumberFormatException nfe) {
237                         logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
238                                 topic);
239                     }
240                 }
241
242                 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
243                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
244                 int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH;
245                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
246                     try {
247                         fetchLimit = Integer.parseInt(fetchLimitString);
248                     } catch (NumberFormatException nfe) {
249                         logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
250                                 topic);
251                     }
252                 }
253
254                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
255                         + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
256                 boolean managed = true;
257                 if (managedString != null && !managedString.isEmpty()) {
258                     managed = Boolean.parseBoolean(managedString);
259                 }
260
261                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
262                         + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
263
264                 // default is to use HTTP if no https property exists
265                 boolean useHttps = false;
266                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
267                     useHttps = Boolean.parseBoolean(useHttpsString);
268                 }
269
270                 String allowSelfSignedCertsString =
271                         properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
272                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
273
274                 // default is to disallow self-signed certs
275                 boolean allowSelfSignedCerts = false;
276                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
277                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
278                 }
279
280                 UebTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, consumerGroup,
281                         consumerInstance, fetchTimeout, fetchLimit, managed, useHttps, allowSelfSignedCerts);
282                 newUebTopicSources.add(uebTopicSource);
283             }
284         }
285         return newUebTopicSources;
286     }
287
288     /**
289      * {@inheritDoc}
290      */
291     @Override
292     public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
293
294         return this.build(servers, topic, apiKey, apiSecret, null, null, UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
295                 UebTopicSource.DEFAULT_LIMIT_FETCH, true, false, true);
296     }
297
298     /**
299      * {@inheritDoc}
300      */
301     @Override
302     public UebTopicSource build(List<String> servers, String topic) {
303         return this.build(servers, topic, null, null);
304     }
305
306     /**
307      * {@inheritDoc}
308      */
309     @Override
310     public void destroy(String topic) {
311
312         if (topic == null || topic.isEmpty()) {
313             throw new IllegalArgumentException(MISSING_TOPIC);
314         }
315
316         UebTopicSource uebTopicSource;
317
318         synchronized (this) {
319             if (!uebTopicSources.containsKey(topic)) {
320                 return;
321             }
322
323             uebTopicSource = uebTopicSources.remove(topic);
324         }
325
326         uebTopicSource.shutdown();
327     }
328
329     @Override
330     public void destroy() {
331         List<UebTopicSource> readers = this.inventory();
332         for (UebTopicSource reader : readers) {
333             reader.shutdown();
334         }
335
336         synchronized (this) {
337             this.uebTopicSources.clear();
338         }
339     }
340
341     /**
342      * {@inheritDoc}
343      */
344     @Override
345     public UebTopicSource get(String topic) {
346
347         if (topic == null || topic.isEmpty()) {
348             throw new IllegalArgumentException(MISSING_TOPIC);
349         }
350
351         synchronized (this) {
352             if (uebTopicSources.containsKey(topic)) {
353                 return uebTopicSources.get(topic);
354             } else {
355                 throw new IllegalStateException("UebTopiceSource for " + topic + " not found");
356             }
357         }
358     }
359
360     @Override
361     public synchronized List<UebTopicSource> inventory() {
362         return new ArrayList<>(this.uebTopicSources.values());
363     }
364
365     @Override
366     public String toString() {
367         StringBuilder builder = new StringBuilder();
368         builder.append("IndexedUebTopicSourceFactory []");
369         return builder.toString();
370     }
371 }