X-Git-Url: https://gerrit.onap.org/r/gitweb?p=policy%2Fengine.git;a=blobdiff_plain;f=PolicyEngineUtils%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fpolicy%2Futils%2FBusPublisher.java;h=f5c792caf321ae7532ce974b0f7faeb526a4602a;hp=745213529626d7f528309088c36226b848a065f5;hb=e9b8aa0223e6f042c0533176ae8222fb061852de;hpb=3c95fa7249cccc3d0eb3c9e89ba708a3167d41ba diff --git a/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusPublisher.java b/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusPublisher.java index 745213529..f5c792caf 100644 --- a/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusPublisher.java +++ b/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusPublisher.java @@ -2,14 +2,14 @@ * ============LICENSE_START======================================================= * PolicyEngineUtils * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; + import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher; import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; import org.onap.policy.common.logging.eelf.PolicyLogger; @@ -31,109 +32,109 @@ import org.onap.policy.common.logging.flexlogger.FlexLogger; import org.onap.policy.common.logging.flexlogger.Logger; public interface BusPublisher { - - /** - * sends a message - * - * @param partition id - * @param message the message - * @return true if success, false otherwise - * @throws IllegalArgumentException if no message provided - */ - public boolean send(String partitionId, String message); - - /** - * closes the publisher - */ - public void close(); - - /** - * DmaapClient library wrapper - */ - public static class DmaapPublisherWrapper implements BusPublisher { - private static Logger logger = FlexLogger.getLogger(DmaapPublisherWrapper.class); - /** - * MR based Publisher - */ - protected MRSimplerBatchPublisher publisher; - - public DmaapPublisherWrapper(List servers, String topic, - String aafLogin, - String aafPassword) { - - ArrayList dmaapServers = new ArrayList<>(); - for (String server: servers) { - dmaapServers.add(server + ":3904"); - } - - this.publisher = - new MRSimplerBatchPublisher.Builder(). - againstUrls(dmaapServers). - onTopic(topic). - build(); - - this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - - this.publisher.setUsername(aafLogin); - this.publisher.setPassword(aafPassword); - - Properties props = new Properties(); - props.setProperty("Protocol", "http"); - props.setProperty("contenttype", "application/json"); - - this.publisher.setProps(props); - - this.publisher.setHost(servers.get(0)); - - if (PolicyLogger.isInfoEnabled()) - PolicyLogger.info(DmaapPublisherWrapper.class.getName(), - "CREATION: " + this); - } - - /** - * {@inheritDoc} - */ - @Override - public void close() { - if (logger.isInfoEnabled()) - logger.info(DmaapPublisherWrapper.class.getName()+ - "CREATION: " + this); - - try { - this.publisher.close(1, TimeUnit.SECONDS); - } catch (Exception e) { - logger.warn(DmaapPublisherWrapper.class.getName()+ - "CLOSE: " + this + " because of " + - e.getMessage(), e); - } - } - - /** - * {@inheritDoc} - */ - @Override - public boolean send(String partitionId, String message){ - if (message == null) - throw new IllegalArgumentException("No message provided"); - - this.publisher.send(partitionId, message); - return true; - - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("DmaapPublisherWrapper ["). - append("publisher.getAuthDate()=").append(publisher.getAuthDate()). - append(", publisher.getAuthKey()=").append(publisher.getAuthKey()). - append(", publisher.getHost()=").append(publisher.getHost()). - append(", publisher.getProtocolFlag()=").append(publisher.getProtocolFlag()). - append(", publisher.getUsername()=").append(publisher.getUsername()). - append(", publisher.getPendingMessageCount()=").append(publisher.getPendingMessageCount()). - append("]"); - return builder.toString(); - } - } + + /** + * sends a message + * . + * @param partitionId id + * @param message the message + * @return true if success, false otherwise + * @throws IllegalArgumentException if no message provided + */ + public boolean send(String partitionId, String message); + + /** + * closes the publisher. + */ + public void close(); + + /** + * DmaapClient library wrapper. + */ + public static class DmaapPublisherWrapper implements BusPublisher { + private static Logger logger = FlexLogger.getLogger(DmaapPublisherWrapper.class); + /** + * MR based Publisher. + */ + protected MRSimplerBatchPublisher publisher; + + /** + * DmaapPublisherWrapper constructor. + * + * @param servers list of servers + * @param topic topic + * @param aafLogin login + * @param aafPassword password + */ + public DmaapPublisherWrapper(List servers, String topic, String aafLogin, String aafPassword) { + + ArrayList dmaapServers = new ArrayList<>(); + for (String server : servers) { + dmaapServers.add(server + ":3904"); + } + + this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build(); + + this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + + this.publisher.setUsername(aafLogin); + this.publisher.setPassword(aafPassword); + + Properties props = new Properties(); + props.setProperty("Protocol", "http"); + props.setProperty("contenttype", "application/json"); + + this.publisher.setProps(props); + + this.publisher.setHost(servers.get(0)); + + if (PolicyLogger.isInfoEnabled()) { + PolicyLogger.info(DmaapPublisherWrapper.class.getName(), "CREATION: " + this); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void close() { + if (logger.isInfoEnabled()) { + logger.info(DmaapPublisherWrapper.class.getName() + "CREATION: " + this); + } + + try { + this.publisher.close(1, TimeUnit.SECONDS); + } catch (Exception e) { + logger.warn(DmaapPublisherWrapper.class.getName() + "CLOSE: " + this + " because of " + e.getMessage(), + e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean send(String partitionId, String message) { + if (message == null) { + throw new IllegalArgumentException("No message provided"); + } + + this.publisher.send(partitionId, message); + return true; + + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("DmaapPublisherWrapper [").append("publisher.getAuthDate()=").append(publisher.getAuthDate()) + .append(", publisher.getAuthKey()=").append(publisher.getAuthKey()).append(", publisher.getHost()=") + .append(publisher.getHost()).append(", publisher.getProtocolFlag()=") + .append(publisher.getProtocolFlag()).append(", publisher.getUsername()=") + .append(publisher.getUsername()).append(", publisher.getPendingMessageCount()=") + .append(publisher.getPendingMessageCount()).append("]"); + return builder.toString(); + } + } }