2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.policy.drools.event.comm.bus.internal;
23 import java.net.MalformedURLException;
24 import java.security.GeneralSecurityException;
25 import java.util.ArrayList;
26 import java.util.List;
28 import java.util.Properties;
29 import java.util.concurrent.TimeUnit;
31 import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSinkFactory;
32 import org.openecomp.policy.drools.properties.PolicyProperties;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
36 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
37 import com.att.nsa.cambria.client.CambriaClientBuilders;
38 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
39 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
40 import com.att.nsa.mr.client.response.MRPublisherResponse;
41 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
42 import com.fasterxml.jackson.annotation.JsonIgnore;
44 public interface BusPublisher {
50 * @param message the message
51 * @return true if success, false otherwise
52 * @throws IllegalArgumentException if no message provided
54 public boolean send(String partitionId, String message) throws IllegalArgumentException;
57 * closes the publisher
62 * Cambria based library publisher
64 public static class CambriaPublisherWrapper implements BusPublisher {
66 private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
69 * The actual Cambria publisher
72 protected volatile CambriaBatchingPublisher publisher;
74 public CambriaPublisherWrapper(List<String> servers, String topic,
76 String apiSecret, boolean useHttps) throws IllegalArgumentException {
77 PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
82 builder.usingHosts(servers)
87 builder.usingHosts(servers)
92 if (apiKey != null && !apiKey.isEmpty() &&
93 apiSecret != null && !apiSecret.isEmpty()) {
94 builder.authenticatedBy(apiKey, apiSecret);
98 this.publisher = builder.build();
99 } catch (MalformedURLException | GeneralSecurityException e) {
100 throw new IllegalArgumentException(e);
108 public boolean send(String partitionId, String message)
109 throws IllegalArgumentException {
111 throw new IllegalArgumentException("No message provided");
114 this.publisher.send(partitionId, message);
115 } catch (Exception e) {
116 logger.warn("{}: SEND of {} cannot be performed because of {}",
117 this, message, e.getMessage(), e);
127 public void close() {
128 logger.info("{}: CLOSE", this);
131 this.publisher.close();
132 } catch (Exception e) {
133 logger.warn("{}: CLOSE FAILED because of {}",
134 this, e.getMessage(),e);
140 public String toString() {
141 StringBuilder builder = new StringBuilder();
142 builder.append("CambriaPublisherWrapper [").
143 append("publisher.getPendingMessageCount()=").
144 append(publisher.getPendingMessageCount()).
146 return builder.toString();
152 * DmaapClient library wrapper
154 public abstract class DmaapPublisherWrapper implements BusPublisher {
156 private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
161 protected MRSimplerBatchPublisher publisher;
162 protected Properties props;
165 * MR Publisher Wrapper
167 * @param servers messaging bus hosts
169 * @param username AAF or DME2 Login
170 * @param password AAF or DME2 Password
172 public DmaapPublisherWrapper(ProtocolTypeConstants protocol,
173 List<String> servers, String topic,
175 String password, boolean useHttps) throws IllegalArgumentException {
178 if (topic == null || topic.isEmpty())
179 throw new IllegalArgumentException("No topic for DMaaP");
182 if (protocol == ProtocolTypeConstants.AAF_AUTH) {
183 if (servers == null || servers.isEmpty())
184 throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
186 ArrayList<String> dmaapServers = new ArrayList<String>();
188 for (String server: servers) {
189 dmaapServers.add(server + ":3905");
194 for (String server: servers) {
195 dmaapServers.add(server + ":3904");
201 new MRSimplerBatchPublisher.Builder().
202 againstUrls(dmaapServers).
206 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
207 } else if (protocol == ProtocolTypeConstants.DME2) {
208 ArrayList<String> dmaapServers = new ArrayList<String>();
209 dmaapServers.add("0.0.0.0:3904");
212 new MRSimplerBatchPublisher.Builder().
213 againstUrls(dmaapServers).
217 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
220 this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
222 this.publisher.setUsername(username);
223 this.publisher.setPassword(password);
225 props = new Properties();
228 props.setProperty("Protocol", "https");
230 props.setProperty("Protocol", "http");
233 props.setProperty("contenttype", "application/json");
234 props.setProperty("username", username);
235 props.setProperty("password", password);
237 props.setProperty("topic", topic);
239 this.publisher.setProps(props);
241 if (protocol == ProtocolTypeConstants.AAF_AUTH)
242 this.publisher.setHost(servers.get(0));
244 logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
251 public void close() {
252 logger.info("{}: CLOSE", this);
255 this.publisher.close(1, TimeUnit.SECONDS);
256 } catch (Exception e) {
257 logger.warn("{}: CLOSE FAILED because of {}",
258 this, e.getMessage(), e);
266 public boolean send(String partitionId, String message)
267 throws IllegalArgumentException {
269 throw new IllegalArgumentException("No message provided");
271 this.publisher.setPubResponse(new MRPublisherResponse());
272 this.publisher.send(partitionId, message);
273 MRPublisherResponse response = this.publisher.sendBatchWithResponse();
274 if (response != null) {
275 logger.debug("DMaaP publisher received {} : {}",
276 response.getResponseCode(),
277 response.getResponseMessage());
284 public String toString() {
285 StringBuilder builder = new StringBuilder();
286 builder.append("DmaapPublisherWrapper [").
287 append("publisher.getAuthDate()=").append(publisher.getAuthDate()).
288 append(", publisher.getAuthKey()=").append(publisher.getAuthKey()).
289 append(", publisher.getHost()=").append(publisher.getHost()).
290 append(", publisher.getProtocolFlag()=").append(publisher.getProtocolFlag()).
291 append(", publisher.getUsername()=").append(publisher.getUsername()).
292 append(", publisher.getPendingMessageCount()=").append(publisher.getPendingMessageCount()).
294 return builder.toString();
299 * DmaapClient library wrapper
301 public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
305 protected MRSimplerBatchPublisher publisher;
307 public DmaapAafPublisherWrapper(List<String> servers, String topic,
309 String aafPassword, boolean useHttps) {
311 super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
315 public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
316 public DmaapDmePublisherWrapper(List<String> servers, String topic,
317 String username, String password,
318 String environment, String aftEnvironment, String dme2Partner,
319 String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) {
321 super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps);
328 String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
330 if (environment == null || environment.isEmpty()) {
331 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
332 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
333 } if (aftEnvironment == null || aftEnvironment.isEmpty()) {
334 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
335 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
336 } if (latitude == null || latitude.isEmpty()) {
337 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
338 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP");
339 } if (longitude == null || longitude.isEmpty()) {
340 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
341 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + " property for DME2 in DMaaP");
344 if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
345 throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
346 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " +
347 PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
350 String serviceName = servers.get(0);
352 /* These are required, no defaults */
353 props.setProperty("Environment", environment);
354 props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
356 props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
358 if (dme2Partner != null)
359 props.setProperty("Partner", dme2Partner);
360 if (dme2RouteOffer != null)
361 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
363 props.setProperty("Latitude", latitude);
364 props.setProperty("Longitude", longitude);
366 // ServiceName also a default, found in additionalProps
368 /* These are optional, will default to these values if not set in optionalProps */
369 props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
370 props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
371 props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
372 props.setProperty("Version", "1.0");
373 props.setProperty("SubContextPath", "/");
374 props.setProperty("sessionstickinessrequired", "no");
376 /* These should not change */
377 props.setProperty("TransportType", "DME2");
378 props.setProperty("MethodType", "POST");
380 for (String key : additionalProps.keySet()) {
381 String value = additionalProps.get(key);
384 props.setProperty(key, value);
387 this.publisher.setProps(props);