2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.policy.drools.event.comm.bus.internal;
23 import java.net.MalformedURLException;
24 import java.security.GeneralSecurityException;
25 import java.util.ArrayList;
26 import java.util.List;
28 import java.util.Properties;
29 import java.util.concurrent.TimeUnit;
31 import org.openecomp.policy.common.logging.eelf.PolicyLogger;
32 import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSinkFactory;
33 import org.openecomp.policy.drools.properties.PolicyProperties;
34 import org.slf4j.LoggerFactory;
36 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
37 import com.att.nsa.cambria.client.CambriaClientBuilders;
38 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
39 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
40 import com.att.nsa.mr.client.response.MRPublisherResponse;
41 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
42 import com.fasterxml.jackson.annotation.JsonIgnore;
44 public interface BusPublisher {
50 * @param message the message
51 * @return true if success, false otherwise
52 * @throws IllegalArgumentException if no message provided
54 public boolean send(String partitionId, String message) throws IllegalArgumentException;
57 * closes the publisher
62 * Cambria based library publisher
64 public static class CambriaPublisherWrapper implements BusPublisher {
67 * The actual Cambria publisher
70 protected volatile CambriaBatchingPublisher publisher;
72 public CambriaPublisherWrapper(List<String> servers, String topic,
74 String apiSecret, boolean useHttps) throws IllegalArgumentException {
75 PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
80 builder.usingHosts(servers)
85 builder.usingHosts(servers)
90 if (apiKey != null && !apiKey.isEmpty() &&
91 apiSecret != null && !apiSecret.isEmpty()) {
92 builder.authenticatedBy(apiKey, apiSecret);
96 this.publisher = builder.build();
97 } catch (MalformedURLException | GeneralSecurityException e) {
98 throw new IllegalArgumentException(e);
106 public boolean send(String partitionId, String message)
107 throws IllegalArgumentException {
109 throw new IllegalArgumentException("No message provided");
112 this.publisher.send(partitionId, message);
113 } catch (Exception e) {
114 PolicyLogger.warn(CambriaPublisherWrapper.class.getName(),
115 "SEND of " + message + " IN " +
116 this + " cannot be performed because of " +
127 public void close() {
128 if (PolicyLogger.isInfoEnabled())
129 PolicyLogger.info(CambriaPublisherWrapper.class.getName(),
130 "CREATION: " + this);
133 this.publisher.close();
134 } catch (Exception e) {
135 PolicyLogger.warn(CambriaPublisherWrapper.class.getName(),
136 "CLOSE on " + this + " FAILED because of " +
143 public String toString() {
144 StringBuilder builder = new StringBuilder();
145 builder.append("CambriaPublisherWrapper [").
146 append("publisher.getPendingMessageCount()=").
147 append(publisher.getPendingMessageCount()).
149 return builder.toString();
155 * DmaapClient library wrapper
157 public abstract class DmaapPublisherWrapper implements BusPublisher {
161 protected MRSimplerBatchPublisher publisher;
162 protected Properties props;
165 * MR Publisher Wrapper
167 * @param servers messaging bus hosts
169 * @param username AAF or DME2 Login
170 * @param password AAF or DME2 Password
172 public DmaapPublisherWrapper(ProtocolTypeConstants protocol,
173 List<String> servers, String topic,
175 String password, boolean useHttps) throws IllegalArgumentException {
178 if (topic == null || topic.isEmpty()) {
179 throw new IllegalArgumentException("No topic for DMaaP");
182 if (protocol == ProtocolTypeConstants.AAF_AUTH) {
183 if (servers == null || servers.isEmpty())
184 throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
186 ArrayList<String> dmaapServers = new ArrayList<String>();
188 for (String server: servers) {
189 dmaapServers.add(server + ":3905");
194 for (String server: servers) {
195 dmaapServers.add(server + ":3904");
201 new MRSimplerBatchPublisher.Builder().
202 againstUrls(dmaapServers).
206 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
207 } else if (protocol == ProtocolTypeConstants.DME2) {
208 ArrayList<String> dmaapServers = new ArrayList<String>();
209 dmaapServers.add("0.0.0.0:3904");
212 new MRSimplerBatchPublisher.Builder().
213 againstUrls(dmaapServers).
217 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
221 this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
223 this.publisher.setUsername(username);
224 this.publisher.setPassword(password);
226 props = new Properties();
230 props.setProperty("Protocol", "https");
234 props.setProperty("Protocol", "http");
238 props.setProperty("contenttype", "application/json");
239 props.setProperty("username", username);
240 props.setProperty("password", password);
242 props.setProperty("topic", topic);
244 this.publisher.setProps(props);
246 if (protocol == ProtocolTypeConstants.AAF_AUTH)
247 this.publisher.setHost(servers.get(0));
249 if (PolicyLogger.isInfoEnabled()) {
250 PolicyLogger.info(DmaapPublisherWrapper.class.getName(),
251 "CREATION: " + this);
252 PolicyLogger.info(DmaapPublisherWrapper.class.getName(),
253 "BusPublisher.DmaapPublisherWrapper using Protocol: " + protocol.getValue());
262 public void close() {
263 if (PolicyLogger.isInfoEnabled())
264 PolicyLogger.info(DmaapPublisherWrapper.class.getName(),
265 "CREATION: " + this);
268 this.publisher.close(1, TimeUnit.SECONDS);
269 } catch (Exception e) {
270 PolicyLogger.warn(DmaapPublisherWrapper.class.getName(),
271 "CLOSE: " + this + " because of " +
280 public boolean send(String partitionId, String message)
281 throws IllegalArgumentException {
283 throw new IllegalArgumentException("No message provided");
285 this.publisher.setPubResponse(new MRPublisherResponse());
286 this.publisher.send(partitionId, message);
287 MRPublisherResponse response = this.publisher.sendBatchWithResponse();
288 if (PolicyLogger.isDebugEnabled() && response != null) {
289 PolicyLogger.debug(DmaapPublisherWrapper.class.getName(),
290 "DMaaP publisher received " + response.getResponseCode() + ": "
291 + response.getResponseMessage());
300 public String toString() {
301 StringBuilder builder = new StringBuilder();
302 builder.append("DmaapPublisherWrapper [").
303 append("publisher.getAuthDate()=").append(publisher.getAuthDate()).
304 append(", publisher.getAuthKey()=").append(publisher.getAuthKey()).
305 append(", publisher.getHost()=").append(publisher.getHost()).
306 append(", publisher.getProtocolFlag()=").append(publisher.getProtocolFlag()).
307 append(", publisher.getUsername()=").append(publisher.getUsername()).
308 append(", publisher.getPendingMessageCount()=").append(publisher.getPendingMessageCount()).
310 return builder.toString();
315 * DmaapClient library wrapper
317 public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
321 protected MRSimplerBatchPublisher publisher;
323 public DmaapAafPublisherWrapper(List<String> servers, String topic,
325 String aafPassword, boolean useHttps) {
327 super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
331 public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
332 public DmaapDmePublisherWrapper(List<String> servers, String topic,
333 String username, String password,
334 String environment, String aftEnvironment, String dme2Partner,
335 String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) {
337 super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps);
344 String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
346 if (environment == null || environment.isEmpty()) {
347 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
348 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
349 } if (aftEnvironment == null || aftEnvironment.isEmpty()) {
350 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
351 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
352 } if (latitude == null || latitude.isEmpty()) {
353 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
354 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP");
355 } if (longitude == null || longitude.isEmpty()) {
356 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
357 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + " property for DME2 in DMaaP");
360 if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
361 throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
362 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " +
363 PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
366 String serviceName = servers.get(0);
368 /* These are required, no defaults */
369 props.setProperty("Environment", environment);
370 props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
372 props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
374 if (dme2Partner != null)
375 props.setProperty("Partner", dme2Partner);
376 if (dme2RouteOffer != null)
377 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
379 props.setProperty("Latitude", latitude);
380 props.setProperty("Longitude", longitude);
382 // ServiceName also a default, found in additionalProps
384 /* These are optional, will default to these values if not set in optionalProps */
385 props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
386 props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
387 props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
388 props.setProperty("Version", "1.0");
389 props.setProperty("SubContextPath", "/");
390 props.setProperty("sessionstickinessrequired", "no");
392 /* These should not change */
393 props.setProperty("TransportType", "DME2");
394 props.setProperty("MethodType", "POST");
396 for (String key : additionalProps.keySet()) {
397 String value = additionalProps.get(key);
400 props.setProperty(key, value);
403 this.publisher.setProps(props);