fedde284a78e2a9c9f74b7d8786e8683323c92da
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal.impl;
22
23 import com.att.nsa.cambria.client.CambriaClientBuilders;
24 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
25 import com.att.nsa.cambria.client.CambriaConsumer;
26
27 import java.io.IOException;
28 import java.net.MalformedURLException;
29 import java.security.GeneralSecurityException;
30 import java.util.List;
31
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.FilterableBusConsumer;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Cambria based consumer
38  */
39 public class CambriaConsumerWrapper implements FilterableBusConsumer {
40
41     /**
42      * logger
43      */
44     private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
45
46     /**
47      * Used to build the consumer.
48      */
49     private final ConsumerBuilder builder;
50
51     /**
52      * Locked while updating {@link #consumer} and {@link #newConsumer}.
53      */
54     private final Object consLocker = new Object();
55
56     /**
57      * Cambria client
58      */
59     private CambriaConsumer consumer;
60
61     /**
62      * Cambria client to use for next fetch
63      */
64     private CambriaConsumer newConsumer = null;
65
66     /**
67      * fetch timeout
68      */
69     protected int fetchTimeout;
70
71     /**
72      * close condition
73      */
74     protected Object closeCondition = new Object();
75
76     /**
77      * Cambria Consumer Wrapper
78      *
79      * @param servers messaging bus hosts
80      * @param topic topic
81      * @param apiKey API Key
82      * @param apiSecret API Secret
83      * @param consumerGroup Consumer Group
84      * @param consumerInstance Consumer Instance
85      * @param fetchTimeout Fetch Timeout
86      * @param fetchLimit Fetch Limit
87      * @throws GeneralSecurityException
88      * @throws MalformedURLException
89      */
90     public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
91             String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps,
92             boolean useSelfSignedCerts) {
93         this(servers, topic, apiKey, apiSecret, null, null, consumerGroup, consumerInstance, fetchTimeout, fetchLimit,
94                 useHttps, useSelfSignedCerts);
95     }
96
97     public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, String username,
98             String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
99             boolean useHttps, boolean useSelfSignedCerts) {
100
101         this.fetchTimeout = fetchTimeout;
102
103         this.builder = new CambriaClientBuilders.ConsumerBuilder();
104
105         builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic).waitAtServer(fetchTimeout)
106                 .receivingAtMost(fetchLimit);
107
108         // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
109         builder.withSocketTimeout(fetchTimeout + 30000);
110
111         if (useHttps) {
112             builder.usingHttps();
113
114             if (useSelfSignedCerts) {
115                 builder.allowSelfSignedCertificates();
116             }
117         }
118
119         if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
120             builder.authenticatedBy(apiKey, apiSecret);
121         }
122
123         if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
124             builder.authenticatedByHttp(username, password);
125         }
126
127         try {
128             this.consumer = builder.build();
129         } catch (MalformedURLException | GeneralSecurityException e) {
130             throw new IllegalArgumentException(e);
131         }
132     }
133
134     @Override
135     public Iterable<String> fetch() throws IOException, InterruptedException {
136         try {
137             return getCurrentConsumer().fetch();
138         } catch (final IOException e) {
139             logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
140                     this.fetchTimeout);
141             synchronized (this.closeCondition) {
142                 this.closeCondition.wait(this.fetchTimeout);
143             }
144
145             throw e;
146         }
147     }
148
149     @Override
150     public void close() {
151         synchronized (closeCondition) {
152             closeCondition.notifyAll();
153         }
154
155         getCurrentConsumer().close();
156     }
157
158     private CambriaConsumer getCurrentConsumer() {
159         CambriaConsumer old = null;
160         CambriaConsumer ret;
161
162         synchronized (consLocker) {
163             if (this.newConsumer != null) {
164                 // replace old consumer with new consumer
165                 old = this.consumer;
166                 this.consumer = this.newConsumer;
167                 this.newConsumer = null;
168             }
169
170             ret = this.consumer;
171         }
172
173         if (old != null) {
174             old.close();
175         }
176
177         return ret;
178     }
179
180     @Override
181     public void setFilter(String filter) {
182         logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
183         builder.withServerSideFilter(filter);
184
185         try {
186             CambriaConsumer previous;
187             synchronized (consLocker) {
188                 previous = this.newConsumer;
189                 this.newConsumer = builder.build();
190             }
191
192             if (previous != null) {
193                 // there was already a new consumer - close it
194                 previous.close();
195             }
196
197         } catch (MalformedURLException | GeneralSecurityException e) {
198             /*
199              * Since an exception occurred, "consumer" still has its old value, thus it should not
200              * be closed at this point.
201              */
202             throw new IllegalArgumentException(e);
203         }
204     }
205
206     @Override
207     public String toString() {
208         return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
209     }
210 }