Update dmaap 1.1.9 changes
[policy/engine.git] / PolicyEngineUtils / src / main / java / org / onap / policy / utils / BusConsumer.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * PolicyEngineUtils
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.utils;
22
23 import java.net.MalformedURLException;
24 import java.util.List;
25 import java.util.Properties;
26 import org.onap.dmaap.mr.client.MRClient.MRApiException;
27 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
28 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
29
30 public interface BusConsumer {
31         
32         /**
33          * fetch messages
34          * 
35          * @return list of messages
36          * @throws MRApiException when error encountered by underlying libraries
37          */
38         public Iterable<String> fetch() throws MRApiException;
39         
40         /**
41          * close underlying library consumer
42          */
43         public void close();
44         
45         /**
46          * MR based consumer
47          */
48         public static class DmaapConsumerWrapper implements BusConsumer {
49                 
50                 /**
51                  * MR Consumer
52                  */
53                 protected MRConsumerImpl consumer;
54                 
55                 /**
56                  * MR Consumer Wrapper
57                  * 
58                  * @param servers messaging bus hosts
59                  * @param topic topic
60                  * @param apiKey API Key
61                  * @param apiSecret API Secret
62                  * @param aafLogin AAF Login
63                  * @param aafPassword AAF Password
64                  * @param consumerGroup Consumer Group
65                  * @param consumerInstance Consumer Instance
66                  * @param fetchTimeout Fetch Timeout
67                  * @param fetchLimit Fetch Limit
68                  */
69                 public DmaapConsumerWrapper(List<String> servers, String topic, 
70                                                                 String aafLogin, String aafPassword,
71                                                                 String consumerGroup, String consumerInstance,
72                                                                 int fetchTimeout, int fetchLimit) throws MalformedURLException{
73                                         
74                         this.consumer = new MRConsumerImpl(servers, topic, 
75                                                                                            consumerGroup, consumerInstance, 
76                                                                                            fetchTimeout, fetchLimit, 
77                                                                                    null, aafLogin, aafPassword);
78                         
79                         this.consumer.setUsername(aafLogin);
80                         this.consumer.setPassword(aafPassword);
81                         
82                         this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
83                         
84                         Properties props = new Properties();
85                         props.setProperty("Protocol", "http");
86                         this.consumer.setProps(props);
87                         this.consumer.setHost(servers.get(0) + ":3904");
88                 }
89                 
90                 /**
91                  * {@inheritDoc}
92                  */
93                 @Override
94                 public Iterable<String> fetch() throws MRApiException {
95                         try {
96                 return this.consumer.fetch();
97             } catch (Exception e) {
98                 throw new MRApiException("Error during MR consumer Fetch ",e);
99             }
100                 }
101                 
102                 /**
103                  * {@inheritDoc}
104                  */
105                 @Override
106                 public void close() {
107                         this.consumer.close();
108                 }
109                 
110                 @Override
111                 public String toString() {
112                         StringBuilder builder = new StringBuilder();
113                         builder.
114                         append("DmaapConsumerWrapper [").
115                         append("consumer.getAuthDate()=").append(consumer.getAuthDate()).
116                         append(", consumer.getAuthKey()=").append(consumer.getAuthKey()).
117                         append(", consumer.getHost()=").append(consumer.getHost()).
118                         append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()).
119                         append(", consumer.getUsername()=").append(consumer.getUsername()).
120                         append("]");
121                         return builder.toString();
122                 }
123         }
124
125 }