26e8d4132fe3f0ad775f5ab914962b6071ba9ebe
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Properties;
29
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink;
31 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * DMAAP Topic Sink Factory
37  */
38 public interface DmaapTopicSinkFactory {
39     public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
40     public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
41     public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
42     public final String DME2_VERSION_PROPERTY = "Version";
43     public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
44     public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
45     public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
46     public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
47
48     /**
49      * Instantiates a new DMAAP Topic Sink
50      * 
51      * @param servers list of servers
52      * @param topic topic name
53      * @param apiKey API Key
54      * @param apiSecret API Secret
55      * @param userName AAF user name
56      * @param password AAF password
57      * @param partitionKey Consumer Group
58      * @param environment DME2 environment
59      * @param aftEnvironment DME2 AFT environment
60      * @param partner DME2 Partner
61      * @param latitude DME2 latitude
62      * @param longitude DME2 longitude
63      * @param additionalProps additional properties to pass to DME2
64      * @param managed is this sink endpoint managed?
65      * 
66      * @return an DMAAP Topic Sink
67      * @throws IllegalArgumentException if invalid parameters are present
68      */
69     public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
70             String password, String partitionKey, String environment, String aftEnvironment, String partner,
71             String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps,
72             boolean allowSelfSignedCerts);
73
74     /**
75      * Instantiates a new DMAAP Topic Sink
76      * 
77      * @param servers list of servers
78      * @param topic topic name
79      * @param apiKey API Key
80      * @param apiSecret API Secret
81      * @param userName AAF user name
82      * @param password AAF password
83      * @param partitionKey Consumer Group
84      * @param managed is this sink endpoint managed?
85      * 
86      * @return an DMAAP Topic Sink
87      * @throws IllegalArgumentException if invalid parameters are present
88      */
89     public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
90             String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
91
92     /**
93      * Creates an DMAAP Topic Sink based on properties files
94      * 
95      * @param properties Properties containing initialization values
96      * 
97      * @return an DMAAP Topic Sink
98      * @throws IllegalArgumentException if invalid parameters are present
99      */
100     public List<DmaapTopicSink> build(Properties properties);
101
102     /**
103      * Instantiates a new DMAAP Topic Sink
104      * 
105      * @param servers list of servers
106      * @param topic topic name
107      * 
108      * @return an DMAAP Topic Sink
109      * @throws IllegalArgumentException if invalid parameters are present
110      */
111     public DmaapTopicSink build(List<String> servers, String topic);
112
113     /**
114      * Destroys an DMAAP Topic Sink based on a topic
115      * 
116      * @param topic topic name
117      * @throws IllegalArgumentException if invalid parameters are present
118      */
119     public void destroy(String topic);
120
121     /**
122      * gets an DMAAP Topic Sink based on topic name
123      * 
124      * @param topic the topic name
125      * 
126      * @return an DMAAP Topic Sink with topic name
127      * @throws IllegalArgumentException if an invalid topic is provided
128      * @throws IllegalStateException if the DMAAP Topic Reader is an incorrect state
129      */
130     public DmaapTopicSink get(String topic);
131
132     /**
133      * Provides a snapshot of the DMAAP Topic Sinks
134      * 
135      * @return a list of the DMAAP Topic Sinks
136      */
137     public List<DmaapTopicSink> inventory();
138
139     /**
140      * Destroys all DMAAP Topic Sinks
141      */
142     public void destroy();
143 }
144
145
146 /* ------------- implementation ----------------- */
147
148 /**
149  * Factory of DMAAP Reader Topics indexed by topic name
150  */
151 class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
152     private static final String MISSING_TOPIC = "A topic must be provided";
153
154     /**
155      * Logger
156      */
157     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);
158
159     /**
160      * DMAAP Topic Name Index
161      */
162     protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
163
164     @Override
165     public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
166             String password, String partitionKey, String environment, String aftEnvironment, String partner,
167             String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps,
168             boolean allowSelfSignedCerts) {
169
170         if (topic == null || topic.isEmpty()) {
171             throw new IllegalArgumentException(MISSING_TOPIC);
172         }
173
174         synchronized (this) {
175             if (dmaapTopicWriters.containsKey(topic)) {
176                 return dmaapTopicWriters.get(topic);
177             }
178
179             DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(servers, topic, apiKey, apiSecret, userName,
180                     password, partitionKey, environment, aftEnvironment, partner, latitude, longitude, additionalProps,
181                     useHttps, allowSelfSignedCerts);
182
183             if (managed) {
184                 dmaapTopicWriters.put(topic, dmaapTopicSink);
185             }
186             return dmaapTopicSink;
187         }
188     }
189
190     @Override
191     public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
192             String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
193
194         if (topic == null || topic.isEmpty()) {
195             throw new IllegalArgumentException(MISSING_TOPIC);
196         }
197
198         synchronized (this) {
199             if (dmaapTopicWriters.containsKey(topic)) {
200                 return dmaapTopicWriters.get(topic);
201             }
202
203             DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(servers, topic, apiKey, apiSecret, userName,
204                     password, partitionKey, useHttps, allowSelfSignedCerts);
205
206             if (managed) {
207                 dmaapTopicWriters.put(topic, dmaapTopicSink);
208             }
209             return dmaapTopicSink;
210         }
211     }
212
213     @Override
214     public DmaapTopicSink build(List<String> servers, String topic) {
215         return this.build(servers, topic, null, null, null, null, null, true, false, false);
216     }
217
218     @Override
219     public List<DmaapTopicSink> build(Properties properties) {
220
221         String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
222         if (writeTopics == null || writeTopics.isEmpty()) {
223             logger.info("{}: no topic for DMaaP Sink", this);
224             return new ArrayList<>();
225         }
226
227         List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
228         List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>();
229         synchronized (this) {
230             for (String topic : writeTopicList) {
231                 if (this.dmaapTopicWriters.containsKey(topic)) {
232                     newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
233                     continue;
234                 }
235                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
236                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
237
238                 List<String> serverList;
239                 if (servers != null && !servers.isEmpty()) {
240                     serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
241                 } else {
242                     serverList = new ArrayList<>();
243                 }
244
245                 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
246                         + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
247                 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
248                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
249
250                 String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
251                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
252                 String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
253                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
254
255                 String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
256                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
257
258                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
259                         + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
260
261                 /* DME2 Properties */
262
263                 String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
264                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
265
266                 String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
267                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
268
269                 String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
270                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
271
272                 String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
273                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
274
275                 String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
276                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
277
278                 String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
279                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
280
281                 String dme2EpReadTimeoutMs = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
282                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
283
284                 String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
285                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
286
287                 String dme2RoundtripTimeoutMs =
288                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
289                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
290
291                 String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
292                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
293
294                 String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
295                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
296
297                 String dme2SessionStickinessRequired =
298                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
299                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
300
301                 Map<String, String> dme2AdditionalProps = new HashMap<>();
302
303                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
304                     dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
305                 }
306                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
307                     dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
308                 }
309                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
310                     dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
311                 }
312                 if (dme2Version != null && !dme2Version.isEmpty()) {
313                     dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
314                 }
315                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
316                     dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
317                 }
318                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
319                     dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
320                 }
321                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
322                     dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
323                 }
324
325                 if (servers == null || servers.isEmpty()) {
326                     logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
327                     continue;
328                 }
329
330                 boolean managed = true;
331                 if (managedString != null && !managedString.isEmpty()) {
332                     managed = Boolean.parseBoolean(managedString);
333                 }
334
335                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
336                         + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
337
338                 // default is to use HTTP if no https property exists
339                 boolean useHttps = false;
340                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
341                     useHttps = Boolean.parseBoolean(useHttpsString);
342                 }
343
344
345                 String allowSelfSignedCertsString =
346                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
347                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
348
349                 // default is to disallow self-signed certs
350                 boolean allowSelfSignedCerts = false;
351                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
352                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
353                 }
354
355                 DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, apiKey, apiSecret, aafMechId, aafPassword,
356                         partitionKey, dme2Environment, dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude,
357                         dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts);
358
359                 newDmaapTopicSinks.add(dmaapTopicSink);
360             }
361             return newDmaapTopicSinks;
362         }
363     }
364
365     @Override
366     public void destroy(String topic) {
367
368         if (topic == null || topic.isEmpty()) {
369             throw new IllegalArgumentException(MISSING_TOPIC);
370         }
371
372         DmaapTopicSink dmaapTopicWriter;
373         synchronized (this) {
374             if (!dmaapTopicWriters.containsKey(topic)) {
375                 return;
376             }
377
378             dmaapTopicWriter = dmaapTopicWriters.remove(topic);
379         }
380
381         dmaapTopicWriter.shutdown();
382     }
383
384     @Override
385     public void destroy() {
386         List<DmaapTopicSink> writers = this.inventory();
387         for (DmaapTopicSink writer : writers) {
388             writer.shutdown();
389         }
390
391         synchronized (this) {
392             this.dmaapTopicWriters.clear();
393         }
394     }
395
396     @Override
397     public DmaapTopicSink get(String topic) {
398
399         if (topic == null || topic.isEmpty()) {
400             throw new IllegalArgumentException(MISSING_TOPIC);
401         }
402
403         synchronized (this) {
404             if (dmaapTopicWriters.containsKey(topic)) {
405                 return dmaapTopicWriters.get(topic);
406             } else {
407                 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
408             }
409         }
410     }
411
412     @Override
413     public synchronized List<DmaapTopicSink> inventory() {
414         return new ArrayList<>(this.dmaapTopicWriters.values());
415     }
416
417     @Override
418     public String toString() {
419         StringBuilder builder = new StringBuilder();
420         builder.append("IndexedDmaapTopicSinkFactory []");
421         return builder.toString();
422     }
423
424 }