96ab6c63e93e028274def19a676d5e5ae0269abf
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Properties;
29
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
31 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * DMAAP Topic Source Factory
37  */
38 public interface DmaapTopicSourceFactory {
39     public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
40     public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
41     public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
42     public final String DME2_VERSION_PROPERTY = "Version";
43     public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
44     public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
45     public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
46     public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
47
48     /**
49      * Creates an DMAAP Topic Source based on properties files
50      * 
51      * @param properties Properties containing initialization values
52      * 
53      * @return an DMAAP Topic Source
54      * @throws IllegalArgumentException if invalid parameters are present
55      */
56     public List<DmaapTopicSource> build(Properties properties);
57
58     /**
59      * Instantiates a new DMAAP Topic Source
60      * 
61      * @param servers list of servers
62      * @param topic topic name
63      * @param apiKey API Key
64      * @param apiSecret API Secret
65      * @param userName user name
66      * @param password password
67      * @param consumerGroup Consumer Group
68      * @param consumerInstance Consumer Instance
69      * @param fetchTimeout Read Fetch Timeout
70      * @param fetchLimit Fetch Limit
71      * @param managed is this endpoind managed?
72      * @param useHttps does the connection use HTTPS?
73      * @param allowSelfSignedCerts does connection allow self-signed certificates?
74      * 
75      * @return an DMAAP Topic Source
76      * @throws IllegalArgumentException if invalid parameters are present
77      */
78     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
79             String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
80             boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
81
82     /**
83      * Instantiates a new DMAAP Topic Source
84      * 
85      * @param servers list of servers
86      * @param topic topic name
87      * @param apiKey API Key
88      * @param apiSecret API Secret
89      * @param userName user name
90      * @param password password
91      * @param consumerGroup Consumer Group
92      * @param consumerInstance Consumer Instance
93      * @param fetchTimeout Read Fetch Timeout
94      * @param fetchLimit Fetch Limit
95      * @param environment DME2 environment
96      * @param aftEnvironment DME2 AFT environment
97      * @param partner DME2 Partner
98      * @param latitude DME2 latitude
99      * @param longitude DME2 longitude
100      * @param additionalProps additional properties to pass to DME2
101      * @param managed is this endpoind managed?
102      * @param useHttps does the connection use HTTPS?
103      * @param allowSelfSignedCerts does connection allow self-signed certificates?
104      * 
105      * @return an DMAAP Topic Source
106      * @throws IllegalArgumentException if invalid parameters are present
107      */
108     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
109             String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
110             String environment, String aftEnvironment, String partner, String latitude, String longitude,
111             Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
112
113     /**
114      * Instantiates a new DMAAP Topic Source
115      * 
116      * @param servers list of servers
117      * @param topic topic name
118      * @param apiKey API Key
119      * @param apiSecret API Secret
120      * 
121      * @return an DMAAP Topic Source
122      * @throws IllegalArgumentException if invalid parameters are present
123      */
124     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
125
126     /**
127      * Instantiates a new DMAAP Topic Source
128      * 
129      * @param servers list of servers
130      * @param topic topic name
131      * 
132      * @return an DMAAP Topic Source
133      * @throws IllegalArgumentException if invalid parameters are present
134      */
135     public DmaapTopicSource build(List<String> servers, String topic);
136
137     /**
138      * Destroys an DMAAP Topic Source based on a topic
139      * 
140      * @param topic topic name
141      * @throws IllegalArgumentException if invalid parameters are present
142      */
143     public void destroy(String topic);
144
145     /**
146      * Destroys all DMAAP Topic Sources
147      */
148     public void destroy();
149
150     /**
151      * gets an DMAAP Topic Source based on topic name
152      * 
153      * @param topic the topic name
154      * @return an DMAAP Topic Source with topic name
155      * @throws IllegalArgumentException if an invalid topic is provided
156      * @throws IllegalStateException if the DMAAP Topic Source is an incorrect state
157      */
158     public DmaapTopicSource get(String topic);
159
160     /**
161      * Provides a snapshot of the DMAAP Topic Sources
162      * 
163      * @return a list of the DMAAP Topic Sources
164      */
165     public List<DmaapTopicSource> inventory();
166 }
167
168
169 /* ------------- implementation ----------------- */
170
171 /**
172  * Factory of DMAAP Source Topics indexed by topic name
173  */
174
175 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
176     private static final String MISSING_TOPIC = "A topic must be provided";
177
178     /**
179      * Logger
180      */
181     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
182
183     /**
184      * DMaaP Topic Name Index
185      */
186     protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
187
188     /**
189      * {@inheritDoc}
190      */
191     @Override
192     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
193             String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
194             String environment, String aftEnvironment, String partner, String latitude, String longitude,
195             Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
196
197         if (topic == null || topic.isEmpty()) {
198             throw new IllegalArgumentException(MISSING_TOPIC);
199         }
200
201         synchronized (this) {
202             if (dmaapTopicSources.containsKey(topic)) {
203                 return dmaapTopicSources.get(topic);
204             }
205
206             DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(servers, topic, apiKey, apiSecret,
207                     userName, password, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, environment,
208                     aftEnvironment, partner, latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts);
209
210             if (managed) {
211                 dmaapTopicSources.put(topic, dmaapTopicSource);
212             }
213
214             return dmaapTopicSource;
215         }
216     }
217
218     /**
219      * {@inheritDoc}
220      */
221     @Override
222     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
223             String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
224             boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
225
226         if (servers == null || servers.isEmpty()) {
227             throw new IllegalArgumentException("DMaaP Server(s) must be provided");
228         }
229
230         if (topic == null || topic.isEmpty()) {
231             throw new IllegalArgumentException(MISSING_TOPIC);
232         }
233
234         synchronized (this) {
235             if (dmaapTopicSources.containsKey(topic)) {
236                 return dmaapTopicSources.get(topic);
237             }
238
239             DmaapTopicSource dmaapTopicSource =
240                     new SingleThreadedDmaapTopicSource(servers, topic, apiKey, apiSecret, userName, password,
241                             consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
242
243             if (managed) {
244                 dmaapTopicSources.put(topic, dmaapTopicSource);
245             }
246
247             return dmaapTopicSource;
248         }
249     }
250
251     /**
252      * {@inheritDoc}
253      */
254     @Override
255     public List<DmaapTopicSource> build(Properties properties) {
256
257         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
258         if (readTopics == null || readTopics.isEmpty()) {
259             logger.info("{}: no topic for DMaaP Source", this);
260             return new ArrayList<>();
261         }
262         List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
263
264         List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
265         synchronized (this) {
266             for (String topic : readTopicList) {
267                 if (this.dmaapTopicSources.containsKey(topic)) {
268                     dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
269                     continue;
270                 }
271
272                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
273                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
274
275                 List<String> serverList;
276                 if (servers != null && !servers.isEmpty()) {
277                     serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
278                 } else {
279                     serverList = new ArrayList<>();
280                 }
281
282                 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
283                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
284
285                 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
286                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
287
288                 String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
289                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
290
291                 String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
292                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
293
294                 String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
295                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
296
297                 String consumerInstance = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
298                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
299
300                 String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
301                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
302
303                 /* DME2 Properties */
304
305                 String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
306                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
307
308                 String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
309                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
310
311                 String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
312                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
313
314                 String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
315                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
316
317                 String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
318                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
319
320                 String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
321                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
322
323                 String dme2EpReadTimeoutMs =
324                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
325                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
326
327                 String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
328                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
329
330                 String dme2RoundtripTimeoutMs =
331                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
332                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
333
334                 String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
335                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
336
337                 String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
338                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
339
340                 String dme2SessionStickinessRequired =
341                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
342                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
343
344                 Map<String, String> dme2AdditionalProps = new HashMap<>();
345
346                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
347                     dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
348                 }
349                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
350                     dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
351                 }
352                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
353                     dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
354                 }
355                 if (dme2Version != null && !dme2Version.isEmpty()) {
356                     dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
357                 }
358                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
359                     dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
360                 }
361                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
362                     dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
363                 }
364                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
365                     dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
366                 }
367
368
369                 if (servers == null || servers.isEmpty()) {
370
371                     logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
372                     continue;
373                 }
374
375                 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
376                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
377                     try {
378                         fetchTimeout = Integer.parseInt(fetchTimeoutString);
379                     } catch (NumberFormatException nfe) {
380                         logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
381                                 topic);
382                     }
383                 }
384
385                 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
386                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
387                 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
388                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
389                     try {
390                         fetchLimit = Integer.parseInt(fetchLimitString);
391                     } catch (NumberFormatException nfe) {
392                         logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
393                                 topic);
394                     }
395                 }
396
397                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
398                         + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
399                 boolean managed = true;
400                 if (managedString != null && !managedString.isEmpty()) {
401                     managed = Boolean.parseBoolean(managedString);
402                 }
403
404                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
405                         + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
406
407                 // default is to use HTTP if no https property exists
408                 boolean useHttps = false;
409                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
410                     useHttps = Boolean.parseBoolean(useHttpsString);
411                 }
412
413                 String allowSelfSignedCertsString =
414                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
415                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
416
417                 // default is to disallow self-signed certs
418                 boolean allowSelfSignedCerts = false;
419                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
420                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
421                 }
422
423
424                 DmaapTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, aafMechId,
425                         aafPassword, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, dme2Environment,
426                         dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude, dme2AdditionalProps, managed,
427                         useHttps, allowSelfSignedCerts);
428
429                 dmaapTopicSourceLst.add(uebTopicSource);
430             }
431         }
432         return dmaapTopicSourceLst;
433     }
434
435     /**
436      * {@inheritDoc}
437      * 
438      * @throws IllegalArgumentException
439      */
440     @Override
441     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
442         return this.build(servers, topic, apiKey, apiSecret, null, null, null, null,
443                 DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, DmaapTopicSource.DEFAULT_LIMIT_FETCH, true, false, false);
444     }
445
446     /**
447      * {@inheritDoc}
448      * 
449      * @throws IllegalArgumentException
450      */
451     @Override
452     public DmaapTopicSource build(List<String> servers, String topic) {
453         return this.build(servers, topic, null, null);
454     }
455
456     /**
457      * {@inheritDoc}
458      */
459     @Override
460     public void destroy(String topic) {
461
462         if (topic == null || topic.isEmpty()) {
463             throw new IllegalArgumentException(MISSING_TOPIC);
464         }
465
466         DmaapTopicSource uebTopicSource;
467
468         synchronized (this) {
469             if (!dmaapTopicSources.containsKey(topic)) {
470                 return;
471             }
472
473             uebTopicSource = dmaapTopicSources.remove(topic);
474         }
475
476         uebTopicSource.shutdown();
477     }
478
479     /**
480      * {@inheritDoc}
481      */
482     @Override
483     public DmaapTopicSource get(String topic) {
484
485         if (topic == null || topic.isEmpty()) {
486             throw new IllegalArgumentException(MISSING_TOPIC);
487         }
488
489         synchronized (this) {
490             if (dmaapTopicSources.containsKey(topic)) {
491                 return dmaapTopicSources.get(topic);
492             } else {
493                 throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found");
494             }
495         }
496     }
497
498     @Override
499     public synchronized List<DmaapTopicSource> inventory() {
500         return new ArrayList<>(this.dmaapTopicSources.values());
501     }
502
503     @Override
504     public void destroy() {
505         List<DmaapTopicSource> readers = this.inventory();
506         for (DmaapTopicSource reader : readers) {
507             reader.shutdown();
508         }
509
510         synchronized (this) {
511             this.dmaapTopicSources.clear();
512         }
513     }
514
515     @Override
516     public String toString() {
517         StringBuilder builder = new StringBuilder();
518         builder.append("IndexedDmaapTopicSourceFactory []");
519         return builder.toString();
520     }
521
522 }
523