0c008f116f6a3affc7dda3be797e501b480f7ca5
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP Policy Engine - Common Modules
4  * ================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Properties;
29
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
32 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Factory of DMAAP Source Topics indexed by topic name.
38  */
39
40 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
41     private static final String MISSING_TOPIC = "A topic must be provided";
42
43     /**
44      * Logger.
45      */
46     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
47
48     /**
49      * DMaaP Topic Name Index.
50      */
51     protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
52
53     @Override
54     public DmaapTopicSource build(BusTopicParams busTopicParams) {
55
56         if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
57             throw new IllegalArgumentException(MISSING_TOPIC);
58         }
59
60         synchronized (this) {
61             if (dmaapTopicSources.containsKey(busTopicParams.getTopic())) {
62                 return dmaapTopicSources.get(busTopicParams.getTopic());
63             }
64
65             DmaapTopicSource dmaapTopicSource = makeSource(busTopicParams);
66
67             if (busTopicParams.isManaged()) {
68                 dmaapTopicSources.put(busTopicParams.getTopic(), dmaapTopicSource);
69             }
70             return dmaapTopicSource;
71         }
72     }
73
74     @Override
75     public List<DmaapTopicSource> build(Properties properties) {
76
77         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
78         if (readTopics == null || readTopics.isEmpty()) {
79             logger.info("{}: no topic for DMaaP Source", this);
80             return new ArrayList<>();
81         }
82         List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
83
84         List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
85         synchronized (this) {
86             for (String topic : readTopicList) {
87                 if (this.dmaapTopicSources.containsKey(topic)) {
88                     dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
89                     continue;
90                 }
91
92                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
93                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
94
95                 List<String> serverList;
96                 if (servers != null && !servers.isEmpty()) {
97                     serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
98                 } else {
99                     serverList = new ArrayList<>();
100                 }
101
102                 final String apiKey = properties.getProperty(
103                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
104                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
105
106                 final String apiSecret = properties.getProperty(
107                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
108                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
109
110                 final String aafMechId = properties.getProperty(
111                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
112                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
113
114                 final String aafPassword = properties.getProperty(
115                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
116                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
117
118                 final String consumerGroup = properties.getProperty(
119                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
120                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
121
122                 final String consumerInstance = properties.getProperty(
123                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
124                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
125
126                 final String fetchTimeoutString = properties.getProperty(
127                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
128                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
129
130                 /* DME2 Properties */
131
132                 final String dme2Environment = properties.getProperty(
133                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
134                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
135
136                 final String dme2AftEnvironment = properties.getProperty(
137                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
138                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
139
140                 final String dme2Partner = properties.getProperty(
141                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
142                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
143
144                 final String dme2RouteOffer = properties.getProperty(
145                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
146                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
147
148                 final String dme2Latitude = properties.getProperty(
149                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
150                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
151
152                 final String dme2Longitude = properties.getProperty(
153                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
154                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
155
156                 final String dme2EpReadTimeoutMs =
157                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
158                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
159
160                 final String dme2EpConnTimeout = properties.getProperty(
161                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
162                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
163
164                 final String dme2RoundtripTimeoutMs =
165                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
166                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
167
168                 final String dme2Version = properties.getProperty(
169                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
170                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
171
172                 final String dme2SubContextPath = properties.getProperty(
173                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
174                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
175
176                 final String dme2SessionStickinessRequired =
177                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
178                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
179
180                 Map<String, String> dme2AdditionalProps = new HashMap<>();
181
182                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
183                     dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
184                 }
185                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
186                     dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
187                 }
188                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
189                     dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
190                 }
191                 if (dme2Version != null && !dme2Version.isEmpty()) {
192                     dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
193                 }
194                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
195                     dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
196                 }
197                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
198                     dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
199                 }
200                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
201                     dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
202                 }
203
204
205                 if (servers == null || servers.isEmpty()) {
206
207                     logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
208                     continue;
209                 }
210
211                 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
212                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
213                     try {
214                         fetchTimeout = Integer.parseInt(fetchTimeoutString);
215                     } catch (NumberFormatException nfe) {
216                         logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
217                                 topic);
218                     }
219                 }
220
221                 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
222                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
223                 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
224                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
225                     try {
226                         fetchLimit = Integer.parseInt(fetchLimitString);
227                     } catch (NumberFormatException nfe) {
228                         logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
229                                 topic);
230                     }
231                 }
232
233                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
234                         + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
235                 boolean managed = true;
236                 if (managedString != null && !managedString.isEmpty()) {
237                     managed = Boolean.parseBoolean(managedString);
238                 }
239
240                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
241                         + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
242
243                 // default is to use HTTP if no https property exists
244                 boolean useHttps = false;
245                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
246                     useHttps = Boolean.parseBoolean(useHttpsString);
247                 }
248
249                 String allowSelfSignedCertsString =
250                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
251                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
252
253                 // default is to disallow self-signed certs
254                 boolean allowSelfSignedCerts = false;
255                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
256                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
257                 }
258
259
260                 DmaapTopicSource uebTopicSource = this.build(BusTopicParams.builder()
261                         .servers(serverList)
262                         .topic(topic)
263                         .apiKey(apiKey)
264                         .apiSecret(apiSecret)
265                         .userName(aafMechId)
266                         .password(aafPassword)
267                         .consumerGroup(consumerGroup)
268                         .consumerInstance(consumerInstance)
269                         .fetchTimeout(fetchTimeout)
270                         .fetchLimit(fetchLimit)
271                         .environment(dme2Environment)
272                         .aftEnvironment(dme2AftEnvironment)
273                         .partner(dme2Partner)
274                         .latitude(dme2Latitude)
275                         .longitude(dme2Longitude)
276                         .additionalProps(dme2AdditionalProps)
277                         .managed(managed)
278                         .useHttps(useHttps)
279                         .allowSelfSignedCerts(allowSelfSignedCerts)
280                         .build());
281
282                 dmaapTopicSourceLst.add(uebTopicSource);
283             }
284         }
285         return dmaapTopicSourceLst;
286     }
287
288     @Override
289     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
290         return this.build(BusTopicParams.builder()
291                 .servers(servers)
292                 .topic(topic)
293                 .apiKey(apiKey)
294                 .apiSecret(apiSecret)
295                 .fetchTimeout(DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH)
296                 .fetchLimit(DmaapTopicSource.DEFAULT_LIMIT_FETCH)
297                 .managed(true)
298                 .useHttps(false)
299                 .allowSelfSignedCerts(false)
300                 .build());
301     }
302
303     @Override
304     public DmaapTopicSource build(List<String> servers, String topic) {
305         return this.build(servers, topic, null, null);
306     }
307
308     /**
309      * Makes a new source.
310      * 
311      * @param busTopicParams parameters to use to configure the source
312      * @return a new source
313      */
314     protected DmaapTopicSource makeSource(BusTopicParams busTopicParams) {
315         return new SingleThreadedDmaapTopicSource(busTopicParams);
316     }
317
318     @Override
319     public void destroy(String topic) {
320
321         if (topic == null || topic.isEmpty()) {
322             throw new IllegalArgumentException(MISSING_TOPIC);
323         }
324
325         DmaapTopicSource uebTopicSource;
326
327         synchronized (this) {
328             if (!dmaapTopicSources.containsKey(topic)) {
329                 return;
330             }
331
332             uebTopicSource = dmaapTopicSources.remove(topic);
333         }
334
335         uebTopicSource.shutdown();
336     }
337
338     @Override
339     public void destroy() {
340         List<DmaapTopicSource> readers = this.inventory();
341         for (DmaapTopicSource reader : readers) {
342             reader.shutdown();
343         }
344
345         synchronized (this) {
346             this.dmaapTopicSources.clear();
347         }
348     }
349
350     @Override
351     public DmaapTopicSource get(String topic) {
352
353         if (topic == null || topic.isEmpty()) {
354             throw new IllegalArgumentException(MISSING_TOPIC);
355         }
356
357         synchronized (this) {
358             if (dmaapTopicSources.containsKey(topic)) {
359                 return dmaapTopicSources.get(topic);
360             } else {
361                 throw new IllegalStateException("DmaapTopiceSource for " + topic + " not found");
362             }
363         }
364     }
365
366     @Override
367     public synchronized List<DmaapTopicSource> inventory() {
368         return new ArrayList<>(this.dmaapTopicSources.values());
369     }
370
371     @Override
372     public String toString() {
373         return "IndexedDmaapTopicSourceFactory []";
374     }
375
376 }