20f4c91c5ef1591dfc9d3fb3bf1fb1e61c507548
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.common.endpoints.event.comm.bus.internal;
24
25 import com.att.nsa.cambria.client.CambriaClientBuilders;
26 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
27 import com.att.nsa.cambria.client.CambriaConsumer;
28 import java.io.IOException;
29 import java.net.MalformedURLException;
30 import java.security.GeneralSecurityException;
31 import java.util.ArrayList;
32 import java.util.Map;
33 import java.util.Properties;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import lombok.Getter;
37 import org.apache.commons.lang3.StringUtils;
38 import org.onap.dmaap.mr.client.MRClientFactory;
39 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
40 import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
41 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
42 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
43 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * Wrapper around libraries to consume from message bus.
49  */
50 public interface BusConsumer {
51
52     /**
53      * fetch messages.
54      *
55      * @return list of messages
56      * @throws IOException when error encountered by underlying libraries
57      */
58     public Iterable<String> fetch() throws IOException;
59
60     /**
61      * close underlying library consumer.
62      */
63     public void close();
64
65     /**
66      * Consumer that handles fetch() failures by sleeping.
67      */
68     public abstract static class FetchingBusConsumer implements BusConsumer {
69         private static Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
70
71         /**
72          * Fetch timeout.
73          */
74         protected int fetchTimeout;
75
76         /**
77          * Time to sleep on a fetch failure.
78          */
79         @Getter
80         private final int sleepTime;
81
82         /**
83          * Counted down when {@link #close()} is invoked.
84          */
85         private final CountDownLatch closeCondition = new CountDownLatch(1);
86
87
88         /**
89          * Constructs the object.
90          *
91          * @param busTopicParams parameters for the bus topic
92          */
93         protected FetchingBusConsumer(BusTopicParams busTopicParams) {
94             this.fetchTimeout = busTopicParams.getFetchTimeout();
95
96             if (this.fetchTimeout <= 0) {
97                 this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH;
98             } else {
99                 // don't sleep too long, even if fetch timeout is large
100                 this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
101             }
102         }
103
104         /**
105          * Causes the thread to sleep; invoked after fetch() fails.  If the consumer is closed,
106          * or the thread is interrupted, then this will return immediately.
107          */
108         protected void sleepAfterFetchFailure() {
109             try {
110                 logger.info("{}: backoff for {}ms", this, sleepTime);
111                 if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
112                     logger.info("{}: closed while handling fetch error", this);
113                 }
114
115             } catch (InterruptedException e) {
116                 logger.warn("{}: interrupted while handling fetch error", this, e);
117                 Thread.currentThread().interrupt();
118             }
119         }
120
121         @Override
122         public void close() {
123             this.closeCondition.countDown();
124         }
125     }
126
127     /**
128      * Cambria based consumer.
129      */
130     public static class CambriaConsumerWrapper extends FetchingBusConsumer {
131
132         /**
133          * logger.
134          */
135         private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
136
137         /**
138          * Used to build the consumer.
139          */
140         private final ConsumerBuilder builder;
141
142         /**
143          * Cambria client.
144          */
145         private final CambriaConsumer consumer;
146
147         /**
148          * Cambria Consumer Wrapper.
149          * BusTopicParam object contains the following parameters
150          * servers messaging bus hosts.
151          * topic topic
152          * apiKey API Key
153          * apiSecret API Secret
154          * consumerGroup Consumer Group
155          * consumerInstance Consumer Instance
156          * fetchTimeout Fetch Timeout
157          * fetchLimit Fetch Limit
158          *
159          * @param busTopicParams - The parameters for the bus topic
160          * @throws GeneralSecurityException - Security exception
161          * @throws MalformedURLException - Malformed URL exception
162          */
163         public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
164             super(busTopicParams);
165
166             this.builder = new CambriaClientBuilders.ConsumerBuilder();
167
168             builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
169                     .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
170                     .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
171
172             // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
173             builder.withSocketTimeout(fetchTimeout + 30000);
174
175             if (busTopicParams.isUseHttps()) {
176                 builder.usingHttps();
177
178                 if (busTopicParams.isAllowSelfSignedCerts()) {
179                     builder.allowSelfSignedCertificates();
180                 }
181             }
182
183             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
184                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
185             }
186
187             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
188                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
189             }
190
191             try {
192                 this.consumer = builder.build();
193             } catch (MalformedURLException | GeneralSecurityException e) {
194                 throw new IllegalArgumentException(e);
195             }
196         }
197
198         @Override
199         public Iterable<String> fetch() throws IOException {
200             try {
201                 return this.consumer.fetch();
202             } catch (final IOException e) { //NOSONAR
203                 logger.error("{}: cannot fetch because of {}", this, e.getMessage());
204                 sleepAfterFetchFailure();
205                 throw e;
206             }
207         }
208
209         @Override
210         public void close() {
211             super.close();
212             this.consumer.close();
213         }
214
215         @Override
216         public String toString() {
217             return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
218         }
219     }
220
221     /**
222      * MR based consumer.
223      */
224     public abstract class DmaapConsumerWrapper extends FetchingBusConsumer {
225
226         /**
227          * logger.
228          */
229         private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
230
231         /**
232          * Name of the "protocol" property.
233          */
234         protected static final String PROTOCOL_PROP = "Protocol";
235
236         /**
237          * MR Consumer.
238          */
239         protected MRConsumerImpl consumer;
240
241         /**
242          * MR Consumer Wrapper.
243          *
244          * <p>servers          messaging bus hosts
245          * topic            topic
246          * apiKey           API Key
247          * apiSecret        API Secret
248          * username         AAF Login
249          * password         AAF Password
250          * consumerGroup    Consumer Group
251          * consumerInstance Consumer Instance
252          * fetchTimeout     Fetch Timeout
253          * fetchLimit       Fetch Limit
254          *
255          * @param busTopicParams contains above listed attributes
256          * @throws MalformedURLException URL should be valid
257          */
258         protected DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
259             super(busTopicParams);
260
261             if (busTopicParams.isTopicInvalid()) {
262                 throw new IllegalArgumentException("No topic for DMaaP");
263             }
264
265             this.consumer = new MRConsumerImplBuilder()
266                             .setHostPart(busTopicParams.getServers())
267                             .setTopic(busTopicParams.getTopic())
268                             .setConsumerGroup(busTopicParams.getConsumerGroup())
269                             .setConsumerId(busTopicParams.getConsumerInstance())
270                             .setTimeoutMs(busTopicParams.getFetchTimeout())
271                             .setLimit(busTopicParams.getFetchLimit())
272                             .setApiKey(busTopicParams.getApiKey())
273                             .setApiSecret(busTopicParams.getApiSecret())
274                             .createMRConsumerImpl();
275
276             this.consumer.setUsername(busTopicParams.getUserName());
277             this.consumer.setPassword(busTopicParams.getPassword());
278         }
279
280         @Override
281         public Iterable<String> fetch() throws IOException {
282             final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
283             if (response == null) {
284                 logger.warn("{}: DMaaP NULL response received", this);
285
286                 sleepAfterFetchFailure();
287                 return new ArrayList<>();
288             } else {
289                 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
290                         response.getResponseMessage());
291
292                 if (!"200".equals(response.getResponseCode())) {
293
294                     logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
295                             response.getResponseMessage());
296
297                     sleepAfterFetchFailure();
298
299                     /* fall through */
300                 }
301             }
302
303             if (response.getActualMessages() == null) {
304                 return new ArrayList<>();
305             } else {
306                 return response.getActualMessages();
307             }
308         }
309
310         @Override
311         public void close() {
312             super.close();
313             this.consumer.close();
314         }
315
316         @Override
317         public String toString() {
318             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
319                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
320                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
321                     + consumer.getUsername() + "]";
322         }
323     }
324
325     /**
326      * MR based consumer.
327      */
328     public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
329
330         private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
331
332         private final Properties props;
333
334         /**
335          * BusTopicParams contain the following parameters.
336          * MR Consumer Wrapper.
337          *
338          * <p>servers messaging bus hosts
339          * topic topic
340          * apiKey API Key
341          * apiSecret API Secret
342          * aafLogin AAF Login
343          * aafPassword AAF Password
344          * consumerGroup Consumer Group
345          * consumerInstance Consumer Instance
346          * fetchTimeout Fetch Timeout
347          * fetchLimit Fetch Limit
348          *
349          * @param busTopicParams contains above listed params
350          * @throws MalformedURLException URL should be valid
351          */
352         public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
353
354             super(busTopicParams);
355
356             // super constructor sets servers = {""} if empty to avoid errors when using DME2
357             if (busTopicParams.isServersInvalid()) {
358                 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
359             }
360
361             this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
362
363             props = new Properties();
364
365             if (busTopicParams.isUseHttps()) {
366                 props.setProperty(PROTOCOL_PROP, "https");
367                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
368
369             } else {
370                 props.setProperty(PROTOCOL_PROP, "http");
371                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
372             }
373
374             this.consumer.setProps(props);
375             logger.info("{}: CREATION", this);
376         }
377
378         @Override
379         public String toString() {
380             final MRConsumerImpl consumer = this.consumer;
381
382             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
383                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
384                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
385                     + consumer.getUsername() + "]";
386         }
387     }
388
389     public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
390
391         private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
392
393         private final Properties props;
394
395         /**
396          * Constructor.
397          *
398          * @param busTopicParams topic paramters
399          *
400          * @throws MalformedURLException must provide a valid URL
401          */
402         public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
403
404
405             super(busTopicParams);
406
407
408             final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
409                             ? busTopicParams.getAdditionalProps().get(
410                                             PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
411                             : null);
412
413             if (busTopicParams.isEnvironmentInvalid()) {
414                 throw parmException(busTopicParams.getTopic(),
415                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
416             }
417             if (busTopicParams.isAftEnvironmentInvalid()) {
418                 throw parmException(busTopicParams.getTopic(),
419                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
420             }
421             if (busTopicParams.isLatitudeInvalid()) {
422                 throw parmException(busTopicParams.getTopic(),
423                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
424             }
425             if (busTopicParams.isLongitudeInvalid()) {
426                 throw parmException(busTopicParams.getTopic(),
427                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
428             }
429
430             if ((busTopicParams.isPartnerInvalid())
431                     && StringUtils.isBlank(dme2RouteOffer)) {
432                 throw new IllegalArgumentException(
433                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
434                                 + "." + busTopicParams.getTopic()
435                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
436                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
437                                 + busTopicParams.getTopic()
438                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
439             }
440
441             final String serviceName = busTopicParams.getServers().get(0);
442
443             this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
444
445             this.consumer.setUsername(busTopicParams.getUserName());
446             this.consumer.setPassword(busTopicParams.getPassword());
447
448             props = new Properties();
449
450             props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
451
452             props.setProperty("username", busTopicParams.getUserName());
453             props.setProperty("password", busTopicParams.getPassword());
454
455             /* These are required, no defaults */
456             props.setProperty("topic", busTopicParams.getTopic());
457
458             props.setProperty("Environment", busTopicParams.getEnvironment());
459             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
460
461             if (busTopicParams.getPartner() != null) {
462                 props.setProperty("Partner", busTopicParams.getPartner());
463             }
464             if (dme2RouteOffer != null) {
465                 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
466             }
467
468             props.setProperty("Latitude", busTopicParams.getLatitude());
469             props.setProperty("Longitude", busTopicParams.getLongitude());
470
471             /* These are optional, will default to these values if not set in additionalProps */
472             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
473             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
474             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
475             props.setProperty("Version", "1.0");
476             props.setProperty("SubContextPath", "/");
477             props.setProperty("sessionstickinessrequired", "no");
478
479             /* These should not change */
480             props.setProperty("TransportType", "DME2");
481             props.setProperty("MethodType", "GET");
482
483             if (busTopicParams.isUseHttps()) {
484                 props.setProperty(PROTOCOL_PROP, "https");
485
486             } else {
487                 props.setProperty(PROTOCOL_PROP, "http");
488             }
489
490             props.setProperty("contenttype", "application/json");
491
492             if (busTopicParams.isAdditionalPropsValid()) {
493                 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
494                     props.put(entry.getKey(), entry.getValue());
495                 }
496             }
497
498             MRClientFactory.prop = props;
499             this.consumer.setProps(props);
500
501             logger.info("{}: CREATION", this);
502         }
503
504         private IllegalArgumentException parmException(String topic, String propnm) {
505             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
506                     + topic + propnm + " property for DME2 in DMaaP");
507
508         }
509     }
510 }
511
512