08e8dfe8e0dce882662a8e7b43cd64022cbda26c
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus;
23
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Properties;
30
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink;
33 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * DMAAP Topic Sink Factory.
39  */
40 public interface DmaapTopicSinkFactory {
41     String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
42     String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
43     String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
44     String DME2_VERSION_PROPERTY = "Version";
45     String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
46     String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
47     String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
48     String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
49
50     /**
51      * Instantiate a new DMAAP Topic Sink, with following params.
52      * servers         list of servers
53      * topic           topic name
54      * apiKey          API Key
55      * apiSecret       API Secret
56      * userName        AAF user name
57      * password        AAF password
58      * partitionKey    Consumer Group
59      * environment     DME2 environment
60      * aftEnvironment  DME2 AFT environment
61      * partner         DME2 Partner
62      * latitude        DME2 latitude
63      * longitude       DME2 longitude
64      * additionalProps additional properties to pass to DME2
65      * managed         is this sink endpoint managed?
66      * @param busTopicParams parameter object
67      * @return DmaapTopicSink object
68      * @throws IllegalArgumentException if invalid parameters are present
69      */
70     DmaapTopicSink build(BusTopicParams busTopicParams);
71
72     /**
73      * Creates an DMAAP Topic Sink based on properties files.
74      *
75      * @param properties Properties containing initialization values
76      * @return an DMAAP Topic Sink
77      * @throws IllegalArgumentException if invalid parameters are present
78      */
79     List<DmaapTopicSink> build(Properties properties);
80
81     /**
82      * Instantiates a new DMAAP Topic Sink.
83      *
84      * @param servers list of servers
85      * @param topic   topic name
86      * @return an DMAAP Topic Sink
87      * @throws IllegalArgumentException if invalid parameters are present
88      */
89     DmaapTopicSink build(List<String> servers, String topic);
90
91     /**
92      * Destroys an DMAAP Topic Sink based on a topic.
93      *
94      * @param topic topic name
95      * @throws IllegalArgumentException if invalid parameters are present
96      */
97     void destroy(String topic);
98
99     /**
100      * Destroys all DMAAP Topic Sinks.
101      */
102     void destroy();
103
104     /**
105      * Gets an DMAAP Topic Sink based on topic name.
106      *
107      * @param topic the topic name
108      * @return an DMAAP Topic Sink with topic name
109      * @throws IllegalArgumentException if an invalid topic is provided
110      * @throws IllegalStateException    if the DMAAP Topic Reader is an incorrect state
111      */
112     DmaapTopicSink get(String topic);
113
114     /**
115      * Provides a snapshot of the DMAAP Topic Sinks.
116      *
117      * @return a list of the DMAAP Topic Sinks
118      */
119     List<DmaapTopicSink> inventory();
120 }
121
122
123 /* ------------- implementation ----------------- */
124
125 /**
126  * Factory of DMAAP Reader Topics indexed by topic name.
127  */
128 class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
129     private static final String MISSING_TOPIC = "A topic must be provided";
130
131     /**
132      * Logger.
133      */
134     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);
135
136     /**
137      * DMAAP Topic Name Index.
138      */
139     protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
140
141     @Override
142     public DmaapTopicSink build(BusTopicParams busTopicParams) {
143
144         if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
145             throw new IllegalArgumentException(MISSING_TOPIC);
146         }
147
148         synchronized (this) {
149             if (dmaapTopicWriters.containsKey(busTopicParams.getTopic())) {
150                 return dmaapTopicWriters.get(busTopicParams.getTopic());
151             }
152
153             DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(busTopicParams);
154
155             if (busTopicParams.isManaged()) {
156                 dmaapTopicWriters.put(busTopicParams.getTopic(), dmaapTopicSink);
157             }
158             return dmaapTopicSink;
159         }
160     }
161
162     @Override
163     public DmaapTopicSink build(List<String> servers, String topic) {
164         return this.build(BusTopicParams.builder()
165                 .servers(servers)
166                 .topic(topic)
167                 .managed(true)
168                 .useHttps(false)
169                 .allowSelfSignedCerts(false)
170                 .build());
171     }
172
173     @Override
174     public List<DmaapTopicSink> build(Properties properties) {
175
176         String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
177         if (writeTopics == null || writeTopics.isEmpty()) {
178             logger.info("{}: no topic for DMaaP Sink", this);
179             return new ArrayList<>();
180         }
181
182         List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
183         List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>();
184         synchronized (this) {
185             for (String topic : writeTopicList) {
186                 if (this.dmaapTopicWriters.containsKey(topic)) {
187                     newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
188                     continue;
189                 }
190                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
191                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
192
193                 List<String> serverList;
194                 if (servers != null && !servers.isEmpty()) {
195                     serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
196                 } else {
197                     serverList = new ArrayList<>();
198                 }
199
200                 final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS 
201                                 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
202                 final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS 
203                                 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
204
205                 final String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS 
206                                 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
207                 final String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS 
208                                 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
209
210                 final String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
211                                 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
212
213                 final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS 
214                                 + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
215
216                 /* DME2 Properties */
217
218                 final String dme2Environment = properties.getProperty(
219                                 PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
220                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
221
222                 final String dme2AftEnvironment = properties.getProperty(
223                                 PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
224                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
225
226                 final String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS 
227                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
228
229                 final String dme2RouteOffer = properties.getProperty(
230                                 PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS 
231                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
232
233                 final String dme2Latitude = properties.getProperty(
234                                 PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
235                                                 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
236
237                 final String dme2Longitude = properties.getProperty(
238                                 PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS 
239                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
240
241                 final String dme2EpReadTimeoutMs = properties.getProperty(
242                                 PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
243                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
244
245                 final String dme2EpConnTimeout = properties.getProperty(
246                                 PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
247                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
248
249                 final String dme2RoundtripTimeoutMs =
250                                 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS 
251                                 + "." + topic 
252                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
253
254                 final String dme2Version = properties.getProperty(
255                                 PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
256                                                 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
257
258                 final String dme2SubContextPath = properties.getProperty(
259                                 PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
260                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
261
262                 final String dme2SessionStickinessRequired =
263                                 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
264                                 + "." + topic
265                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
266
267                 Map<String, String> dme2AdditionalProps = new HashMap<>();
268
269                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
270                     dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
271                 }
272                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
273                     dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
274                 }
275                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
276                     dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
277                 }
278                 if (dme2Version != null && !dme2Version.isEmpty()) {
279                     dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
280                 }
281                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
282                     dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
283                 }
284                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
285                     dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
286                 }
287                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
288                     dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
289                 }
290
291                 if (servers == null || servers.isEmpty()) {
292                     logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
293                     continue;
294                 }
295
296                 boolean managed = true;
297                 if (managedString != null && !managedString.isEmpty()) {
298                     managed = Boolean.parseBoolean(managedString);
299                 }
300
301                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
302                         + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
303
304                 // default is to use HTTP if no https property exists
305                 boolean useHttps = false;
306                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
307                     useHttps = Boolean.parseBoolean(useHttpsString);
308                 }
309
310
311                 String allowSelfSignedCertsString =
312                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
313                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
314
315                 // default is to disallow self-signed certs
316                 boolean allowSelfSignedCerts = false;
317                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
318                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
319                 }
320
321                 DmaapTopicSink dmaapTopicSink = this.build(BusTopicParams.builder()
322                         .servers(serverList)
323                         .topic(topic)
324                         .apiKey(apiKey)
325                         .apiSecret(apiSecret)
326                         .userName(aafMechId)
327                         .password(aafPassword)
328                         .partitionId(partitionKey)
329                         .environment(dme2Environment)
330                         .aftEnvironment(dme2AftEnvironment)
331                         .partner(dme2Partner)
332                         .latitude(dme2Latitude)
333                         .longitude(dme2Longitude)
334                         .additionalProps(dme2AdditionalProps)
335                         .managed(managed)
336                         .useHttps(useHttps)
337                         .allowSelfSignedCerts(allowSelfSignedCerts)
338                         .build());
339
340                 newDmaapTopicSinks.add(dmaapTopicSink);
341             }
342             return newDmaapTopicSinks;
343         }
344     }
345
346     @Override
347     public void destroy(String topic) {
348
349         if (topic == null || topic.isEmpty()) {
350             throw new IllegalArgumentException(MISSING_TOPIC);
351         }
352
353         DmaapTopicSink dmaapTopicWriter;
354         synchronized (this) {
355             if (!dmaapTopicWriters.containsKey(topic)) {
356                 return;
357             }
358
359             dmaapTopicWriter = dmaapTopicWriters.remove(topic);
360         }
361
362         dmaapTopicWriter.shutdown();
363     }
364
365     @Override
366     public void destroy() {
367         List<DmaapTopicSink> writers = this.inventory();
368         for (DmaapTopicSink writer : writers) {
369             writer.shutdown();
370         }
371
372         synchronized (this) {
373             this.dmaapTopicWriters.clear();
374         }
375     }
376
377     @Override
378     public DmaapTopicSink get(String topic) {
379
380         if (topic == null || topic.isEmpty()) {
381             throw new IllegalArgumentException(MISSING_TOPIC);
382         }
383
384         synchronized (this) {
385             if (dmaapTopicWriters.containsKey(topic)) {
386                 return dmaapTopicWriters.get(topic);
387             } else {
388                 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
389             }
390         }
391     }
392
393     @Override
394     public synchronized List<DmaapTopicSink> inventory() {
395         return new ArrayList<>(this.dmaapTopicWriters.values());
396     }
397
398     @Override
399     public String toString() {
400         StringBuilder builder = new StringBuilder();
401         builder.append("IndexedDmaapTopicSinkFactory []");
402         return builder.toString();
403     }
404
405 }