d5e04d5043711d70737b5f7ca0e2a70b56dfcfc4
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus;
23
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Properties;
30
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
33 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * DMAAP Topic Source Factory.
39  */
40 public interface DmaapTopicSourceFactory {
41     String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
42     String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
43     String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
44     String DME2_VERSION_PROPERTY = "Version";
45     String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
46     String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
47     String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
48     String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
49
50     /**
51      * Creates an DMAAP Topic Source based on properties files.
52      * 
53      * @param properties Properties containing initialization values
54      * 
55      * @return an DMAAP Topic Source
56      * @throws IllegalArgumentException if invalid parameters are present
57      */
58     List<DmaapTopicSource> build(Properties properties);
59
60     /**
61      * Instantiates a new DMAAP Topic Source.
62      * 
63      * @param busTopicParams parameters object
64      * @return a DMAAP Topic Source
65      */
66     DmaapTopicSource build(BusTopicParams busTopicParams);
67
68     /**
69      * Instantiates a new DMAAP Topic Source.
70      * 
71      * @param servers list of servers
72      * @param topic topic name
73      * @param apiKey API Key
74      * @param apiSecret API Secret
75      * 
76      * @return an DMAAP Topic Source
77      * @throws IllegalArgumentException if invalid parameters are present
78      */
79     DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
80
81     /**
82      * Instantiates a new DMAAP Topic Source.
83      * 
84      * @param servers list of servers
85      * @param topic topic name
86      * 
87      * @return an DMAAP Topic Source
88      * @throws IllegalArgumentException if invalid parameters are present
89      */
90     DmaapTopicSource build(List<String> servers, String topic);
91
92     /**
93      * Destroys an DMAAP Topic Source based on a topic.
94      * 
95      * @param topic topic name
96      * @throws IllegalArgumentException if invalid parameters are present
97      */
98     void destroy(String topic);
99
100     /**
101      * Destroys all DMAAP Topic Sources.
102      */
103     void destroy();
104
105     /**
106      * Gets an DMAAP Topic Source based on topic name.
107      * 
108      * @param topic the topic name
109      * @return an DMAAP Topic Source with topic name
110      * @throws IllegalArgumentException if an invalid topic is provided
111      * @throws IllegalStateException if the DMAAP Topic Source is an incorrect state
112      */
113     DmaapTopicSource get(String topic);
114
115     /**
116      * Provides a snapshot of the DMAAP Topic Sources.
117      * 
118      * @return a list of the DMAAP Topic Sources
119      */
120     List<DmaapTopicSource> inventory();
121 }
122
123
124 /* ------------- implementation ----------------- */
125
126 /**
127  * Factory of DMAAP Source Topics indexed by topic name.
128  */
129
130 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
131     private static final String MISSING_TOPIC = "A topic must be provided";
132
133     /**
134      * Logger.
135      */
136     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
137
138     /**
139      * DMaaP Topic Name Index.
140      */
141     protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
142
143     @Override
144     public DmaapTopicSource build(BusTopicParams busTopicParams) {
145
146         if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
147             throw new IllegalArgumentException(MISSING_TOPIC);
148         }
149
150         synchronized (this) {
151             if (dmaapTopicSources.containsKey(busTopicParams.getTopic())) {
152                 return dmaapTopicSources.get(busTopicParams.getTopic());
153             }
154
155             DmaapTopicSource dmaapTopicSource =
156                     new SingleThreadedDmaapTopicSource(busTopicParams);
157
158             if (busTopicParams.isManaged()) {
159                 dmaapTopicSources.put(busTopicParams.getTopic(), dmaapTopicSource);
160             }
161             return dmaapTopicSource;
162         }
163     }
164
165     @Override
166     public List<DmaapTopicSource> build(Properties properties) {
167
168         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
169         if (readTopics == null || readTopics.isEmpty()) {
170             logger.info("{}: no topic for DMaaP Source", this);
171             return new ArrayList<>();
172         }
173         List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
174
175         List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
176         synchronized (this) {
177             for (String topic : readTopicList) {
178                 if (this.dmaapTopicSources.containsKey(topic)) {
179                     dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
180                     continue;
181                 }
182
183                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
184                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
185
186                 List<String> serverList;
187                 if (servers != null && !servers.isEmpty()) {
188                     serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
189                 } else {
190                     serverList = new ArrayList<>();
191                 }
192
193                 final String apiKey = properties.getProperty(
194                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
195                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
196
197                 final String apiSecret = properties.getProperty(
198                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
199                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
200
201                 final String aafMechId = properties.getProperty(
202                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
203                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
204
205                 final String aafPassword = properties.getProperty(
206                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
207                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
208
209                 final String consumerGroup = properties.getProperty(
210                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
211                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
212
213                 final String consumerInstance = properties.getProperty(
214                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
215                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
216
217                 final String fetchTimeoutString = properties.getProperty(
218                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
219                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
220
221                 /* DME2 Properties */
222
223                 final String dme2Environment = properties.getProperty(
224                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
225                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
226
227                 final String dme2AftEnvironment = properties.getProperty(
228                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
229                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
230
231                 final String dme2Partner = properties.getProperty(
232                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
233                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
234
235                 final String dme2RouteOffer = properties.getProperty(
236                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
237                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
238
239                 final String dme2Latitude = properties.getProperty(
240                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
241                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
242
243                 final String dme2Longitude = properties.getProperty(
244                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
245                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
246
247                 final String dme2EpReadTimeoutMs =
248                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
249                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
250
251                 final String dme2EpConnTimeout = properties.getProperty(
252                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
253                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
254
255                 final String dme2RoundtripTimeoutMs =
256                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
257                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
258
259                 final String dme2Version = properties.getProperty(
260                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
261                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
262
263                 final String dme2SubContextPath = properties.getProperty(
264                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
265                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
266
267                 final String dme2SessionStickinessRequired =
268                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
269                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
270
271                 Map<String, String> dme2AdditionalProps = new HashMap<>();
272
273                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
274                     dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
275                 }
276                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
277                     dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
278                 }
279                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
280                     dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
281                 }
282                 if (dme2Version != null && !dme2Version.isEmpty()) {
283                     dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
284                 }
285                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
286                     dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
287                 }
288                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
289                     dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
290                 }
291                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
292                     dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
293                 }
294
295
296                 if (servers == null || servers.isEmpty()) {
297
298                     logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
299                     continue;
300                 }
301
302                 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
303                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
304                     try {
305                         fetchTimeout = Integer.parseInt(fetchTimeoutString);
306                     } catch (NumberFormatException nfe) {
307                         logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
308                                 topic);
309                     }
310                 }
311
312                 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
313                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
314                 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
315                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
316                     try {
317                         fetchLimit = Integer.parseInt(fetchLimitString);
318                     } catch (NumberFormatException nfe) {
319                         logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
320                                 topic);
321                     }
322                 }
323
324                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
325                         + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
326                 boolean managed = true;
327                 if (managedString != null && !managedString.isEmpty()) {
328                     managed = Boolean.parseBoolean(managedString);
329                 }
330
331                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
332                         + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
333
334                 // default is to use HTTP if no https property exists
335                 boolean useHttps = false;
336                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
337                     useHttps = Boolean.parseBoolean(useHttpsString);
338                 }
339
340                 String allowSelfSignedCertsString =
341                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
342                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
343
344                 // default is to disallow self-signed certs
345                 boolean allowSelfSignedCerts = false;
346                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
347                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
348                 }
349
350
351                 DmaapTopicSource uebTopicSource = this.build(BusTopicParams.builder()
352                         .servers(serverList)
353                         .topic(topic)
354                         .apiKey(apiKey)
355                         .apiSecret(apiSecret)
356                         .userName(aafMechId)
357                         .password(aafPassword)
358                         .consumerGroup(consumerGroup)
359                         .consumerInstance(consumerInstance)
360                         .fetchTimeout(fetchTimeout)
361                         .fetchLimit(fetchLimit)
362                         .environment(dme2Environment)
363                         .aftEnvironment(dme2AftEnvironment)
364                         .partner(dme2Partner)
365                         .latitude(dme2Latitude)
366                         .longitude(dme2Longitude)
367                         .additionalProps(dme2AdditionalProps)
368                         .managed(managed)
369                         .useHttps(useHttps)
370                         .allowSelfSignedCerts(allowSelfSignedCerts)
371                         .build());
372
373                 dmaapTopicSourceLst.add(uebTopicSource);
374             }
375         }
376         return dmaapTopicSourceLst;
377     }
378
379     @Override
380     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
381         return this.build(BusTopicParams.builder()
382                 .servers(servers)
383                 .topic(topic)
384                 .apiKey(apiKey)
385                 .apiSecret(apiSecret)
386                 .fetchTimeout(DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH)
387                 .fetchLimit(DmaapTopicSource.DEFAULT_LIMIT_FETCH)
388                 .managed(true)
389                 .useHttps(false)
390                 .allowSelfSignedCerts(false)
391                 .build());
392     }
393
394     @Override
395     public DmaapTopicSource build(List<String> servers, String topic) {
396         return this.build(servers, topic, null, null);
397     }
398
399     @Override
400     public void destroy(String topic) {
401
402         if (topic == null || topic.isEmpty()) {
403             throw new IllegalArgumentException(MISSING_TOPIC);
404         }
405
406         DmaapTopicSource uebTopicSource;
407
408         synchronized (this) {
409             if (!dmaapTopicSources.containsKey(topic)) {
410                 return;
411             }
412
413             uebTopicSource = dmaapTopicSources.remove(topic);
414         }
415
416         uebTopicSource.shutdown();
417     }
418
419     @Override
420     public void destroy() {
421         List<DmaapTopicSource> readers = this.inventory();
422         for (DmaapTopicSource reader : readers) {
423             reader.shutdown();
424         }
425
426         synchronized (this) {
427             this.dmaapTopicSources.clear();
428         }
429     }
430
431     @Override
432     public DmaapTopicSource get(String topic) {
433
434         if (topic == null || topic.isEmpty()) {
435             throw new IllegalArgumentException(MISSING_TOPIC);
436         }
437
438         synchronized (this) {
439             if (dmaapTopicSources.containsKey(topic)) {
440                 return dmaapTopicSources.get(topic);
441             } else {
442                 throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found");
443             }
444         }
445     }
446
447     @Override
448     public synchronized List<DmaapTopicSource> inventory() {
449         return new ArrayList<>(this.dmaapTopicSources.values());
450     }
451
452     @Override
453     public String toString() {
454         StringBuilder builder = new StringBuilder();
455         builder.append("IndexedDmaapTopicSourceFactory []");
456         return builder.toString();
457     }
458
459 }
460