1729576f967e665b0bef7212d18b73d54800bad8
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
28
29 import org.openecomp.policy.drools.event.comm.bus.internal.SingleThreadedUebTopicSource;
30 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
31 import org.openecomp.policy.common.logging.flexlogger.Logger;
32 import org.openecomp.policy.drools.properties.PolicyProperties;
33
34 /**
35  * UEB Topic Source Factory
36  */
37 public interface UebTopicSourceFactory {
38         
39         /**
40          * Creates an UEB Topic Source based on properties files
41          * 
42          * @param properties Properties containing initialization values
43          * 
44          * @return an UEB Topic Source
45          * @throws IllegalArgumentException if invalid parameters are present
46          */
47         public List<UebTopicSource> build(Properties properties)
48                         throws IllegalArgumentException;
49         
50         /**
51          * Instantiates a new UEB Topic Source
52          * 
53          * @param servers list of servers
54          * @param topic topic name
55          * @param apiKey API Key
56          * @param apiSecret API Secret
57          * @param consumerGroup Consumer Group
58          * @param consumerInstance Consumer Instance
59          * @param fetchTimeout Read Fetch Timeout
60          * @param fetchLimit Fetch Limit
61          * @param managed is this source endpoint managed?
62          * 
63          * @return an UEB Topic Source
64          * @throws IllegalArgumentException if invalid parameters are present
65          */
66         public UebTopicSource build(List<String> servers, 
67                                                                 String topic, 
68                                                                 String apiKey, 
69                                                                 String apiSecret, 
70                                                                 String consumerGroup, 
71                                                                 String consumerInstance,
72                                                                 int fetchTimeout,
73                                                                 int fetchLimit,
74                                                                 boolean managed,
75                                                                 boolean useHttps,
76                                                                 boolean allowSelfSignedCerts)
77                         throws IllegalArgumentException;
78         
79         /**
80          * Instantiates a new UEB Topic Source
81          * 
82          * @param servers list of servers
83          * @param topic topic name
84          * @param apiKey API Key
85          * @param apiSecret API Secret
86          * 
87          * @return an UEB Topic Source
88          * @throws IllegalArgumentException if invalid parameters are present
89          */
90         public UebTopicSource build(List<String> servers, 
91                                                                 String topic, 
92                                                                 String apiKey, 
93                                                                 String apiSecret)
94                         throws IllegalArgumentException;
95
96         /**
97          * Instantiates a new UEB Topic Source
98          * 
99          * @param uebTopicSourceType Implementation type
100          * @param servers list of servers
101          * @param topic topic name
102          * 
103          * @return an UEB Topic Source
104          * @throws IllegalArgumentException if invalid parameters are present
105          */
106         public UebTopicSource build(List<String> servers, 
107                                                                 String topic)
108                         throws IllegalArgumentException;        
109         
110         /**
111          * Destroys an UEB Topic Source based on a topic
112          * 
113          * @param topic topic name
114          * @throws IllegalArgumentException if invalid parameters are present
115          */
116         public void destroy(String topic);
117         
118         /**
119          * Destroys all UEB Topic Sources
120          */
121         public void destroy();
122         
123         /**
124          * gets an UEB Topic Source based on topic name
125          * @param topic the topic name
126          * @return an UEB Topic Source with topic name
127          * @throws IllegalArgumentException if an invalid topic is provided
128          * @throws IllegalStateException if the UEB Topic Source is 
129          * an incorrect state
130          */
131         public UebTopicSource get(String topic)
132                    throws IllegalArgumentException, IllegalStateException;
133         
134         /**
135          * Provides a snapshot of the UEB Topic Sources
136          * @return a list of the UEB Topic Sources
137          */
138         public List<UebTopicSource> inventory();
139 }
140
141 /* ------------- implementation ----------------- */
142
143 /**
144  * Factory of UEB Source Topics indexed by topic name
145  */
146 class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
147         // get an instance of logger 
148         private static Logger  logger = FlexLogger.getLogger(IndexedUebTopicSourceFactory.class);       
149         /**
150          * UEB Topic Name Index
151          */
152         protected HashMap<String, UebTopicSource> uebTopicSources =
153                         new HashMap<String, UebTopicSource>();
154
155         /**
156          * {@inheritDoc}
157          */
158         @Override
159         public UebTopicSource build(List<String> servers, 
160                                                                 String topic, 
161                                                                 String apiKey, 
162                                                                 String apiSecret, 
163                                                                 String consumerGroup, 
164                                                                 String consumerInstance,
165                                                                 int fetchTimeout,
166                                                                 int fetchLimit,
167                                                                 boolean managed,
168                                                                 boolean useHttps,
169                                                                 boolean allowSelfSignedCerts) 
170         throws IllegalArgumentException {
171                 if (servers == null || servers.isEmpty()) {
172                         throw new IllegalArgumentException("UEB Server(s) must be provided");
173                 }
174                 
175                 if (topic == null || topic.isEmpty()) {
176                         throw new IllegalArgumentException("A topic must be provided");
177                 }
178                 
179                 synchronized(this) {
180                         if (uebTopicSources.containsKey(topic)) {
181                                 return uebTopicSources.get(topic);
182                         }
183                         
184                         UebTopicSource uebTopicSource = 
185                                         new SingleThreadedUebTopicSource(servers, topic, 
186                                                                                                          apiKey, apiSecret,
187                                                                                                          consumerGroup, consumerInstance, 
188                                                                                                          fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
189                         
190                         if (managed)
191                                 uebTopicSources.put(topic, uebTopicSource);
192                         
193                         return uebTopicSource;
194                 }
195         }
196         
197         /**
198          * {@inheritDoc}
199          */
200         @Override
201         public List<UebTopicSource> build(Properties properties) 
202                         throws IllegalArgumentException {
203                 
204                 String readTopics = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS);
205                 if (readTopics == null || readTopics.isEmpty()) {
206                         logger.warn("No topic for UEB Source " + properties);
207                         return new ArrayList<UebTopicSource>();
208                 }
209                 List<String> readTopicList = new ArrayList<String>(Arrays.asList(readTopics.split("\\s*,\\s*")));               
210                 
211                 List<UebTopicSource> uebTopicSources = new ArrayList<UebTopicSource>();
212                 synchronized(this) {
213                         for (String topic: readTopicList) {
214                                 
215                                 String servers = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + 
216                                                         topic + 
217                                                         PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
218                                 
219                                 if (servers == null || servers.isEmpty()) {
220                                         logger.error("No UEB servers provided in " + properties);
221                                         continue;
222                                 }
223                                 
224                                 List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
225                                 
226                                 String apiKey = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
227                                                                                    "." + topic + 
228                                                        PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
229                                 
230                                 String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
231                                                                                   "." + topic + 
232                                                           PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
233                                 
234                                 String consumerGroup = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
235                                                                                       "." + topic + 
236                                                               PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
237                                 
238                                 String consumerInstance = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
239                                                                                          "." + topic + 
240                                                                  PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
241                                 
242                                 String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
243                                                                                            "." + topic + 
244                                                                    PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
245                                 int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
246                                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
247                                         try {
248                                                 fetchTimeout = Integer.parseInt(fetchTimeoutString);
249                                         } catch (NumberFormatException nfe) {
250                                                 logger.warn("Fetch Timeout in invalid format for topic " + topic + ": " + fetchTimeoutString);
251                                         }
252                                 }
253                                         
254                                 String fetchLimitString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
255                                                                  "." + topic + 
256                                                                  PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
257                                 int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH;
258                                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
259                                         try {
260                                                 fetchLimit = Integer.parseInt(fetchLimitString);
261                                         } catch (NumberFormatException nfe) {
262                                                 logger.warn("Fetch Limit in invalid format for topic " + topic + ": " + fetchLimitString);
263                                         }
264                                 }
265                                 
266                                 String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." +
267                                                                                       topic + PolicyProperties.PROPERTY_MANAGED_SUFFIX);
268                                 boolean managed = true;
269                                 if (managedString != null && !managedString.isEmpty()) {
270                                         managed = Boolean.parseBoolean(managedString);
271                                 }
272                         
273                                 String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic +
274                                                 PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
275
276                                         //default is to use HTTP if no https property exists
277                                 boolean useHttps = false;
278                                 if (useHttpsString != null && !useHttpsString.isEmpty()){
279                                         useHttps = Boolean.parseBoolean(useHttpsString);
280                                 }
281
282                                 String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic +
283                                                 PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
284
285                                         //default is to disallow self-signed certs 
286                                 boolean allowSelfSignedCerts = false;
287                                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
288                                         allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
289                                 }
290                                 
291                                 UebTopicSource uebTopicSource = this.build(serverList, topic, 
292                                                                                                            apiKey, apiSecret,
293                                                                                                            consumerGroup, consumerInstance, 
294                                                                                                            fetchTimeout, fetchLimit, managed, useHttps, allowSelfSignedCerts);
295                                 uebTopicSources.add(uebTopicSource);
296                         }
297                 }
298                 return uebTopicSources;
299         }
300         
301         /**
302          * {@inheritDoc}
303          */
304         @Override
305         public UebTopicSource build(List<String> servers, 
306                                                                 String topic,
307                                                                 String apiKey, 
308                                                                 String apiSecret) {
309                 
310                 return this.build(servers, topic, 
311                                                   apiKey, apiSecret,
312                                                   null, null,
313                                                   UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
314                                                   UebTopicSource.DEFAULT_LIMIT_FETCH, true, false, true);
315         }
316
317         /**
318          * {@inheritDoc}
319          */
320         @Override
321         public UebTopicSource build(List<String> servers, String topic) {
322                 return this.build(servers, topic, null, null);
323         }       
324
325         /**
326          * {@inheritDoc}
327          */
328         @Override
329         public void destroy(String topic) 
330                    throws IllegalArgumentException {
331                 
332                 if (topic == null || topic.isEmpty()) {
333                         throw new IllegalArgumentException("A topic must be provided");
334                 }
335                 
336                 UebTopicSource uebTopicSource;
337                 
338                 synchronized(this) {
339                         if (!uebTopicSources.containsKey(topic)) {
340                                 return;
341                         }
342                         
343                         uebTopicSource = uebTopicSources.remove(topic);
344                 }
345                 
346                 uebTopicSource.shutdown();
347         }
348
349         /**
350          * {@inheritDoc}
351          */
352         @Override
353         public UebTopicSource get(String topic) 
354                throws IllegalArgumentException, IllegalStateException {
355                 
356                 if (topic == null || topic.isEmpty()) {
357                         throw new IllegalArgumentException("A topic must be provided");
358                 }
359                 
360                 synchronized(this) {
361                         if (uebTopicSources.containsKey(topic)) {
362                                 return uebTopicSources.get(topic);
363                         } else {
364                                 throw new IllegalStateException("UebTopiceSource for " + topic + " not found");
365                         }
366                 }
367         }
368
369         @Override
370         public synchronized List<UebTopicSource> inventory() {
371                  List<UebTopicSource> readers = 
372                                  new ArrayList<UebTopicSource>(this.uebTopicSources.values());
373                  return readers;
374         }
375
376         @Override
377         public void destroy() {
378                 List<UebTopicSource> readers = this.inventory();
379                 for (UebTopicSource reader: readers) {
380                         reader.shutdown();
381                 }
382                 
383                 synchronized(this) {
384                         this.uebTopicSources.clear();
385                 }
386         }
387         
388 }