2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.policy.drools.event.comm.bus.internal;
23 import java.net.MalformedURLException;
24 import java.security.GeneralSecurityException;
25 import java.util.ArrayList;
26 import java.util.List;
28 import java.util.Properties;
30 import org.openecomp.policy.common.logging.eelf.PolicyLogger;
31 import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSinkFactory;
32 import org.openecomp.policy.drools.properties.PolicyProperties;
34 import com.att.nsa.cambria.client.CambriaClientBuilders;
35 import com.att.nsa.cambria.client.CambriaConsumer;
36 import com.att.nsa.mr.client.MRClientFactory;
37 import com.att.nsa.mr.client.impl.MRConsumerImpl;
38 import com.att.nsa.mr.client.response.MRConsumerResponse;
39 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
40 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
43 * Wrapper around libraries to consume from message bus
46 public interface BusConsumer {
51 * @return list of messages
52 * @throws Exception when error encountered by underlying libraries
54 public Iterable<String> fetch() throws Exception;
57 * close underlying library consumer
62 * Cambria based consumer
64 public static class CambriaConsumerWrapper implements BusConsumer {
68 protected CambriaConsumer consumer;
71 * Cambria Consumer Wrapper
73 * @param servers messaging bus hosts
75 * @param apiKey API Key
76 * @param apiSecret API Secret
77 * @param consumerGroup Consumer Group
78 * @param consumerInstance Consumer Instance
79 * @param fetchTimeout Fetch Timeout
80 * @param fetchLimit Fetch Limit
81 * @throws GeneralSecurityException
82 * @throws MalformedURLException
84 public CambriaConsumerWrapper(List<String> servers, String topic,
85 String apiKey, String apiSecret,
86 String consumerGroup, String consumerInstance,
87 int fetchTimeout, int fetchLimit, boolean useHttps, boolean useSelfSignedCerts)
88 throws IllegalArgumentException {
90 ConsumerBuilder builder =
91 new CambriaClientBuilders.ConsumerBuilder();
96 if(useSelfSignedCerts){
97 builder.knownAs(consumerGroup, consumerInstance)
100 .waitAtServer(fetchTimeout)
101 .receivingAtMost(fetchLimit)
103 .allowSelfSignedCertificates();
106 builder.knownAs(consumerGroup, consumerInstance)
109 .waitAtServer(fetchTimeout)
110 .receivingAtMost(fetchLimit)
115 builder.knownAs(consumerGroup, consumerInstance)
118 .waitAtServer(fetchTimeout)
119 .receivingAtMost(fetchLimit);
122 if (apiKey != null && !apiKey.isEmpty() &&
123 apiSecret != null && !apiSecret.isEmpty()) {
124 builder.authenticatedBy(apiKey, apiSecret);
128 this.consumer = builder.build();
129 } catch (MalformedURLException | GeneralSecurityException e) {
130 throw new IllegalArgumentException(e);
137 public Iterable<String> fetch() throws Exception {
138 return this.consumer.fetch();
144 public void close() {
145 this.consumer.close();
149 public String toString() {
150 return "CambriaConsumerWrapper []";
157 public abstract class DmaapConsumerWrapper implements BusConsumer {
159 protected int fetchTimeout;
160 protected Object closeCondition = new Object();
165 protected MRConsumerImpl consumer;
168 * MR Consumer Wrapper
170 * @param servers messaging bus hosts
172 * @param apiKey API Key
173 * @param apiSecret API Secret
174 * @param username AAF Login
175 * @param password AAF Password
176 * @param consumerGroup Consumer Group
177 * @param consumerInstance Consumer Instance
178 * @param fetchTimeout Fetch Timeout
179 * @param fetchLimit Fetch Limit
180 * @throws MalformedURLException
182 public DmaapConsumerWrapper(List<String> servers, String topic,
183 String apiKey, String apiSecret,
184 String username, String password,
185 String consumerGroup, String consumerInstance,
186 int fetchTimeout, int fetchLimit, boolean useHttps)
187 throws MalformedURLException {
189 this.fetchTimeout = fetchTimeout;
191 if (topic == null || topic.isEmpty()) {
192 throw new IllegalArgumentException("No topic for DMaaP");
195 this.consumer = new MRConsumerImpl(servers, topic,
196 consumerGroup, consumerInstance,
197 fetchTimeout, fetchLimit,
198 null, apiKey, apiSecret);
200 this.consumer.setUsername(username);
201 this.consumer.setPassword(password);
207 public Iterable<String> fetch() throws Exception {
208 MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
210 if (PolicyLogger.isDebugEnabled() && response != null)
211 PolicyLogger.debug(DmaapConsumerWrapper.class.getName(), "DMaaP consumer received " + response.getResponseCode() + ": " + response.getResponseMessage());
213 if (response.getResponseCode() == null || !response.getResponseCode().equals("200")) {
214 if (response.getResponseCode() == null)
215 PolicyLogger.error(DmaapConsumerWrapper.class.getName(), "DMaaP consumer received response code null");
217 PolicyLogger.error(DmaapConsumerWrapper.class.getName(), "DMaaP consumer received " + response.getResponseCode() + ": " + response.getResponseMessage());
219 synchronized (closeCondition) {
220 closeCondition.wait(fetchTimeout);
224 if (response.getActualMessages() == null)
225 return new ArrayList<String>();
227 return response.getActualMessages();
233 public void close() {
234 synchronized (closeCondition) {
235 closeCondition.notifyAll();
238 this.consumer.close();
242 public String toString() {
243 StringBuilder builder = new StringBuilder();
245 append("DmaapConsumerWrapper [").
246 append("consumer.getAuthDate()=").append(consumer.getAuthDate()).
247 append(", consumer.getAuthKey()=").append(consumer.getAuthKey()).
248 append(", consumer.getHost()=").append(consumer.getHost()).
249 append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()).
250 append(", consumer.getUsername()=").append(consumer.getUsername()).
252 return builder.toString();
259 public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
260 private Properties props;
263 * MR Consumer Wrapper
265 * @param servers messaging bus hosts
267 * @param apiKey API Key
268 * @param apiSecret API Secret
269 * @param aafLogin AAF Login
270 * @param aafPassword AAF Password
271 * @param consumerGroup Consumer Group
272 * @param consumerInstance Consumer Instance
273 * @param fetchTimeout Fetch Timeout
274 * @param fetchLimit Fetch Limit
275 * @throws MalformedURLException
277 public DmaapAafConsumerWrapper(List<String> servers, String topic,
278 String apiKey, String apiSecret,
279 String aafLogin, String aafPassword,
280 String consumerGroup, String consumerInstance,
281 int fetchTimeout, int fetchLimit, boolean useHttps) throws MalformedURLException {
283 super(servers, topic, apiKey, apiSecret,
284 aafLogin, aafPassword,
285 consumerGroup, consumerInstance,
286 fetchTimeout, fetchLimit, useHttps);
288 // super constructor sets servers = {""} if empty to avoid errors when using DME2
289 if ((servers.size() == 1 && servers.get(0).equals("")) ||
290 (servers == null) || (servers.size() == 0)) {
291 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
294 this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
296 props = new Properties();
299 props.setProperty("Protocol", "https");
300 this.consumer.setHost(servers.get(0) + ":3905");
304 props.setProperty("Protocol", "http");
305 this.consumer.setHost(servers.get(0) + ":3904");
308 this.consumer.setProps(props);
309 PolicyLogger.info(DmaapConsumerWrapper.class.getName(), "CREATION: " + this);
313 public String toString() {
314 StringBuilder builder = new StringBuilder();
315 MRConsumerImpl consumer = (MRConsumerImpl) this.consumer;
318 append("DmaapConsumerWrapper [").
319 append("consumer.getAuthDate()=").append(consumer.getAuthDate()).
320 append(", consumer.getAuthKey()=").append(consumer.getAuthKey()).
321 append(", consumer.getHost()=").append(consumer.getHost()).
322 append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()).
323 append(", consumer.getUsername()=").append(consumer.getUsername()).
325 return builder.toString();
329 public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
330 private Properties props;
332 public DmaapDmeConsumerWrapper(List<String> servers, String topic,
333 String apiKey, String apiSecret,
334 String dme2Login, String dme2Password,
335 String consumerGroup, String consumerInstance,
336 int fetchTimeout, int fetchLimit,
337 String environment, String aftEnvironment, String dme2Partner,
338 String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) throws MalformedURLException {
342 super(servers, topic, apiKey, apiSecret,
343 dme2Login, dme2Password,
344 consumerGroup, consumerInstance,
345 fetchTimeout, fetchLimit, useHttps);
348 String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
350 if (environment == null || environment.isEmpty()) {
351 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
352 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
353 } if (aftEnvironment == null || aftEnvironment.isEmpty()) {
354 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
355 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
356 } if (latitude == null || latitude.isEmpty()) {
357 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
358 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP");
359 } if (longitude == null || longitude.isEmpty()) {
360 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
361 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + " property for DME2 in DMaaP");
364 if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
365 throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
366 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " +
367 PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
370 String serviceName = servers.get(0);
372 this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
374 this.consumer.setUsername(dme2Login);
375 this.consumer.setPassword(dme2Password);
377 props = new Properties();
379 props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
381 props.setProperty("username", dme2Login);
382 props.setProperty("password", dme2Password);
384 /* These are required, no defaults */
385 props.setProperty("topic", topic);
387 props.setProperty("Environment", environment);
388 props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
390 if (dme2Partner != null)
391 props.setProperty("Partner", dme2Partner);
392 if (dme2RouteOffer != null)
393 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
395 props.setProperty("Latitude", latitude);
396 props.setProperty("Longitude", longitude);
398 /* These are optional, will default to these values if not set in additionalProps */
399 props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
400 props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
401 props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
402 props.setProperty("Version", "1.0");
403 props.setProperty("SubContextPath", "/");
404 props.setProperty("sessionstickinessrequired", "no");
406 /* These should not change */
407 props.setProperty("TransportType", "DME2");
408 props.setProperty("MethodType", "GET");
411 props.setProperty("Protocol", "https");
415 props.setProperty("Protocol", "http");
418 props.setProperty("contenttype", "application/json");
420 if (additionalProps != null) {
421 for(String key : additionalProps.keySet())
422 props.put(key, additionalProps.get(key));
425 MRClientFactory.prop = props;
426 this.consumer.setProps(props);
428 PolicyLogger.info(DmaapConsumerWrapper.class.getName(), "CREATION: " + this);