4c96f9befb792d2115f41be5a8fb3b648bbde46b
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Properties;
29
30 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
31 import org.openecomp.policy.common.logging.flexlogger.Logger;
32 import org.openecomp.policy.drools.event.comm.bus.internal.InlineDmaapTopicSink;
33 import org.openecomp.policy.drools.properties.PolicyProperties;
34
35 /**
36  * DMAAP Topic Sink Factory
37  */
38 public interface DmaapTopicSinkFactory {
39         public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
40         public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
41         public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
42         public final String DME2_VERSION_PROPERTY = "Version";
43         public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
44         public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
45         public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
46         public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
47
48         /**
49          * Instantiates a new DMAAP Topic Sink
50          * 
51          * @param servers list of servers
52          * @param topic topic name
53          * @param apiKey API Key
54          * @param apiSecret API Secret
55          * @param userName AAF user name
56          * @param password AAF password
57          * @param partitionKey Consumer Group
58          * @param environment DME2 environment
59          * @param aftEnvironment DME2 AFT environment
60          * @param partner DME2 Partner
61          * @param latitude DME2 latitude
62          * @param longitude DME2 longitude
63          * @param additionalProps additional properties to pass to DME2
64          * @param managed is this sink endpoint managed?
65          * 
66          * @return an DMAAP Topic Sink
67          * @throws IllegalArgumentException if invalid parameters are present
68          */
69         public DmaapTopicSink build(List<String> servers, 
70                                                                 String topic, 
71                                                                 String apiKey, 
72                                                                 String apiSecret,
73                                                                 String userName,
74                                                                 String password,
75                                                                 String partitionKey,
76                                                                 String environment,
77                                                                 String aftEnvironment,
78                                                                 String partner,
79                                                                 String latitude,
80                                                                 String longitude,
81                                                                 Map<String,String> additionalProps,
82                                                                 boolean managed,
83                                                                 boolean useHttps,
84                                                                 boolean allowSelfSignedCerts) ;
85         
86         /**
87          * Instantiates a new DMAAP Topic Sink
88          * 
89          * @param servers list of servers
90          * @param topic topic name
91          * @param apiKey API Key
92          * @param apiSecret API Secret
93          * @param userName AAF user name
94          * @param password AAF password
95          * @param partitionKey Consumer Group
96          * @param managed is this sink endpoint managed?
97          * 
98          * @return an DMAAP Topic Sink
99          * @throws IllegalArgumentException if invalid parameters are present
100          */
101         public DmaapTopicSink build(List<String> servers, 
102                                                                 String topic, 
103                                                                 String apiKey, 
104                                                                 String apiSecret,
105                                                                 String userName,
106                                                                 String password,
107                                                                 String partitionKey,
108                                                                 boolean managed,
109                                                                 boolean useHttps,
110                                                                 boolean allowSelfSignedCerts)
111                         throws IllegalArgumentException;
112         
113         /**
114          * Creates an DMAAP Topic Sink based on properties files
115          * 
116          * @param properties Properties containing initialization values
117          * 
118          * @return an DMAAP Topic Sink
119          * @throws IllegalArgumentException if invalid parameters are present
120          */
121         public List<DmaapTopicSink> build(Properties properties)
122                         throws IllegalArgumentException;
123         
124         /**
125          * Instantiates a new DMAAP Topic Sink
126          * 
127          * @param servers list of servers
128          * @param topic topic name
129          * 
130          * @return an DMAAP Topic Sink
131          * @throws IllegalArgumentException if invalid parameters are present
132          */
133         public DmaapTopicSink build(List<String> servers, String topic)
134                         throws IllegalArgumentException;
135         
136         /**
137          * Destroys an DMAAP Topic Sink based on a topic
138          * 
139          * @param topic topic name
140          * @throws IllegalArgumentException if invalid parameters are present
141          */
142         public void destroy(String topic);
143
144         /**
145          * gets an DMAAP Topic Sink based on topic name
146          * @param topic the topic name
147          * 
148          * @return an DMAAP Topic Sink with topic name
149          * @throws IllegalArgumentException if an invalid topic is provided
150          * @throws IllegalStateException if the DMAAP Topic Reader is 
151          * an incorrect state
152          */
153         public DmaapTopicSink get(String topic)
154                            throws IllegalArgumentException, IllegalStateException;
155         
156         /**
157          * Provides a snapshot of the DMAAP Topic Sinks
158          * @return a list of the DMAAP Topic Sinks
159          */
160         public List<DmaapTopicSink> inventory();
161
162         /**
163          * Destroys all DMAAP Topic Sinks
164          */
165         public void destroy();
166 }
167
168 /* ------------- implementation ----------------- */
169
170 /**
171  * Factory of DMAAP Reader Topics indexed by topic name
172  */
173 class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
174         // get an instance of logger 
175         private static Logger  logger = FlexLogger.getLogger(IndexedDmaapTopicSinkFactory.class);       
176         
177         /**
178          * DMAAP Topic Name Index
179          */
180         protected HashMap<String, DmaapTopicSink> dmaapTopicWriters =
181                         new HashMap<String, DmaapTopicSink>();
182
183         /**
184          * {@inheritDoc}
185          */
186         @Override
187         public DmaapTopicSink build(List<String> servers, 
188                                                                 String topic, 
189                                                                 String apiKey, 
190                                                                 String apiSecret,
191                                                                 String userName,
192                                                                 String password,
193                                                                 String partitionKey,
194                                                                 String environment,
195                                                                 String aftEnvironment,
196                                                                 String partner,
197                                                                 String latitude,
198                                                                 String longitude,
199                                                                 Map<String,String> additionalProps,
200                                                                 boolean managed,
201                                                                 boolean useHttps,
202                                                                 boolean allowSelfSignedCerts) 
203                         throws IllegalArgumentException {
204                 
205                 if (topic == null || topic.isEmpty()) {
206                         throw new IllegalArgumentException("A topic must be provided");
207                 }
208                 
209                 synchronized (this) {
210                         if (dmaapTopicWriters.containsKey(topic)) {
211                                 return dmaapTopicWriters.get(topic);
212                         }
213                         
214                         DmaapTopicSink dmaapTopicSink = 
215                                         new InlineDmaapTopicSink(servers, topic, 
216                                                                                      apiKey, apiSecret,
217                                                                                      userName, password,
218                                                                                      partitionKey,
219                                                                                      environment, aftEnvironment, 
220                                                                                      partner, latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts);
221                         
222                         if (managed)
223                                 dmaapTopicWriters.put(topic, dmaapTopicSink);
224                         return dmaapTopicSink;
225                 }
226         }
227         
228         /**
229          * {@inheritDoc}
230          */
231         @Override
232         public DmaapTopicSink build(List<String> servers, 
233                                                                 String topic, 
234                                                                 String apiKey, 
235                                                                 String apiSecret,
236                                                                 String userName,
237                                                                 String password,
238                                                                 String partitionKey,
239                                                                 boolean managed,
240                                                                 boolean useHttps, boolean allowSelfSignedCerts) 
241                         throws IllegalArgumentException {
242                 
243                 if (topic == null || topic.isEmpty()) {
244                         throw new IllegalArgumentException("A topic must be provided");
245                 }
246                 
247                 synchronized (this) {
248                         if (dmaapTopicWriters.containsKey(topic)) {
249                                 return dmaapTopicWriters.get(topic);
250                         }
251                         
252                         DmaapTopicSink dmaapTopicSink = 
253                                         new InlineDmaapTopicSink(servers, topic, 
254                                                                                      apiKey, apiSecret,
255                                                                                      userName, password,
256                                                                                      partitionKey, useHttps, allowSelfSignedCerts);
257                         
258                         if (managed)
259                                 dmaapTopicWriters.put(topic, dmaapTopicSink);
260                         return dmaapTopicSink;
261                 }
262         }
263         
264
265         /**
266          * {@inheritDoc}
267          */
268         @Override
269         public DmaapTopicSink build(List<String> servers, String topic) throws IllegalArgumentException {
270                 return this.build(servers, topic, null, null, null, null, null, true, false, false);
271         }
272         
273
274         /**
275          * {@inheritDoc}
276          */
277         @Override
278         public List<DmaapTopicSink> build(Properties properties) throws IllegalArgumentException {
279                 
280                 String writeTopics = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS);
281                 if (writeTopics == null || writeTopics.isEmpty()) {
282                         logger.warn("No topic for DMAAP Sink " + properties);
283                         return new ArrayList<DmaapTopicSink>();
284                 }
285                 
286                 List<String> writeTopicList = new ArrayList<String>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
287                 List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<DmaapTopicSink>();
288                 synchronized(this) {
289                         for (String topic: writeTopicList) {
290                                 if (this.dmaapTopicWriters.containsKey(topic)) {
291                                         newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
292                                         continue;
293                                 }
294                                 String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + 
295                                                                         topic + 
296                                                                         PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
297                                 
298                                 List<String> serverList;
299                                 if (servers != null && !servers.isEmpty()) serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
300                                 else serverList = new ArrayList<>();
301                                 
302                                 String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
303                                                                                "." + topic + 
304                                                                                PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);          
305                                 String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
306                                                           "." + topic + 
307                                                           PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
308                                 
309                                 String aafMechId = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
310                                                           "." + topic + 
311                                                           PolicyProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
312                                 String aafPassword = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
313                                                                             "." + topic + 
314                                                                             PolicyProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
315                                 
316                                 String partitionKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
317                                                              "." + topic + 
318                                                              PolicyProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
319                                 
320                                 String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic +
321                                                                                       PolicyProperties.PROPERTY_MANAGED_SUFFIX);
322                                 
323                                 /* DME2 Properties */
324                                 
325                                 String dme2Environment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
326                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
327
328                                 String dme2AftEnvironment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
329                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
330
331                                 String dme2Partner = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
332                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
333                                 
334                                 String dme2RouteOffer = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
335                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
336
337                                 String dme2Latitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
338                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
339
340                                 String dme2Longitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
341                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
342
343                                 String dme2EpReadTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
344                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
345
346                                 String dme2EpConnTimeout = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
347                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
348
349                                 String dme2RoundtripTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS
350                                                 + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
351
352                                 String dme2Version = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
353                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
354
355                                 String dme2SubContextPath = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
356                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
357
358                                 String dme2SessionStickinessRequired = properties
359                                                 .getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
360                                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
361                                 
362                                 Map<String,String> dme2AdditionalProps = new HashMap<>();
363                                 
364                                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty())
365                                         dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
366                                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty())
367                                         dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
368                                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty())
369                                         dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
370                                 if (dme2Version != null && !dme2Version.isEmpty())
371                                         dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
372                                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty())
373                                         dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
374                                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty())
375                                         dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
376                                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty())
377                                         dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
378
379                                 if (servers == null || servers.isEmpty()) {
380                                         logger.error("No DMaaP servers or DME2 ServiceName provided");
381                                         continue;
382                                 }
383                                 
384                                 boolean managed = true;
385                                 if (managedString != null && !managedString.isEmpty()) {
386                                         managed = Boolean.parseBoolean(managedString);
387                                 }
388                                 
389                                 String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic +
390                                                 PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
391
392                                 //default is to use HTTP if no https property exists
393                                 boolean useHttps = false;
394                                 if (useHttpsString != null && !useHttpsString.isEmpty()){
395                                         useHttps = Boolean.parseBoolean(useHttpsString);
396                                 }
397                                 
398                                 
399                                 String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic +
400                                                 PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
401
402                                         //default is to disallow self-signed certs 
403                                 boolean allowSelfSignedCerts = false;
404                                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
405                                         allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
406                                 }                               
407                                 
408                                 DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, 
409                                                                                                            apiKey, apiSecret,
410                                                                                                            aafMechId, aafPassword,
411                                                                                                            partitionKey,
412                                                                                                            dme2Environment, dme2AftEnvironment,
413                                                                                                            dme2Partner, dme2Latitude, dme2Longitude,
414                                                                                                            dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts);
415                                 
416                                 newDmaapTopicSinks.add(dmaapTopicSink);
417                         }
418                         return newDmaapTopicSinks;
419                 }
420         }
421         
422         /**
423          * {@inheritDoc}
424          */
425         @Override
426         public void destroy(String topic) 
427                    throws IllegalArgumentException {
428                 
429                 if (topic == null || topic.isEmpty()) {
430                         throw new IllegalArgumentException("A topic must be provided");
431                 }
432                 
433                 DmaapTopicSink dmaapTopicWriter;
434                 synchronized(this) {
435                         if (!dmaapTopicWriters.containsKey(topic)) {
436                                 return;
437                         }
438                         
439                         dmaapTopicWriter = dmaapTopicWriters.remove(topic);
440                 }
441                 
442                 dmaapTopicWriter.shutdown();
443         }
444         
445         /**
446          * {@inheritDoc}
447          */
448         @Override
449         public void destroy() {
450                 List<DmaapTopicSink> writers = this.inventory();
451                 for (DmaapTopicSink writer: writers) {
452                         writer.shutdown();
453                 }
454                 
455                 synchronized(this) {
456                         this.dmaapTopicWriters.clear();
457                 }
458         }
459
460         /**
461          * {@inheritDoc}
462          */
463         @Override
464         public DmaapTopicSink get(String topic) 
465                         throws IllegalArgumentException, IllegalStateException {
466                 
467                 if (topic == null || topic.isEmpty()) {
468                         throw new IllegalArgumentException("A topic must be provided");
469                 }
470                 
471                 synchronized(this) {
472                         if (dmaapTopicWriters.containsKey(topic)) {
473                                 return dmaapTopicWriters.get(topic);
474                         } else {
475                                 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
476                         }
477                 }
478         }
479
480         /**
481          * {@inheritDoc}
482          */
483         @Override
484         public synchronized List<DmaapTopicSink> inventory() {
485                  List<DmaapTopicSink> writers = 
486                                  new ArrayList<DmaapTopicSink>(this.dmaapTopicWriters.values());
487                  return writers;
488         }
489         
490 }