474d4a8082aa68d5e6632292f496221192fba93c
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
28
29 import org.openecomp.policy.drools.event.comm.bus.internal.SingleThreadedUebTopicSource;
30 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
31 import org.openecomp.policy.common.logging.flexlogger.Logger;
32 import org.openecomp.policy.drools.properties.PolicyProperties;
33
34 /**
35  * UEB Topic Source Factory
36  */
37 public interface UebTopicSourceFactory {
38         
39         /**
40          * Creates an UEB Topic Source based on properties files
41          * 
42          * @param properties Properties containing initialization values
43          * 
44          * @return an UEB Topic Source
45          * @throws IllegalArgumentException if invalid parameters are present
46          */
47         public List<UebTopicSource> build(Properties properties)
48                         throws IllegalArgumentException;
49         
50         /**
51          * Instantiates a new UEB Topic Source
52          * 
53          * @param servers list of servers
54          * @param topic topic name
55          * @param apiKey API Key
56          * @param apiSecret API Secret
57          * @param consumerGroup Consumer Group
58          * @param consumerInstance Consumer Instance
59          * @param fetchTimeout Read Fetch Timeout
60          * @param fetchLimit Fetch Limit
61          * @param managed is this source endpoint managed?
62          * 
63          * @return an UEB Topic Source
64          * @throws IllegalArgumentException if invalid parameters are present
65          */
66         public UebTopicSource build(List<String> servers, 
67                                                                 String topic, 
68                                                                 String apiKey, 
69                                                                 String apiSecret, 
70                                                                 String consumerGroup, 
71                                                                 String consumerInstance,
72                                                                 int fetchTimeout,
73                                                                 int fetchLimit,
74                                                                 boolean managed,
75                                                                 boolean useHttps,
76                                                                 boolean allowSelfSignedCerts)
77                         throws IllegalArgumentException;
78         
79         /**
80          * Instantiates a new UEB Topic Source
81          * 
82          * @param servers list of servers
83          * @param topic topic name
84          * @param apiKey API Key
85          * @param apiSecret API Secret
86          * 
87          * @return an UEB Topic Source
88          * @throws IllegalArgumentException if invalid parameters are present
89          */
90         public UebTopicSource build(List<String> servers, 
91                                                                 String topic, 
92                                                                 String apiKey, 
93                                                                 String apiSecret)
94                         throws IllegalArgumentException;
95
96         /**
97          * Instantiates a new UEB Topic Source
98          * 
99          * @param uebTopicSourceType Implementation type
100          * @param servers list of servers
101          * @param topic topic name
102          * 
103          * @return an UEB Topic Source
104          * @throws IllegalArgumentException if invalid parameters are present
105          */
106         public UebTopicSource build(List<String> servers, 
107                                                                 String topic)
108                         throws IllegalArgumentException;        
109         
110         /**
111          * Destroys an UEB Topic Source based on a topic
112          * 
113          * @param topic topic name
114          * @throws IllegalArgumentException if invalid parameters are present
115          */
116         public void destroy(String topic);
117         
118         /**
119          * Destroys all UEB Topic Sources
120          */
121         public void destroy();
122         
123         /**
124          * gets an UEB Topic Source based on topic name
125          * @param topic the topic name
126          * @return an UEB Topic Source with topic name
127          * @throws IllegalArgumentException if an invalid topic is provided
128          * @throws IllegalStateException if the UEB Topic Source is 
129          * an incorrect state
130          */
131         public UebTopicSource get(String topic)
132                    throws IllegalArgumentException, IllegalStateException;
133         
134         /**
135          * Provides a snapshot of the UEB Topic Sources
136          * @return a list of the UEB Topic Sources
137          */
138         public List<UebTopicSource> inventory();
139 }
140
141 /* ------------- implementation ----------------- */
142
143 /**
144  * Factory of UEB Source Topics indexed by topic name
145  */
146 class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
147         // get an instance of logger 
148         private static Logger  logger = FlexLogger.getLogger(IndexedUebTopicSourceFactory.class);       
149         /**
150          * UEB Topic Name Index
151          */
152         protected HashMap<String, UebTopicSource> uebTopicSources =
153                         new HashMap<String, UebTopicSource>();
154
155         /**
156          * {@inheritDoc}
157          */
158         @Override
159         public UebTopicSource build(List<String> servers, 
160                                                                 String topic, 
161                                                                 String apiKey, 
162                                                                 String apiSecret, 
163                                                                 String consumerGroup, 
164                                                                 String consumerInstance,
165                                                                 int fetchTimeout,
166                                                                 int fetchLimit,
167                                                                 boolean managed,
168                                                                 boolean useHttps,
169                                                                 boolean allowSelfSignedCerts) 
170         throws IllegalArgumentException {
171                 if (servers == null || servers.isEmpty()) {
172                         throw new IllegalArgumentException("UEB Server(s) must be provided");
173                 }
174                 
175                 if (topic == null || topic.isEmpty()) {
176                         throw new IllegalArgumentException("A topic must be provided");
177                 }
178                 
179                 synchronized(this) {
180                         if (uebTopicSources.containsKey(topic)) {
181                                 return uebTopicSources.get(topic);
182                         }
183                         
184                         UebTopicSource uebTopicSource = 
185                                         new SingleThreadedUebTopicSource(servers, topic, 
186                                                                                                          apiKey, apiSecret,
187                                                                                                          consumerGroup, consumerInstance, 
188                                                                                                          fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
189                         
190                         if (managed)
191                                 uebTopicSources.put(topic, uebTopicSource);
192                         
193                         return uebTopicSource;
194                 }
195         }
196         
197         /**
198          * {@inheritDoc}
199          */
200         @Override
201         public List<UebTopicSource> build(Properties properties) 
202                         throws IllegalArgumentException {
203                 
204                 String readTopics = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS);
205                 if (readTopics == null || readTopics.isEmpty()) {
206                         logger.warn("No topic for UEB Source " + properties);
207                         return new ArrayList<UebTopicSource>();
208                 }
209                 List<String> readTopicList = new ArrayList<String>(Arrays.asList(readTopics.split("\\s*,\\s*")));               
210                 
211                 List<UebTopicSource> newUebTopicSources = new ArrayList<UebTopicSource>();
212                 synchronized(this) {
213                         for (String topic: readTopicList) {
214                                 if (this.uebTopicSources.containsKey(topic)) {
215                                         newUebTopicSources.add(this.uebTopicSources.get(topic));
216                                         continue;
217                                 }
218                                 
219                                 String servers = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + 
220                                                         topic + 
221                                                         PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
222                                 
223                                 if (servers == null || servers.isEmpty()) {
224                                         logger.error("No UEB servers provided in " + properties);
225                                         continue;
226                                 }
227                                 
228                                 List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
229                                 
230                                 String apiKey = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
231                                                                                    "." + topic + 
232                                                        PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
233                                 
234                                 String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
235                                                                                   "." + topic + 
236                                                           PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
237                                 
238                                 String consumerGroup = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
239                                                                                       "." + topic + 
240                                                               PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
241                                 
242                                 String consumerInstance = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
243                                                                                          "." + topic + 
244                                                                  PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
245                                 
246                                 String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
247                                                                                            "." + topic + 
248                                                                    PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
249                                 int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
250                                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
251                                         try {
252                                                 fetchTimeout = Integer.parseInt(fetchTimeoutString);
253                                         } catch (NumberFormatException nfe) {
254                                                 logger.warn("Fetch Timeout in invalid format for topic " + topic + ": " + fetchTimeoutString);
255                                         }
256                                 }
257                                         
258                                 String fetchLimitString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
259                                                                  "." + topic + 
260                                                                  PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
261                                 int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH;
262                                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
263                                         try {
264                                                 fetchLimit = Integer.parseInt(fetchLimitString);
265                                         } catch (NumberFormatException nfe) {
266                                                 logger.warn("Fetch Limit in invalid format for topic " + topic + ": " + fetchLimitString);
267                                         }
268                                 }
269                                 
270                                 String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." +
271                                                                                       topic + PolicyProperties.PROPERTY_MANAGED_SUFFIX);
272                                 boolean managed = true;
273                                 if (managedString != null && !managedString.isEmpty()) {
274                                         managed = Boolean.parseBoolean(managedString);
275                                 }
276                         
277                                 String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic +
278                                                 PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
279
280                                         //default is to use HTTP if no https property exists
281                                 boolean useHttps = false;
282                                 if (useHttpsString != null && !useHttpsString.isEmpty()){
283                                         useHttps = Boolean.parseBoolean(useHttpsString);
284                                 }
285
286                                 String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic +
287                                                 PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
288
289                                         //default is to disallow self-signed certs 
290                                 boolean allowSelfSignedCerts = false;
291                                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
292                                         allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
293                                 }
294                                 
295                                 UebTopicSource uebTopicSource = this.build(serverList, topic, 
296                                                                                                            apiKey, apiSecret,
297                                                                                                            consumerGroup, consumerInstance, 
298                                                                                                            fetchTimeout, fetchLimit, managed, useHttps, allowSelfSignedCerts);
299                                 newUebTopicSources.add(uebTopicSource);
300                         }
301                 }
302                 return newUebTopicSources;
303         }
304         
305         /**
306          * {@inheritDoc}
307          */
308         @Override
309         public UebTopicSource build(List<String> servers, 
310                                                                 String topic,
311                                                                 String apiKey, 
312                                                                 String apiSecret) {
313                 
314                 return this.build(servers, topic, 
315                                                   apiKey, apiSecret,
316                                                   null, null,
317                                                   UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
318                                                   UebTopicSource.DEFAULT_LIMIT_FETCH, true, false, true);
319         }
320
321         /**
322          * {@inheritDoc}
323          */
324         @Override
325         public UebTopicSource build(List<String> servers, String topic) {
326                 return this.build(servers, topic, null, null);
327         }       
328
329         /**
330          * {@inheritDoc}
331          */
332         @Override
333         public void destroy(String topic) 
334                    throws IllegalArgumentException {
335                 
336                 if (topic == null || topic.isEmpty()) {
337                         throw new IllegalArgumentException("A topic must be provided");
338                 }
339                 
340                 UebTopicSource uebTopicSource;
341                 
342                 synchronized(this) {
343                         if (!uebTopicSources.containsKey(topic)) {
344                                 return;
345                         }
346                         
347                         uebTopicSource = uebTopicSources.remove(topic);
348                 }
349                 
350                 uebTopicSource.shutdown();
351         }
352
353         /**
354          * {@inheritDoc}
355          */
356         @Override
357         public UebTopicSource get(String topic) 
358                throws IllegalArgumentException, IllegalStateException {
359                 
360                 if (topic == null || topic.isEmpty()) {
361                         throw new IllegalArgumentException("A topic must be provided");
362                 }
363                 
364                 synchronized(this) {
365                         if (uebTopicSources.containsKey(topic)) {
366                                 return uebTopicSources.get(topic);
367                         } else {
368                                 throw new IllegalStateException("UebTopiceSource for " + topic + " not found");
369                         }
370                 }
371         }
372
373         @Override
374         public synchronized List<UebTopicSource> inventory() {
375                  List<UebTopicSource> readers = 
376                                  new ArrayList<UebTopicSource>(this.uebTopicSources.values());
377                  return readers;
378         }
379
380         @Override
381         public void destroy() {
382                 List<UebTopicSource> readers = this.inventory();
383                 for (UebTopicSource reader: readers) {
384                         reader.shutdown();
385                 }
386                 
387                 synchronized(this) {
388                         this.uebTopicSources.clear();
389                 }
390         }
391         
392 }