4c36a39a80a7df9fd2a50f22e3365fabae5bec4a
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus;
23
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Properties;
30
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink;
33 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * DMAAP Topic Sink Factory.
39  */
40 public interface DmaapTopicSinkFactory {
41
42     String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
43     String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
44     String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
45     String DME2_VERSION_PROPERTY = "Version";
46     String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
47     String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
48     String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
49     String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
50
51     /**
52      * <pre>
53      * Instantiate a new DMAAP Topic Sink, with following params.
54      * servers         list of servers
55      * topic           topic name
56      * apiKey          API Key
57      * apiSecret       API Secret
58      * userName        AAF user name
59      * password        AAF password
60      * partitionKey    Consumer Group
61      * environment     DME2 environment
62      * aftEnvironment  DME2 AFT environment
63      * partner         DME2 Partner
64      * latitude        DME2 latitude
65      * longitude       DME2 longitude
66      * additionalProps additional properties to pass to DME2
67      * managed         is this sink endpoint managed?
68      * </pre>
69      * @param busTopicParams parameter object
70      * @return DmaapTopicSink object
71      * @throws IllegalArgumentException if invalid parameters are present
72      */
73     DmaapTopicSink build(BusTopicParams busTopicParams);
74
75     /**
76      * Creates an DMAAP Topic Sink based on properties files.
77      *
78      * @param properties Properties containing initialization values
79      * @return an DMAAP Topic Sink
80      * @throws IllegalArgumentException if invalid parameters are present
81      */
82     List<DmaapTopicSink> build(Properties properties);
83
84     /**
85      * Instantiates a new DMAAP Topic Sink.
86      *
87      * @param servers list of servers
88      * @param topic topic name
89      * @return an DMAAP Topic Sink
90      * @throws IllegalArgumentException if invalid parameters are present
91      */
92     DmaapTopicSink build(List<String> servers, String topic);
93
94     /**
95      * Destroys an DMAAP Topic Sink based on a topic.
96      *
97      * @param topic topic name
98      * @throws IllegalArgumentException if invalid parameters are present
99      */
100     void destroy(String topic);
101
102     /**
103      * Destroys all DMAAP Topic Sinks.
104      */
105     void destroy();
106
107     /**
108      * Gets an DMAAP Topic Sink based on topic name.
109      *
110      * @param topic the topic name
111      * @return an DMAAP Topic Sink with topic name
112      * @throws IllegalArgumentException if an invalid topic is provided
113      * @throws IllegalStateException if the DMAAP Topic Reader is an incorrect state
114      */
115     DmaapTopicSink get(String topic);
116
117     /**
118      * Provides a snapshot of the DMAAP Topic Sinks.
119      *
120      * @return a list of the DMAAP Topic Sinks
121      */
122     List<DmaapTopicSink> inventory();
123 }
124
125
126 /* ------------- implementation ----------------- */
127
128 /**
129  * Factory of DMAAP Reader Topics indexed by topic name.
130  */
131 class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
132
133     private static final String MISSING_TOPIC = "A topic must be provided";
134
135     /**
136      * Logger.
137      */
138     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);
139
140     /**
141      * DMAAP Topic Name Index.
142      */
143     protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
144
145     @Override
146     public DmaapTopicSink build(BusTopicParams busTopicParams) {
147
148         if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
149             throw new IllegalArgumentException(MISSING_TOPIC);
150         }
151
152         synchronized (this) {
153             if (dmaapTopicWriters.containsKey(busTopicParams.getTopic())) {
154                 return dmaapTopicWriters.get(busTopicParams.getTopic());
155             }
156
157             DmaapTopicSink dmaapTopicSink = makeSink(busTopicParams);
158
159             if (busTopicParams.isManaged()) {
160                 dmaapTopicWriters.put(busTopicParams.getTopic(), dmaapTopicSink);
161             }
162             return dmaapTopicSink;
163         }
164     }
165
166     @Override
167     public DmaapTopicSink build(List<String> servers, String topic) {
168         return this.build(BusTopicParams.builder()
169                 .servers(servers)
170                 .topic(topic)
171                 .managed(true)
172                 .useHttps(false)
173                 .allowSelfSignedCerts(false)
174                 .build());
175     }
176
177     @Override
178     public List<DmaapTopicSink> build(Properties properties) {
179
180         String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
181         if (writeTopics == null || writeTopics.isEmpty()) {
182             logger.info("{}: no topic for DMaaP Sink", this);
183             return new ArrayList<>();
184         }
185
186         List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
187         List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>();
188         synchronized (this) {
189             for (String topic : writeTopicList) {
190                 if (this.dmaapTopicWriters.containsKey(topic)) {
191                     newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
192                     continue;
193                 }
194                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
195                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
196
197                 List<String> serverList;
198                 if (servers != null && !servers.isEmpty()) {
199                     serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
200                 } else {
201                     serverList = new ArrayList<>();
202                 }
203
204                 final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
205                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
206                 final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
207                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
208
209                 final String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
210                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
211                 final String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
212                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
213
214                 final String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
215                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
216
217                 final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
218                         + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
219
220                 /* DME2 Properties */
221
222                 final String dme2Environment = properties.getProperty(
223                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
224                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
225
226                 final String dme2AftEnvironment = properties.getProperty(
227                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
228                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
229
230                 final String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
231                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
232
233                 final String dme2RouteOffer = properties.getProperty(
234                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
235                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
236
237                 final String dme2Latitude = properties.getProperty(
238                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
239                                 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
240
241                 final String dme2Longitude = properties.getProperty(
242                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
243                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
244
245                 final String dme2EpReadTimeoutMs = properties.getProperty(
246                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
247                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
248
249                 final String dme2EpConnTimeout = properties.getProperty(
250                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
251                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
252
253                 final String dme2RoundtripTimeoutMs =
254                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
255                                 + "." + topic
256                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
257
258                 final String dme2Version = properties.getProperty(
259                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
260                                 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
261
262                 final String dme2SubContextPath = properties.getProperty(
263                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
264                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
265
266                 final String dme2SessionStickinessRequired =
267                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
268                                 + "." + topic
269                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
270
271                 Map<String, String> dme2AdditionalProps = new HashMap<>();
272
273                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
274                     dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
275                 }
276                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
277                     dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
278                 }
279                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
280                     dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
281                 }
282                 if (dme2Version != null && !dme2Version.isEmpty()) {
283                     dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
284                 }
285                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
286                     dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
287                 }
288                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
289                     dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
290                 }
291                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
292                     dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
293                 }
294
295                 if (servers == null || servers.isEmpty()) {
296                     logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
297                     continue;
298                 }
299
300                 boolean managed = true;
301                 if (managedString != null && !managedString.isEmpty()) {
302                     managed = Boolean.parseBoolean(managedString);
303                 }
304
305                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
306                         + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
307
308                 // default is to use HTTP if no https property exists
309                 boolean useHttps = false;
310                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
311                     useHttps = Boolean.parseBoolean(useHttpsString);
312                 }
313
314                 String allowSelfSignedCertsString =
315                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
316                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
317
318                 // default is to disallow self-signed certs
319                 boolean allowSelfSignedCerts = false;
320                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
321                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
322                 }
323
324                 DmaapTopicSink dmaapTopicSink = this.build(BusTopicParams.builder()
325                         .servers(serverList)
326                         .topic(topic)
327                         .apiKey(apiKey)
328                         .apiSecret(apiSecret)
329                         .userName(aafMechId)
330                         .password(aafPassword)
331                         .partitionId(partitionKey)
332                         .environment(dme2Environment)
333                         .aftEnvironment(dme2AftEnvironment)
334                         .partner(dme2Partner)
335                         .latitude(dme2Latitude)
336                         .longitude(dme2Longitude)
337                         .additionalProps(dme2AdditionalProps)
338                         .managed(managed)
339                         .useHttps(useHttps)
340                         .allowSelfSignedCerts(allowSelfSignedCerts)
341                         .build());
342
343                 newDmaapTopicSinks.add(dmaapTopicSink);
344             }
345             return newDmaapTopicSinks;
346         }
347     }
348
349     /**
350      * Makes a new sink.
351      *
352      * @param busTopicParams parameters to use to configure the sink
353      * @return a new sink
354      */
355     protected DmaapTopicSink makeSink(BusTopicParams busTopicParams) {
356         return new InlineDmaapTopicSink(busTopicParams);
357     }
358
359     @Override
360     public void destroy(String topic) {
361
362         if (topic == null || topic.isEmpty()) {
363             throw new IllegalArgumentException(MISSING_TOPIC);
364         }
365
366         DmaapTopicSink dmaapTopicWriter;
367         synchronized (this) {
368             if (!dmaapTopicWriters.containsKey(topic)) {
369                 return;
370             }
371
372             dmaapTopicWriter = dmaapTopicWriters.remove(topic);
373         }
374
375         dmaapTopicWriter.shutdown();
376     }
377
378     @Override
379     public void destroy() {
380         List<DmaapTopicSink> writers = this.inventory();
381         for (DmaapTopicSink writer : writers) {
382             writer.shutdown();
383         }
384
385         synchronized (this) {
386             this.dmaapTopicWriters.clear();
387         }
388     }
389
390     @Override
391     public DmaapTopicSink get(String topic) {
392
393         if (topic == null || topic.isEmpty()) {
394             throw new IllegalArgumentException(MISSING_TOPIC);
395         }
396
397         synchronized (this) {
398             if (dmaapTopicWriters.containsKey(topic)) {
399                 return dmaapTopicWriters.get(topic);
400             } else {
401                 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
402             }
403         }
404     }
405
406     @Override
407     public synchronized List<DmaapTopicSink> inventory() {
408         return new ArrayList<>(this.dmaapTopicWriters.values());
409     }
410
411     @Override
412     public String toString() {
413         return "IndexedDmaapTopicSinkFactory []";
414     }
415
416 }