4c3cbbf89ef45b174c4fb92d77a7a730afb6fbbe
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
28
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedBusTopicSource;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource;
31 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * UEB Topic Source Factory
37  */
38 public interface UebTopicSourceFactory {
39
40     /**
41      * Creates an UEB Topic Source based on properties files
42      * 
43      * @param properties Properties containing initialization values
44      * 
45      * @return an UEB Topic Source
46      * @throws IllegalArgumentException if invalid parameters are present
47      */
48     public List<UebTopicSource> build(Properties properties);
49
50     /**
51      * Instantiates a new UEB Topic Source
52      * 
53      * @param servers list of servers
54      * @param topic topic name
55      * @param apiKey API Key
56      * @param apiSecret API Secret
57      * @param consumerGroup Consumer Group
58      * @param consumerInstance Consumer Instance
59      * @param fetchTimeout Read Fetch Timeout
60      * @param fetchLimit Fetch Limit
61      * @param managed is this source endpoint managed?
62      * 
63      * @return an UEB Topic Source
64      * @throws IllegalArgumentException if invalid parameters are present
65      */
66     public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret,
67                                 String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean managed,
68                                 boolean useHttps, boolean allowSelfSignedCerts);
69
70     /**
71      * Instantiates a new UEB Topic Source
72      * 
73      * @param servers list of servers
74      * @param topic topic name
75      * @param apiKey API Key
76      * @param apiSecret API Secret
77      * 
78      * @return an UEB Topic Source
79      * @throws IllegalArgumentException if invalid parameters are present
80      */
81     public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
82
83     /**
84      * Instantiates a new UEB Topic Source
85      * 
86      * @param servers list of servers
87      * @param topic topic name
88      * 
89      * @return an UEB Topic Source
90      * @throws IllegalArgumentException if invalid parameters are present
91      */
92     public UebTopicSource build(List<String> servers, String topic);
93
94     /**
95      * Destroys an UEB Topic Source based on a topic
96      * 
97      * @param topic topic name
98      * @throws IllegalArgumentException if invalid parameters are present
99      */
100     public void destroy(String topic);
101
102     /**
103      * Destroys all UEB Topic Sources
104      */
105     public void destroy();
106
107     /**
108      * gets an UEB Topic Source based on topic name
109      * 
110      * @param topic the topic name
111      * @return an UEB Topic Source with topic name
112      * @throws IllegalArgumentException if an invalid topic is provided
113      * @throws IllegalStateException if the UEB Topic Source is an incorrect state
114      */
115     public UebTopicSource get(String topic);
116
117     /**
118      * Provides a snapshot of the UEB Topic Sources
119      * 
120      * @return a list of the UEB Topic Sources
121      */
122     public List<UebTopicSource> inventory();
123 }
124
125
126 /* ------------- implementation ----------------- */
127
128 /**
129  * Factory of UEB Source Topics indexed by topic name
130  */
131 class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
132     private static final String MISSING_TOPIC = "A topic must be provided";
133
134     /**
135      * Logger
136      */
137     private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class);
138
139     /**
140      * UEB Topic Name Index
141      */
142     protected HashMap<String, UebTopicSource> uebTopicSources = new HashMap<>();
143
144     /**
145      * {@inheritDoc}
146      */
147     @Override
148     public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret,
149             String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean managed,
150             boolean useHttps, boolean allowSelfSignedCerts) {
151         if (servers == null || servers.isEmpty()) {
152             throw new IllegalArgumentException("UEB Server(s) must be provided");
153         }
154
155         if (topic == null || topic.isEmpty()) {
156             throw new IllegalArgumentException(MISSING_TOPIC);
157         }
158
159         synchronized (this) {
160             if (uebTopicSources.containsKey(topic)) {
161                 return uebTopicSources.get(topic);
162             }
163
164             UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder()
165                     .servers(servers)
166                     .topic(topic)
167                     .apiKey(apiKey)
168                     .apiSecret(apiSecret)
169                     .consumerGroup(consumerGroup)
170                     .consumerInstance(consumerInstance)
171                     .fetchTimeout(fetchTimeout)
172                     .fetchLimit(fetchLimit)
173                     .useHttps(useHttps)
174                     .allowSelfSignedCerts(allowSelfSignedCerts)
175                     .build());
176
177             if (managed) {
178                 uebTopicSources.put(topic, uebTopicSource);
179             }
180
181             return uebTopicSource;
182         }
183     }
184
185     /**
186      * {@inheritDoc}
187      */
188     @Override
189     public List<UebTopicSource> build(Properties properties) {
190
191         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS);
192         if (readTopics == null || readTopics.isEmpty()) {
193             logger.info("{}: no topic for UEB Source", this);
194             return new ArrayList<>();
195         }
196         List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
197
198         List<UebTopicSource> newUebTopicSources = new ArrayList<>();
199         synchronized (this) {
200             for (String topic : readTopicList) {
201                 if (this.uebTopicSources.containsKey(topic)) {
202                     newUebTopicSources.add(this.uebTopicSources.get(topic));
203                     continue;
204                 }
205
206                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
207                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
208
209                 if (servers == null || servers.isEmpty()) {
210                     logger.error("{}: no UEB servers configured for sink {}", this, topic);
211                     continue;
212                 }
213
214                 List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
215
216                 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
217                         + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
218
219                 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
220                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
221
222                 String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
223                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
224
225                 String consumerInstance = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
226                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
227
228                 String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
229                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
230                 int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
231                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
232                     try {
233                         fetchTimeout = Integer.parseInt(fetchTimeoutString);
234                     } catch (NumberFormatException nfe) {
235                         logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
236                                 topic);
237                     }
238                 }
239
240                 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
241                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
242                 int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH;
243                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
244                     try {
245                         fetchLimit = Integer.parseInt(fetchLimitString);
246                     } catch (NumberFormatException nfe) {
247                         logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
248                                 topic);
249                     }
250                 }
251
252                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
253                         + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
254                 boolean managed = true;
255                 if (managedString != null && !managedString.isEmpty()) {
256                     managed = Boolean.parseBoolean(managedString);
257                 }
258
259                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
260                         + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
261
262                 // default is to use HTTP if no https property exists
263                 boolean useHttps = false;
264                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
265                     useHttps = Boolean.parseBoolean(useHttpsString);
266                 }
267
268                 String allowSelfSignedCertsString =
269                         properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
270                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
271
272                 // default is to disallow self-signed certs
273                 boolean allowSelfSignedCerts = false;
274                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
275                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
276                 }
277
278                 UebTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, consumerGroup,
279                         consumerInstance, fetchTimeout, fetchLimit, managed, useHttps, allowSelfSignedCerts);
280                 newUebTopicSources.add(uebTopicSource);
281             }
282         }
283         return newUebTopicSources;
284     }
285
286     /**
287      * {@inheritDoc}
288      */
289     @Override
290     public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
291
292         return this.build(servers, topic, apiKey, apiSecret, null, null, UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
293                 UebTopicSource.DEFAULT_LIMIT_FETCH, true, false, true);
294     }
295
296     /**
297      * {@inheritDoc}
298      */
299     @Override
300     public UebTopicSource build(List<String> servers, String topic) {
301         return this.build(servers, topic, null, null);
302     }
303
304     /**
305      * {@inheritDoc}
306      */
307     @Override
308     public void destroy(String topic) {
309
310         if (topic == null || topic.isEmpty()) {
311             throw new IllegalArgumentException(MISSING_TOPIC);
312         }
313
314         UebTopicSource uebTopicSource;
315
316         synchronized (this) {
317             if (!uebTopicSources.containsKey(topic)) {
318                 return;
319             }
320
321             uebTopicSource = uebTopicSources.remove(topic);
322         }
323
324         uebTopicSource.shutdown();
325     }
326
327     /**
328      * {@inheritDoc}
329      */
330     @Override
331     public UebTopicSource get(String topic) {
332
333         if (topic == null || topic.isEmpty()) {
334             throw new IllegalArgumentException(MISSING_TOPIC);
335         }
336
337         synchronized (this) {
338             if (uebTopicSources.containsKey(topic)) {
339                 return uebTopicSources.get(topic);
340             } else {
341                 throw new IllegalStateException("UebTopiceSource for " + topic + " not found");
342             }
343         }
344     }
345
346     @Override
347     public synchronized List<UebTopicSource> inventory() {
348         return new ArrayList<>(this.uebTopicSources.values());
349     }
350
351     @Override
352     public void destroy() {
353         List<UebTopicSource> readers = this.inventory();
354         for (UebTopicSource reader : readers) {
355             reader.shutdown();
356         }
357
358         synchronized (this) {
359             this.uebTopicSources.clear();
360         }
361     }
362
363     @Override
364     public String toString() {
365         StringBuilder builder = new StringBuilder();
366         builder.append("IndexedUebTopicSourceFactory []");
367         return builder.toString();
368     }
369
370 }