ddc3321fa21e4f0bcd46ef165e7f75c7324897c4
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Properties;
29
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
32 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Factory of DMAAP Source Topics indexed by topic name.
38  */
39
40 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
41     private static final String MISSING_TOPIC = "A topic must be provided";
42
43     /**
44      * Logger.
45      */
46     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
47
48     /**
49      * DMaaP Topic Name Index.
50      */
51     protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
52
53     @Override
54     public DmaapTopicSource build(BusTopicParams busTopicParams) {
55
56         if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
57             throw new IllegalArgumentException(MISSING_TOPIC);
58         }
59
60         synchronized (this) {
61             if (dmaapTopicSources.containsKey(busTopicParams.getTopic())) {
62                 return dmaapTopicSources.get(busTopicParams.getTopic());
63             }
64
65             DmaapTopicSource dmaapTopicSource = makeSource(busTopicParams);
66
67             if (busTopicParams.isManaged()) {
68                 dmaapTopicSources.put(busTopicParams.getTopic(), dmaapTopicSource);
69             }
70             return dmaapTopicSource;
71         }
72     }
73
74     @Override
75     public List<DmaapTopicSource> build(Properties properties) {
76
77         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
78         if (readTopics == null || readTopics.isEmpty()) {
79             logger.info("{}: no topic for DMaaP Source", this);
80             return new ArrayList<>();
81         }
82         List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
83
84         List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
85         synchronized (this) {
86             for (String topic : readTopicList) {
87                 if (this.dmaapTopicSources.containsKey(topic)) {
88                     dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
89                     continue;
90                 }
91
92                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
93                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
94
95                 List<String> serverList;
96                 if (servers != null && !servers.isEmpty()) {
97                     serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
98                 } else {
99                     serverList = new ArrayList<>();
100                 }
101
102                 final String effectiveTopic = properties.getProperty(
103                     PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
104                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic);
105
106                 final String apiKey = properties.getProperty(
107                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
108                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
109
110                 final String apiSecret = properties.getProperty(
111                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
112                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
113
114                 final String aafMechId = properties.getProperty(
115                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
116                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
117
118                 final String aafPassword = properties.getProperty(
119                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
120                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
121
122                 final String consumerGroup = properties.getProperty(
123                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
124                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
125
126                 final String consumerInstance = properties.getProperty(
127                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
128                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
129
130                 final String fetchTimeoutString = properties.getProperty(
131                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
132                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
133
134                 /* DME2 Properties */
135
136                 final String dme2Environment = properties.getProperty(
137                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
138                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
139
140                 final String dme2AftEnvironment = properties.getProperty(
141                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
142                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
143
144                 final String dme2Partner = properties.getProperty(
145                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
146                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
147
148                 final String dme2RouteOffer = properties.getProperty(
149                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
150                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
151
152                 final String dme2Latitude = properties.getProperty(
153                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
154                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
155
156                 final String dme2Longitude = properties.getProperty(
157                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
158                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
159
160                 final String dme2EpReadTimeoutMs =
161                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
162                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
163
164                 final String dme2EpConnTimeout = properties.getProperty(
165                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
166                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
167
168                 final String dme2RoundtripTimeoutMs =
169                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
170                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
171
172                 final String dme2Version = properties.getProperty(
173                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
174                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
175
176                 final String dme2SubContextPath = properties.getProperty(
177                                 PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
178                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
179
180                 final String dme2SessionStickinessRequired =
181                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
182                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
183
184                 Map<String, String> dme2AdditionalProps = new HashMap<>();
185
186                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
187                     dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
188                 }
189                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
190                     dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
191                 }
192                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
193                     dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
194                 }
195                 if (dme2Version != null && !dme2Version.isEmpty()) {
196                     dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
197                 }
198                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
199                     dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
200                 }
201                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
202                     dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
203                 }
204                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
205                     dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
206                 }
207
208                 if (servers == null || servers.isEmpty()) {
209                     logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
210                     continue;
211                 }
212
213                 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
214                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
215                     try {
216                         fetchTimeout = Integer.parseInt(fetchTimeoutString);
217                     } catch (NumberFormatException nfe) {
218                         logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
219                                 topic);
220                     }
221                 }
222
223                 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
224                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
225                 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
226                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
227                     try {
228                         fetchLimit = Integer.parseInt(fetchLimitString);
229                     } catch (NumberFormatException nfe) {
230                         logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
231                                 topic);
232                     }
233                 }
234
235                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
236                         + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
237                 boolean managed = true;
238                 if (managedString != null && !managedString.isEmpty()) {
239                     managed = Boolean.parseBoolean(managedString);
240                 }
241
242                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
243                         + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
244
245                 // default is to use HTTP if no https property exists
246                 boolean useHttps = false;
247                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
248                     useHttps = Boolean.parseBoolean(useHttpsString);
249                 }
250
251                 String allowSelfSignedCertsString =
252                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
253                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
254
255                 // default is to disallow self-signed certs
256                 boolean allowSelfSignedCerts = false;
257                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
258                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
259                 }
260
261
262                 DmaapTopicSource uebTopicSource = this.build(BusTopicParams.builder()
263                         .servers(serverList)
264                         .topic(topic)
265                         .effectiveTopic(effectiveTopic)
266                         .apiKey(apiKey)
267                         .apiSecret(apiSecret)
268                         .userName(aafMechId)
269                         .password(aafPassword)
270                         .consumerGroup(consumerGroup)
271                         .consumerInstance(consumerInstance)
272                         .fetchTimeout(fetchTimeout)
273                         .fetchLimit(fetchLimit)
274                         .environment(dme2Environment)
275                         .aftEnvironment(dme2AftEnvironment)
276                         .partner(dme2Partner)
277                         .latitude(dme2Latitude)
278                         .longitude(dme2Longitude)
279                         .additionalProps(dme2AdditionalProps)
280                         .managed(managed)
281                         .useHttps(useHttps)
282                         .allowSelfSignedCerts(allowSelfSignedCerts)
283                         .build());
284
285                 dmaapTopicSourceLst.add(uebTopicSource);
286             }
287         }
288         return dmaapTopicSourceLst;
289     }
290
291     @Override
292     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
293         return this.build(BusTopicParams.builder()
294                 .servers(servers)
295                 .topic(topic)
296                 .apiKey(apiKey)
297                 .apiSecret(apiSecret)
298                 .fetchTimeout(DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH)
299                 .fetchLimit(DmaapTopicSource.DEFAULT_LIMIT_FETCH)
300                 .managed(true)
301                 .useHttps(false)
302                 .allowSelfSignedCerts(false)
303                 .build());
304     }
305
306     @Override
307     public DmaapTopicSource build(List<String> servers, String topic) {
308         return this.build(servers, topic, null, null);
309     }
310
311     /**
312      * Makes a new source.
313      * 
314      * @param busTopicParams parameters to use to configure the source
315      * @return a new source
316      */
317     protected DmaapTopicSource makeSource(BusTopicParams busTopicParams) {
318         return new SingleThreadedDmaapTopicSource(busTopicParams);
319     }
320
321     @Override
322     public void destroy(String topic) {
323
324         if (topic == null || topic.isEmpty()) {
325             throw new IllegalArgumentException(MISSING_TOPIC);
326         }
327
328         DmaapTopicSource uebTopicSource;
329
330         synchronized (this) {
331             if (!dmaapTopicSources.containsKey(topic)) {
332                 return;
333             }
334
335             uebTopicSource = dmaapTopicSources.remove(topic);
336         }
337
338         uebTopicSource.shutdown();
339     }
340
341     @Override
342     public void destroy() {
343         List<DmaapTopicSource> readers = this.inventory();
344         for (DmaapTopicSource reader : readers) {
345             reader.shutdown();
346         }
347
348         synchronized (this) {
349             this.dmaapTopicSources.clear();
350         }
351     }
352
353     @Override
354     public DmaapTopicSource get(String topic) {
355
356         if (topic == null || topic.isEmpty()) {
357             throw new IllegalArgumentException(MISSING_TOPIC);
358         }
359
360         synchronized (this) {
361             if (dmaapTopicSources.containsKey(topic)) {
362                 return dmaapTopicSources.get(topic);
363             } else {
364                 throw new IllegalStateException("DmaapTopiceSource for " + topic + " not found");
365             }
366         }
367     }
368
369     @Override
370     public synchronized List<DmaapTopicSource> inventory() {
371         return new ArrayList<>(this.dmaapTopicSources.values());
372     }
373
374     @Override
375     public String toString() {
376         return "IndexedDmaapTopicSourceFactory []";
377     }
378
379 }