8b8ca530345cd701bc335d3de731ac2bd02ea0f8
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Properties;
29
30 import org.slf4j.LoggerFactory;
31 import org.slf4j.Logger;
32 import org.openecomp.policy.drools.event.comm.bus.internal.InlineDmaapTopicSink;
33 import org.openecomp.policy.drools.properties.PolicyProperties;
34
35 /**
36  * DMAAP Topic Sink Factory
37  */
38 public interface DmaapTopicSinkFactory {
39         public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
40         public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
41         public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
42         public final String DME2_VERSION_PROPERTY = "Version";
43         public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
44         public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
45         public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
46         public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
47
48         /**
49          * Instantiates a new DMAAP Topic Sink
50          * 
51          * @param servers list of servers
52          * @param topic topic name
53          * @param apiKey API Key
54          * @param apiSecret API Secret
55          * @param userName AAF user name
56          * @param password AAF password
57          * @param partitionKey Consumer Group
58          * @param environment DME2 environment
59          * @param aftEnvironment DME2 AFT environment
60          * @param partner DME2 Partner
61          * @param latitude DME2 latitude
62          * @param longitude DME2 longitude
63          * @param additionalProps additional properties to pass to DME2
64          * @param managed is this sink endpoint managed?
65          * 
66          * @return an DMAAP Topic Sink
67          * @throws IllegalArgumentException if invalid parameters are present
68          */
69         public DmaapTopicSink build(List<String> servers, 
70                                                                 String topic, 
71                                                                 String apiKey, 
72                                                                 String apiSecret,
73                                                                 String userName,
74                                                                 String password,
75                                                                 String partitionKey,
76                                                                 String environment,
77                                                                 String aftEnvironment,
78                                                                 String partner,
79                                                                 String latitude,
80                                                                 String longitude,
81                                                                 Map<String,String> additionalProps,
82                                                                 boolean managed,
83                                                                 boolean useHttps,
84                                                                 boolean allowSelfSignedCerts) ;
85         
86         /**
87          * Instantiates a new DMAAP Topic Sink
88          * 
89          * @param servers list of servers
90          * @param topic topic name
91          * @param apiKey API Key
92          * @param apiSecret API Secret
93          * @param userName AAF user name
94          * @param password AAF password
95          * @param partitionKey Consumer Group
96          * @param managed is this sink endpoint managed?
97          * 
98          * @return an DMAAP Topic Sink
99          * @throws IllegalArgumentException if invalid parameters are present
100          */
101         public DmaapTopicSink build(List<String> servers, 
102                                                                 String topic, 
103                                                                 String apiKey, 
104                                                                 String apiSecret,
105                                                                 String userName,
106                                                                 String password,
107                                                                 String partitionKey,
108                                                                 boolean managed,
109                                                                 boolean useHttps,
110                                                                 boolean allowSelfSignedCerts)
111                         throws IllegalArgumentException;
112         
113         /**
114          * Creates an DMAAP Topic Sink based on properties files
115          * 
116          * @param properties Properties containing initialization values
117          * 
118          * @return an DMAAP Topic Sink
119          * @throws IllegalArgumentException if invalid parameters are present
120          */
121         public List<DmaapTopicSink> build(Properties properties)
122                         throws IllegalArgumentException;
123         
124         /**
125          * Instantiates a new DMAAP Topic Sink
126          * 
127          * @param servers list of servers
128          * @param topic topic name
129          * 
130          * @return an DMAAP Topic Sink
131          * @throws IllegalArgumentException if invalid parameters are present
132          */
133         public DmaapTopicSink build(List<String> servers, String topic)
134                         throws IllegalArgumentException;
135         
136         /**
137          * Destroys an DMAAP Topic Sink based on a topic
138          * 
139          * @param topic topic name
140          * @throws IllegalArgumentException if invalid parameters are present
141          */
142         public void destroy(String topic);
143
144         /**
145          * gets an DMAAP Topic Sink based on topic name
146          * @param topic the topic name
147          * 
148          * @return an DMAAP Topic Sink with topic name
149          * @throws IllegalArgumentException if an invalid topic is provided
150          * @throws IllegalStateException if the DMAAP Topic Reader is 
151          * an incorrect state
152          */
153         public DmaapTopicSink get(String topic)
154                            throws IllegalArgumentException, IllegalStateException;
155         
156         /**
157          * Provides a snapshot of the DMAAP Topic Sinks
158          * @return a list of the DMAAP Topic Sinks
159          */
160         public List<DmaapTopicSink> inventory();
161
162         /**
163          * Destroys all DMAAP Topic Sinks
164          */
165         public void destroy();
166 }
167
168 /* ------------- implementation ----------------- */
169
170 /**
171  * Factory of DMAAP Reader Topics indexed by topic name
172  */
173 class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
174         /**
175          * Logger
176          */
177         private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);     
178         
179         /**
180          * DMAAP Topic Name Index
181          */
182         protected HashMap<String, DmaapTopicSink> dmaapTopicWriters =
183                         new HashMap<String, DmaapTopicSink>();
184
185         /**
186          * {@inheritDoc}
187          */
188         @Override
189         public DmaapTopicSink build(List<String> servers, 
190                                                                 String topic, 
191                                                                 String apiKey, 
192                                                                 String apiSecret,
193                                                                 String userName,
194                                                                 String password,
195                                                                 String partitionKey,
196                                                                 String environment,
197                                                                 String aftEnvironment,
198                                                                 String partner,
199                                                                 String latitude,
200                                                                 String longitude,
201                                                                 Map<String,String> additionalProps,
202                                                                 boolean managed,
203                                                                 boolean useHttps,
204                                                                 boolean allowSelfSignedCerts) 
205                         throws IllegalArgumentException {
206                 
207                 if (topic == null || topic.isEmpty()) {
208                         throw new IllegalArgumentException("A topic must be provided");
209                 }
210                 
211                 synchronized (this) {
212                         if (dmaapTopicWriters.containsKey(topic)) {
213                                 return dmaapTopicWriters.get(topic);
214                         }
215                         
216                         DmaapTopicSink dmaapTopicSink = 
217                                         new InlineDmaapTopicSink(servers, topic, 
218                                                                                      apiKey, apiSecret,
219                                                                                      userName, password,
220                                                                                      partitionKey,
221                                                                                      environment, aftEnvironment, 
222                                                                                      partner, latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts);
223                         
224                         if (managed)
225                                 dmaapTopicWriters.put(topic, dmaapTopicSink);
226                         return dmaapTopicSink;
227                 }
228         }
229         
230         /**
231          * {@inheritDoc}
232          */
233         @Override
234         public DmaapTopicSink build(List<String> servers, 
235                                                                 String topic, 
236                                                                 String apiKey, 
237                                                                 String apiSecret,
238                                                                 String userName,
239                                                                 String password,
240                                                                 String partitionKey,
241                                                                 boolean managed,
242                                                                 boolean useHttps, boolean allowSelfSignedCerts) 
243                         throws IllegalArgumentException {
244                 
245                 if (topic == null || topic.isEmpty()) {
246                         throw new IllegalArgumentException("A topic must be provided");
247                 }
248                 
249                 synchronized (this) {
250                         if (dmaapTopicWriters.containsKey(topic)) {
251                                 return dmaapTopicWriters.get(topic);
252                         }
253                         
254                         DmaapTopicSink dmaapTopicSink = 
255                                         new InlineDmaapTopicSink(servers, topic, 
256                                                                                      apiKey, apiSecret,
257                                                                                      userName, password,
258                                                                                      partitionKey, useHttps, allowSelfSignedCerts);
259                         
260                         if (managed)
261                                 dmaapTopicWriters.put(topic, dmaapTopicSink);
262                         return dmaapTopicSink;
263                 }
264         }
265         
266
267         /**
268          * {@inheritDoc}
269          */
270         @Override
271         public DmaapTopicSink build(List<String> servers, String topic) throws IllegalArgumentException {
272                 return this.build(servers, topic, null, null, null, null, null, true, false, false);
273         }
274         
275
276         /**
277          * {@inheritDoc}
278          */
279         @Override
280         public List<DmaapTopicSink> build(Properties properties) throws IllegalArgumentException {
281                 
282                 String writeTopics = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS);
283                 if (writeTopics == null || writeTopics.isEmpty()) {
284                         logger.info("{}: no topic for DMaaP Sink", this);
285                         return new ArrayList<DmaapTopicSink>();
286                 }
287                 
288                 List<String> writeTopicList = new ArrayList<String>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
289                 List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<DmaapTopicSink>();
290                 synchronized(this) {
291                         for (String topic: writeTopicList) {
292                                 if (this.dmaapTopicWriters.containsKey(topic)) {
293                                         newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
294                                         continue;
295                                 }
296                                 String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + 
297                                                                         topic + 
298                                                                         PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
299                                 
300                                 List<String> serverList;
301                                 if (servers != null && !servers.isEmpty()) serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
302                                 else serverList = new ArrayList<>();
303                                 
304                                 String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
305                                                                                "." + topic + 
306                                                                                PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);          
307                                 String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
308                                                           "." + topic + 
309                                                           PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
310                                 
311                                 String aafMechId = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
312                                                           "." + topic + 
313                                                           PolicyProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
314                                 String aafPassword = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
315                                                                             "." + topic + 
316                                                                             PolicyProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
317                                 
318                                 String partitionKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
319                                                              "." + topic + 
320                                                              PolicyProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
321                                 
322                                 String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic +
323                                                                                       PolicyProperties.PROPERTY_MANAGED_SUFFIX);
324                                 
325                                 /* DME2 Properties */
326                                 
327                                 String dme2Environment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
328                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
329
330                                 String dme2AftEnvironment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
331                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
332
333                                 String dme2Partner = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
334                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
335                                 
336                                 String dme2RouteOffer = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
337                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
338
339                                 String dme2Latitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
340                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
341
342                                 String dme2Longitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
343                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
344
345                                 String dme2EpReadTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
346                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
347
348                                 String dme2EpConnTimeout = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
349                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
350
351                                 String dme2RoundtripTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS
352                                                 + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
353
354                                 String dme2Version = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
355                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
356
357                                 String dme2SubContextPath = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
358                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
359
360                                 String dme2SessionStickinessRequired = properties
361                                                 .getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
362                                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
363                                 
364                                 Map<String,String> dme2AdditionalProps = new HashMap<>();
365                                 
366                                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty())
367                                         dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
368                                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty())
369                                         dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
370                                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty())
371                                         dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
372                                 if (dme2Version != null && !dme2Version.isEmpty())
373                                         dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
374                                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty())
375                                         dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
376                                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty())
377                                         dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
378                                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty())
379                                         dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
380
381                                 if (servers == null || servers.isEmpty()) {
382                                         logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
383                                         continue;
384                                 }
385                                 
386                                 boolean managed = true;
387                                 if (managedString != null && !managedString.isEmpty()) {
388                                         managed = Boolean.parseBoolean(managedString);
389                                 }
390                                 
391                                 String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic +
392                                                 PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
393
394                                 //default is to use HTTP if no https property exists
395                                 boolean useHttps = false;
396                                 if (useHttpsString != null && !useHttpsString.isEmpty()){
397                                         useHttps = Boolean.parseBoolean(useHttpsString);
398                                 }
399                                 
400                                 
401                                 String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic +
402                                                 PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
403
404                                         //default is to disallow self-signed certs 
405                                 boolean allowSelfSignedCerts = false;
406                                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
407                                         allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
408                                 }                               
409                                 
410                                 DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, 
411                                                                                                            apiKey, apiSecret,
412                                                                                                            aafMechId, aafPassword,
413                                                                                                            partitionKey,
414                                                                                                            dme2Environment, dme2AftEnvironment,
415                                                                                                            dme2Partner, dme2Latitude, dme2Longitude,
416                                                                                                            dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts);
417                                 
418                                 newDmaapTopicSinks.add(dmaapTopicSink);
419                         }
420                         return newDmaapTopicSinks;
421                 }
422         }
423         
424         /**
425          * {@inheritDoc}
426          */
427         @Override
428         public void destroy(String topic) 
429                    throws IllegalArgumentException {
430                 
431                 if (topic == null || topic.isEmpty()) {
432                         throw new IllegalArgumentException("A topic must be provided");
433                 }
434                 
435                 DmaapTopicSink dmaapTopicWriter;
436                 synchronized(this) {
437                         if (!dmaapTopicWriters.containsKey(topic)) {
438                                 return;
439                         }
440                         
441                         dmaapTopicWriter = dmaapTopicWriters.remove(topic);
442                 }
443                 
444                 dmaapTopicWriter.shutdown();
445         }
446         
447         /**
448          * {@inheritDoc}
449          */
450         @Override
451         public void destroy() {
452                 List<DmaapTopicSink> writers = this.inventory();
453                 for (DmaapTopicSink writer: writers) {
454                         writer.shutdown();
455                 }
456                 
457                 synchronized(this) {
458                         this.dmaapTopicWriters.clear();
459                 }
460         }
461
462         /**
463          * {@inheritDoc}
464          */
465         @Override
466         public DmaapTopicSink get(String topic) 
467                         throws IllegalArgumentException, IllegalStateException {
468                 
469                 if (topic == null || topic.isEmpty()) {
470                         throw new IllegalArgumentException("A topic must be provided");
471                 }
472                 
473                 synchronized(this) {
474                         if (dmaapTopicWriters.containsKey(topic)) {
475                                 return dmaapTopicWriters.get(topic);
476                         } else {
477                                 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
478                         }
479                 }
480         }
481
482         /**
483          * {@inheritDoc}
484          */
485         @Override
486         public synchronized List<DmaapTopicSink> inventory() {
487                  List<DmaapTopicSink> writers = 
488                                  new ArrayList<DmaapTopicSink>(this.dmaapTopicWriters.values());
489                  return writers;
490         }
491
492         @Override
493         public String toString() {
494                 StringBuilder builder = new StringBuilder();
495                 builder.append("IndexedDmaapTopicSinkFactory []");
496                 return builder.toString();
497         }
498         
499 }