0806c3d3bb6749d73fc73a3f0e742783e5c1ea53
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal.impl;
22
23 import com.att.nsa.mr.client.impl.MRConsumerImpl;
24 import com.att.nsa.mr.client.response.MRConsumerResponse;
25
26 import java.io.IOException;
27 import java.net.MalformedURLException;
28 import java.util.ArrayList;
29 import java.util.List;
30
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * MR based consumer
37  */
38 public abstract class DmaapConsumerWrapper implements BusConsumer {
39
40     /**
41      * logger
42      */
43     private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
44
45     /**
46      * Name of the "protocol" property.
47      */
48     protected static final String PROTOCOL_PROP = "Protocol";
49
50     /**
51      * fetch timeout
52      */
53     protected int fetchTimeout;
54
55     /**
56      * close condition
57      */
58     protected Object closeCondition = new Object();
59
60     /**
61      * MR Consumer
62      */
63     protected MRConsumerImpl consumer;
64
65     /**
66      * MR Consumer Wrapper
67      *
68      * @param servers messaging bus hosts
69      * @param topic topic
70      * @param apiKey API Key
71      * @param apiSecret API Secret
72      * @param username AAF Login
73      * @param password AAF Password
74      * @param consumerGroup Consumer Group
75      * @param consumerInstance Consumer Instance
76      * @param fetchTimeout Fetch Timeout
77      * @param fetchLimit Fetch Limit
78      * @throws MalformedURLException
79      */
80     public DmaapConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, String username,
81             String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit)
82             throws MalformedURLException {
83
84         this.fetchTimeout = fetchTimeout;
85
86         if (topic == null || topic.isEmpty()) {
87             throw new IllegalArgumentException("No topic for DMaaP");
88         }
89
90         this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, fetchTimeout, fetchLimit,
91                 null, apiKey, apiSecret);
92
93         this.consumer.setUsername(username);
94         this.consumer.setPassword(password);
95     }
96
97     @Override
98     public Iterable<String> fetch() throws InterruptedException, IOException {
99         final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
100         if (response == null) {
101             logger.warn("{}: DMaaP NULL response received", this);
102
103             synchronized (closeCondition) {
104                 closeCondition.wait(fetchTimeout);
105             }
106             return new ArrayList<>();
107         } else {
108             logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(), response.getResponseMessage());
109
110             if (response.getResponseCode() == null || !"200".equals(response.getResponseCode())) {
111
112                 logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
113                         response.getResponseMessage());
114
115                 synchronized (closeCondition) {
116                     closeCondition.wait(fetchTimeout);
117                 }
118
119                 /* fall through */
120             }
121         }
122
123         if (response.getActualMessages() == null) {
124             return new ArrayList<>();
125         } else {
126             return response.getActualMessages();
127         }
128     }
129
130     @Override
131     public void close() {
132         synchronized (closeCondition) {
133             closeCondition.notifyAll();
134         }
135
136         this.consumer.close();
137     }
138
139     @Override
140     public String toString() {
141         return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
142                 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
143                 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
144                 + consumer.getUsername() + "]";
145     }
146 }