f8d85eb7a6a08a1785a4ecc6b0a257efd6ff51bc
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
28
29 import org.openecomp.policy.drools.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
30 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
31 import org.openecomp.policy.common.logging.flexlogger.Logger;
32 import org.openecomp.policy.drools.properties.PolicyProperties;
33 /**
34  * DMAAP Topic Source Factory
35  */
36 public interface DmaapTopicSourceFactory {
37         
38         /**
39          * Creates an DMAAP Topic Source based on properties files
40          * 
41          * @param properties Properties containing initialization values
42          * 
43          * @return an DMAAP Topic Source
44          * @throws IllegalArgumentException if invalid parameters are present
45          */
46         public List<DmaapTopicSource> build(Properties properties)
47                         throws IllegalArgumentException;
48         
49         /**
50          * Instantiates a new DMAAP Topic Source
51          * 
52          * @param servers list of servers
53          * @param topic topic name
54          * @param apiKey API Key
55          * @param apiSecret API Secret
56          * @param userName user name
57          * @param password password
58          * @param consumerGroup Consumer Group
59          * @param consumerInstance Consumer Instance
60          * @param fetchTimeout Read Fetch Timeout
61          * @param fetchLimit Fetch Limit
62          * @param managed is this endpoind managed?
63          * 
64          * @return an DMAAP Topic Source
65          * @throws IllegalArgumentException if invalid parameters are present
66          */
67         public DmaapTopicSource build(List<String> servers, 
68                                                                 String topic, 
69                                                                 String apiKey, 
70                                                                 String apiSecret, 
71                                                                 String userName, 
72                                                                 String password,
73                                                                 String consumerGroup, 
74                                                                 String consumerInstance,
75                                                                 int fetchTimeout,
76                                                                 int fetchLimit,
77                                                                 boolean managed)
78                         throws IllegalArgumentException;
79         
80         /**
81          * Instantiates a new DMAAP Topic Source
82          * 
83          * @param servers list of servers
84          * @param topic topic name
85          * @param apiKey API Key
86          * @param apiSecret API Secret
87          * 
88          * @return an DMAAP Topic Source
89          * @throws IllegalArgumentException if invalid parameters are present
90          */
91         public DmaapTopicSource build(List<String> servers, 
92                                                                 String topic, 
93                                                                 String apiKey, 
94                                                                 String apiSecret)
95                         throws IllegalArgumentException;
96
97         /**
98          * Instantiates a new DMAAP Topic Source
99          * 
100          * @param uebTopicReaderType Implementation type
101          * @param servers list of servers
102          * @param topic topic name
103          * 
104          * @return an DMAAP Topic Source
105          * @throws IllegalArgumentException if invalid parameters are present
106          */
107         public DmaapTopicSource build(List<String> servers, 
108                                                                 String topic)
109                         throws IllegalArgumentException;        
110         
111         /**
112          * Destroys an DMAAP Topic Source based on a topic
113          * 
114          * @param topic topic name
115          * @throws IllegalArgumentException if invalid parameters are present
116          */
117         public void destroy(String topic);
118         
119         /**
120          * Destroys all DMAAP Topic Sources
121          */
122         public void destroy();
123         
124         /**
125          * gets an DMAAP Topic Source based on topic name
126          * @param topic the topic name
127          * @return an DMAAP Topic Source with topic name
128          * @throws IllegalArgumentException if an invalid topic is provided
129          * @throws IllegalStateException if the DMAAP Topic Source is 
130          * an incorrect state
131          */
132         public DmaapTopicSource get(String topic)
133                    throws IllegalArgumentException, IllegalStateException;
134         
135         /**
136          * Provides a snapshot of the DMAAP Topic Sources
137          * @return a list of the DMAAP Topic Sources
138          */
139         public List<DmaapTopicSource> inventory();
140 }
141
142
143 /* ------------- implementation ----------------- */
144
145 /**
146  * Factory of DMAAP Source Topics indexed by topic name
147  */
148
149 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
150         // get an instance of logger 
151         private static Logger  logger = FlexLogger.getLogger(IndexedDmaapTopicSourceFactory.class);             
152         /**
153          * UEB Topic Name Index
154          */
155         protected HashMap<String, DmaapTopicSource> dmaapTopicSources =
156                         new HashMap<String, DmaapTopicSource>();
157
158         /**
159          * {@inheritDoc}
160          */
161         @Override
162         public DmaapTopicSource build(List<String> servers, 
163                                                                 String topic, 
164                                                                 String apiKey, 
165                                                                 String apiSecret, 
166                                                                 String userName, 
167                                                                 String password,
168                                                                 String consumerGroup, 
169                                                                 String consumerInstance,
170                                                                 int fetchTimeout,
171                                                                 int fetchLimit,
172                                                                 boolean managed) 
173                         throws IllegalArgumentException {
174                 
175                 if (topic == null || topic.isEmpty()) {
176                         throw new IllegalArgumentException("A topic must be provided");
177                 }
178                 
179                 synchronized(this) {
180                         if (dmaapTopicSources.containsKey(topic)) {
181                                 return dmaapTopicSources.get(topic);
182                         }
183                         
184                         DmaapTopicSource dmaapTopicSource = 
185                                         new SingleThreadedDmaapTopicSource(servers, topic, 
186                                                                                                          apiKey, apiSecret, userName, password,
187                                                                                                          consumerGroup, consumerInstance, 
188                                                                                                          fetchTimeout, fetchLimit);
189                         
190                         if (managed)
191                                 dmaapTopicSources.put(topic, dmaapTopicSource);
192                         
193                         return dmaapTopicSource;
194                 }
195         }
196         
197         /**
198          * {@inheritDoc}
199          */
200         @Override
201         public List<DmaapTopicSource> build(Properties properties) 
202                         throws IllegalArgumentException {
203                 
204                 String readTopics = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
205                 if (readTopics == null || readTopics.isEmpty()) {
206                         logger.warn("No topic for UEB Source " + properties);
207                         return new ArrayList<DmaapTopicSource>();
208                 }
209                 List<String> readTopicList = new ArrayList<String>(Arrays.asList(readTopics.split("\\s*,\\s*")));               
210                 
211                 List<DmaapTopicSource> dmaapTopicSource_s = new ArrayList<DmaapTopicSource>();
212                 synchronized(this) {
213                         for (String topic: readTopicList) {
214                                 
215                                 String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + 
216                                                         topic + 
217                                                         PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
218                                 
219                                 if (servers == null || servers.isEmpty()) {
220                                         logger.error("No UEB servers provided in " + properties);
221                                         continue;
222                                 }
223                                 
224                                 List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
225                                 
226                                 String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
227                                                                                    "." + topic + 
228                                                        PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
229                                 
230                                 String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
231                                                                                   "." + topic + 
232                                                           PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
233                                 
234                                 String aafMechId = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
235                                                                                           "." + topic + 
236                                                                                           PolicyProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
237
238                                 String aafPassword = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
239                                                                                           "." + topic + 
240                                                                                           PolicyProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
241                                 
242                                 String consumerGroup = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
243                                                                                       "." + topic + 
244                                                               PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
245                                 
246                                 String consumerInstance = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
247                                                                                          "." + topic + 
248                                                                  PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
249                                 
250                                 String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
251                                                                                            "." + topic + 
252                                                                    PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
253                                 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
254                                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
255                                         try {
256                                                 fetchTimeout = Integer.parseInt(fetchTimeoutString);
257                                         } catch (NumberFormatException nfe) {
258                                                 logger.warn("Fetch Timeout in invalid format for topic " + topic + ": " + fetchTimeoutString);
259                                         }
260                                 }
261                                         
262                                 String fetchLimitString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
263                                                                  "." + topic + 
264                                                                  PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
265                                 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
266                                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
267                                         try {
268                                                 fetchLimit = Integer.parseInt(fetchLimitString);
269                                         } catch (NumberFormatException nfe) {
270                                                 logger.warn("Fetch Limit in invalid format for topic " + topic + ": " + fetchLimitString);
271                                         }
272                                 }
273                                 
274                                 String managedString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
275                                                                "." + topic + 
276                                                               PolicyProperties.PROPERTY_MANAGED_SUFFIX);
277                                 boolean managed = true;
278                                 if (managedString != null && !managedString.isEmpty()) {
279                                         managed = Boolean.parseBoolean(managedString);
280                                 }
281                                 
282                                 DmaapTopicSource uebTopicSource = this.build(serverList, topic, 
283                                                                                                            apiKey, apiSecret, aafMechId, aafPassword,
284                                                                                                            consumerGroup, consumerInstance, 
285                                                                                                            fetchTimeout, fetchLimit, managed);
286                                 dmaapTopicSource_s.add(uebTopicSource);
287                         }
288                 }
289                 return dmaapTopicSource_s;
290         }
291         
292         /**
293          * {@inheritDoc}
294          */
295         @Override
296         public DmaapTopicSource build(List<String> servers, 
297                                                                 String topic,
298                                                                 String apiKey, 
299                                                                 String apiSecret) {
300                 return this.build(servers, topic, 
301                                                   apiKey, apiSecret, null, null,
302                                                   null, null,
303                                                   DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
304                                                   DmaapTopicSource.DEFAULT_LIMIT_FETCH,
305                                                   true);
306         }
307
308         /**
309          * {@inheritDoc}
310          */
311         @Override
312         public DmaapTopicSource build(List<String> servers, String topic) {
313                 return this.build(servers, topic, null, null);
314         }       
315
316         /**
317          * {@inheritDoc}
318          */
319         @Override
320         public void destroy(String topic) 
321                    throws IllegalArgumentException {
322                 
323                 if (topic == null || topic.isEmpty()) {
324                         throw new IllegalArgumentException("A topic must be provided");
325                 }
326                 
327                 DmaapTopicSource uebTopicSource;
328                 
329                 synchronized(this) {
330                         if (!dmaapTopicSources.containsKey(topic)) {
331                                 return;
332                         }
333                         
334                         uebTopicSource = dmaapTopicSources.remove(topic);
335                 }
336                 
337                 uebTopicSource.shutdown();
338         }
339
340         /**
341          * {@inheritDoc}
342          */
343         @Override
344         public DmaapTopicSource get(String topic) 
345                throws IllegalArgumentException, IllegalStateException {
346                 
347                 if (topic == null || topic.isEmpty()) {
348                         throw new IllegalArgumentException("A topic must be provided");
349                 }
350                 
351                 synchronized(this) {
352                         if (dmaapTopicSources.containsKey(topic)) {
353                                 return dmaapTopicSources.get(topic);
354                         } else {
355                                 throw new IllegalArgumentException("DmaapTopicSource for " + topic + " not found");
356                         }
357                 }
358         }
359
360         @Override
361         public synchronized List<DmaapTopicSource> inventory() {
362                  List<DmaapTopicSource> readers = 
363                                  new ArrayList<DmaapTopicSource>(this.dmaapTopicSources.values());
364                  return readers;
365         }
366
367         @Override
368         public void destroy() {
369                 List<DmaapTopicSource> readers = this.inventory();
370                 for (DmaapTopicSource reader: readers) {
371                         reader.shutdown();
372                 }
373                 
374                 synchronized(this) {
375                         this.dmaapTopicSources.clear();
376                 }
377         }
378         
379 }
380