206018a9ea317fe46eb2f03d5efa9e275b0780e2
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus;
23
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Properties;
30
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink;
33 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * DMAAP Topic Sink Factory.
39  */
40 public interface DmaapTopicSinkFactory {
41     public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
42     public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
43     public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
44     public final String DME2_VERSION_PROPERTY = "Version";
45     public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
46     public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
47     public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
48     public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
49
50     /**
51      * Instantiates a new DMAAP Topic Sink.
52      *
53      * @param servers         list of servers
54      * @param topic           topic name
55      * @param apiKey          API Key
56      * @param apiSecret       API Secret
57      * @param userName        AAF user name
58      * @param password        AAF password
59      * @param partitionKey    Consumer Group
60      * @param environment     DME2 environment
61      * @param aftEnvironment  DME2 AFT environment
62      * @param partner         DME2 Partner
63      * @param latitude        DME2 latitude
64      * @param longitude       DME2 longitude
65      * @param additionalProps additional properties to pass to DME2
66      * @param managed         is this sink endpoint managed?
67      * @return an DMAAP Topic Sink
68      * @throws IllegalArgumentException if invalid parameters are present
69      */
70     public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
71                                 String password, String partitionKey, String environment, String aftEnvironment, 
72                                 String partner, String latitude, String longitude, Map<String, String> additionalProps,
73                                 boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
74
75     /**
76      * Instantiates a new DMAAP Topic Sink.
77      *
78      * @param servers      list of servers
79      * @param topic        topic name
80      * @param apiKey       API Key
81      * @param apiSecret    API Secret
82      * @param userName     AAF user name
83      * @param password     AAF password
84      * @param partitionKey Consumer Group
85      * @param managed      is this sink endpoint managed?
86      * @return an DMAAP Topic Sink
87      * @throws IllegalArgumentException if invalid parameters are present
88      */
89     public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
90                                 String password, String partitionKey, boolean managed, 
91                                 boolean useHttps, boolean allowSelfSignedCerts);
92
93     /**
94      * Creates an DMAAP Topic Sink based on properties files.
95      *
96      * @param properties Properties containing initialization values
97      * @return an DMAAP Topic Sink
98      * @throws IllegalArgumentException if invalid parameters are present
99      */
100     public List<DmaapTopicSink> build(Properties properties);
101
102     /**
103      * Instantiates a new DMAAP Topic Sink.
104      *
105      * @param servers list of servers
106      * @param topic   topic name
107      * @return an DMAAP Topic Sink
108      * @throws IllegalArgumentException if invalid parameters are present
109      */
110     public DmaapTopicSink build(List<String> servers, String topic);
111
112     /**
113      * Destroys an DMAAP Topic Sink based on a topic.
114      *
115      * @param topic topic name
116      * @throws IllegalArgumentException if invalid parameters are present
117      */
118     public void destroy(String topic);
119
120     /**
121      * Destroys all DMAAP Topic Sinks.
122      */
123     public void destroy();
124
125     /**
126      * Gets an DMAAP Topic Sink based on topic name.
127      *
128      * @param topic the topic name
129      * @return an DMAAP Topic Sink with topic name
130      * @throws IllegalArgumentException if an invalid topic is provided
131      * @throws IllegalStateException    if the DMAAP Topic Reader is an incorrect state
132      */
133     public DmaapTopicSink get(String topic);
134
135     /**
136      * Provides a snapshot of the DMAAP Topic Sinks.
137      *
138      * @return a list of the DMAAP Topic Sinks
139      */
140     public List<DmaapTopicSink> inventory();
141 }
142
143
144 /* ------------- implementation ----------------- */
145
146 /**
147  * Factory of DMAAP Reader Topics indexed by topic name.
148  */
149 class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
150     private static final String MISSING_TOPIC = "A topic must be provided";
151
152     /**
153      * Logger.
154      */
155     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);
156
157     /**
158      * DMAAP Topic Name Index.
159      */
160     protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
161
162     @Override
163     public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
164                                 String password, String partitionKey, String environment, String aftEnvironment, 
165                                 String partner, String latitude, String longitude, Map<String, String> additionalProps,
166                                 boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
167
168         if (topic == null || topic.isEmpty()) {
169             throw new IllegalArgumentException(MISSING_TOPIC);
170         }
171
172         synchronized (this) {
173             if (dmaapTopicWriters.containsKey(topic)) {
174                 return dmaapTopicWriters.get(topic);
175             }
176
177             DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder()
178                     .servers(servers)
179                     .topic(topic)
180                     .apiKey(apiKey)
181                     .apiSecret(apiSecret)
182                     .userName(userName)
183                     .password(password)
184                     .partitionId(partitionKey)
185                     .environment(environment)
186                     .aftEnvironment(aftEnvironment)
187                     .partner(partner)
188                     .latitude(latitude)
189                     .longitude(longitude)
190                     .additionalProps(additionalProps)
191                     .useHttps(useHttps)
192                     .allowSelfSignedCerts(allowSelfSignedCerts)
193                     .build());
194
195             if (managed) {
196                 dmaapTopicWriters.put(topic, dmaapTopicSink);
197             }
198             return dmaapTopicSink;
199         }
200     }
201
202     @Override
203     public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
204                                 String password, String partitionKey, boolean managed, boolean useHttps,
205                                 boolean allowSelfSignedCerts) {
206
207         if (topic == null || topic.isEmpty()) {
208             throw new IllegalArgumentException(MISSING_TOPIC);
209         }
210
211         synchronized (this) {
212             if (dmaapTopicWriters.containsKey(topic)) {
213                 return dmaapTopicWriters.get(topic);
214             }
215
216             DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder()
217                     .servers(servers)
218                     .topic(topic)
219                     .apiKey(apiKey)
220                     .apiSecret(apiSecret)
221                     .userName(userName)
222                     .password(password)
223                     .partitionId(partitionKey)
224                     .useHttps(useHttps)
225                     .allowSelfSignedCerts(allowSelfSignedCerts)
226                     .build());
227
228             if (managed) {
229                 dmaapTopicWriters.put(topic, dmaapTopicSink);
230             }
231             return dmaapTopicSink;
232         }
233     }
234
235     @Override
236     public DmaapTopicSink build(List<String> servers, String topic) {
237         return this.build(servers, topic, null, null, null, null, null, true, false, false);
238     }
239
240     @Override
241     public List<DmaapTopicSink> build(Properties properties) {
242
243         String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
244         if (writeTopics == null || writeTopics.isEmpty()) {
245             logger.info("{}: no topic for DMaaP Sink", this);
246             return new ArrayList<>();
247         }
248
249         List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
250         List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>();
251         synchronized (this) {
252             for (String topic : writeTopicList) {
253                 if (this.dmaapTopicWriters.containsKey(topic)) {
254                     newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
255                     continue;
256                 }
257                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
258                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
259
260                 List<String> serverList;
261                 if (servers != null && !servers.isEmpty()) {
262                     serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
263                 } else {
264                     serverList = new ArrayList<>();
265                 }
266
267                 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
268                         + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
269                 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
270                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
271
272                 String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
273                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
274                 String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
275                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
276
277                 String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
278                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
279
280                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
281                         + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
282
283                 /* DME2 Properties */
284
285                 String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
286                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
287
288                 String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
289                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
290
291                 String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
292                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
293
294                 String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
295                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
296
297                 String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
298                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
299
300                 String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
301                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
302
303                 String dme2EpReadTimeoutMs = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
304                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
305
306                 String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
307                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
308
309                 String dme2RoundtripTimeoutMs =
310                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
311                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
312
313                 String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
314                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
315
316                 String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
317                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
318
319                 String dme2SessionStickinessRequired =
320                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
321                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
322
323                 Map<String, String> dme2AdditionalProps = new HashMap<>();
324
325                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
326                     dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
327                 }
328                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
329                     dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
330                 }
331                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
332                     dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
333                 }
334                 if (dme2Version != null && !dme2Version.isEmpty()) {
335                     dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
336                 }
337                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
338                     dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
339                 }
340                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
341                     dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
342                 }
343                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
344                     dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
345                 }
346
347                 if (servers == null || servers.isEmpty()) {
348                     logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
349                     continue;
350                 }
351
352                 boolean managed = true;
353                 if (managedString != null && !managedString.isEmpty()) {
354                     managed = Boolean.parseBoolean(managedString);
355                 }
356
357                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
358                         + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
359
360                 // default is to use HTTP if no https property exists
361                 boolean useHttps = false;
362                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
363                     useHttps = Boolean.parseBoolean(useHttpsString);
364                 }
365
366
367                 String allowSelfSignedCertsString =
368                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
369                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
370
371                 // default is to disallow self-signed certs
372                 boolean allowSelfSignedCerts = false;
373                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
374                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
375                 }
376
377                 DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, apiKey, apiSecret, aafMechId, aafPassword,
378                         partitionKey, dme2Environment, dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude,
379                         dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts);
380
381                 newDmaapTopicSinks.add(dmaapTopicSink);
382             }
383             return newDmaapTopicSinks;
384         }
385     }
386
387     @Override
388     public void destroy(String topic) {
389
390         if (topic == null || topic.isEmpty()) {
391             throw new IllegalArgumentException(MISSING_TOPIC);
392         }
393
394         DmaapTopicSink dmaapTopicWriter;
395         synchronized (this) {
396             if (!dmaapTopicWriters.containsKey(topic)) {
397                 return;
398             }
399
400             dmaapTopicWriter = dmaapTopicWriters.remove(topic);
401         }
402
403         dmaapTopicWriter.shutdown();
404     }
405
406     @Override
407     public void destroy() {
408         List<DmaapTopicSink> writers = this.inventory();
409         for (DmaapTopicSink writer : writers) {
410             writer.shutdown();
411         }
412
413         synchronized (this) {
414             this.dmaapTopicWriters.clear();
415         }
416     }
417
418     @Override
419     public DmaapTopicSink get(String topic) {
420
421         if (topic == null || topic.isEmpty()) {
422             throw new IllegalArgumentException(MISSING_TOPIC);
423         }
424
425         synchronized (this) {
426             if (dmaapTopicWriters.containsKey(topic)) {
427                 return dmaapTopicWriters.get(topic);
428             } else {
429                 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
430             }
431         }
432     }
433
434     @Override
435     public synchronized List<DmaapTopicSink> inventory() {
436         return new ArrayList<>(this.dmaapTopicWriters.values());
437     }
438
439     @Override
440     public String toString() {
441         StringBuilder builder = new StringBuilder();
442         builder.append("IndexedDmaapTopicSinkFactory []");
443         return builder.toString();
444     }
445
446 }