f45164f875ea42cbb26cf0e900cffd9868a2682b
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus;
23
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Properties;
30
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
33 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * DMAAP Topic Source Factory.
39  */
40 public interface DmaapTopicSourceFactory {
41     String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
42     String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
43     String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
44     String DME2_VERSION_PROPERTY = "Version";
45     String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
46     String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
47     String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
48     String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
49
50     /**
51      * Creates an DMAAP Topic Source based on properties files.
52      * 
53      * @param properties Properties containing initialization values
54      * 
55      * @return an DMAAP Topic Source
56      * @throws IllegalArgumentException if invalid parameters are present
57      */
58     List<DmaapTopicSource> build(Properties properties);
59
60     /**
61      * Instantiates a new DMAAP Topic Source.
62      * 
63      * @param busTopicParams parameters object
64      * @return a DMAAP Topic Source
65      */
66     DmaapTopicSource build(BusTopicParams busTopicParams);
67
68     /**
69      * Instantiates a new DMAAP Topic Source.
70      * 
71      * @param servers list of servers
72      * @param topic topic name
73      * @param apiKey API Key
74      * @param apiSecret API Secret
75      * 
76      * @return an DMAAP Topic Source
77      * @throws IllegalArgumentException if invalid parameters are present
78      */
79     DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
80
81     /**
82      * Instantiates a new DMAAP Topic Source.
83      * 
84      * @param servers list of servers
85      * @param topic topic name
86      * 
87      * @return an DMAAP Topic Source
88      * @throws IllegalArgumentException if invalid parameters are present
89      */
90     DmaapTopicSource build(List<String> servers, String topic);
91
92     /**
93      * Destroys an DMAAP Topic Source based on a topic.
94      * 
95      * @param topic topic name
96      * @throws IllegalArgumentException if invalid parameters are present
97      */
98     void destroy(String topic);
99
100     /**
101      * Destroys all DMAAP Topic Sources.
102      */
103     void destroy();
104
105     /**
106      * Gets an DMAAP Topic Source based on topic name.
107      * 
108      * @param topic the topic name
109      * @return an DMAAP Topic Source with topic name
110      * @throws IllegalArgumentException if an invalid topic is provided
111      * @throws IllegalStateException if the DMAAP Topic Source is an incorrect state
112      */
113     DmaapTopicSource get(String topic);
114
115     /**
116      * Provides a snapshot of the DMAAP Topic Sources.
117      * 
118      * @return a list of the DMAAP Topic Sources
119      */
120     List<DmaapTopicSource> inventory();
121 }
122
123
124 /* ------------- implementation ----------------- */
125
126 /**
127  * Factory of DMAAP Source Topics indexed by topic name.
128  */
129
130 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
131     private static final String MISSING_TOPIC = "A topic must be provided";
132
133     /**
134      * Logger.
135      */
136     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
137
138     /**
139      * DMaaP Topic Name Index.
140      */
141     protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
142
143     @Override
144     public DmaapTopicSource build(BusTopicParams busTopicParams) {
145
146         if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
147             throw new IllegalArgumentException(MISSING_TOPIC);
148         }
149
150         synchronized (this) {
151             if (dmaapTopicSources.containsKey(busTopicParams.getTopic())) {
152                 return dmaapTopicSources.get(busTopicParams.getTopic());
153             }
154
155             DmaapTopicSource dmaapTopicSource = makeSource(busTopicParams);
156
157             if (busTopicParams.isManaged()) {
158                 dmaapTopicSources.put(busTopicParams.getTopic(), dmaapTopicSource);
159             }
160             return dmaapTopicSource;
161         }
162     }
163
164     @Override
165     public List<DmaapTopicSource> build(Properties properties) {
166
167         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
168         if (readTopics == null || readTopics.isEmpty()) {
169             logger.info("{}: no topic for DMaaP Source", this);
170             return new ArrayList<>();
171         }
172         List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
173
174         List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
175         synchronized (this) {
176             for (String topic : readTopicList) {
177                 if (this.dmaapTopicSources.containsKey(topic)) {
178                     dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
179                     continue;
180                 }
181
182                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
183                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
184
185                 List<String> serverList;
186                 if (servers != null && !servers.isEmpty()) {
187                     serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
188                 } else {
189                     serverList = new ArrayList<>();
190                 }
191
192                 final String apiKey = properties.getProperty(
193                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
194                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
195
196                 final String apiSecret = properties.getProperty(
197                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
198                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
199
200                 final String aafMechId = properties.getProperty(
201                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
202                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
203
204                 final String aafPassword = properties.getProperty(
205                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
206                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
207
208                 final String consumerGroup = properties.getProperty(
209                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
210                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
211
212                 final String consumerInstance = properties.getProperty(
213                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
214                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
215
216                 final String fetchTimeoutString = properties.getProperty(
217                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
218                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
219
220                 /* DME2 Properties */
221
222                 final String dme2Environment = properties.getProperty(
223                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
224                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
225
226                 final String dme2AftEnvironment = properties.getProperty(
227                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
228                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
229
230                 final String dme2Partner = properties.getProperty(
231                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
232                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
233
234                 final String dme2RouteOffer = properties.getProperty(
235                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
236                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
237
238                 final String dme2Latitude = properties.getProperty(
239                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
240                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
241
242                 final String dme2Longitude = properties.getProperty(
243                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
244                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
245
246                 final String dme2EpReadTimeoutMs =
247                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
248                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
249
250                 final String dme2EpConnTimeout = properties.getProperty(
251                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
252                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
253
254                 final String dme2RoundtripTimeoutMs =
255                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
256                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
257
258                 final String dme2Version = properties.getProperty(
259                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
260                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
261
262                 final String dme2SubContextPath = properties.getProperty(
263                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
264                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
265
266                 final String dme2SessionStickinessRequired =
267                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
268                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
269
270                 Map<String, String> dme2AdditionalProps = new HashMap<>();
271
272                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
273                     dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
274                 }
275                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
276                     dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
277                 }
278                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
279                     dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
280                 }
281                 if (dme2Version != null && !dme2Version.isEmpty()) {
282                     dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
283                 }
284                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
285                     dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
286                 }
287                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
288                     dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
289                 }
290                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
291                     dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
292                 }
293
294
295                 if (servers == null || servers.isEmpty()) {
296
297                     logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
298                     continue;
299                 }
300
301                 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
302                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
303                     try {
304                         fetchTimeout = Integer.parseInt(fetchTimeoutString);
305                     } catch (NumberFormatException nfe) {
306                         logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
307                                 topic);
308                     }
309                 }
310
311                 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
312                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
313                 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
314                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
315                     try {
316                         fetchLimit = Integer.parseInt(fetchLimitString);
317                     } catch (NumberFormatException nfe) {
318                         logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
319                                 topic);
320                     }
321                 }
322
323                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
324                         + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
325                 boolean managed = true;
326                 if (managedString != null && !managedString.isEmpty()) {
327                     managed = Boolean.parseBoolean(managedString);
328                 }
329
330                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
331                         + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
332
333                 // default is to use HTTP if no https property exists
334                 boolean useHttps = false;
335                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
336                     useHttps = Boolean.parseBoolean(useHttpsString);
337                 }
338
339                 String allowSelfSignedCertsString =
340                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
341                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
342
343                 // default is to disallow self-signed certs
344                 boolean allowSelfSignedCerts = false;
345                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
346                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
347                 }
348
349
350                 DmaapTopicSource uebTopicSource = this.build(BusTopicParams.builder()
351                         .servers(serverList)
352                         .topic(topic)
353                         .apiKey(apiKey)
354                         .apiSecret(apiSecret)
355                         .userName(aafMechId)
356                         .password(aafPassword)
357                         .consumerGroup(consumerGroup)
358                         .consumerInstance(consumerInstance)
359                         .fetchTimeout(fetchTimeout)
360                         .fetchLimit(fetchLimit)
361                         .environment(dme2Environment)
362                         .aftEnvironment(dme2AftEnvironment)
363                         .partner(dme2Partner)
364                         .latitude(dme2Latitude)
365                         .longitude(dme2Longitude)
366                         .additionalProps(dme2AdditionalProps)
367                         .managed(managed)
368                         .useHttps(useHttps)
369                         .allowSelfSignedCerts(allowSelfSignedCerts)
370                         .build());
371
372                 dmaapTopicSourceLst.add(uebTopicSource);
373             }
374         }
375         return dmaapTopicSourceLst;
376     }
377
378     @Override
379     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
380         return this.build(BusTopicParams.builder()
381                 .servers(servers)
382                 .topic(topic)
383                 .apiKey(apiKey)
384                 .apiSecret(apiSecret)
385                 .fetchTimeout(DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH)
386                 .fetchLimit(DmaapTopicSource.DEFAULT_LIMIT_FETCH)
387                 .managed(true)
388                 .useHttps(false)
389                 .allowSelfSignedCerts(false)
390                 .build());
391     }
392
393     @Override
394     public DmaapTopicSource build(List<String> servers, String topic) {
395         return this.build(servers, topic, null, null);
396     }
397
398     /**
399      * Makes a new source.
400      * 
401      * @param busTopicParams parameters to use to configure the source
402      * @return a new source
403      */
404     protected DmaapTopicSource makeSource(BusTopicParams busTopicParams) {
405         return new SingleThreadedDmaapTopicSource(busTopicParams);
406     }
407
408     @Override
409     public void destroy(String topic) {
410
411         if (topic == null || topic.isEmpty()) {
412             throw new IllegalArgumentException(MISSING_TOPIC);
413         }
414
415         DmaapTopicSource uebTopicSource;
416
417         synchronized (this) {
418             if (!dmaapTopicSources.containsKey(topic)) {
419                 return;
420             }
421
422             uebTopicSource = dmaapTopicSources.remove(topic);
423         }
424
425         uebTopicSource.shutdown();
426     }
427
428     @Override
429     public void destroy() {
430         List<DmaapTopicSource> readers = this.inventory();
431         for (DmaapTopicSource reader : readers) {
432             reader.shutdown();
433         }
434
435         synchronized (this) {
436             this.dmaapTopicSources.clear();
437         }
438     }
439
440     @Override
441     public DmaapTopicSource get(String topic) {
442
443         if (topic == null || topic.isEmpty()) {
444             throw new IllegalArgumentException(MISSING_TOPIC);
445         }
446
447         synchronized (this) {
448             if (dmaapTopicSources.containsKey(topic)) {
449                 return dmaapTopicSources.get(topic);
450             } else {
451                 throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found");
452             }
453         }
454     }
455
456     @Override
457     public synchronized List<DmaapTopicSource> inventory() {
458         return new ArrayList<>(this.dmaapTopicSources.values());
459     }
460
461     @Override
462     public String toString() {
463         return "IndexedDmaapTopicSourceFactory []";
464     }
465
466 }
467