[POLICY-11] New REST APIs to obtain facts info
[policy/drools-pdp.git] / policy-endpoints / src / main / java / org / openecomp / policy / drools / event / comm / bus / internal / BusConsumer.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus.internal;
22
23 import java.net.MalformedURLException;
24 import java.security.GeneralSecurityException;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Properties;
29
30 import org.openecomp.policy.common.logging.eelf.PolicyLogger;
31 import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSinkFactory;
32 import org.openecomp.policy.drools.properties.PolicyProperties;
33
34 import com.att.nsa.cambria.client.CambriaClientBuilders;
35 import com.att.nsa.cambria.client.CambriaConsumer;
36 import com.att.nsa.mr.client.MRClientFactory;
37 import com.att.nsa.mr.client.impl.MRConsumerImpl;
38 import com.att.nsa.mr.client.response.MRConsumerResponse;
39 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
40 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
41
42 /**
43  * Wrapper around libraries to consume from message bus
44  *
45  */
46 public interface BusConsumer {
47         
48         /**
49          * fetch messages
50          * 
51          * @return list of messages
52          * @throws Exception when error encountered by underlying libraries
53          */
54         public Iterable<String> fetch() throws Exception;
55         
56         /**
57          * close underlying library consumer
58          */
59         public void close();
60
61         /**
62          * Cambria based consumer
63          */
64         public static class CambriaConsumerWrapper implements BusConsumer {
65                 /**
66                  * Cambria client
67                  */
68                 protected CambriaConsumer consumer;
69                 
70                 /**
71                  * Cambria Consumer Wrapper
72                  * 
73                  * @param servers messaging bus hosts
74                  * @param topic topic
75                  * @param apiKey API Key
76                  * @param apiSecret API Secret
77                  * @param consumerGroup Consumer Group
78                  * @param consumerInstance Consumer Instance
79                  * @param fetchTimeout Fetch Timeout
80                  * @param fetchLimit Fetch Limit
81                  * @throws GeneralSecurityException 
82                  * @throws MalformedURLException 
83                  */
84                 public CambriaConsumerWrapper(List<String> servers, String topic, 
85                                                                   String apiKey, String apiSecret,
86                                                                   String consumerGroup, String consumerInstance,
87                                                                   int fetchTimeout, int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) 
88                        throws IllegalArgumentException {
89                         
90                         ConsumerBuilder builder = 
91                                         new CambriaClientBuilders.ConsumerBuilder();
92                         
93                         
94                         if (useHttps){
95                                 
96                                 if(useSelfSignedCerts){
97                                         builder.knownAs(consumerGroup, consumerInstance)
98                                         .usingHosts(servers)
99                                         .onTopic(topic)
100                                         .waitAtServer(fetchTimeout)
101                                         .receivingAtMost(fetchLimit)
102                                         .usingHttps()
103                                         .allowSelfSignedCertificates();
104                                 }
105                                 else{
106                                         builder.knownAs(consumerGroup, consumerInstance)
107                                         .usingHosts(servers)
108                                         .onTopic(topic)
109                                         .waitAtServer(fetchTimeout)
110                                         .receivingAtMost(fetchLimit)
111                                         .usingHttps();
112                                 }
113                         }
114                         else{
115                                 builder.knownAs(consumerGroup, consumerInstance)
116                                .usingHosts(servers)
117                                .onTopic(topic)
118                                .waitAtServer(fetchTimeout)
119                                .receivingAtMost(fetchLimit);
120                         }
121                         
122                         if (apiKey != null && !apiKey.isEmpty() &&
123                                 apiSecret != null && !apiSecret.isEmpty()) {
124                                 builder.authenticatedBy(apiKey, apiSecret);
125                         }
126                                         
127                         try {
128                                 this.consumer = builder.build();
129                         } catch (MalformedURLException | GeneralSecurityException e) {
130                                 throw new IllegalArgumentException(e);
131                         }
132                 }
133                 
134                 /**
135                  * {@inheritDoc}
136                  */
137                 public Iterable<String> fetch() throws Exception {
138                         return this.consumer.fetch();
139                 }
140                 
141                 /**
142                  * {@inheritDoc}
143                  */
144                 public void close() {
145                         this.consumer.close();
146                 }
147                 
148                 @Override
149                 public String toString() {
150                         return "CambriaConsumerWrapper []";
151                 }
152         }
153         
154         /**
155          * MR based consumer
156          */
157         public abstract class DmaapConsumerWrapper implements BusConsumer {
158                 
159                 protected int fetchTimeout;
160                 protected Object closeCondition = new Object();
161                         
162                 /**
163                  * MR Consumer
164                  */
165                 protected MRConsumerImpl consumer;
166                 
167                 /**
168                  * MR Consumer Wrapper
169                  * 
170                  * @param servers messaging bus hosts
171                  * @param topic topic
172                  * @param apiKey API Key
173                  * @param apiSecret API Secret
174                  * @param username AAF Login
175                  * @param password AAF Password
176                  * @param consumerGroup Consumer Group
177                  * @param consumerInstance Consumer Instance
178                  * @param fetchTimeout Fetch Timeout
179                  * @param fetchLimit Fetch Limit
180                  * @throws MalformedURLException 
181                  */
182                 public DmaapConsumerWrapper(List<String> servers, String topic, 
183                                                                 String apiKey, String apiSecret,
184                                                                 String username, String password,
185                                                                 String consumerGroup, String consumerInstance,
186                                                                 int fetchTimeout, int fetchLimit, boolean useHttps)                                                                     
187                 throws MalformedURLException {
188                         
189                         this.fetchTimeout = fetchTimeout;
190                         
191                         if (topic == null || topic.isEmpty()) {
192                                 throw new IllegalArgumentException("No topic for DMaaP");
193                         }
194                                         
195                         this.consumer = new MRConsumerImpl(servers, topic, 
196                                                                                            consumerGroup, consumerInstance, 
197                                                                                            fetchTimeout, fetchLimit, 
198                                                                                    null, apiKey, apiSecret);
199                         
200                         this.consumer.setUsername(username);
201                         this.consumer.setPassword(password);
202                 }
203                 
204                 /**
205                  * {@inheritDoc}
206                  */
207                 public Iterable<String> fetch() throws Exception {
208                         MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
209
210                         if (PolicyLogger.isDebugEnabled() && response != null)
211                                 PolicyLogger.debug(DmaapConsumerWrapper.class.getName(), "DMaaP consumer received " + response.getResponseCode() + ": " + response.getResponseMessage());
212
213                         if (response.getResponseCode() == null || !response.getResponseCode().equals("200")) {
214                                 if (response.getResponseCode() == null)
215                                         PolicyLogger.error(DmaapConsumerWrapper.class.getName(), "DMaaP consumer received response code null"); 
216                                 else
217                                         PolicyLogger.error(DmaapConsumerWrapper.class.getName(), "DMaaP consumer received " + response.getResponseCode() + ": " + response.getResponseMessage());
218                                 
219                                 synchronized (closeCondition) {
220                                         closeCondition.wait(fetchTimeout);
221                                 }
222                         }
223                         
224                         if (response.getActualMessages() == null)
225                                 return new ArrayList<String>();
226                         else
227                                 return response.getActualMessages();
228                 }
229                 
230                 /**
231                  * {@inheritDoc}
232                  */
233                 public void close() {
234                         synchronized (closeCondition) {
235                                 closeCondition.notifyAll();
236                         }
237                         
238                         this.consumer.close();
239                 }
240                 
241                 @Override
242                 public String toString() {
243                         StringBuilder builder = new StringBuilder();
244                         builder.
245                         append("DmaapConsumerWrapper [").
246                         append("consumer.getAuthDate()=").append(consumer.getAuthDate()).
247                         append(", consumer.getAuthKey()=").append(consumer.getAuthKey()).
248                         append(", consumer.getHost()=").append(consumer.getHost()).
249                         append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()).
250                         append(", consumer.getUsername()=").append(consumer.getUsername()).
251                         append("]");
252                         return builder.toString();
253                 }
254         }
255
256         /**
257          * MR based consumer
258          */
259         public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
260                 private Properties props;
261                 
262                 /**
263                  * MR Consumer Wrapper
264                  * 
265                  * @param servers messaging bus hosts
266                  * @param topic topic
267                  * @param apiKey API Key
268                  * @param apiSecret API Secret
269                  * @param aafLogin AAF Login
270                  * @param aafPassword AAF Password
271                  * @param consumerGroup Consumer Group
272                  * @param consumerInstance Consumer Instance
273                  * @param fetchTimeout Fetch Timeout
274                  * @param fetchLimit Fetch Limit
275                  * @throws MalformedURLException 
276                  */
277                 public DmaapAafConsumerWrapper(List<String> servers, String topic, 
278                                                                         String apiKey, String apiSecret,
279                                                                         String aafLogin, String aafPassword,
280                                                                         String consumerGroup, String consumerInstance,
281                                                                         int fetchTimeout, int fetchLimit, boolean useHttps) throws MalformedURLException {
282                         
283                         super(servers, topic, apiKey, apiSecret,
284                                                                 aafLogin, aafPassword,
285                                                                 consumerGroup, consumerInstance,
286                                                                 fetchTimeout, fetchLimit, useHttps);
287                         
288                         // super constructor sets servers = {""} if empty to avoid errors when using DME2
289                         if ((servers.size() == 1 && servers.get(0).equals("")) ||
290                                 (servers == null) || (servers.size() == 0)) {
291                                 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
292                         }
293
294                         this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
295
296                         props = new Properties();
297                         
298                         if(useHttps){
299                                 props.setProperty("Protocol", "https");
300                                 this.consumer.setHost(servers.get(0) + ":3905");
301                                 
302                         }
303                         else{
304                                 props.setProperty("Protocol", "http");
305                                 this.consumer.setHost(servers.get(0) + ":3904");
306                         }
307
308                         this.consumer.setProps(props);
309                         PolicyLogger.info(DmaapConsumerWrapper.class.getName(), "CREATION: " + this);
310                 }
311                 
312                 @Override
313                 public String toString() {
314                         StringBuilder builder = new StringBuilder();
315                         MRConsumerImpl consumer = (MRConsumerImpl) this.consumer;
316                         
317                         builder.
318                         append("DmaapConsumerWrapper [").
319                         append("consumer.getAuthDate()=").append(consumer.getAuthDate()).
320                         append(", consumer.getAuthKey()=").append(consumer.getAuthKey()).
321                         append(", consumer.getHost()=").append(consumer.getHost()).
322                         append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()).
323                         append(", consumer.getUsername()=").append(consumer.getUsername()).
324                         append("]");
325                         return builder.toString();
326                 }
327         }
328         
329         public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
330                 private Properties props;
331                 
332                 public DmaapDmeConsumerWrapper(List<String> servers, String topic, 
333                                                                 String apiKey, String apiSecret,
334                                                                 String dme2Login, String dme2Password,
335                                                                 String consumerGroup, String consumerInstance,
336                                                                 int fetchTimeout, int fetchLimit,
337                                                                 String environment, String aftEnvironment, String dme2Partner,
338                                                                 String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) throws MalformedURLException {
339                         
340                         
341                         
342                         super(servers, topic, apiKey, apiSecret,
343                                                                 dme2Login, dme2Password,
344                                                                 consumerGroup, consumerInstance,
345                                                                 fetchTimeout, fetchLimit, useHttps);
346                         
347                         
348                         String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
349                         
350                         if (environment == null || environment.isEmpty()) {
351                                 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
352                                                 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
353                         } if (aftEnvironment == null || aftEnvironment.isEmpty()) {
354                                 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
355                                                 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
356                         } if (latitude == null || latitude.isEmpty()) {
357                                 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
358                                                 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP");
359                         } if (longitude == null || longitude.isEmpty()) {
360                                 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
361                                                 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + " property for DME2 in DMaaP");
362                         } 
363                         
364                         if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
365                                 throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
366                                                 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + 
367                                                 PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
368                         }
369                         
370                         String serviceName = servers.get(0);
371                         
372                         this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
373                         
374                         this.consumer.setUsername(dme2Login);
375                         this.consumer.setPassword(dme2Password);
376                         
377                         props = new Properties();
378                         
379                         props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
380                         
381                         props.setProperty("username", dme2Login);
382                         props.setProperty("password", dme2Password);
383                         
384                         /* These are required, no defaults */
385                         props.setProperty("topic", topic);
386                         
387                         props.setProperty("Environment", environment);
388                         props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
389                         
390                         if (dme2Partner != null)
391                                 props.setProperty("Partner", dme2Partner);
392                         if (dme2RouteOffer != null)
393                                 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
394                         
395                         props.setProperty("Latitude", latitude);
396                         props.setProperty("Longitude", longitude);
397                         
398                         /* These are optional, will default to these values if not set in additionalProps */
399                         props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
400                         props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
401                         props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
402                         props.setProperty("Version", "1.0");
403                         props.setProperty("SubContextPath", "/");
404                         props.setProperty("sessionstickinessrequired", "no");
405                         
406                         /* These should not change */
407                         props.setProperty("TransportType", "DME2");
408                         props.setProperty("MethodType", "GET");
409                         
410                         if(useHttps){
411                                 props.setProperty("Protocol", "https");
412                                 
413                         }
414                         else{
415                                 props.setProperty("Protocol", "http");
416                         }
417                         
418                         props.setProperty("contenttype", "application/json");
419                         
420                         if (additionalProps != null) {
421                                 for(String key : additionalProps.keySet())
422                                         props.put(key, additionalProps.get(key));
423                         }
424                         
425                         MRClientFactory.prop = props;
426                         this.consumer.setProps(props);
427                         
428                         PolicyLogger.info(DmaapConsumerWrapper.class.getName(), "CREATION: " + this);
429                 }
430         }
431 }
432
433
434
435