5b4cfd42ac54d2559fee042f008aed4795a3ed53
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
28
29 import org.openecomp.policy.drools.event.comm.bus.internal.InlineDmaapTopicSink;
30 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
31 import org.openecomp.policy.common.logging.flexlogger.Logger;
32 import org.openecomp.policy.drools.properties.PolicyProperties;
33
34 /**
35  * DMAAP Topic Sink Factory
36  */
37 public interface DmaapTopicSinkFactory {
38         
39         /**
40          * Instantiates a new DMAAP Topic Sink
41          * 
42          * @param servers list of servers
43          * @param topic topic name
44          * @param apiKey API Key
45          * @param apiSecret API Secret
46          * @param userName AAF user name
47          * @param password AAF password
48          * @param partitionKey Consumer Group
49          * @param managed is this sink endpoint managed?
50          * 
51          * @return an DMAAP Topic Sink
52          * @throws IllegalArgumentException if invalid parameters are present
53          */
54         public DmaapTopicSink build(List<String> servers, 
55                                                                 String topic, 
56                                                                 String apiKey, 
57                                                                 String apiSecret,
58                                                                 String userName,
59                                                                 String password,
60                                                                 String partitionKey,
61                                                                 boolean managed)
62                         throws IllegalArgumentException;
63         
64         /**
65          * Creates an DMAAP Topic Sink based on properties files
66          * 
67          * @param properties Properties containing initialization values
68          * 
69          * @return an DMAAP Topic Sink
70          * @throws IllegalArgumentException if invalid parameters are present
71          */
72         public List<DmaapTopicSink> build(Properties properties)
73                         throws IllegalArgumentException;
74         
75         /**
76          * Instantiates a new DMAAP Topic Sink
77          * 
78          * @param servers list of servers
79          * @param topic topic name
80          * 
81          * @return an DMAAP Topic Sink
82          * @throws IllegalArgumentException if invalid parameters are present
83          */
84         public DmaapTopicSink build(List<String> servers, String topic)
85                         throws IllegalArgumentException;
86         
87         /**
88          * Destroys an DMAAP Topic Sink based on a topic
89          * 
90          * @param topic topic name
91          * @throws IllegalArgumentException if invalid parameters are present
92          */
93         public void destroy(String topic);
94
95         /**
96          * gets an DMAAP Topic Sink based on topic name
97          * @param topic the topic name
98          * 
99          * @return an DMAAP Topic Sink with topic name
100          * @throws IllegalArgumentException if an invalid topic is provided
101          * @throws IllegalStateException if the DMAAP Topic Reader is 
102          * an incorrect state
103          */
104         public DmaapTopicSink get(String topic)
105                            throws IllegalArgumentException, IllegalStateException;
106         
107         /**
108          * Provides a snapshot of the DMAAP Topic Sinks
109          * @return a list of the DMAAP Topic Sinks
110          */
111         public List<DmaapTopicSink> inventory();
112
113         /**
114          * Destroys all DMAAP Topic Sinks
115          */
116         public void destroy();
117 }
118
119 /* ------------- implementation ----------------- */
120
121 /**
122  * Factory of DMAAP Reader Topics indexed by topic name
123  */
124 class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
125         // get an instance of logger 
126         private static Logger  logger = FlexLogger.getLogger(IndexedDmaapTopicSinkFactory.class);       
127         
128         /**
129          * DMAAP Topic Name Index
130          */
131         protected HashMap<String, DmaapTopicSink> dmaapTopicWriters =
132                         new HashMap<String, DmaapTopicSink>();
133
134         /**
135          * {@inheritDoc}
136          */
137         @Override
138         public DmaapTopicSink build(List<String> servers, 
139                                                                 String topic, 
140                                                                 String apiKey, 
141                                                                 String apiSecret,
142                                                                 String userName,
143                                                                 String password,
144                                                                 String partitionKey,
145                                                                 boolean managed) 
146                         throws IllegalArgumentException {
147                 
148                 if (topic == null || topic.isEmpty()) {
149                         throw new IllegalArgumentException("A topic must be provided");
150                 }
151                 
152                 synchronized (this) {
153                         if (dmaapTopicWriters.containsKey(topic)) {
154                                 return dmaapTopicWriters.get(topic);
155                         }
156                         
157                         DmaapTopicSink dmaapTopicSink = 
158                                         new InlineDmaapTopicSink(servers, topic, 
159                                                                                      apiKey, apiSecret,
160                                                                                      userName, password,
161                                                                                      partitionKey);
162                         
163                         if (managed)
164                                 dmaapTopicWriters.put(topic, dmaapTopicSink);
165                         return dmaapTopicSink;
166                 }
167         }
168         
169
170         /**
171          * {@inheritDoc}
172          */
173         @Override
174         public DmaapTopicSink build(List<String> servers, String topic) throws IllegalArgumentException {
175                 return this.build(servers, topic, null, null, null, null, null, true);
176         }
177         
178
179         /**
180          * {@inheritDoc}
181          */
182         @Override
183         public List<DmaapTopicSink> build(Properties properties) throws IllegalArgumentException {
184                 
185                 String writeTopics = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS);
186                 if (writeTopics == null || writeTopics.isEmpty()) {
187                         logger.warn("No topic for DMAAP Sink " + properties);
188                         return new ArrayList<DmaapTopicSink>();
189                 }
190                 List<String> writeTopicList = new ArrayList<String>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
191                 
192                 synchronized(this) {
193                         List<DmaapTopicSink> dmaapTopicWriters = new ArrayList<DmaapTopicSink>();
194                         for (String topic: writeTopicList) {
195                                 
196                                 String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + 
197                                                                         topic + 
198                                                                         PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
199                                 if (servers == null || servers.isEmpty()) {
200                                         logger.error("No DMAAP servers provided in " + properties);
201                                         continue;
202                                 }
203                                 
204                                 List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
205                                 
206                                 String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
207                                                                                "." + topic + 
208                                                                                PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);          
209                                 String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
210                                                           "." + topic + 
211                                                           PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
212                                 
213                                 String aafMechId = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
214                                                           "." + topic + 
215                                                           PolicyProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
216                                 String aafPassword = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
217                                                                             "." + topic + 
218                                                                             PolicyProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
219                                 
220                                 String partitionKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
221                                                              "." + topic + 
222                                                              PolicyProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
223                                 
224                                 String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic +
225                                                                                       PolicyProperties.PROPERTY_MANAGED_SUFFIX);
226                                 boolean managed = true;
227                                 if (managedString != null && !managedString.isEmpty()) {
228                                         managed = Boolean.parseBoolean(managedString);
229                                 }
230                                 
231                                 DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, 
232                                                                                                            apiKey, apiSecret, aafMechId, aafPassword,
233                                                                                                            partitionKey, managed);
234                                 dmaapTopicWriters.add(dmaapTopicSink);
235                         }
236                         return dmaapTopicWriters;
237                 }
238         }
239         
240         /**
241          * {@inheritDoc}
242          */
243         @Override
244         public void destroy(String topic) 
245                    throws IllegalArgumentException {
246                 
247                 if (topic == null || topic.isEmpty()) {
248                         throw new IllegalArgumentException("A topic must be provided");
249                 }
250                 
251                 DmaapTopicSink dmaapTopicWriter;
252                 synchronized(this) {
253                         if (!dmaapTopicWriters.containsKey(topic)) {
254                                 return;
255                         }
256                         
257                         dmaapTopicWriter = dmaapTopicWriters.remove(topic);
258                 }
259                 
260                 dmaapTopicWriter.shutdown();
261         }
262         
263         /**
264          * {@inheritDoc}
265          */
266         @Override
267         public void destroy() {
268                 List<DmaapTopicSink> writers = this.inventory();
269                 for (DmaapTopicSink writer: writers) {
270                         writer.shutdown();
271                 }
272                 
273                 synchronized(this) {
274                         this.dmaapTopicWriters.clear();
275                 }
276         }
277
278         /**
279          * {@inheritDoc}
280          */
281         @Override
282         public DmaapTopicSink get(String topic) 
283                         throws IllegalArgumentException, IllegalStateException {
284                 
285                 if (topic == null || topic.isEmpty()) {
286                         throw new IllegalArgumentException("A topic must be provided");
287                 }
288                 
289                 synchronized(this) {
290                         if (dmaapTopicWriters.containsKey(topic)) {
291                                 return dmaapTopicWriters.get(topic);
292                         } else {
293                                 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
294                         }
295                 }
296         }
297
298         /**
299          * {@inheritDoc}
300          */
301         @Override
302         public synchronized List<DmaapTopicSink> inventory() {
303                  List<DmaapTopicSink> writers = 
304                                  new ArrayList<DmaapTopicSink>(this.dmaapTopicWriters.values());
305                  return writers;
306         }
307         
308 }