X-Git-Url: https://gerrit.onap.org/r/gitweb?p=policy%2Fengine.git;a=blobdiff_plain;f=PolicyEngineUtils%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fpolicy%2Futils%2FBusConsumer.java;h=c3fe9023fb2306b50b99c720e0b95da4c297b0b6;hp=0d4c86b3d9b738b022ec60d4f06a603a232f63ec;hb=5bcdc95e91aeabf68770b175fe210da38e76867f;hpb=677d872505a8b74a175023ff199b123ff2147dc8 diff --git a/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusConsumer.java b/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusConsumer.java index 0d4c86b3d..c3fe9023f 100644 --- a/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusConsumer.java +++ b/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusConsumer.java @@ -3,13 +3,14 @@ * PolicyEngineUtils * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Modifications copyright (c) 2019 Nokia * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -28,98 +29,100 @@ import org.onap.dmaap.mr.client.impl.MRConsumerImpl; import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; public interface BusConsumer { - - /** - * fetch messages - * - * @return list of messages - * @throws MRApiException when error encountered by underlying libraries - */ - public Iterable fetch() throws MRApiException; - - /** - * close underlying library consumer - */ - public void close(); - - /** - * MR based consumer - */ - public static class DmaapConsumerWrapper implements BusConsumer { - - /** - * MR Consumer - */ - protected MRConsumerImpl consumer; - - /** - * MR Consumer Wrapper - * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param aafLogin AAF Login - * @param aafPassword AAF Password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit - */ - public DmaapConsumerWrapper(List servers, String topic, - String aafLogin, String aafPassword, - String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit) throws MalformedURLException{ - - this.consumer = new MRConsumerImpl(servers, topic, - consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, - null, aafLogin, aafPassword); - - this.consumer.setUsername(aafLogin); - this.consumer.setPassword(aafPassword); - - this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - - Properties props = new Properties(); - props.setProperty("Protocol", "http"); - this.consumer.setProps(props); - this.consumer.setHost(servers.get(0) + ":3904"); - } - - /** - * {@inheritDoc} - */ - @Override - public Iterable fetch() throws MRApiException { - try { - return this.consumer.fetch(); + + /** + * fetch messages + * + * @return list of messages + * @throws MRApiException when error encountered by underlying libraries + */ + Iterable fetch() throws MRApiException; + + /** + * close underlying library consumer + */ + void close(); + + /** + * MR based consumer + */ + class DmaapConsumerWrapper implements BusConsumer { + + /** + * MR Consumer + */ + protected MRConsumerImpl consumer; + + /** + * MR Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param aafLogin AAF Login + * @param aafPassword AAF Password + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + */ + public DmaapConsumerWrapper(List servers, String topic, + String aafLogin, String aafPassword, + String consumerGroup, String consumerInstance, + int fetchTimeout, int fetchLimit) throws MalformedURLException { + + this(new MRConsumerImpl(servers, topic, + consumerGroup, consumerInstance, + fetchTimeout, fetchLimit, + null, aafLogin, aafPassword), aafLogin, aafPassword, servers.get(0)); + + } + + DmaapConsumerWrapper(MRConsumerImpl consumer, String aafLogin, String aafPassword, String host) { + this.consumer = consumer; + this.consumer.setUsername(aafLogin); + this.consumer.setPassword(aafPassword); + this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + this.consumer.setHost(host + ":3904"); + + Properties props = new Properties(); + props.setProperty("Protocol", "http"); + this.consumer.setProps(props); + } + + /** + * {@inheritDoc} + */ + @Override + public Iterable fetch() throws MRApiException { + try { + return consumer.fetch(); } catch (Exception e) { - throw new MRApiException("Error during MR consumer Fetch ",e); + throw new MRApiException("Error during MR consumer Fetch ", e); } - } - - /** - * {@inheritDoc} - */ - @Override - public void close() { - this.consumer.close(); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder. - append("DmaapConsumerWrapper ["). - append("consumer.getAuthDate()=").append(consumer.getAuthDate()). - append(", consumer.getAuthKey()=").append(consumer.getAuthKey()). - append(", consumer.getHost()=").append(consumer.getHost()). - append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()). - append(", consumer.getUsername()=").append(consumer.getUsername()). - append("]"); - return builder.toString(); - } - } + } + + /** + * {@inheritDoc} + */ + @Override + public void close() { + this.consumer.close(); + } + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder. + append("DmaapConsumerWrapper ["). + append("consumer.getAuthDate()=").append(consumer.getAuthDate()). + append(", consumer.getAuthKey()=").append(consumer.getAuthKey()). + append(", consumer.getHost()=").append(consumer.getHost()). + append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()). + append(", consumer.getUsername()=").append(consumer.getUsername()). + append("]"); + return builder.toString(); + } + } }