77b894db359635e713f50d38aa3319de3cb128b0
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
28
29 import org.openecomp.policy.drools.event.comm.bus.internal.SingleThreadedUebTopicSource;
30 import org.slf4j.LoggerFactory;
31 import org.slf4j.Logger;
32 import org.openecomp.policy.drools.properties.PolicyProperties;
33
34 /**
35  * UEB Topic Source Factory
36  */
37 public interface UebTopicSourceFactory {
38         
39         /**
40          * Creates an UEB Topic Source based on properties files
41          * 
42          * @param properties Properties containing initialization values
43          * 
44          * @return an UEB Topic Source
45          * @throws IllegalArgumentException if invalid parameters are present
46          */
47         public List<UebTopicSource> build(Properties properties)
48                         throws IllegalArgumentException;
49         
50         /**
51          * Instantiates a new UEB Topic Source
52          * 
53          * @param servers list of servers
54          * @param topic topic name
55          * @param apiKey API Key
56          * @param apiSecret API Secret
57          * @param consumerGroup Consumer Group
58          * @param consumerInstance Consumer Instance
59          * @param fetchTimeout Read Fetch Timeout
60          * @param fetchLimit Fetch Limit
61          * @param managed is this source endpoint managed?
62          * 
63          * @return an UEB Topic Source
64          * @throws IllegalArgumentException if invalid parameters are present
65          */
66         public UebTopicSource build(List<String> servers, 
67                                                                 String topic, 
68                                                                 String apiKey, 
69                                                                 String apiSecret, 
70                                                                 String consumerGroup, 
71                                                                 String consumerInstance,
72                                                                 int fetchTimeout,
73                                                                 int fetchLimit,
74                                                                 boolean managed,
75                                                                 boolean useHttps,
76                                                                 boolean allowSelfSignedCerts)
77                         throws IllegalArgumentException;
78         
79         /**
80          * Instantiates a new UEB Topic Source
81          * 
82          * @param servers list of servers
83          * @param topic topic name
84          * @param apiKey API Key
85          * @param apiSecret API Secret
86          * 
87          * @return an UEB Topic Source
88          * @throws IllegalArgumentException if invalid parameters are present
89          */
90         public UebTopicSource build(List<String> servers, 
91                                                                 String topic, 
92                                                                 String apiKey, 
93                                                                 String apiSecret)
94                         throws IllegalArgumentException;
95
96         /**
97          * Instantiates a new UEB Topic Source
98          * 
99          * @param uebTopicSourceType Implementation type
100          * @param servers list of servers
101          * @param topic topic name
102          * 
103          * @return an UEB Topic Source
104          * @throws IllegalArgumentException if invalid parameters are present
105          */
106         public UebTopicSource build(List<String> servers, 
107                                                                 String topic)
108                         throws IllegalArgumentException;        
109         
110         /**
111          * Destroys an UEB Topic Source based on a topic
112          * 
113          * @param topic topic name
114          * @throws IllegalArgumentException if invalid parameters are present
115          */
116         public void destroy(String topic);
117         
118         /**
119          * Destroys all UEB Topic Sources
120          */
121         public void destroy();
122         
123         /**
124          * gets an UEB Topic Source based on topic name
125          * @param topic the topic name
126          * @return an UEB Topic Source with topic name
127          * @throws IllegalArgumentException if an invalid topic is provided
128          * @throws IllegalStateException if the UEB Topic Source is 
129          * an incorrect state
130          */
131         public UebTopicSource get(String topic)
132                    throws IllegalArgumentException, IllegalStateException;
133         
134         /**
135          * Provides a snapshot of the UEB Topic Sources
136          * @return a list of the UEB Topic Sources
137          */
138         public List<UebTopicSource> inventory();
139 }
140
141 /* ------------- implementation ----------------- */
142
143 /**
144  * Factory of UEB Source Topics indexed by topic name
145  */
146 class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
147         /**
148          * Logger
149          */
150         private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class);     
151         
152         /**
153          * UEB Topic Name Index
154          */
155         protected HashMap<String, UebTopicSource> uebTopicSources =
156                         new HashMap<String, UebTopicSource>();
157
158         /**
159          * {@inheritDoc}
160          */
161         @Override
162         public UebTopicSource build(List<String> servers, 
163                                                                 String topic, 
164                                                                 String apiKey, 
165                                                                 String apiSecret, 
166                                                                 String consumerGroup, 
167                                                                 String consumerInstance,
168                                                                 int fetchTimeout,
169                                                                 int fetchLimit,
170                                                                 boolean managed,
171                                                                 boolean useHttps,
172                                                                 boolean allowSelfSignedCerts) 
173         throws IllegalArgumentException {
174                 if (servers == null || servers.isEmpty()) {
175                         throw new IllegalArgumentException("UEB Server(s) must be provided");
176                 }
177                 
178                 if (topic == null || topic.isEmpty()) {
179                         throw new IllegalArgumentException("A topic must be provided");
180                 }
181                 
182                 synchronized(this) {
183                         if (uebTopicSources.containsKey(topic)) {
184                                 return uebTopicSources.get(topic);
185                         }
186                         
187                         UebTopicSource uebTopicSource = 
188                                         new SingleThreadedUebTopicSource(servers, topic, 
189                                                                                                          apiKey, apiSecret,
190                                                                                                          consumerGroup, consumerInstance, 
191                                                                                                          fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
192                         
193                         if (managed)
194                                 uebTopicSources.put(topic, uebTopicSource);
195                         
196                         return uebTopicSource;
197                 }
198         }
199         
200         /**
201          * {@inheritDoc}
202          */
203         @Override
204         public List<UebTopicSource> build(Properties properties) 
205                         throws IllegalArgumentException {
206                 
207                 String readTopics = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS);
208                 if (readTopics == null || readTopics.isEmpty()) {
209                         logger.info("{}: no topic for UEB Source", this);
210                         return new ArrayList<UebTopicSource>();
211                 }
212                 List<String> readTopicList = new ArrayList<String>(Arrays.asList(readTopics.split("\\s*,\\s*")));               
213                 
214                 List<UebTopicSource> newUebTopicSources = new ArrayList<UebTopicSource>();
215                 synchronized(this) {
216                         for (String topic: readTopicList) {
217                                 if (this.uebTopicSources.containsKey(topic)) {
218                                         newUebTopicSources.add(this.uebTopicSources.get(topic));
219                                         continue;
220                                 }
221                                 
222                                 String servers = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + 
223                                                         topic + 
224                                                         PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
225                                 
226                                 if (servers == null || servers.isEmpty()) {
227                                         logger.error("{}: no UEB servers configured for sink {}", this, topic);
228                                         continue;
229                                 }
230                                 
231                                 List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
232                                 
233                                 String apiKey = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
234                                                                                    "." + topic + 
235                                                        PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
236                                 
237                                 String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
238                                                                                   "." + topic + 
239                                                           PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
240                                 
241                                 String consumerGroup = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
242                                                                                       "." + topic + 
243                                                               PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
244                                 
245                                 String consumerInstance = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
246                                                                                          "." + topic + 
247                                                                  PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
248                                 
249                                 String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
250                                                                                            "." + topic + 
251                                                                    PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
252                                 int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
253                                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
254                                         try {
255                                                 fetchTimeout = Integer.parseInt(fetchTimeoutString);
256                                         } catch (NumberFormatException nfe) {
257                                                 logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", 
258                                                                 this, fetchTimeoutString, topic);
259                                         }
260                                 }
261                                         
262                                 String fetchLimitString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
263                                                                  "." + topic + 
264                                                                  PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
265                                 int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH;
266                                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
267                                         try {
268                                                 fetchLimit = Integer.parseInt(fetchLimitString);
269                                         } catch (NumberFormatException nfe) {
270                                                 logger.warn("{}: fetch limit {} is in invalid format for topic {} ", 
271                                                             this, fetchLimitString, topic);
272                                         }
273                                 }
274                                 
275                                 String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." +
276                                                                                       topic + PolicyProperties.PROPERTY_MANAGED_SUFFIX);
277                                 boolean managed = true;
278                                 if (managedString != null && !managedString.isEmpty()) {
279                                         managed = Boolean.parseBoolean(managedString);
280                                 }
281                         
282                                 String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic +
283                                                 PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
284
285                                         //default is to use HTTP if no https property exists
286                                 boolean useHttps = false;
287                                 if (useHttpsString != null && !useHttpsString.isEmpty()){
288                                         useHttps = Boolean.parseBoolean(useHttpsString);
289                                 }
290
291                                 String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic +
292                                                 PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
293
294                                         //default is to disallow self-signed certs 
295                                 boolean allowSelfSignedCerts = false;
296                                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
297                                         allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
298                                 }
299                                 
300                                 UebTopicSource uebTopicSource = this.build(serverList, topic, 
301                                                                                                            apiKey, apiSecret,
302                                                                                                            consumerGroup, consumerInstance, 
303                                                                                                            fetchTimeout, fetchLimit, managed, useHttps, allowSelfSignedCerts);
304                                 newUebTopicSources.add(uebTopicSource);
305                         }
306                 }
307                 return newUebTopicSources;
308         }
309         
310         /**
311          * {@inheritDoc}
312          */
313         @Override
314         public UebTopicSource build(List<String> servers, 
315                                                                 String topic,
316                                                                 String apiKey, 
317                                                                 String apiSecret) {
318                 
319                 return this.build(servers, topic, 
320                                                   apiKey, apiSecret,
321                                                   null, null,
322                                                   UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
323                                                   UebTopicSource.DEFAULT_LIMIT_FETCH, true, false, true);
324         }
325
326         /**
327          * {@inheritDoc}
328          */
329         @Override
330         public UebTopicSource build(List<String> servers, String topic) {
331                 return this.build(servers, topic, null, null);
332         }       
333
334         /**
335          * {@inheritDoc}
336          */
337         @Override
338         public void destroy(String topic) 
339                    throws IllegalArgumentException {
340                 
341                 if (topic == null || topic.isEmpty()) {
342                         throw new IllegalArgumentException("A topic must be provided");
343                 }
344                 
345                 UebTopicSource uebTopicSource;
346                 
347                 synchronized(this) {
348                         if (!uebTopicSources.containsKey(topic)) {
349                                 return;
350                         }
351                         
352                         uebTopicSource = uebTopicSources.remove(topic);
353                 }
354                 
355                 uebTopicSource.shutdown();
356         }
357
358         /**
359          * {@inheritDoc}
360          */
361         @Override
362         public UebTopicSource get(String topic) 
363                throws IllegalArgumentException, IllegalStateException {
364                 
365                 if (topic == null || topic.isEmpty()) {
366                         throw new IllegalArgumentException("A topic must be provided");
367                 }
368                 
369                 synchronized(this) {
370                         if (uebTopicSources.containsKey(topic)) {
371                                 return uebTopicSources.get(topic);
372                         } else {
373                                 throw new IllegalStateException("UebTopiceSource for " + topic + " not found");
374                         }
375                 }
376         }
377
378         @Override
379         public synchronized List<UebTopicSource> inventory() {
380                  List<UebTopicSource> readers = 
381                                  new ArrayList<UebTopicSource>(this.uebTopicSources.values());
382                  return readers;
383         }
384
385         @Override
386         public void destroy() {
387                 List<UebTopicSource> readers = this.inventory();
388                 for (UebTopicSource reader: readers) {
389                         reader.shutdown();
390                 }
391                 
392                 synchronized(this) {
393                         this.uebTopicSources.clear();
394                 }
395         }
396
397         @Override
398         public String toString() {
399                 StringBuilder builder = new StringBuilder();
400                 builder.append("IndexedUebTopicSourceFactory []");
401                 return builder.toString();
402         }
403         
404 }