bf2a403815400c5af5c2157f384109eea2dc5970
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
28
29 import org.openecomp.policy.drools.event.comm.bus.internal.SingleThreadedUebTopicSource;
30 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
31 import org.openecomp.policy.common.logging.flexlogger.Logger;
32 import org.openecomp.policy.drools.properties.PolicyProperties;
33
34 /**
35  * UEB Topic Source Factory
36  */
37 public interface UebTopicSourceFactory {
38         
39         /**
40          * Creates an UEB Topic Source based on properties files
41          * 
42          * @param properties Properties containing initialization values
43          * 
44          * @return an UEB Topic Source
45          * @throws IllegalArgumentException if invalid parameters are present
46          */
47         public List<UebTopicSource> build(Properties properties)
48                         throws IllegalArgumentException;
49         
50         /**
51          * Instantiates a new UEB Topic Source
52          * 
53          * @param servers list of servers
54          * @param topic topic name
55          * @param apiKey API Key
56          * @param apiSecret API Secret
57          * @param consumerGroup Consumer Group
58          * @param consumerInstance Consumer Instance
59          * @param fetchTimeout Read Fetch Timeout
60          * @param fetchLimit Fetch Limit
61          * @param managed is this source endpoint managed?
62          * 
63          * @return an UEB Topic Source
64          * @throws IllegalArgumentException if invalid parameters are present
65          */
66         public UebTopicSource build(List<String> servers, 
67                                                                 String topic, 
68                                                                 String apiKey, 
69                                                                 String apiSecret, 
70                                                                 String consumerGroup, 
71                                                                 String consumerInstance,
72                                                                 int fetchTimeout,
73                                                                 int fetchLimit,
74                                                                 boolean managed)
75                         throws IllegalArgumentException;
76         
77         /**
78          * Instantiates a new UEB Topic Source
79          * 
80          * @param servers list of servers
81          * @param topic topic name
82          * @param apiKey API Key
83          * @param apiSecret API Secret
84          * 
85          * @return an UEB Topic Source
86          * @throws IllegalArgumentException if invalid parameters are present
87          */
88         public UebTopicSource build(List<String> servers, 
89                                                                 String topic, 
90                                                                 String apiKey, 
91                                                                 String apiSecret)
92                         throws IllegalArgumentException;
93
94         /**
95          * Instantiates a new UEB Topic Source
96          * 
97          * @param uebTopicSourceType Implementation type
98          * @param servers list of servers
99          * @param topic topic name
100          * 
101          * @return an UEB Topic Source
102          * @throws IllegalArgumentException if invalid parameters are present
103          */
104         public UebTopicSource build(List<String> servers, 
105                                                                 String topic)
106                         throws IllegalArgumentException;        
107         
108         /**
109          * Destroys an UEB Topic Source based on a topic
110          * 
111          * @param topic topic name
112          * @throws IllegalArgumentException if invalid parameters are present
113          */
114         public void destroy(String topic);
115         
116         /**
117          * Destroys all UEB Topic Sources
118          */
119         public void destroy();
120         
121         /**
122          * gets an UEB Topic Source based on topic name
123          * @param topic the topic name
124          * @return an UEB Topic Source with topic name
125          * @throws IllegalArgumentException if an invalid topic is provided
126          * @throws IllegalStateException if the UEB Topic Source is 
127          * an incorrect state
128          */
129         public UebTopicSource get(String topic)
130                    throws IllegalArgumentException, IllegalStateException;
131         
132         /**
133          * Provides a snapshot of the UEB Topic Sources
134          * @return a list of the UEB Topic Sources
135          */
136         public List<UebTopicSource> inventory();
137 }
138
139 /* ------------- implementation ----------------- */
140
141 /**
142  * Factory of UEB Source Topics indexed by topic name
143  */
144 class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
145         // get an instance of logger 
146         private static Logger  logger = FlexLogger.getLogger(IndexedUebTopicSourceFactory.class);       
147         /**
148          * UEB Topic Name Index
149          */
150         protected HashMap<String, UebTopicSource> uebTopicSources =
151                         new HashMap<String, UebTopicSource>();
152
153         /**
154          * {@inheritDoc}
155          */
156         @Override
157         public UebTopicSource build(List<String> servers, 
158                                                                 String topic, 
159                                                                 String apiKey, 
160                                                                 String apiSecret, 
161                                                                 String consumerGroup, 
162                                                                 String consumerInstance,
163                                                                 int fetchTimeout,
164                                                                 int fetchLimit,
165                                                                 boolean managed) 
166         throws IllegalArgumentException {
167                 
168                 if (topic == null || topic.isEmpty()) {
169                         throw new IllegalArgumentException("A topic must be provided");
170                 }
171                 
172                 synchronized(this) {
173                         if (uebTopicSources.containsKey(topic)) {
174                                 return uebTopicSources.get(topic);
175                         }
176                         
177                         UebTopicSource uebTopicSource = 
178                                         new SingleThreadedUebTopicSource(servers, topic, 
179                                                                                                          apiKey, apiSecret,
180                                                                                                          consumerGroup, consumerInstance, 
181                                                                                                          fetchTimeout, fetchLimit);
182                         
183                         if (managed)
184                                 uebTopicSources.put(topic, uebTopicSource);
185                         
186                         return uebTopicSource;
187                 }
188         }
189         
190         /**
191          * {@inheritDoc}
192          */
193         @Override
194         public List<UebTopicSource> build(Properties properties) 
195                         throws IllegalArgumentException {
196                 
197                 String readTopics = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS);
198                 if (readTopics == null || readTopics.isEmpty()) {
199                         logger.warn("No topic for UEB Source " + properties);
200                         return new ArrayList<UebTopicSource>();
201                 }
202                 List<String> readTopicList = new ArrayList<String>(Arrays.asList(readTopics.split("\\s*,\\s*")));               
203                 
204                 List<UebTopicSource> uebTopicSources = new ArrayList<UebTopicSource>();
205                 synchronized(this) {
206                         for (String topic: readTopicList) {
207                                 
208                                 String servers = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + 
209                                                         topic + 
210                                                         PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
211                                 
212                                 if (servers == null || servers.isEmpty()) {
213                                         logger.error("No UEB servers provided in " + properties);
214                                         continue;
215                                 }
216                                 
217                                 List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
218                                 
219                                 String apiKey = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
220                                                                                    "." + topic + 
221                                                        PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
222                                 
223                                 String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
224                                                                                   "." + topic + 
225                                                           PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
226                                 
227                                 String consumerGroup = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
228                                                                                       "." + topic + 
229                                                               PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
230                                 
231                                 String consumerInstance = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
232                                                                                          "." + topic + 
233                                                                  PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
234                                 
235                                 String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
236                                                                                            "." + topic + 
237                                                                    PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
238                                 int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
239                                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
240                                         try {
241                                                 fetchTimeout = Integer.parseInt(fetchTimeoutString);
242                                         } catch (NumberFormatException nfe) {
243                                                 logger.warn("Fetch Timeout in invalid format for topic " + topic + ": " + fetchTimeoutString);
244                                         }
245                                 }
246                                         
247                                 String fetchLimitString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
248                                                                  "." + topic + 
249                                                                  PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
250                                 int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH;
251                                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
252                                         try {
253                                                 fetchLimit = Integer.parseInt(fetchLimitString);
254                                         } catch (NumberFormatException nfe) {
255                                                 logger.warn("Fetch Limit in invalid format for topic " + topic + ": " + fetchLimitString);
256                                         }
257                                 }
258                                 
259                                 String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." +
260                                                                                       topic + PolicyProperties.PROPERTY_MANAGED_SUFFIX);
261                                 boolean managed = true;
262                                 if (managedString != null && !managedString.isEmpty()) {
263                                         managed = Boolean.parseBoolean(managedString);
264                                 }
265                         
266                                 UebTopicSource uebTopicSource = this.build(serverList, topic, 
267                                                                                                            apiKey, apiSecret,
268                                                                                                            consumerGroup, consumerInstance, 
269                                                                                                            fetchTimeout, fetchLimit, managed);
270                                 uebTopicSources.add(uebTopicSource);
271                         }
272                 }
273                 return uebTopicSources;
274         }
275         
276         /**
277          * {@inheritDoc}
278          */
279         @Override
280         public UebTopicSource build(List<String> servers, 
281                                                                 String topic,
282                                                                 String apiKey, 
283                                                                 String apiSecret) {
284                 return this.build(servers, topic, 
285                                                   apiKey, apiSecret,
286                                                   null, null,
287                                                   UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
288                                                   UebTopicSource.DEFAULT_LIMIT_FETCH, true);
289         }
290
291         /**
292          * {@inheritDoc}
293          */
294         @Override
295         public UebTopicSource build(List<String> servers, String topic) {
296                 return this.build(servers, topic, null, null);
297         }       
298
299         /**
300          * {@inheritDoc}
301          */
302         @Override
303         public void destroy(String topic) 
304                    throws IllegalArgumentException {
305                 
306                 if (topic == null || topic.isEmpty()) {
307                         throw new IllegalArgumentException("A topic must be provided");
308                 }
309                 
310                 UebTopicSource uebTopicSource;
311                 
312                 synchronized(this) {
313                         if (!uebTopicSources.containsKey(topic)) {
314                                 return;
315                         }
316                         
317                         uebTopicSource = uebTopicSources.remove(topic);
318                 }
319                 
320                 uebTopicSource.shutdown();
321         }
322
323         /**
324          * {@inheritDoc}
325          */
326         @Override
327         public UebTopicSource get(String topic) 
328                throws IllegalArgumentException, IllegalStateException {
329                 
330                 if (topic == null || topic.isEmpty()) {
331                         throw new IllegalArgumentException("A topic must be provided");
332                 }
333                 
334                 synchronized(this) {
335                         if (uebTopicSources.containsKey(topic)) {
336                                 return uebTopicSources.get(topic);
337                         } else {
338                                 throw new IllegalStateException("UebTopiceSource for " + topic + " not found");
339                         }
340                 }
341         }
342
343         @Override
344         public synchronized List<UebTopicSource> inventory() {
345                  List<UebTopicSource> readers = 
346                                  new ArrayList<UebTopicSource>(this.uebTopicSources.values());
347                  return readers;
348         }
349
350         @Override
351         public void destroy() {
352                 List<UebTopicSource> readers = this.inventory();
353                 for (UebTopicSource reader: readers) {
354                         reader.shutdown();
355                 }
356                 
357                 synchronized(this) {
358                         this.uebTopicSources.clear();
359                 }
360         }
361         
362 }