60ab2e9e7b0bf1bb252eb3bf7c1eef46a9224648
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import com.att.nsa.cambria.client.CambriaClientBuilders;
25 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
26 import com.att.nsa.cambria.client.CambriaConsumer;
27 import java.io.IOException;
28 import java.net.MalformedURLException;
29 import java.security.GeneralSecurityException;
30 import java.util.ArrayList;
31 import java.util.Map;
32 import java.util.Properties;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35 import org.apache.commons.lang3.StringUtils;
36 import org.onap.dmaap.mr.client.MRClientFactory;
37 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
38 import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
39 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
40 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
41 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Wrapper around libraries to consume from message bus.
47  */
48 public interface BusConsumer {
49
50     /**
51      * fetch messages.
52      *
53      * @return list of messages
54      * @throws IOException when error encountered by underlying libraries
55      */
56     public Iterable<String> fetch() throws IOException;
57
58     /**
59      * close underlying library consumer.
60      */
61     public void close();
62
63     /**
64      * Cambria based consumer.
65      */
66     public static class CambriaConsumerWrapper implements BusConsumer {
67
68         /**
69          * logger.
70          */
71         private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
72
73         /**
74          * Used to build the consumer.
75          */
76         private final ConsumerBuilder builder;
77
78         /**
79          * Cambria client.
80          */
81         private final CambriaConsumer consumer;
82
83         /**
84          * fetch timeout.
85          */
86         protected int fetchTimeout;
87
88         /**
89          * close condition.
90          */
91         protected CountDownLatch closeCondition = new CountDownLatch(1);
92
93         /**
94          * Cambria Consumer Wrapper.
95          * BusTopicParam object contains the following parameters
96          * servers messaging bus hosts.
97          * topic topic
98          * apiKey API Key
99          * apiSecret API Secret
100          * consumerGroup Consumer Group
101          * consumerInstance Consumer Instance
102          * fetchTimeout Fetch Timeout
103          * fetchLimit Fetch Limit
104          *
105          * @param busTopicParams - The parameters for the bus topic
106          * @throws GeneralSecurityException - Security exception
107          * @throws MalformedURLException - Malformed URL exception
108          */
109         public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
110
111             this.fetchTimeout = busTopicParams.getFetchTimeout();
112
113             this.builder = new CambriaClientBuilders.ConsumerBuilder();
114
115             builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
116                     .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
117                     .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
118
119             // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
120             builder.withSocketTimeout(fetchTimeout + 30000);
121
122             if (busTopicParams.isUseHttps()) {
123                 builder.usingHttps();
124
125                 if (busTopicParams.isAllowSelfSignedCerts()) {
126                     builder.allowSelfSignedCertificates();
127                 }
128             }
129
130             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
131                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
132             }
133
134             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
135                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
136             }
137
138             try {
139                 this.consumer = builder.build();
140             } catch (MalformedURLException | GeneralSecurityException e) {
141                 throw new IllegalArgumentException(e);
142             }
143         }
144
145         @Override
146         public Iterable<String> fetch() throws IOException {
147             try {
148                 return this.consumer.fetch();
149             } catch (final IOException e) { //NOSONAR
150                 logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
151                         this.fetchTimeout);
152                 sleepAfterFetchFailure();
153                 throw e;
154             }
155         }
156
157         private void sleepAfterFetchFailure() {
158             try {
159                 this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
160
161             } catch (InterruptedException e) {
162                 logger.warn("{}: interrupted while handling fetch error", this, e);
163                 Thread.currentThread().interrupt();
164             }
165         }
166
167         @Override
168         public void close() {
169             this.closeCondition.countDown();
170             this.consumer.close();
171         }
172
173         @Override
174         public String toString() {
175             return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
176         }
177     }
178
179     /**
180      * MR based consumer.
181      */
182     public abstract class DmaapConsumerWrapper implements BusConsumer {
183
184         /**
185          * logger.
186          */
187         private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
188
189         /**
190          * Name of the "protocol" property.
191          */
192         protected static final String PROTOCOL_PROP = "Protocol";
193
194         /**
195          * fetch timeout.
196          */
197         protected int fetchTimeout;
198
199         /**
200          * close condition.
201          */
202         protected CountDownLatch closeCondition = new CountDownLatch(1);
203
204         /**
205          * MR Consumer.
206          */
207         protected MRConsumerImpl consumer;
208
209         /**
210          * MR Consumer Wrapper.
211          *
212          * <p>servers          messaging bus hosts
213          * topic            topic
214          * apiKey           API Key
215          * apiSecret        API Secret
216          * username         AAF Login
217          * password         AAF Password
218          * consumerGroup    Consumer Group
219          * consumerInstance Consumer Instance
220          * fetchTimeout     Fetch Timeout
221          * fetchLimit       Fetch Limit
222          *
223          * @param busTopicParams contains above listed attributes
224          * @throws MalformedURLException URL should be valid
225          */
226         public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
227
228             this.fetchTimeout = busTopicParams.getFetchTimeout();
229
230             if (busTopicParams.isTopicInvalid()) {
231                 throw new IllegalArgumentException("No topic for DMaaP");
232             }
233
234             this.consumer = new MRConsumerImplBuilder()
235                             .setHostPart(busTopicParams.getServers())
236                             .setTopic(busTopicParams.getTopic())
237                             .setConsumerGroup(busTopicParams.getConsumerGroup())
238                             .setConsumerId(busTopicParams.getConsumerInstance())
239                             .setTimeoutMs(busTopicParams.getFetchTimeout())
240                             .setLimit(busTopicParams.getFetchLimit())
241                             .setApiKey(busTopicParams.getApiKey())
242                             .setApiSecret(busTopicParams.getApiSecret())
243                             .createMRConsumerImpl();
244
245             this.consumer.setUsername(busTopicParams.getUserName());
246             this.consumer.setPassword(busTopicParams.getPassword());
247         }
248
249         @Override
250         public Iterable<String> fetch() throws IOException {
251             final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
252             if (response == null) {
253                 logger.warn("{}: DMaaP NULL response received", this);
254
255                 sleepAfterFetchFailure();
256                 return new ArrayList<>();
257             } else {
258                 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
259                         response.getResponseMessage());
260
261                 if (!"200".equals(response.getResponseCode())) {
262
263                     logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
264                             response.getResponseMessage());
265
266                     sleepAfterFetchFailure();
267
268                     /* fall through */
269                 }
270             }
271
272             if (response.getActualMessages() == null) {
273                 return new ArrayList<>();
274             } else {
275                 return response.getActualMessages();
276             }
277         }
278
279         private void sleepAfterFetchFailure() {
280             try {
281                 this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
282
283             } catch (InterruptedException e) {
284                 logger.warn("{}: interrupted while handling fetch error", this, e);
285                 Thread.currentThread().interrupt();
286             }
287         }
288
289         @Override
290         public void close() {
291             this.closeCondition.countDown();
292             this.consumer.close();
293         }
294
295         @Override
296         public String toString() {
297             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
298                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
299                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
300                     + consumer.getUsername() + "]";
301         }
302     }
303
304     /**
305      * MR based consumer.
306      */
307     public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
308
309         private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
310
311         private final Properties props;
312
313         /**
314          * BusTopicParams contain the following parameters.
315          * MR Consumer Wrapper.
316          *
317          * <p>servers messaging bus hosts
318          * topic topic
319          * apiKey API Key
320          * apiSecret API Secret
321          * aafLogin AAF Login
322          * aafPassword AAF Password
323          * consumerGroup Consumer Group
324          * consumerInstance Consumer Instance
325          * fetchTimeout Fetch Timeout
326          * fetchLimit Fetch Limit
327          *
328          * @param busTopicParams contains above listed params
329          * @throws MalformedURLException URL should be valid
330          */
331         public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
332
333             super(busTopicParams);
334
335             // super constructor sets servers = {""} if empty to avoid errors when using DME2
336             if (busTopicParams.isServersInvalid()) {
337                 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
338             }
339
340             this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
341
342             props = new Properties();
343
344             if (busTopicParams.isUseHttps()) {
345                 props.setProperty(PROTOCOL_PROP, "https");
346                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
347
348             } else {
349                 props.setProperty(PROTOCOL_PROP, "http");
350                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
351             }
352
353             this.consumer.setProps(props);
354             logger.info("{}: CREATION", this);
355         }
356
357         @Override
358         public String toString() {
359             final MRConsumerImpl consumer = this.consumer;
360
361             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
362                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
363                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
364                     + consumer.getUsername() + "]";
365         }
366     }
367
368     public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
369
370         private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
371
372         private final Properties props;
373
374         /**
375          * Constructor.
376          *
377          * @param busTopicParams topic paramters
378          *
379          * @throws MalformedURLException must provide a valid URL
380          */
381         public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
382
383
384             super(busTopicParams);
385
386
387             final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
388                             ? busTopicParams.getAdditionalProps().get(
389                                             PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
390                             : null);
391
392             if (busTopicParams.isEnvironmentInvalid()) {
393                 throw parmException(busTopicParams.getTopic(),
394                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
395             }
396             if (busTopicParams.isAftEnvironmentInvalid()) {
397                 throw parmException(busTopicParams.getTopic(),
398                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
399             }
400             if (busTopicParams.isLatitudeInvalid()) {
401                 throw parmException(busTopicParams.getTopic(),
402                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
403             }
404             if (busTopicParams.isLongitudeInvalid()) {
405                 throw parmException(busTopicParams.getTopic(),
406                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
407             }
408
409             if ((busTopicParams.isPartnerInvalid())
410                     && StringUtils.isBlank(dme2RouteOffer)) {
411                 throw new IllegalArgumentException(
412                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
413                                 + "." + busTopicParams.getTopic()
414                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
415                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
416                                 + busTopicParams.getTopic()
417                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
418             }
419
420             final String serviceName = busTopicParams.getServers().get(0);
421
422             this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
423
424             this.consumer.setUsername(busTopicParams.getUserName());
425             this.consumer.setPassword(busTopicParams.getPassword());
426
427             props = new Properties();
428
429             props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
430
431             props.setProperty("username", busTopicParams.getUserName());
432             props.setProperty("password", busTopicParams.getPassword());
433
434             /* These are required, no defaults */
435             props.setProperty("topic", busTopicParams.getTopic());
436
437             props.setProperty("Environment", busTopicParams.getEnvironment());
438             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
439
440             if (busTopicParams.getPartner() != null) {
441                 props.setProperty("Partner", busTopicParams.getPartner());
442             }
443             if (dme2RouteOffer != null) {
444                 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
445             }
446
447             props.setProperty("Latitude", busTopicParams.getLatitude());
448             props.setProperty("Longitude", busTopicParams.getLongitude());
449
450             /* These are optional, will default to these values if not set in additionalProps */
451             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
452             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
453             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
454             props.setProperty("Version", "1.0");
455             props.setProperty("SubContextPath", "/");
456             props.setProperty("sessionstickinessrequired", "no");
457
458             /* These should not change */
459             props.setProperty("TransportType", "DME2");
460             props.setProperty("MethodType", "GET");
461
462             if (busTopicParams.isUseHttps()) {
463                 props.setProperty(PROTOCOL_PROP, "https");
464
465             } else {
466                 props.setProperty(PROTOCOL_PROP, "http");
467             }
468
469             props.setProperty("contenttype", "application/json");
470
471             if (busTopicParams.isAdditionalPropsValid()) {
472                 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
473                     props.put(entry.getKey(), entry.getValue());
474                 }
475             }
476
477             MRClientFactory.prop = props;
478             this.consumer.setProps(props);
479
480             logger.info("{}: CREATION", this);
481         }
482
483         private IllegalArgumentException parmException(String topic, String propnm) {
484             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
485                     + topic + propnm + " property for DME2 in DMaaP");
486
487         }
488     }
489 }
490
491