08a1db8f9d61225dc6e9d148c8486c8cabed2447
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus;
23
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Properties;
30
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink;
33 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * DMAAP Topic Sink Factory
39  */
40 public interface DmaapTopicSinkFactory {
41     public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
42     public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
43     public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
44     public final String DME2_VERSION_PROPERTY = "Version";
45     public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
46     public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
47     public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
48     public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
49
50     /**
51      * Instantiates a new DMAAP Topic Sink
52      *
53      * @param servers         list of servers
54      * @param topic           topic name
55      * @param apiKey          API Key
56      * @param apiSecret       API Secret
57      * @param userName        AAF user name
58      * @param password        AAF password
59      * @param partitionKey    Consumer Group
60      * @param environment     DME2 environment
61      * @param aftEnvironment  DME2 AFT environment
62      * @param partner         DME2 Partner
63      * @param latitude        DME2 latitude
64      * @param longitude       DME2 longitude
65      * @param additionalProps additional properties to pass to DME2
66      * @param managed         is this sink endpoint managed?
67      * @return an DMAAP Topic Sink
68      * @throws IllegalArgumentException if invalid parameters are present
69      */
70     public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
71                                 String password, String partitionKey, String environment, String aftEnvironment, String partner,
72                                 String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps,
73                                 boolean allowSelfSignedCerts);
74
75     /**
76      * Instantiates a new DMAAP Topic Sink
77      *
78      * @param servers      list of servers
79      * @param topic        topic name
80      * @param apiKey       API Key
81      * @param apiSecret    API Secret
82      * @param userName     AAF user name
83      * @param password     AAF password
84      * @param partitionKey Consumer Group
85      * @param managed      is this sink endpoint managed?
86      * @return an DMAAP Topic Sink
87      * @throws IllegalArgumentException if invalid parameters are present
88      */
89     public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
90                                 String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
91
92     /**
93      * Creates an DMAAP Topic Sink based on properties files
94      *
95      * @param properties Properties containing initialization values
96      * @return an DMAAP Topic Sink
97      * @throws IllegalArgumentException if invalid parameters are present
98      */
99     public List<DmaapTopicSink> build(Properties properties);
100
101     /**
102      * Instantiates a new DMAAP Topic Sink
103      *
104      * @param servers list of servers
105      * @param topic   topic name
106      * @return an DMAAP Topic Sink
107      * @throws IllegalArgumentException if invalid parameters are present
108      */
109     public DmaapTopicSink build(List<String> servers, String topic);
110
111     /**
112      * Destroys an DMAAP Topic Sink based on a topic
113      *
114      * @param topic topic name
115      * @throws IllegalArgumentException if invalid parameters are present
116      */
117     public void destroy(String topic);
118
119     /**
120      * gets an DMAAP Topic Sink based on topic name
121      *
122      * @param topic the topic name
123      * @return an DMAAP Topic Sink with topic name
124      * @throws IllegalArgumentException if an invalid topic is provided
125      * @throws IllegalStateException    if the DMAAP Topic Reader is an incorrect state
126      */
127     public DmaapTopicSink get(String topic);
128
129     /**
130      * Provides a snapshot of the DMAAP Topic Sinks
131      *
132      * @return a list of the DMAAP Topic Sinks
133      */
134     public List<DmaapTopicSink> inventory();
135
136     /**
137      * Destroys all DMAAP Topic Sinks
138      */
139     public void destroy();
140 }
141
142
143 /* ------------- implementation ----------------- */
144
145 /**
146  * Factory of DMAAP Reader Topics indexed by topic name
147  */
148 class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
149     private static final String MISSING_TOPIC = "A topic must be provided";
150
151     /**
152      * Logger
153      */
154     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);
155
156     /**
157      * DMAAP Topic Name Index
158      */
159     protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
160
161     @Override
162     public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
163                                 String password, String partitionKey, String environment, String aftEnvironment, String partner,
164                                 String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps,
165                                 boolean allowSelfSignedCerts) {
166
167         if (topic == null || topic.isEmpty()) {
168             throw new IllegalArgumentException(MISSING_TOPIC);
169         }
170
171         synchronized (this) {
172             if (dmaapTopicWriters.containsKey(topic)) {
173                 return dmaapTopicWriters.get(topic);
174             }
175
176             DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder()
177                     .servers(servers)
178                     .topic(topic)
179                     .apiKey(apiKey)
180                     .apiSecret(apiSecret)
181                     .userName(userName)
182                     .password(password)
183                     .partitionId(partitionKey)
184                     .environment(environment)
185                     .aftEnvironment(aftEnvironment)
186                     .partner(partner)
187                     .latitude(latitude)
188                     .longitude(longitude)
189                     .additionalProps(additionalProps)
190                     .useHttps(useHttps)
191                     .allowSelfSignedCerts(allowSelfSignedCerts)
192                     .build());
193
194             if (managed) {
195                 dmaapTopicWriters.put(topic, dmaapTopicSink);
196             }
197             return dmaapTopicSink;
198         }
199     }
200
201     @Override
202     public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
203                                 String password, String partitionKey, boolean managed, boolean useHttps,
204                                 boolean allowSelfSignedCerts) {
205
206         if (topic == null || topic.isEmpty()) {
207             throw new IllegalArgumentException(MISSING_TOPIC);
208         }
209
210         synchronized (this) {
211             if (dmaapTopicWriters.containsKey(topic)) {
212                 return dmaapTopicWriters.get(topic);
213             }
214
215             DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder()
216                     .servers(servers)
217                     .topic(topic)
218                     .apiKey(apiKey)
219                     .apiSecret(apiSecret)
220                     .userName(userName)
221                     .password(password)
222                     .partitionId(partitionKey)
223                     .useHttps(useHttps)
224                     .allowSelfSignedCerts(allowSelfSignedCerts)
225                     .build());
226
227             if (managed) {
228                 dmaapTopicWriters.put(topic, dmaapTopicSink);
229             }
230             return dmaapTopicSink;
231         }
232     }
233
234     @Override
235     public DmaapTopicSink build(List<String> servers, String topic) {
236         return this.build(servers, topic, null, null, null, null, null, true, false, false);
237     }
238
239     @Override
240     public List<DmaapTopicSink> build(Properties properties) {
241
242         String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
243         if (writeTopics == null || writeTopics.isEmpty()) {
244             logger.info("{}: no topic for DMaaP Sink", this);
245             return new ArrayList<>();
246         }
247
248         List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
249         List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>();
250         synchronized (this) {
251             for (String topic : writeTopicList) {
252                 if (this.dmaapTopicWriters.containsKey(topic)) {
253                     newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
254                     continue;
255                 }
256                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
257                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
258
259                 List<String> serverList;
260                 if (servers != null && !servers.isEmpty()) {
261                     serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
262                 } else {
263                     serverList = new ArrayList<>();
264                 }
265
266                 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
267                         + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
268                 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
269                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
270
271                 String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
272                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
273                 String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
274                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
275
276                 String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
277                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
278
279                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
280                         + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
281
282                 /* DME2 Properties */
283
284                 String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
285                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
286
287                 String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
288                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
289
290                 String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
291                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
292
293                 String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
294                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
295
296                 String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
297                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
298
299                 String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
300                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
301
302                 String dme2EpReadTimeoutMs = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
303                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
304
305                 String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
306                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
307
308                 String dme2RoundtripTimeoutMs =
309                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
310                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
311
312                 String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
313                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
314
315                 String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
316                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
317
318                 String dme2SessionStickinessRequired =
319                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
320                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
321
322                 Map<String, String> dme2AdditionalProps = new HashMap<>();
323
324                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
325                     dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
326                 }
327                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
328                     dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
329                 }
330                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
331                     dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
332                 }
333                 if (dme2Version != null && !dme2Version.isEmpty()) {
334                     dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
335                 }
336                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
337                     dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
338                 }
339                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
340                     dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
341                 }
342                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
343                     dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
344                 }
345
346                 if (servers == null || servers.isEmpty()) {
347                     logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
348                     continue;
349                 }
350
351                 boolean managed = true;
352                 if (managedString != null && !managedString.isEmpty()) {
353                     managed = Boolean.parseBoolean(managedString);
354                 }
355
356                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
357                         + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
358
359                 // default is to use HTTP if no https property exists
360                 boolean useHttps = false;
361                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
362                     useHttps = Boolean.parseBoolean(useHttpsString);
363                 }
364
365
366                 String allowSelfSignedCertsString =
367                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
368                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
369
370                 // default is to disallow self-signed certs
371                 boolean allowSelfSignedCerts = false;
372                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
373                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
374                 }
375
376                 DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, apiKey, apiSecret, aafMechId, aafPassword,
377                         partitionKey, dme2Environment, dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude,
378                         dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts);
379
380                 newDmaapTopicSinks.add(dmaapTopicSink);
381             }
382             return newDmaapTopicSinks;
383         }
384     }
385
386     @Override
387     public void destroy(String topic) {
388
389         if (topic == null || topic.isEmpty()) {
390             throw new IllegalArgumentException(MISSING_TOPIC);
391         }
392
393         DmaapTopicSink dmaapTopicWriter;
394         synchronized (this) {
395             if (!dmaapTopicWriters.containsKey(topic)) {
396                 return;
397             }
398
399             dmaapTopicWriter = dmaapTopicWriters.remove(topic);
400         }
401
402         dmaapTopicWriter.shutdown();
403     }
404
405     @Override
406     public void destroy() {
407         List<DmaapTopicSink> writers = this.inventory();
408         for (DmaapTopicSink writer : writers) {
409             writer.shutdown();
410         }
411
412         synchronized (this) {
413             this.dmaapTopicWriters.clear();
414         }
415     }
416
417     @Override
418     public DmaapTopicSink get(String topic) {
419
420         if (topic == null || topic.isEmpty()) {
421             throw new IllegalArgumentException(MISSING_TOPIC);
422         }
423
424         synchronized (this) {
425             if (dmaapTopicWriters.containsKey(topic)) {
426                 return dmaapTopicWriters.get(topic);
427             } else {
428                 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
429             }
430         }
431     }
432
433     @Override
434     public synchronized List<DmaapTopicSink> inventory() {
435         return new ArrayList<>(this.dmaapTopicWriters.values());
436     }
437
438     @Override
439     public String toString() {
440         StringBuilder builder = new StringBuilder();
441         builder.append("IndexedDmaapTopicSinkFactory []");
442         return builder.toString();
443     }
444
445 }