Sonar cleanup
[policy/drools-pdp.git] / policy-endpoints / src / main / java / org / onap / policy / drools / event / comm / bus / internal / SingleThreadedDmaapTopicSource.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.event.comm.bus.internal;
22
23 import java.net.MalformedURLException;
24 import java.util.List;
25 import java.util.Map;
26
27 import org.onap.policy.drools.event.comm.Topic;
28 import org.onap.policy.drools.event.comm.bus.DmaapTopicSource;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * This topic reader implementation specializes in reading messages
34  * over DMAAP topic and notifying its listeners
35  */
36 public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
37                                             implements DmaapTopicSource, Runnable {     
38
39         private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class);
40         
41
42         protected final String userName;
43         protected final String password;
44         
45         protected String environment = null;
46         protected String aftEnvironment = null;
47         protected String partner = null;
48         protected String latitude = null;
49         protected String longitude = null;
50         
51         protected Map<String,String> additionalProps = null;
52
53         
54         /**
55          * 
56          * @param servers DMaaP servers
57          * @param topic DMaaP Topic to be monitored
58          * @param apiKey DMaaP API Key (optional)
59          * @param apiSecret DMaaP API Secret (optional)
60          * @param consumerGroup DMaaP Reader Consumer Group
61          * @param consumerInstance DMaaP Reader Instance
62          * @param fetchTimeout DMaaP fetch timeout
63          * @param fetchLimit DMaaP fetch limit
64          * @param environment DME2 Environment
65          * @param aftEnvironment DME2 AFT Environment
66          * @param partner DME2 Partner
67          * @param latitude DME2 Latitude
68          * @param longitude DME2 Longitude
69          * @param additionalProps Additional properties to pass to DME2
70          * @param useHttps does connection use HTTPS?
71          * @param allowSelfSignedCerts are self-signed certificates allow
72          * 
73          * @throws IllegalArgumentException An invalid parameter passed in
74          */
75         public SingleThreadedDmaapTopicSource(List<String> servers, String topic, 
76                                                                                         String apiKey, String apiSecret,
77                                                                                         String userName, String password,
78                                                                                         String consumerGroup, String consumerInstance,
79                                                                                         int fetchTimeout, int fetchLimit,
80                                                                                         String environment, String aftEnvironment, String partner,
81                                                                                         String latitude, String longitude, Map<String,String> additionalProps,
82                                                                                         boolean useHttps, boolean allowSelfSignedCerts)
83                         throws IllegalArgumentException {
84                         
85                 super(servers, topic, apiKey, apiSecret, 
86                           consumerGroup, consumerInstance, 
87                           fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts);
88                 
89                 this.userName = userName;
90                 this.password = password;
91                 
92                 this.environment = environment;
93                 this.aftEnvironment = aftEnvironment;
94                 this.partner = partner;
95                 
96                 this.latitude = latitude;
97                 this.longitude = longitude;
98                 
99                 this.additionalProps = additionalProps;
100                 try {
101                         this.init();
102                 } catch (Exception e) {
103                         logger.error("ERROR during init of topic {}", this.topic);
104                         throw new IllegalArgumentException(e);
105                 }
106         }
107
108         /**
109          * 
110          * @param servers DMaaP servers
111          * @param topic DMaaP Topic to be monitored
112          * @param apiKey DMaaP API Key (optional)
113          * @param apiSecret DMaaP API Secret (optional)
114          * @param consumerGroup DMaaP Reader Consumer Group
115          * @param consumerInstance DMaaP Reader Instance
116          * @param fetchTimeout DMaaP fetch timeout
117          * @param fetchLimit DMaaP fetch limit
118          * @param useHttps does connection use HTTPS?
119          * @param allowSelfSignedCerts are self-signed certificates allow
120          * @throws IllegalArgumentException An invalid parameter passed in
121          */
122         public SingleThreadedDmaapTopicSource(List<String> servers, String topic, 
123                                                       String apiKey, String apiSecret,
124                                                       String userName, String password,
125                                                       String consumerGroup, String consumerInstance, 
126                                                       int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts)
127                         throws IllegalArgumentException {
128                 
129                 
130                 super(servers, topic, apiKey, apiSecret, 
131                           consumerGroup, consumerInstance, 
132                           fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
133                 
134                 this.userName = userName;
135                 this.password = password;               
136                 
137                 try {
138                         this.init();
139                 } catch (Exception e) {
140                         logger.warn("dmaap-source: cannot create topic {} because of {}", topic, e.getMessage(), e);
141                         throw new IllegalArgumentException(e);
142                 }
143         }
144         
145
146         /**
147          * Initialize the Cambria or MR Client
148          */
149         @Override
150         public void init() throws MalformedURLException {
151                 if (this.userName == null || this.userName.isEmpty() || 
152                                 this.password == null || this.password.isEmpty()) {
153                                 this.consumer =
154                                                 new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, 
155                                                                                            this.apiKey, this.apiSecret,
156                                                                                            this.consumerGroup, this.consumerInstance,
157                                                                                            this.fetchTimeout, this.fetchLimit,
158                                                                                            this.useHttps, this.allowSelfSignedCerts);
159                 } else if ((this.environment == null    || this.environment.isEmpty()) &&
160                                    (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) &&
161                                    (this.latitude == null          || this.latitude.isEmpty()) &&
162                                    (this.longitude == null         || this.longitude.isEmpty()) &&
163                                    (this.partner == null           || this.partner.isEmpty())) {
164                         this.consumer =
165                                         new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, 
166                                                                                     this.apiKey, this.apiSecret,
167                                                                                     this.userName, this.password,
168                                                                                     this.consumerGroup, this.consumerInstance,
169                                                                                     this.fetchTimeout, this.fetchLimit,
170                                                                                     this.useHttps, this.allowSelfSignedCerts);
171                 } else {
172                         this.consumer =
173                                         new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic, 
174                                                                                     this.apiKey, this.apiSecret,
175                                                                                     this.userName, this.password,
176                                                                                     this.consumerGroup, this.consumerInstance,
177                                                                                     this.fetchTimeout, this.fetchLimit,
178                                                                                     this.environment, this.aftEnvironment, this.partner,
179                                                                                     this.latitude, this.longitude, this.additionalProps, this.useHttps);
180                 }
181                         
182                 logger.info("{}: INITTED", this);
183         }
184         
185         /**
186          * {@inheritDoc}
187          */
188         @Override
189         public CommInfrastructure getTopicCommInfrastructure() {
190                 return Topic.CommInfrastructure.DMAAP;
191         }
192
193         @Override
194         public String toString() {
195                 StringBuilder builder = new StringBuilder();
196                 builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=")
197                                 .append((password == null || password.isEmpty()) ? "-" : password.length())
198                                 .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure())
199                                 .append(", toString()=").append(super.toString()).append("]");
200                 return builder.toString();
201         }
202
203
204 }