5fb3aedb5968a5745dc9422eae48b2efd29aba1b
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.common.endpoints.event.comm.bus.internal;
24
25 import com.att.nsa.cambria.client.CambriaClientBuilders;
26 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
27 import com.att.nsa.cambria.client.CambriaConsumer;
28 import java.io.IOException;
29 import java.net.MalformedURLException;
30 import java.security.GeneralSecurityException;
31 import java.util.ArrayList;
32 import java.util.Map;
33 import java.util.Properties;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import org.apache.commons.lang3.StringUtils;
37 import org.onap.dmaap.mr.client.MRClientFactory;
38 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
39 import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
40 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
41 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
42 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * Wrapper around libraries to consume from message bus.
48  */
49 public interface BusConsumer {
50
51     /**
52      * fetch messages.
53      *
54      * @return list of messages
55      * @throws IOException when error encountered by underlying libraries
56      */
57     public Iterable<String> fetch() throws IOException;
58
59     /**
60      * close underlying library consumer.
61      */
62     public void close();
63
64     /**
65      * Cambria based consumer.
66      */
67     public static class CambriaConsumerWrapper implements BusConsumer {
68
69         /**
70          * logger.
71          */
72         private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
73
74         /**
75          * Used to build the consumer.
76          */
77         private final ConsumerBuilder builder;
78
79         /**
80          * Cambria client.
81          */
82         private final CambriaConsumer consumer;
83
84         /**
85          * fetch timeout.
86          */
87         protected int fetchTimeout;
88
89         /**
90          * close condition.
91          */
92         protected CountDownLatch closeCondition = new CountDownLatch(1);
93
94         /**
95          * Cambria Consumer Wrapper.
96          * BusTopicParam object contains the following parameters
97          * servers messaging bus hosts.
98          * topic topic
99          * apiKey API Key
100          * apiSecret API Secret
101          * consumerGroup Consumer Group
102          * consumerInstance Consumer Instance
103          * fetchTimeout Fetch Timeout
104          * fetchLimit Fetch Limit
105          *
106          * @param busTopicParams - The parameters for the bus topic
107          * @throws GeneralSecurityException - Security exception
108          * @throws MalformedURLException - Malformed URL exception
109          */
110         public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
111
112             this.fetchTimeout = busTopicParams.getFetchTimeout();
113
114             this.builder = new CambriaClientBuilders.ConsumerBuilder();
115
116             builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
117                     .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
118                     .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
119
120             // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
121             builder.withSocketTimeout(fetchTimeout + 30000);
122
123             if (busTopicParams.isUseHttps()) {
124                 builder.usingHttps();
125
126                 if (busTopicParams.isAllowSelfSignedCerts()) {
127                     builder.allowSelfSignedCertificates();
128                 }
129             }
130
131             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
132                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
133             }
134
135             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
136                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
137             }
138
139             try {
140                 this.consumer = builder.build();
141             } catch (MalformedURLException | GeneralSecurityException e) {
142                 throw new IllegalArgumentException(e);
143             }
144         }
145
146         @Override
147         public Iterable<String> fetch() throws IOException {
148             try {
149                 return this.consumer.fetch();
150             } catch (final IOException e) { //NOSONAR
151                 logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
152                         this.fetchTimeout);
153                 sleepAfterFetchFailure();
154                 throw e;
155             }
156         }
157
158         private void sleepAfterFetchFailure() {
159             try {
160                 this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
161
162             } catch (InterruptedException e) {
163                 logger.warn("{}: interrupted while handling fetch error", this, e);
164                 Thread.currentThread().interrupt();
165             }
166         }
167
168         @Override
169         public void close() {
170             this.closeCondition.countDown();
171             this.consumer.close();
172         }
173
174         @Override
175         public String toString() {
176             return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
177         }
178     }
179
180     /**
181      * MR based consumer.
182      */
183     public abstract class DmaapConsumerWrapper implements BusConsumer {
184
185         /**
186          * logger.
187          */
188         private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
189
190         /**
191          * Name of the "protocol" property.
192          */
193         protected static final String PROTOCOL_PROP = "Protocol";
194
195         /**
196          * fetch timeout.
197          */
198         protected int fetchTimeout;
199
200         /**
201          * close condition.
202          */
203         protected CountDownLatch closeCondition = new CountDownLatch(1);
204
205         /**
206          * MR Consumer.
207          */
208         protected MRConsumerImpl consumer;
209
210         /**
211          * MR Consumer Wrapper.
212          *
213          * <p>servers          messaging bus hosts
214          * topic            topic
215          * apiKey           API Key
216          * apiSecret        API Secret
217          * username         AAF Login
218          * password         AAF Password
219          * consumerGroup    Consumer Group
220          * consumerInstance Consumer Instance
221          * fetchTimeout     Fetch Timeout
222          * fetchLimit       Fetch Limit
223          *
224          * @param busTopicParams contains above listed attributes
225          * @throws MalformedURLException URL should be valid
226          */
227         protected DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
228
229             this.fetchTimeout = busTopicParams.getFetchTimeout();
230
231             if (busTopicParams.isTopicInvalid()) {
232                 throw new IllegalArgumentException("No topic for DMaaP");
233             }
234
235             this.consumer = new MRConsumerImplBuilder()
236                             .setHostPart(busTopicParams.getServers())
237                             .setTopic(busTopicParams.getTopic())
238                             .setConsumerGroup(busTopicParams.getConsumerGroup())
239                             .setConsumerId(busTopicParams.getConsumerInstance())
240                             .setTimeoutMs(busTopicParams.getFetchTimeout())
241                             .setLimit(busTopicParams.getFetchLimit())
242                             .setApiKey(busTopicParams.getApiKey())
243                             .setApiSecret(busTopicParams.getApiSecret())
244                             .createMRConsumerImpl();
245
246             this.consumer.setUsername(busTopicParams.getUserName());
247             this.consumer.setPassword(busTopicParams.getPassword());
248         }
249
250         @Override
251         public Iterable<String> fetch() throws IOException {
252             final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
253             if (response == null) {
254                 logger.warn("{}: DMaaP NULL response received", this);
255
256                 sleepAfterFetchFailure();
257                 return new ArrayList<>();
258             } else {
259                 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
260                         response.getResponseMessage());
261
262                 if (!"200".equals(response.getResponseCode())) {
263
264                     logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
265                             response.getResponseMessage());
266
267                     sleepAfterFetchFailure();
268
269                     /* fall through */
270                 }
271             }
272
273             if (response.getActualMessages() == null) {
274                 return new ArrayList<>();
275             } else {
276                 return response.getActualMessages();
277             }
278         }
279
280         private void sleepAfterFetchFailure() {
281             try {
282                 this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
283
284             } catch (InterruptedException e) {
285                 logger.warn("{}: interrupted while handling fetch error", this, e);
286                 Thread.currentThread().interrupt();
287             }
288         }
289
290         @Override
291         public void close() {
292             this.closeCondition.countDown();
293             this.consumer.close();
294         }
295
296         @Override
297         public String toString() {
298             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
299                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
300                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
301                     + consumer.getUsername() + "]";
302         }
303     }
304
305     /**
306      * MR based consumer.
307      */
308     public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
309
310         private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
311
312         private final Properties props;
313
314         /**
315          * BusTopicParams contain the following parameters.
316          * MR Consumer Wrapper.
317          *
318          * <p>servers messaging bus hosts
319          * topic topic
320          * apiKey API Key
321          * apiSecret API Secret
322          * aafLogin AAF Login
323          * aafPassword AAF Password
324          * consumerGroup Consumer Group
325          * consumerInstance Consumer Instance
326          * fetchTimeout Fetch Timeout
327          * fetchLimit Fetch Limit
328          *
329          * @param busTopicParams contains above listed params
330          * @throws MalformedURLException URL should be valid
331          */
332         public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
333
334             super(busTopicParams);
335
336             // super constructor sets servers = {""} if empty to avoid errors when using DME2
337             if (busTopicParams.isServersInvalid()) {
338                 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
339             }
340
341             this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
342
343             props = new Properties();
344
345             if (busTopicParams.isUseHttps()) {
346                 props.setProperty(PROTOCOL_PROP, "https");
347                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
348
349             } else {
350                 props.setProperty(PROTOCOL_PROP, "http");
351                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
352             }
353
354             this.consumer.setProps(props);
355             logger.info("{}: CREATION", this);
356         }
357
358         @Override
359         public String toString() {
360             final MRConsumerImpl consumer = this.consumer;
361
362             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
363                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
364                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
365                     + consumer.getUsername() + "]";
366         }
367     }
368
369     public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
370
371         private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
372
373         private final Properties props;
374
375         /**
376          * Constructor.
377          *
378          * @param busTopicParams topic paramters
379          *
380          * @throws MalformedURLException must provide a valid URL
381          */
382         public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
383
384
385             super(busTopicParams);
386
387
388             final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
389                             ? busTopicParams.getAdditionalProps().get(
390                                             PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
391                             : null);
392
393             if (busTopicParams.isEnvironmentInvalid()) {
394                 throw parmException(busTopicParams.getTopic(),
395                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
396             }
397             if (busTopicParams.isAftEnvironmentInvalid()) {
398                 throw parmException(busTopicParams.getTopic(),
399                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
400             }
401             if (busTopicParams.isLatitudeInvalid()) {
402                 throw parmException(busTopicParams.getTopic(),
403                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
404             }
405             if (busTopicParams.isLongitudeInvalid()) {
406                 throw parmException(busTopicParams.getTopic(),
407                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
408             }
409
410             if ((busTopicParams.isPartnerInvalid())
411                     && StringUtils.isBlank(dme2RouteOffer)) {
412                 throw new IllegalArgumentException(
413                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
414                                 + "." + busTopicParams.getTopic()
415                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
416                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
417                                 + busTopicParams.getTopic()
418                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
419             }
420
421             final String serviceName = busTopicParams.getServers().get(0);
422
423             this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
424
425             this.consumer.setUsername(busTopicParams.getUserName());
426             this.consumer.setPassword(busTopicParams.getPassword());
427
428             props = new Properties();
429
430             props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
431
432             props.setProperty("username", busTopicParams.getUserName());
433             props.setProperty("password", busTopicParams.getPassword());
434
435             /* These are required, no defaults */
436             props.setProperty("topic", busTopicParams.getTopic());
437
438             props.setProperty("Environment", busTopicParams.getEnvironment());
439             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
440
441             if (busTopicParams.getPartner() != null) {
442                 props.setProperty("Partner", busTopicParams.getPartner());
443             }
444             if (dme2RouteOffer != null) {
445                 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
446             }
447
448             props.setProperty("Latitude", busTopicParams.getLatitude());
449             props.setProperty("Longitude", busTopicParams.getLongitude());
450
451             /* These are optional, will default to these values if not set in additionalProps */
452             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
453             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
454             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
455             props.setProperty("Version", "1.0");
456             props.setProperty("SubContextPath", "/");
457             props.setProperty("sessionstickinessrequired", "no");
458
459             /* These should not change */
460             props.setProperty("TransportType", "DME2");
461             props.setProperty("MethodType", "GET");
462
463             if (busTopicParams.isUseHttps()) {
464                 props.setProperty(PROTOCOL_PROP, "https");
465
466             } else {
467                 props.setProperty(PROTOCOL_PROP, "http");
468             }
469
470             props.setProperty("contenttype", "application/json");
471
472             if (busTopicParams.isAdditionalPropsValid()) {
473                 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
474                     props.put(entry.getKey(), entry.getValue());
475                 }
476             }
477
478             MRClientFactory.prop = props;
479             this.consumer.setProps(props);
480
481             logger.info("{}: CREATION", this);
482         }
483
484         private IllegalArgumentException parmException(String topic, String propnm) {
485             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
486                     + topic + propnm + " property for DME2 in DMaaP");
487
488         }
489     }
490 }
491
492