0e469a1d80676945ac3cef851f37a0147cfc3517
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.impl;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Properties;
29
30 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
31 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSourceFactory;
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
33 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * Factory of DMAAP Source Topics indexed by topic name
39  */
40
41 public class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
42     private static final String MISSING_TOPIC = "A topic must be provided";
43
44     private static final IndexedDmaapTopicSourceFactory instance = new IndexedDmaapTopicSourceFactory();
45
46     /**
47      * Logger
48      */
49     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
50
51     /**
52      * DMaaP Topic Name Index
53      */
54     protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
55
56     /**
57      * Get the singleton instance.
58      * 
59      * @return the instance
60      */
61     public static IndexedDmaapTopicSourceFactory getInstance() {
62         return instance;
63     }
64
65     private IndexedDmaapTopicSourceFactory() {}
66
67     /**
68      * {@inheritDoc}
69      */
70     @Override
71     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
72             String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
73             String environment, String aftEnvironment, String partner, String latitude, String longitude,
74             Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
75
76         if (topic == null || topic.isEmpty()) {
77             throw new IllegalArgumentException(MISSING_TOPIC);
78         }
79
80         synchronized (this) {
81             if (dmaapTopicSources.containsKey(topic)) {
82                 return dmaapTopicSources.get(topic);
83             }
84
85             DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(servers, topic, apiKey, apiSecret,
86                     userName, password, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, environment,
87                     aftEnvironment, partner, latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts);
88
89             if (managed) {
90                 dmaapTopicSources.put(topic, dmaapTopicSource);
91             }
92
93             return dmaapTopicSource;
94         }
95     }
96
97     /**
98      * {@inheritDoc}
99      */
100     @Override
101     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
102             String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
103             boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
104
105         if (servers == null || servers.isEmpty()) {
106             throw new IllegalArgumentException("DMaaP Server(s) must be provided");
107         }
108
109         if (topic == null || topic.isEmpty()) {
110             throw new IllegalArgumentException(MISSING_TOPIC);
111         }
112
113         synchronized (this) {
114             if (dmaapTopicSources.containsKey(topic)) {
115                 return dmaapTopicSources.get(topic);
116             }
117
118             DmaapTopicSource dmaapTopicSource =
119                     new SingleThreadedDmaapTopicSource(servers, topic, apiKey, apiSecret, userName, password,
120                             consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
121
122             if (managed) {
123                 dmaapTopicSources.put(topic, dmaapTopicSource);
124             }
125
126             return dmaapTopicSource;
127         }
128     }
129
130     /**
131      * {@inheritDoc}
132      */
133     @Override
134     public List<DmaapTopicSource> build(Properties properties) {
135
136         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
137         if (readTopics == null || readTopics.isEmpty()) {
138             logger.info("{}: no topic for DMaaP Source", this);
139             return new ArrayList<>();
140         }
141         List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
142
143         List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
144         synchronized (this) {
145             for (String topic : readTopicList) {
146                 if (this.dmaapTopicSources.containsKey(topic)) {
147                     dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
148                     continue;
149                 }
150
151                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
152                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
153
154                 List<String> serverList;
155                 if (servers != null && !servers.isEmpty()) {
156                     serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
157                 } else {
158                     serverList = new ArrayList<>();
159                 }
160
161                 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
162                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
163
164                 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
165                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
166
167                 String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
168                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
169
170                 String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
171                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
172
173                 String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
174                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
175
176                 String consumerInstance = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
177                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
178
179                 String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
180                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
181
182                 /* DME2 Properties */
183
184                 String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
185                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
186
187                 String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
188                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
189
190                 String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
191                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
192
193                 String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
194                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
195
196                 String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
197                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
198
199                 String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
200                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
201
202                 String dme2EpReadTimeoutMs =
203                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
204                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
205
206                 String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
207                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
208
209                 String dme2RoundtripTimeoutMs =
210                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
211                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
212
213                 String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
214                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
215
216                 String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
217                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
218
219                 String dme2SessionStickinessRequired =
220                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
221                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
222
223                 Map<String, String> dme2AdditionalProps = new HashMap<>();
224
225                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
226                     dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
227                 }
228                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
229                     dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
230                 }
231                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
232                     dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
233                 }
234                 if (dme2Version != null && !dme2Version.isEmpty()) {
235                     dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
236                 }
237                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
238                     dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
239                 }
240                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
241                     dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
242                 }
243                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
244                     dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
245                 }
246
247
248                 if (servers == null || servers.isEmpty()) {
249
250                     logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
251                     continue;
252                 }
253
254                 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
255                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
256                     try {
257                         fetchTimeout = Integer.parseInt(fetchTimeoutString);
258                     } catch (NumberFormatException nfe) {
259                         logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
260                                 topic);
261                     }
262                 }
263
264                 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
265                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
266                 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
267                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
268                     try {
269                         fetchLimit = Integer.parseInt(fetchLimitString);
270                     } catch (NumberFormatException nfe) {
271                         logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
272                                 topic);
273                     }
274                 }
275
276                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
277                         + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
278                 boolean managed = true;
279                 if (managedString != null && !managedString.isEmpty()) {
280                     managed = Boolean.parseBoolean(managedString);
281                 }
282
283                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
284                         + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
285
286                 // default is to use HTTP if no https property exists
287                 boolean useHttps = false;
288                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
289                     useHttps = Boolean.parseBoolean(useHttpsString);
290                 }
291
292                 String allowSelfSignedCertsString =
293                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
294                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
295
296                 // default is to disallow self-signed certs
297                 boolean allowSelfSignedCerts = false;
298                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
299                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
300                 }
301
302
303                 DmaapTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, aafMechId,
304                         aafPassword, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, dme2Environment,
305                         dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude, dme2AdditionalProps, managed,
306                         useHttps, allowSelfSignedCerts);
307
308                 dmaapTopicSourceLst.add(uebTopicSource);
309             }
310         }
311         return dmaapTopicSourceLst;
312     }
313
314     /**
315      * {@inheritDoc}
316      * 
317      * @throws IllegalArgumentException
318      */
319     @Override
320     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
321         return this.build(servers, topic, apiKey, apiSecret, null, null, null, null,
322                 DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, DmaapTopicSource.DEFAULT_LIMIT_FETCH, true, false, false);
323     }
324
325     /**
326      * {@inheritDoc}
327      * 
328      * @throws IllegalArgumentException
329      */
330     @Override
331     public DmaapTopicSource build(List<String> servers, String topic) {
332         return this.build(servers, topic, null, null);
333     }
334
335     /**
336      * {@inheritDoc}
337      */
338     @Override
339     public void destroy(String topic) {
340
341         if (topic == null || topic.isEmpty()) {
342             throw new IllegalArgumentException(MISSING_TOPIC);
343         }
344
345         DmaapTopicSource uebTopicSource;
346
347         synchronized (this) {
348             if (!dmaapTopicSources.containsKey(topic)) {
349                 return;
350             }
351
352             uebTopicSource = dmaapTopicSources.remove(topic);
353         }
354
355         uebTopicSource.shutdown();
356     }
357
358     /**
359      * {@inheritDoc}
360      */
361     @Override
362     public DmaapTopicSource get(String topic) {
363
364         if (topic == null || topic.isEmpty()) {
365             throw new IllegalArgumentException(MISSING_TOPIC);
366         }
367
368         synchronized (this) {
369             if (dmaapTopicSources.containsKey(topic)) {
370                 return dmaapTopicSources.get(topic);
371             } else {
372                 throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found");
373             }
374         }
375     }
376
377     @Override
378     public synchronized List<DmaapTopicSource> inventory() {
379         return new ArrayList<>(this.dmaapTopicSources.values());
380     }
381
382     @Override
383     public void destroy() {
384         List<DmaapTopicSource> readers = this.inventory();
385         for (DmaapTopicSource reader : readers) {
386             reader.shutdown();
387         }
388
389         synchronized (this) {
390             this.dmaapTopicSources.clear();
391         }
392     }
393
394     @Override
395     public String toString() {
396         StringBuilder builder = new StringBuilder();
397         builder.append("IndexedDmaapTopicSourceFactory []");
398         return builder.toString();
399     }
400
401 }