30cc923b55fce7072fcc351549c1e2ec4a7867a7
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus;
23
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Properties;
30
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
33 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * DMAAP Topic Source Factory.
39  */
40 public interface DmaapTopicSourceFactory {
41     String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
42     String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
43     String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
44     String DME2_VERSION_PROPERTY = "Version";
45     String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
46     String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
47     String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
48     String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
49
50     /**
51      * Creates an DMAAP Topic Source based on properties files.
52      * 
53      * @param properties Properties containing initialization values
54      * 
55      * @return an DMAAP Topic Source
56      * @throws IllegalArgumentException if invalid parameters are present
57      */
58     List<DmaapTopicSource> build(Properties properties);
59
60     /**
61      * Instantiates a new DMAAP Topic Source.
62      * 
63      * @param busTopicParams parameters object
64      * @return a DMAAP Topic Source
65      */
66     DmaapTopicSource build(BusTopicParams busTopicParams);
67
68     /**
69      * Instantiates a new DMAAP Topic Source.
70      * 
71      * @param servers list of servers
72      * @param topic topic name
73      * @param apiKey API Key
74      * @param apiSecret API Secret
75      * 
76      * @return an DMAAP Topic Source
77      * @throws IllegalArgumentException if invalid parameters are present
78      */
79     DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
80
81     /**
82      * Instantiates a new DMAAP Topic Source.
83      * 
84      * @param servers list of servers
85      * @param topic topic name
86      * 
87      * @return an DMAAP Topic Source
88      * @throws IllegalArgumentException if invalid parameters are present
89      */
90     DmaapTopicSource build(List<String> servers, String topic);
91
92     /**
93      * Destroys an DMAAP Topic Source based on a topic.
94      * 
95      * @param topic topic name
96      * @throws IllegalArgumentException if invalid parameters are present
97      */
98     void destroy(String topic);
99
100     /**
101      * Destroys all DMAAP Topic Sources.
102      */
103     void destroy();
104
105     /**
106      * Gets an DMAAP Topic Source based on topic name.
107      * 
108      * @param topic the topic name
109      * @return an DMAAP Topic Source with topic name
110      * @throws IllegalArgumentException if an invalid topic is provided
111      * @throws IllegalStateException if the DMAAP Topic Source is an incorrect state
112      */
113     DmaapTopicSource get(String topic);
114
115     /**
116      * Provides a snapshot of the DMAAP Topic Sources.
117      * 
118      * @return a list of the DMAAP Topic Sources
119      */
120     List<DmaapTopicSource> inventory();
121 }
122
123
124 /* ------------- implementation ----------------- */
125
126 /**
127  * Factory of DMAAP Source Topics indexed by topic name.
128  */
129
130 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
131     private static final String MISSING_TOPIC = "A topic must be provided";
132
133     /**
134      * Logger.
135      */
136     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
137
138     /**
139      * DMaaP Topic Name Index.
140      */
141     protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
142
143
144     /**
145      * {@inheritDoc}
146      */
147     @Override
148     public DmaapTopicSource build(BusTopicParams busTopicParams) {
149
150         if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
151             throw new IllegalArgumentException(MISSING_TOPIC);
152         }
153
154         synchronized (this) {
155             if (dmaapTopicSources.containsKey(busTopicParams.getTopic())) {
156                 return dmaapTopicSources.get(busTopicParams.getTopic());
157             }
158
159             DmaapTopicSource dmaapTopicSource =
160                     new SingleThreadedDmaapTopicSource(busTopicParams);
161
162             if (busTopicParams.isManaged()) {
163                 dmaapTopicSources.put(busTopicParams.getTopic(), dmaapTopicSource);
164             }
165             return dmaapTopicSource;
166         }
167     }
168
169     /**
170      * {@inheritDoc}
171      */
172     @Override
173     public List<DmaapTopicSource> build(Properties properties) {
174
175         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
176         if (readTopics == null || readTopics.isEmpty()) {
177             logger.info("{}: no topic for DMaaP Source", this);
178             return new ArrayList<>();
179         }
180         List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
181
182         List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
183         synchronized (this) {
184             for (String topic : readTopicList) {
185                 if (this.dmaapTopicSources.containsKey(topic)) {
186                     dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
187                     continue;
188                 }
189
190                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
191                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
192
193                 List<String> serverList;
194                 if (servers != null && !servers.isEmpty()) {
195                     serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
196                 } else {
197                     serverList = new ArrayList<>();
198                 }
199
200                 final String apiKey = properties.getProperty(
201                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
202                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
203
204                 final String apiSecret = properties.getProperty(
205                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
206                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
207
208                 final String aafMechId = properties.getProperty(
209                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
210                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
211
212                 final String aafPassword = properties.getProperty(
213                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
214                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
215
216                 final String consumerGroup = properties.getProperty(
217                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
218                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
219
220                 final String consumerInstance = properties.getProperty(
221                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
222                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
223
224                 final String fetchTimeoutString = properties.getProperty(
225                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
226                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
227
228                 /* DME2 Properties */
229
230                 final String dme2Environment = properties.getProperty(
231                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
232                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
233
234                 final String dme2AftEnvironment = properties.getProperty(
235                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
236                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
237
238                 final String dme2Partner = properties.getProperty(
239                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
240                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
241
242                 final String dme2RouteOffer = properties.getProperty(
243                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
244                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
245
246                 final String dme2Latitude = properties.getProperty(
247                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
248                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
249
250                 final String dme2Longitude = properties.getProperty(
251                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
252                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
253
254                 final String dme2EpReadTimeoutMs =
255                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
256                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
257
258                 final String dme2EpConnTimeout = properties.getProperty(
259                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
260                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
261
262                 final String dme2RoundtripTimeoutMs =
263                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
264                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
265
266                 final String dme2Version = properties.getProperty(
267                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
268                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
269
270                 final String dme2SubContextPath = properties.getProperty(
271                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
272                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
273
274                 final String dme2SessionStickinessRequired =
275                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
276                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
277
278                 Map<String, String> dme2AdditionalProps = new HashMap<>();
279
280                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
281                     dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
282                 }
283                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
284                     dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
285                 }
286                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
287                     dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
288                 }
289                 if (dme2Version != null && !dme2Version.isEmpty()) {
290                     dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
291                 }
292                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
293                     dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
294                 }
295                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
296                     dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
297                 }
298                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
299                     dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
300                 }
301
302
303                 if (servers == null || servers.isEmpty()) {
304
305                     logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
306                     continue;
307                 }
308
309                 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
310                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
311                     try {
312                         fetchTimeout = Integer.parseInt(fetchTimeoutString);
313                     } catch (NumberFormatException nfe) {
314                         logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
315                                 topic);
316                     }
317                 }
318
319                 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
320                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
321                 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
322                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
323                     try {
324                         fetchLimit = Integer.parseInt(fetchLimitString);
325                     } catch (NumberFormatException nfe) {
326                         logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
327                                 topic);
328                     }
329                 }
330
331                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
332                         + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
333                 boolean managed = true;
334                 if (managedString != null && !managedString.isEmpty()) {
335                     managed = Boolean.parseBoolean(managedString);
336                 }
337
338                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
339                         + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
340
341                 // default is to use HTTP if no https property exists
342                 boolean useHttps = false;
343                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
344                     useHttps = Boolean.parseBoolean(useHttpsString);
345                 }
346
347                 String allowSelfSignedCertsString =
348                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
349                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
350
351                 // default is to disallow self-signed certs
352                 boolean allowSelfSignedCerts = false;
353                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
354                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
355                 }
356
357
358                 DmaapTopicSource uebTopicSource = this.build(BusTopicParams.builder()
359                         .servers(serverList)
360                         .topic(topic)
361                         .apiKey(apiKey)
362                         .apiSecret(apiSecret)
363                         .userName(aafMechId)
364                         .password(aafPassword)
365                         .consumerGroup(consumerGroup)
366                         .consumerInstance(consumerInstance)
367                         .fetchTimeout(fetchTimeout)
368                         .fetchLimit(fetchLimit)
369                         .environment(dme2Environment)
370                         .aftEnvironment(dme2AftEnvironment)
371                         .partner(dme2Partner)
372                         .latitude(dme2Latitude)
373                         .longitude(dme2Longitude)
374                         .additionalProps(dme2AdditionalProps)
375                         .managed(managed)
376                         .useHttps(useHttps)
377                         .allowSelfSignedCerts(allowSelfSignedCerts)
378                         .build());
379
380                 dmaapTopicSourceLst.add(uebTopicSource);
381             }
382         }
383         return dmaapTopicSourceLst;
384     }
385
386     /**
387      * {@inheritDoc}
388      * 
389      * @throws IllegalArgumentException throws illegal argument exception
390      */
391     @Override
392     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
393         return this.build(BusTopicParams.builder()
394                 .servers(servers)
395                 .topic(topic)
396                 .apiKey(apiKey)
397                 .apiSecret(apiSecret)
398                 .fetchTimeout(DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH)
399                 .fetchLimit(DmaapTopicSource.DEFAULT_LIMIT_FETCH)
400                 .managed(true)
401                 .useHttps(false)
402                 .allowSelfSignedCerts(false)
403                 .build());
404     }
405
406     /**
407      * {@inheritDoc}
408      * 
409      * @throws IllegalArgumentException throws illegal argument exception
410      */
411     @Override
412     public DmaapTopicSource build(List<String> servers, String topic) {
413         return this.build(servers, topic, null, null);
414     }
415
416     /**
417      * {@inheritDoc}
418      */
419     @Override
420     public void destroy(String topic) {
421
422         if (topic == null || topic.isEmpty()) {
423             throw new IllegalArgumentException(MISSING_TOPIC);
424         }
425
426         DmaapTopicSource uebTopicSource;
427
428         synchronized (this) {
429             if (!dmaapTopicSources.containsKey(topic)) {
430                 return;
431             }
432
433             uebTopicSource = dmaapTopicSources.remove(topic);
434         }
435
436         uebTopicSource.shutdown();
437     }
438
439     @Override
440     public void destroy() {
441         List<DmaapTopicSource> readers = this.inventory();
442         for (DmaapTopicSource reader : readers) {
443             reader.shutdown();
444         }
445
446         synchronized (this) {
447             this.dmaapTopicSources.clear();
448         }
449     }
450
451     /**
452      * {@inheritDoc}
453      */
454     @Override
455     public DmaapTopicSource get(String topic) {
456
457         if (topic == null || topic.isEmpty()) {
458             throw new IllegalArgumentException(MISSING_TOPIC);
459         }
460
461         synchronized (this) {
462             if (dmaapTopicSources.containsKey(topic)) {
463                 return dmaapTopicSources.get(topic);
464             } else {
465                 throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found");
466             }
467         }
468     }
469
470     @Override
471     public synchronized List<DmaapTopicSource> inventory() {
472         return new ArrayList<>(this.dmaapTopicSources.values());
473     }
474
475     @Override
476     public String toString() {
477         StringBuilder builder = new StringBuilder();
478         builder.append("IndexedDmaapTopicSourceFactory []");
479         return builder.toString();
480     }
481
482 }
483