2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.utils;
23 import java.net.MalformedURLException;
24 import java.util.List;
25 import java.util.Properties;
26 import org.onap.dmaap.mr.client.MRClient.MRApiException;
27 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
28 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
30 public interface BusConsumer {
35 * @return list of messages
36 * @throws MRApiException when error encountered by underlying libraries
38 public Iterable<String> fetch() throws MRApiException;
41 * close underlying library consumer
48 public static class DmaapConsumerWrapper implements BusConsumer {
53 protected MRConsumerImpl consumer;
58 * @param servers messaging bus hosts
60 * @param apiKey API Key
61 * @param apiSecret API Secret
62 * @param aafLogin AAF Login
63 * @param aafPassword AAF Password
64 * @param consumerGroup Consumer Group
65 * @param consumerInstance Consumer Instance
66 * @param fetchTimeout Fetch Timeout
67 * @param fetchLimit Fetch Limit
69 public DmaapConsumerWrapper(List<String> servers, String topic,
70 String aafLogin, String aafPassword,
71 String consumerGroup, String consumerInstance,
72 int fetchTimeout, int fetchLimit) throws MalformedURLException{
74 this.consumer = new MRConsumerImpl(servers, topic,
75 consumerGroup, consumerInstance,
76 fetchTimeout, fetchLimit,
77 null, aafLogin, aafPassword);
79 this.consumer.setUsername(aafLogin);
80 this.consumer.setPassword(aafPassword);
82 this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
84 Properties props = new Properties();
85 props.setProperty("Protocol", "http");
86 this.consumer.setProps(props);
87 this.consumer.setHost(servers.get(0) + ":3904");
94 public Iterable<String> fetch() throws MRApiException {
96 return this.consumer.fetch();
97 } catch (Exception e) {
98 throw new MRApiException("Error during MR consumer Fetch ",e);
106 public void close() {
107 this.consumer.close();
111 public String toString() {
112 StringBuilder builder = new StringBuilder();
114 append("DmaapConsumerWrapper [").
115 append("consumer.getAuthDate()=").append(consumer.getAuthDate()).
116 append(", consumer.getAuthKey()=").append(consumer.getAuthKey()).
117 append(", consumer.getHost()=").append(consumer.getHost()).
118 append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()).
119 append(", consumer.getUsername()=").append(consumer.getUsername()).
121 return builder.toString();