X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=PolicyEngineUtils%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fpolicy%2Futils%2FBusConsumer.java;h=b64d6143e298ec327e88dcb3cf911ea6788a2525;hb=refs%2Fchanges%2F64%2F97264%2F6;hp=bf92835c6a76ddaf6fcebec57077239a353c79b8;hpb=073cc188efe9abb4c010cf674e34e2cf46ef1c52;p=policy%2Fengine.git diff --git a/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusConsumer.java b/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusConsumer.java index bf92835c6..b64d6143e 100644 --- a/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusConsumer.java +++ b/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusConsumer.java @@ -1,99 +1,123 @@ +/*- + * ============LICENSE_START======================================================= + * PolicyEngineUtils + * ================================================================================ + * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved. + * Modifications copyright (c) 2019 Nokia + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + package org.onap.policy.utils; +import java.net.MalformedURLException; import java.util.List; import java.util.Properties; -import com.att.nsa.mr.client.impl.MRConsumerImpl; -import com.att.nsa.mr.test.clients.ProtocolTypeConstants; +import org.onap.dmaap.mr.client.MRClient.MRApiException; +import org.onap.dmaap.mr.client.impl.MRConsumerImpl; +import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; public interface BusConsumer { - - /** - * fetch messages - * - * @return list of messages - * @throws Exception when error encountered by underlying libraries - */ - public Iterable fetch() throws Exception; - - /** - * close underlying library consumer - */ - public void close(); - - /** - * MR based consumer - */ - public static class DmaapConsumerWrapper implements BusConsumer { - - /** - * MR Consumer - */ - protected MRConsumerImpl consumer; - - /** - * MR Consumer Wrapper - * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param aafLogin AAF Login - * @param aafPassword AAF Password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit - */ - public DmaapConsumerWrapper(List servers, String topic, - String aafLogin, String aafPassword, - String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit) - throws Exception { - - this.consumer = new MRConsumerImpl(servers, topic, - consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, - null, aafLogin, aafPassword); - - this.consumer.setUsername(aafLogin); - this.consumer.setPassword(aafPassword); - - this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - - Properties props = new Properties(); - props.setProperty("Protocol", "http"); - this.consumer.setProps(props); - this.consumer.setHost(servers.get(0) + ":3904"); - } - - /** - * {@inheritDoc} - */ - public Iterable fetch() throws Exception { - return this.consumer.fetch(); - } - - /** - * {@inheritDoc} - */ - public void close() { - this.consumer.close(); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder. - append("DmaapConsumerWrapper ["). - append("consumer.getAuthDate()=").append(consumer.getAuthDate()). - append(", consumer.getAuthKey()=").append(consumer.getAuthKey()). - append(", consumer.getHost()=").append(consumer.getHost()). - append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()). - append(", consumer.getUsername()=").append(consumer.getUsername()). - append("]"); - return builder.toString(); - } - } + /** + * fetch messages. + * + * @return list of messages + * @throws MRApiException when error encountered by underlying libraries + */ + Iterable fetch() throws MRApiException; + + /** + * close underlying library consumer. + */ + void close(); + + /** + * MR based consumer. + */ + class DmaapConsumerWrapper implements BusConsumer { + + /** + * MR Consumer. + */ + protected MRConsumerImpl consumer; + + /** + * MR Consumer Wrapper. + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param aafLogin AAF Login + * @param aafPassword AAF Password + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + */ + public DmaapConsumerWrapper(List servers, String topic, String aafLogin, String aafPassword, + String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit) + throws MalformedURLException { + + this(new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, null, + aafLogin, aafPassword), aafLogin, aafPassword, servers.get(0)); + + } + + DmaapConsumerWrapper(MRConsumerImpl consumer, String aafLogin, String aafPassword, String host) { + this.consumer = consumer; + this.consumer.setUsername(aafLogin); + this.consumer.setPassword(aafPassword); + this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + this.consumer.setHost(host + ":3904"); + + Properties props = new Properties(); + props.setProperty("Protocol", "http"); + this.consumer.setProps(props); + } + + /** + * {@inheritDoc} + */ + @Override + public Iterable fetch() throws MRApiException { + try { + return consumer.fetch(); + } catch (Exception e) { + throw new MRApiException("Error during MR consumer Fetch ", e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void close() { + this.consumer.close(); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("DmaapConsumerWrapper [").append("consumer.getAuthDate()=").append(consumer.getAuthDate()) + .append(", consumer.getAuthKey()=").append(consumer.getAuthKey()).append(", consumer.getHost()=") + .append(consumer.getHost()).append(", consumer.getProtocolFlag()=") + .append(consumer.getProtocolFlag()).append(", consumer.getUsername()=") + .append(consumer.getUsername()).append("]"); + return builder.toString(); + } + } }