41611f4e1cbc704da4efc1889d1e0471639e7f9b
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus;
23
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Properties;
30
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
33 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * DMAAP Topic Source Factory.
39  */
40 public interface DmaapTopicSourceFactory {
41     String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
42     String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
43     String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
44     String DME2_VERSION_PROPERTY = "Version";
45     String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
46     String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
47     String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
48     String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
49
50     /**
51      * Creates an DMAAP Topic Source based on properties files.
52      * 
53      * @param properties Properties containing initialization values
54      * 
55      * @return an DMAAP Topic Source
56      * @throws IllegalArgumentException if invalid parameters are present
57      */
58     List<DmaapTopicSource> build(Properties properties);
59
60     /**
61      * Instantiates a new DMAAP Topic Source.
62      *
63      * servers list of servers
64      * topic topic name
65      * apiKey API Key
66      * apiSecret API Secret
67      * userName user name
68      * password password
69      * consumerGroup Consumer Group
70      * consumerInstance Consumer Instance
71      * fetchTimeout Read Fetch Timeout
72      * fetchLimit Fetch Limit
73      * managed is this endpoind managed?
74      * useHttps does the connection use HTTPS?
75      * allowSelfSignedCerts does connection allow self-signed certificates?
76      * @param busTopicParams parameter object
77      * @return an DMAAP Topic Source
78      * @throws IllegalArgumentException if invalid parameters are present
79      */
80     DmaapTopicSource build(BusTopicParams busTopicParams);
81
82     /**
83      * Instantiates a new DMAAP Topic Source.
84      * 
85      * @param servers list of servers
86      * @param topic topic name
87      * @param apiKey API Key
88      * @param apiSecret API Secret
89      * 
90      * @return an DMAAP Topic Source
91      * @throws IllegalArgumentException if invalid parameters are present
92      */
93     DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
94
95     /**
96      * Instantiates a new DMAAP Topic Source.
97      * 
98      * @param servers list of servers
99      * @param topic topic name
100      * 
101      * @return an DMAAP Topic Source
102      * @throws IllegalArgumentException if invalid parameters are present
103      */
104     DmaapTopicSource build(List<String> servers, String topic);
105
106     /**
107      * Destroys an DMAAP Topic Source based on a topic.
108      * 
109      * @param topic topic name
110      * @throws IllegalArgumentException if invalid parameters are present
111      */
112     void destroy(String topic);
113
114     /**
115      * Destroys all DMAAP Topic Sources.
116      */
117     void destroy();
118
119     /**
120      * Gets an DMAAP Topic Source based on topic name.
121      * 
122      * @param topic the topic name
123      * @return an DMAAP Topic Source with topic name
124      * @throws IllegalArgumentException if an invalid topic is provided
125      * @throws IllegalStateException if the DMAAP Topic Source is an incorrect state
126      */
127     DmaapTopicSource get(String topic);
128
129     /**
130      * Provides a snapshot of the DMAAP Topic Sources.
131      * 
132      * @return a list of the DMAAP Topic Sources
133      */
134     List<DmaapTopicSource> inventory();
135 }
136
137
138 /* ------------- implementation ----------------- */
139
140 /**
141  * Factory of DMAAP Source Topics indexed by topic name.
142  */
143
144 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
145     private static final String MISSING_TOPIC = "A topic must be provided";
146
147     /**
148      * Logger.
149      */
150     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
151
152     /**
153      * DMaaP Topic Name Index.
154      */
155     protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
156
157
158     /**
159      * {@inheritDoc}
160      */
161     @Override
162     public DmaapTopicSource build(BusTopicParams busTopicParams) {
163
164         if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
165             throw new IllegalArgumentException(MISSING_TOPIC);
166         }
167
168         synchronized (this) {
169             if (dmaapTopicSources.containsKey(busTopicParams.getTopic())) {
170                 return dmaapTopicSources.get(busTopicParams.getTopic());
171             }
172
173             DmaapTopicSource dmaapTopicSource =
174                     new SingleThreadedDmaapTopicSource(busTopicParams);
175
176             if (busTopicParams.isManaged()) {
177                 dmaapTopicSources.put(busTopicParams.getTopic(), dmaapTopicSource);
178             }
179             return dmaapTopicSource;
180         }
181     }
182
183     /**
184      * {@inheritDoc}
185      */
186     @Override
187     public List<DmaapTopicSource> build(Properties properties) {
188
189         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
190         if (readTopics == null || readTopics.isEmpty()) {
191             logger.info("{}: no topic for DMaaP Source", this);
192             return new ArrayList<>();
193         }
194         List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
195
196         List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
197         synchronized (this) {
198             for (String topic : readTopicList) {
199                 if (this.dmaapTopicSources.containsKey(topic)) {
200                     dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
201                     continue;
202                 }
203
204                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
205                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
206
207                 List<String> serverList;
208                 if (servers != null && !servers.isEmpty()) {
209                     serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
210                 } else {
211                     serverList = new ArrayList<>();
212                 }
213
214                 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
215                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
216
217                 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
218                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
219
220                 String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
221                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
222
223                 String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
224                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
225
226                 String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
227                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
228
229                 String consumerInstance = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
230                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
231
232                 String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
233                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
234
235                 /* DME2 Properties */
236
237                 String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
238                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
239
240                 String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
241                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
242
243                 String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
244                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
245
246                 String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
247                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
248
249                 String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
250                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
251
252                 String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
253                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
254
255                 String dme2EpReadTimeoutMs =
256                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
257                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
258
259                 String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
260                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
261
262                 String dme2RoundtripTimeoutMs =
263                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
264                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
265
266                 String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
267                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
268
269                 String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
270                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
271
272                 String dme2SessionStickinessRequired =
273                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
274                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
275
276                 Map<String, String> dme2AdditionalProps = new HashMap<>();
277
278                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
279                     dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
280                 }
281                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
282                     dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
283                 }
284                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
285                     dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
286                 }
287                 if (dme2Version != null && !dme2Version.isEmpty()) {
288                     dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
289                 }
290                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
291                     dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
292                 }
293                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
294                     dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
295                 }
296                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
297                     dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
298                 }
299
300
301                 if (servers == null || servers.isEmpty()) {
302
303                     logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
304                     continue;
305                 }
306
307                 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
308                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
309                     try {
310                         fetchTimeout = Integer.parseInt(fetchTimeoutString);
311                     } catch (NumberFormatException nfe) {
312                         logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
313                                 topic);
314                     }
315                 }
316
317                 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
318                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
319                 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
320                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
321                     try {
322                         fetchLimit = Integer.parseInt(fetchLimitString);
323                     } catch (NumberFormatException nfe) {
324                         logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
325                                 topic);
326                     }
327                 }
328
329                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
330                         + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
331                 boolean managed = true;
332                 if (managedString != null && !managedString.isEmpty()) {
333                     managed = Boolean.parseBoolean(managedString);
334                 }
335
336                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
337                         + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
338
339                 // default is to use HTTP if no https property exists
340                 boolean useHttps = false;
341                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
342                     useHttps = Boolean.parseBoolean(useHttpsString);
343                 }
344
345                 String allowSelfSignedCertsString =
346                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
347                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
348
349                 // default is to disallow self-signed certs
350                 boolean allowSelfSignedCerts = false;
351                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
352                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
353                 }
354
355
356                 DmaapTopicSource uebTopicSource = this.build(BusTopicParams.builder()
357                         .servers(serverList)
358                         .topic(topic)
359                         .apiKey(apiKey)
360                         .apiSecret(apiSecret)
361                         .userName(aafMechId)
362                         .password(aafPassword)
363                         .consumerGroup(consumerGroup)
364                         .consumerInstance(consumerInstance)
365                         .fetchTimeout(fetchTimeout)
366                         .fetchLimit(fetchLimit)
367                         .environment(dme2Environment)
368                         .aftEnvironment(dme2AftEnvironment)
369                         .partner(dme2Partner)
370                         .latitude(dme2Latitude)
371                         .longitude(dme2Longitude)
372                         .additionalProps(dme2AdditionalProps)
373                         .managed(managed)
374                         .useHttps(useHttps)
375                         .allowSelfSignedCerts(allowSelfSignedCerts)
376                         .build());
377
378                 dmaapTopicSourceLst.add(uebTopicSource);
379             }
380         }
381         return dmaapTopicSourceLst;
382     }
383
384     /**
385      * {@inheritDoc}
386      * 
387      * @throws IllegalArgumentException
388      */
389     @Override
390     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
391         return this.build(BusTopicParams.builder()
392                 .servers(servers)
393                 .topic(topic)
394                 .apiKey(apiKey)
395                 .apiSecret(apiSecret)
396                 .fetchTimeout(DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH)
397                 .fetchLimit(DmaapTopicSource.DEFAULT_LIMIT_FETCH)
398                 .managed(true)
399                 .useHttps(false)
400                 .allowSelfSignedCerts(false)
401                 .build());
402     }
403
404     /**
405      * {@inheritDoc}
406      * 
407      * @throws IllegalArgumentException
408      */
409     @Override
410     public DmaapTopicSource build(List<String> servers, String topic) {
411         return this.build(servers, topic, null, null);
412     }
413
414     /**
415      * {@inheritDoc}
416      */
417     @Override
418     public void destroy(String topic) {
419
420         if (topic == null || topic.isEmpty()) {
421             throw new IllegalArgumentException(MISSING_TOPIC);
422         }
423
424         DmaapTopicSource uebTopicSource;
425
426         synchronized (this) {
427             if (!dmaapTopicSources.containsKey(topic)) {
428                 return;
429             }
430
431             uebTopicSource = dmaapTopicSources.remove(topic);
432         }
433
434         uebTopicSource.shutdown();
435     }
436
437     @Override
438     public void destroy() {
439         List<DmaapTopicSource> readers = this.inventory();
440         for (DmaapTopicSource reader : readers) {
441             reader.shutdown();
442         }
443
444         synchronized (this) {
445             this.dmaapTopicSources.clear();
446         }
447     }
448
449     /**
450      * {@inheritDoc}
451      */
452     @Override
453     public DmaapTopicSource get(String topic) {
454
455         if (topic == null || topic.isEmpty()) {
456             throw new IllegalArgumentException(MISSING_TOPIC);
457         }
458
459         synchronized (this) {
460             if (dmaapTopicSources.containsKey(topic)) {
461                 return dmaapTopicSources.get(topic);
462             } else {
463                 throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found");
464             }
465         }
466     }
467
468     @Override
469     public synchronized List<DmaapTopicSource> inventory() {
470         return new ArrayList<>(this.dmaapTopicSources.values());
471     }
472
473     @Override
474     public String toString() {
475         StringBuilder builder = new StringBuilder();
476         builder.append("IndexedDmaapTopicSourceFactory []");
477         return builder.toString();
478     }
479
480 }
481