c3d02d145d523013502525c9c988242f50201372
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Properties;
29
30 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
31 import org.openecomp.policy.common.logging.flexlogger.Logger;
32 import org.openecomp.policy.drools.event.comm.bus.internal.InlineDmaapTopicSink;
33 import org.openecomp.policy.drools.properties.PolicyProperties;
34
35 /**
36  * DMAAP Topic Sink Factory
37  */
38 public interface DmaapTopicSinkFactory {
39         public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
40         public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
41         public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
42         public final String DME2_VERSION_PROPERTY = "Version";
43         public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
44         public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
45         public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
46         public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
47
48         /**
49          * Instantiates a new DMAAP Topic Sink
50          * 
51          * @param servers list of servers
52          * @param topic topic name
53          * @param apiKey API Key
54          * @param apiSecret API Secret
55          * @param userName AAF user name
56          * @param password AAF password
57          * @param partitionKey Consumer Group
58          * @param environment DME2 environment
59          * @param aftEnvironment DME2 AFT environment
60          * @param partner DME2 Partner
61          * @param latitude DME2 latitude
62          * @param longitude DME2 longitude
63          * @param additionalProps additional properties to pass to DME2
64          * @param managed is this sink endpoint managed?
65          * 
66          * @return an DMAAP Topic Sink
67          * @throws IllegalArgumentException if invalid parameters are present
68          */
69         public DmaapTopicSink build(List<String> servers, 
70                                                                 String topic, 
71                                                                 String apiKey, 
72                                                                 String apiSecret,
73                                                                 String userName,
74                                                                 String password,
75                                                                 String partitionKey,
76                                                                 String environment,
77                                                                 String aftEnvironment,
78                                                                 String partner,
79                                                                 String latitude,
80                                                                 String longitude,
81                                                                 Map<String,String> additionalProps,
82                                                                 boolean managed,
83                                                                 boolean useHttps,
84                                                                 boolean allowSelfSignedCerts) ;
85         
86         /**
87          * Instantiates a new DMAAP Topic Sink
88          * 
89          * @param servers list of servers
90          * @param topic topic name
91          * @param apiKey API Key
92          * @param apiSecret API Secret
93          * @param userName AAF user name
94          * @param password AAF password
95          * @param partitionKey Consumer Group
96          * @param managed is this sink endpoint managed?
97          * 
98          * @return an DMAAP Topic Sink
99          * @throws IllegalArgumentException if invalid parameters are present
100          */
101         public DmaapTopicSink build(List<String> servers, 
102                                                                 String topic, 
103                                                                 String apiKey, 
104                                                                 String apiSecret,
105                                                                 String userName,
106                                                                 String password,
107                                                                 String partitionKey,
108                                                                 boolean managed,
109                                                                 boolean useHttps,
110                                                                 boolean allowSelfSignedCerts)
111                         throws IllegalArgumentException;
112         
113         /**
114          * Creates an DMAAP Topic Sink based on properties files
115          * 
116          * @param properties Properties containing initialization values
117          * 
118          * @return an DMAAP Topic Sink
119          * @throws IllegalArgumentException if invalid parameters are present
120          */
121         public List<DmaapTopicSink> build(Properties properties)
122                         throws IllegalArgumentException;
123         
124         /**
125          * Instantiates a new DMAAP Topic Sink
126          * 
127          * @param servers list of servers
128          * @param topic topic name
129          * 
130          * @return an DMAAP Topic Sink
131          * @throws IllegalArgumentException if invalid parameters are present
132          */
133         public DmaapTopicSink build(List<String> servers, String topic)
134                         throws IllegalArgumentException;
135         
136         /**
137          * Destroys an DMAAP Topic Sink based on a topic
138          * 
139          * @param topic topic name
140          * @throws IllegalArgumentException if invalid parameters are present
141          */
142         public void destroy(String topic);
143
144         /**
145          * gets an DMAAP Topic Sink based on topic name
146          * @param topic the topic name
147          * 
148          * @return an DMAAP Topic Sink with topic name
149          * @throws IllegalArgumentException if an invalid topic is provided
150          * @throws IllegalStateException if the DMAAP Topic Reader is 
151          * an incorrect state
152          */
153         public DmaapTopicSink get(String topic)
154                            throws IllegalArgumentException, IllegalStateException;
155         
156         /**
157          * Provides a snapshot of the DMAAP Topic Sinks
158          * @return a list of the DMAAP Topic Sinks
159          */
160         public List<DmaapTopicSink> inventory();
161
162         /**
163          * Destroys all DMAAP Topic Sinks
164          */
165         public void destroy();
166 }
167
168 /* ------------- implementation ----------------- */
169
170 /**
171  * Factory of DMAAP Reader Topics indexed by topic name
172  */
173 class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
174         // get an instance of logger 
175         private static Logger  logger = FlexLogger.getLogger(IndexedDmaapTopicSinkFactory.class);       
176         
177         /**
178          * DMAAP Topic Name Index
179          */
180         protected HashMap<String, DmaapTopicSink> dmaapTopicWriters =
181                         new HashMap<String, DmaapTopicSink>();
182
183         /**
184          * {@inheritDoc}
185          */
186         @Override
187         public DmaapTopicSink build(List<String> servers, 
188                                                                 String topic, 
189                                                                 String apiKey, 
190                                                                 String apiSecret,
191                                                                 String userName,
192                                                                 String password,
193                                                                 String partitionKey,
194                                                                 String environment,
195                                                                 String aftEnvironment,
196                                                                 String partner,
197                                                                 String latitude,
198                                                                 String longitude,
199                                                                 Map<String,String> additionalProps,
200                                                                 boolean managed,
201                                                                 boolean useHttps,
202                                                                 boolean allowSelfSignedCerts) 
203                         throws IllegalArgumentException {
204                 
205                 if (topic == null || topic.isEmpty()) {
206                         throw new IllegalArgumentException("A topic must be provided");
207                 }
208                 
209                 synchronized (this) {
210                         if (dmaapTopicWriters.containsKey(topic)) {
211                                 return dmaapTopicWriters.get(topic);
212                         }
213                         
214                         DmaapTopicSink dmaapTopicSink = 
215                                         new InlineDmaapTopicSink(servers, topic, 
216                                                                                      apiKey, apiSecret,
217                                                                                      userName, password,
218                                                                                      partitionKey,
219                                                                                      environment, aftEnvironment, 
220                                                                                      partner, latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts);
221                         
222                         if (managed)
223                                 dmaapTopicWriters.put(topic, dmaapTopicSink);
224                         return dmaapTopicSink;
225                 }
226         }
227         
228         /**
229          * {@inheritDoc}
230          */
231         @Override
232         public DmaapTopicSink build(List<String> servers, 
233                                                                 String topic, 
234                                                                 String apiKey, 
235                                                                 String apiSecret,
236                                                                 String userName,
237                                                                 String password,
238                                                                 String partitionKey,
239                                                                 boolean managed,
240                                                                 boolean useHttps, boolean allowSelfSignedCerts) 
241                         throws IllegalArgumentException {
242                 
243                 if (topic == null || topic.isEmpty()) {
244                         throw new IllegalArgumentException("A topic must be provided");
245                 }
246                 
247                 synchronized (this) {
248                         if (dmaapTopicWriters.containsKey(topic)) {
249                                 return dmaapTopicWriters.get(topic);
250                         }
251                         
252                         DmaapTopicSink dmaapTopicSink = 
253                                         new InlineDmaapTopicSink(servers, topic, 
254                                                                                      apiKey, apiSecret,
255                                                                                      userName, password,
256                                                                                      partitionKey, useHttps, allowSelfSignedCerts);
257                         
258                         if (managed)
259                                 dmaapTopicWriters.put(topic, dmaapTopicSink);
260                         return dmaapTopicSink;
261                 }
262         }
263         
264
265         /**
266          * {@inheritDoc}
267          */
268         @Override
269         public DmaapTopicSink build(List<String> servers, String topic) throws IllegalArgumentException {
270                 return this.build(servers, topic, null, null, null, null, null, true, false, false);
271         }
272         
273
274         /**
275          * {@inheritDoc}
276          */
277         @Override
278         public List<DmaapTopicSink> build(Properties properties) throws IllegalArgumentException {
279                 
280                 String writeTopics = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS);
281                 if (writeTopics == null || writeTopics.isEmpty()) {
282                         logger.warn("No topic for DMAAP Sink " + properties);
283                         return new ArrayList<DmaapTopicSink>();
284                 }
285                 List<String> writeTopicList = new ArrayList<String>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
286                 
287                 synchronized(this) {
288                         List<DmaapTopicSink> dmaapTopicWriters = new ArrayList<DmaapTopicSink>();
289                         for (String topic: writeTopicList) {
290                                 
291                                 String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + 
292                                                                         topic + 
293                                                                         PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
294                                 
295                                 List<String> serverList;
296                                 if (servers != null && !servers.isEmpty()) serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
297                                 else serverList = new ArrayList<>();
298                                 
299                                 String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
300                                                                                "." + topic + 
301                                                                                PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);          
302                                 String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
303                                                           "." + topic + 
304                                                           PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
305                                 
306                                 String aafMechId = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
307                                                           "." + topic + 
308                                                           PolicyProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
309                                 String aafPassword = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
310                                                                             "." + topic + 
311                                                                             PolicyProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
312                                 
313                                 String partitionKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
314                                                              "." + topic + 
315                                                              PolicyProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
316                                 
317                                 String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic +
318                                                                                       PolicyProperties.PROPERTY_MANAGED_SUFFIX);
319                                 
320                                 /* DME2 Properties */
321                                 
322                                 String dme2Environment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
323                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
324
325                                 String dme2AftEnvironment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
326                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
327
328                                 String dme2Partner = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
329                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
330                                 
331                                 String dme2RouteOffer = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
332                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
333
334                                 String dme2Latitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
335                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
336
337                                 String dme2Longitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
338                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
339
340                                 String dme2EpReadTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
341                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
342
343                                 String dme2EpConnTimeout = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
344                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
345
346                                 String dme2RoundtripTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS
347                                                 + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
348
349                                 String dme2Version = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
350                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
351
352                                 String dme2SubContextPath = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
353                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
354
355                                 String dme2SessionStickinessRequired = properties
356                                                 .getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
357                                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
358                                 
359                                 Map<String,String> dme2AdditionalProps = new HashMap<>();
360                                 
361                                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty())
362                                         dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
363                                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty())
364                                         dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
365                                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty())
366                                         dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
367                                 if (dme2Version != null && !dme2Version.isEmpty())
368                                         dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
369                                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty())
370                                         dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
371                                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty())
372                                         dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
373                                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty())
374                                         dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
375
376                                 if (servers == null || servers.isEmpty()) {
377                                         logger.error("No DMaaP servers or DME2 ServiceName provided");
378                                         continue;
379                                 }
380                                 
381                                 boolean managed = true;
382                                 if (managedString != null && !managedString.isEmpty()) {
383                                         managed = Boolean.parseBoolean(managedString);
384                                 }
385                                 
386                                 String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic +
387                                                 PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
388
389                                 //default is to use HTTP if no https property exists
390                                 boolean useHttps = false;
391                                 if (useHttpsString != null && !useHttpsString.isEmpty()){
392                                         useHttps = Boolean.parseBoolean(useHttpsString);
393                                 }
394                                 
395                                 
396                                 String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic +
397                                                 PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
398
399                                         //default is to disallow self-signed certs 
400                                 boolean allowSelfSignedCerts = false;
401                                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
402                                         allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
403                                 }                               
404                                 
405                                 DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, 
406                                                                                                            apiKey, apiSecret,
407                                                                                                            aafMechId, aafPassword,
408                                                                                                            partitionKey,
409                                                                                                            dme2Environment, dme2AftEnvironment,
410                                                                                                            dme2Partner, dme2Latitude, dme2Longitude,
411                                                                                                            dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts);
412                                 
413                                 dmaapTopicWriters.add(dmaapTopicSink);
414                         }
415                         return dmaapTopicWriters;
416                 }
417         }
418         
419         /**
420          * {@inheritDoc}
421          */
422         @Override
423         public void destroy(String topic) 
424                    throws IllegalArgumentException {
425                 
426                 if (topic == null || topic.isEmpty()) {
427                         throw new IllegalArgumentException("A topic must be provided");
428                 }
429                 
430                 DmaapTopicSink dmaapTopicWriter;
431                 synchronized(this) {
432                         if (!dmaapTopicWriters.containsKey(topic)) {
433                                 return;
434                         }
435                         
436                         dmaapTopicWriter = dmaapTopicWriters.remove(topic);
437                 }
438                 
439                 dmaapTopicWriter.shutdown();
440         }
441         
442         /**
443          * {@inheritDoc}
444          */
445         @Override
446         public void destroy() {
447                 List<DmaapTopicSink> writers = this.inventory();
448                 for (DmaapTopicSink writer: writers) {
449                         writer.shutdown();
450                 }
451                 
452                 synchronized(this) {
453                         this.dmaapTopicWriters.clear();
454                 }
455         }
456
457         /**
458          * {@inheritDoc}
459          */
460         @Override
461         public DmaapTopicSink get(String topic) 
462                         throws IllegalArgumentException, IllegalStateException {
463                 
464                 if (topic == null || topic.isEmpty()) {
465                         throw new IllegalArgumentException("A topic must be provided");
466                 }
467                 
468                 synchronized(this) {
469                         if (dmaapTopicWriters.containsKey(topic)) {
470                                 return dmaapTopicWriters.get(topic);
471                         } else {
472                                 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
473                         }
474                 }
475         }
476
477         /**
478          * {@inheritDoc}
479          */
480         @Override
481         public synchronized List<DmaapTopicSink> inventory() {
482                  List<DmaapTopicSink> writers = 
483                                  new ArrayList<DmaapTopicSink>(this.dmaapTopicWriters.values());
484                  return writers;
485         }
486         
487 }