8542d572dacab1df35c4e71a35ce430e78179be5
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
8  * Copyright (C) 2022 Nordix Foundation.
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
28 import com.att.nsa.cambria.client.CambriaConsumer;
29 import java.io.IOException;
30 import java.net.MalformedURLException;
31 import java.security.GeneralSecurityException;
32 import java.time.Duration;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Properties;
39 import java.util.concurrent.CountDownLatch;
40 import java.util.concurrent.TimeUnit;
41 import lombok.Getter;
42 import org.apache.commons.lang3.StringUtils;
43 import org.apache.kafka.clients.consumer.ConsumerConfig;
44 import org.apache.kafka.clients.consumer.ConsumerRecord;
45 import org.apache.kafka.clients.consumer.ConsumerRecords;
46 import org.apache.kafka.clients.consumer.KafkaConsumer;
47 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
48 import org.apache.kafka.common.TopicPartition;
49 import org.onap.dmaap.mr.client.MRClientFactory;
50 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
51 import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
52 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
53 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
54 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 /**
59  * Wrapper around libraries to consume from message bus.
60  */
61 public interface BusConsumer {
62
63     /**
64      * fetch messages.
65      *
66      * @return list of messages
67      * @throws IOException when error encountered by underlying libraries
68      */
69     public Iterable<String> fetch() throws IOException;
70
71     /**
72      * close underlying library consumer.
73      */
74     public void close();
75
76     /**
77      * Consumer that handles fetch() failures by sleeping.
78      */
79     public abstract static class FetchingBusConsumer implements BusConsumer {
80         private static Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
81
82         /**
83          * Fetch timeout.
84          */
85         protected int fetchTimeout;
86
87         /**
88          * Time to sleep on a fetch failure.
89          */
90         @Getter
91         private final int sleepTime;
92
93         /**
94          * Counted down when {@link #close()} is invoked.
95          */
96         private final CountDownLatch closeCondition = new CountDownLatch(1);
97
98
99         /**
100          * Constructs the object.
101          *
102          * @param busTopicParams parameters for the bus topic
103          */
104         protected FetchingBusConsumer(BusTopicParams busTopicParams) {
105             this.fetchTimeout = busTopicParams.getFetchTimeout();
106
107             if (this.fetchTimeout <= 0) {
108                 this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH;
109             } else {
110                 // don't sleep too long, even if fetch timeout is large
111                 this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
112             }
113         }
114
115         /**
116          * Causes the thread to sleep; invoked after fetch() fails.  If the consumer is closed,
117          * or the thread is interrupted, then this will return immediately.
118          */
119         protected void sleepAfterFetchFailure() {
120             try {
121                 logger.info("{}: backoff for {}ms", this, sleepTime);
122                 if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
123                     logger.info("{}: closed while handling fetch error", this);
124                 }
125
126             } catch (InterruptedException e) {
127                 logger.warn("{}: interrupted while handling fetch error", this, e);
128                 Thread.currentThread().interrupt();
129             }
130         }
131
132         @Override
133         public void close() {
134             this.closeCondition.countDown();
135         }
136     }
137
138     /**
139      * Cambria based consumer.
140      */
141     public static class CambriaConsumerWrapper extends FetchingBusConsumer {
142
143         /**
144          * logger.
145          */
146         private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
147
148         /**
149          * Used to build the consumer.
150          */
151         private final ConsumerBuilder builder;
152
153         /**
154          * Cambria client.
155          */
156         private final CambriaConsumer consumer;
157
158         /**
159          * Cambria Consumer Wrapper.
160          * BusTopicParam object contains the following parameters
161          * servers messaging bus hosts.
162          * topic topic
163          * apiKey API Key
164          * apiSecret API Secret
165          * consumerGroup Consumer Group
166          * consumerInstance Consumer Instance
167          * fetchTimeout Fetch Timeout
168          * fetchLimit Fetch Limit
169          *
170          * @param busTopicParams - The parameters for the bus topic
171          * @throws GeneralSecurityException - Security exception
172          * @throws MalformedURLException - Malformed URL exception
173          */
174         public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
175             super(busTopicParams);
176
177             this.builder = new CambriaClientBuilders.ConsumerBuilder();
178
179             builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
180                     .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
181                     .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
182
183             // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
184             builder.withSocketTimeout(fetchTimeout + 30000);
185
186             if (busTopicParams.isUseHttps()) {
187                 builder.usingHttps();
188
189                 if (busTopicParams.isAllowSelfSignedCerts()) {
190                     builder.allowSelfSignedCertificates();
191                 }
192             }
193
194             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
195                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
196             }
197
198             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
199                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
200             }
201
202             try {
203                 this.consumer = builder.build();
204             } catch (MalformedURLException | GeneralSecurityException e) {
205                 throw new IllegalArgumentException(e);
206             }
207         }
208
209         @Override
210         public Iterable<String> fetch() throws IOException {
211             try {
212                 return this.consumer.fetch();
213             } catch (final IOException e) { //NOSONAR
214                 logger.error("{}: cannot fetch because of {}", this, e.getMessage());
215                 sleepAfterFetchFailure();
216                 throw e;
217             }
218         }
219
220         @Override
221         public void close() {
222             super.close();
223             this.consumer.close();
224         }
225
226         @Override
227         public String toString() {
228             return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
229         }
230     }
231
232     /**
233      * Kafka based consumer.
234      */
235     public static class KafkaConsumerWrapper extends FetchingBusConsumer {
236
237         /**
238          * logger.
239          */
240         private static Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
241
242         private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
243
244         /**
245          * Kafka consumer.
246          */
247         protected KafkaConsumer<String, String> consumer;
248         protected Properties kafkaProps;
249
250         /**
251          * Kafka Consumer Wrapper.
252          * BusTopicParam object contains the following parameters
253          * servers messaging bus hosts.
254          * topic topic
255          *
256          * @param busTopicParams - The parameters for the bus topic
257          * @throws GeneralSecurityException - Security exception
258          * @throws MalformedURLException - Malformed URL exception
259          */
260         public KafkaConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
261             super(busTopicParams);
262
263             if (busTopicParams.isTopicInvalid()) {
264                 throw new IllegalArgumentException("No topic for Kafka");
265             }
266
267             //Setup Properties for consumer
268             kafkaProps = new Properties();
269             kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
270                     busTopicParams.getServers().get(0));
271
272             if (busTopicParams.isAdditionalPropsValid()) {
273                 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
274                     kafkaProps.put(entry.getKey(), entry.getValue());
275                 }
276             }
277
278             if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) {
279                 kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
280             }
281             if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
282                 kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
283             }
284             if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
285                 kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup());
286             }
287             consumer = new KafkaConsumer<>(kafkaProps);
288             //Subscribe to the topic
289             consumer.subscribe(Arrays.asList(busTopicParams.getTopic()));
290         }
291
292         @Override
293         public Iterable<String> fetch() throws IOException {
294             ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout));
295             if (records == null || records.count() <= 0) {
296                 return Collections.emptyList();
297             }
298             List<String> messages = new ArrayList<>(records.count());
299             try {
300                 for (TopicPartition partition : records.partitions()) {
301                     List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
302                     for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
303                         messages.add(partitionRecord.value());
304                     }
305                     long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
306                     consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
307                 }
308             } catch (Exception e) {
309                 logger.error("{}: cannot fetch because of {}", this, e.getMessage());
310                 sleepAfterFetchFailure();
311                 throw e;
312             }
313             return messages;
314         }
315
316         @Override
317         public void close() {
318             super.close();
319             this.consumer.close();
320             logger.info("Kafka Consumer exited {}", this);
321         }
322
323         @Override
324         public String toString() {
325             return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
326         }
327     }
328
329     /**
330      * MR based consumer.
331      */
332     public abstract class DmaapConsumerWrapper extends FetchingBusConsumer {
333
334         /**
335          * logger.
336          */
337         private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
338
339         /**
340          * Name of the "protocol" property.
341          */
342         protected static final String PROTOCOL_PROP = "Protocol";
343
344         /**
345          * MR Consumer.
346          */
347         protected MRConsumerImpl consumer;
348
349         /**
350          * MR Consumer Wrapper.
351          *
352          * <p>servers          messaging bus hosts
353          * topic            topic
354          * apiKey           API Key
355          * apiSecret        API Secret
356          * username         AAF Login
357          * password         AAF Password
358          * consumerGroup    Consumer Group
359          * consumerInstance Consumer Instance
360          * fetchTimeout     Fetch Timeout
361          * fetchLimit       Fetch Limit
362          *
363          * @param busTopicParams contains above listed attributes
364          * @throws MalformedURLException URL should be valid
365          */
366         protected DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
367             super(busTopicParams);
368
369             if (busTopicParams.isTopicInvalid()) {
370                 throw new IllegalArgumentException("No topic for DMaaP");
371             }
372
373             this.consumer = new MRConsumerImplBuilder()
374                             .setHostPart(busTopicParams.getServers())
375                             .setTopic(busTopicParams.getTopic())
376                             .setConsumerGroup(busTopicParams.getConsumerGroup())
377                             .setConsumerId(busTopicParams.getConsumerInstance())
378                             .setTimeoutMs(busTopicParams.getFetchTimeout())
379                             .setLimit(busTopicParams.getFetchLimit())
380                             .setApiKey(busTopicParams.getApiKey())
381                             .setApiSecret(busTopicParams.getApiSecret())
382                             .createMRConsumerImpl();
383
384             this.consumer.setUsername(busTopicParams.getUserName());
385             this.consumer.setPassword(busTopicParams.getPassword());
386         }
387
388         @Override
389         public Iterable<String> fetch() throws IOException {
390             final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
391             if (response == null) {
392                 logger.warn("{}: DMaaP NULL response received", this);
393
394                 sleepAfterFetchFailure();
395                 return new ArrayList<>();
396             } else {
397                 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
398                         response.getResponseMessage());
399
400                 if (!"200".equals(response.getResponseCode())) {
401
402                     logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
403                             response.getResponseMessage());
404
405                     sleepAfterFetchFailure();
406
407                     /* fall through */
408                 }
409             }
410
411             if (response.getActualMessages() == null) {
412                 return new ArrayList<>();
413             } else {
414                 return response.getActualMessages();
415             }
416         }
417
418         @Override
419         public void close() {
420             super.close();
421             this.consumer.close();
422         }
423
424         @Override
425         public String toString() {
426             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
427                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
428                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
429                     + consumer.getUsername() + "]";
430         }
431     }
432
433     /**
434      * MR based consumer.
435      */
436     public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
437
438         private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
439
440         private final Properties props;
441
442         /**
443          * BusTopicParams contain the following parameters.
444          * MR Consumer Wrapper.
445          *
446          * <p>servers messaging bus hosts
447          * topic topic
448          * apiKey API Key
449          * apiSecret API Secret
450          * aafLogin AAF Login
451          * aafPassword AAF Password
452          * consumerGroup Consumer Group
453          * consumerInstance Consumer Instance
454          * fetchTimeout Fetch Timeout
455          * fetchLimit Fetch Limit
456          *
457          * @param busTopicParams contains above listed params
458          * @throws MalformedURLException URL should be valid
459          */
460         public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
461
462             super(busTopicParams);
463
464             // super constructor sets servers = {""} if empty to avoid errors when using DME2
465             if (busTopicParams.isServersInvalid()) {
466                 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
467             }
468
469             this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
470
471             props = new Properties();
472
473             if (busTopicParams.isUseHttps()) {
474                 props.setProperty(PROTOCOL_PROP, "https");
475                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
476
477             } else {
478                 props.setProperty(PROTOCOL_PROP, "http");
479                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
480             }
481
482             this.consumer.setProps(props);
483             logger.info("{}: CREATION", this);
484         }
485
486         @Override
487         public String toString() {
488             final MRConsumerImpl consumer = this.consumer;
489
490             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
491                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
492                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
493                     + consumer.getUsername() + "]";
494         }
495     }
496
497     public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
498
499         private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
500
501         private final Properties props;
502
503         /**
504          * Constructor.
505          *
506          * @param busTopicParams topic paramters
507          *
508          * @throws MalformedURLException must provide a valid URL
509          */
510         public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
511
512
513             super(busTopicParams);
514
515
516             final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
517                             ? busTopicParams.getAdditionalProps().get(
518                                             PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
519                             : null);
520
521             if (busTopicParams.isEnvironmentInvalid()) {
522                 throw parmException(busTopicParams.getTopic(),
523                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
524             }
525             if (busTopicParams.isAftEnvironmentInvalid()) {
526                 throw parmException(busTopicParams.getTopic(),
527                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
528             }
529             if (busTopicParams.isLatitudeInvalid()) {
530                 throw parmException(busTopicParams.getTopic(),
531                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
532             }
533             if (busTopicParams.isLongitudeInvalid()) {
534                 throw parmException(busTopicParams.getTopic(),
535                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
536             }
537
538             if ((busTopicParams.isPartnerInvalid())
539                     && StringUtils.isBlank(dme2RouteOffer)) {
540                 throw new IllegalArgumentException(
541                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
542                                 + "." + busTopicParams.getTopic()
543                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
544                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
545                                 + busTopicParams.getTopic()
546                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
547             }
548
549             final String serviceName = busTopicParams.getServers().get(0);
550
551             this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
552
553             this.consumer.setUsername(busTopicParams.getUserName());
554             this.consumer.setPassword(busTopicParams.getPassword());
555
556             props = new Properties();
557
558             props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
559
560             props.setProperty("username", busTopicParams.getUserName());
561             props.setProperty("password", busTopicParams.getPassword());
562
563             /* These are required, no defaults */
564             props.setProperty("topic", busTopicParams.getTopic());
565
566             props.setProperty("Environment", busTopicParams.getEnvironment());
567             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
568
569             if (busTopicParams.getPartner() != null) {
570                 props.setProperty("Partner", busTopicParams.getPartner());
571             }
572             if (dme2RouteOffer != null) {
573                 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
574             }
575
576             props.setProperty("Latitude", busTopicParams.getLatitude());
577             props.setProperty("Longitude", busTopicParams.getLongitude());
578
579             /* These are optional, will default to these values if not set in additionalProps */
580             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
581             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
582             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
583             props.setProperty("Version", "1.0");
584             props.setProperty("SubContextPath", "/");
585             props.setProperty("sessionstickinessrequired", "no");
586
587             /* These should not change */
588             props.setProperty("TransportType", "DME2");
589             props.setProperty("MethodType", "GET");
590
591             if (busTopicParams.isUseHttps()) {
592                 props.setProperty(PROTOCOL_PROP, "https");
593
594             } else {
595                 props.setProperty(PROTOCOL_PROP, "http");
596             }
597
598             props.setProperty("contenttype", "application/json");
599
600             if (busTopicParams.isAdditionalPropsValid()) {
601                 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
602                     props.put(entry.getKey(), entry.getValue());
603                 }
604             }
605
606             MRClientFactory.prop = props;
607             this.consumer.setProps(props);
608
609             logger.info("{}: CREATION", this);
610         }
611
612         private IllegalArgumentException parmException(String topic, String propnm) {
613             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
614                     + topic + propnm + " property for DME2 in DMaaP");
615
616         }
617     }
618 }
619
620