6a9a2d6d39cde043bc392e551f31ce029497c293
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.event.comm.bus.internal;
22
23 import java.net.MalformedURLException;
24 import java.util.List;
25 import java.util.Map;
26
27 import org.onap.policy.drools.event.comm.Topic;
28 import org.onap.policy.drools.event.comm.bus.DmaapTopicSource;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * This topic reader implementation specializes in reading messages
34  * over DMAAP topic and notifying its listeners
35  */
36 public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
37                                             implements DmaapTopicSource, Runnable {     
38
39         private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class);
40         
41
42         protected final String userName;
43         protected final String password;
44         
45         protected String environment = null;
46         protected String aftEnvironment = null;
47         protected String partner = null;
48         protected String latitude = null;
49         protected String longitude = null;
50         
51         protected Map<String,String> additionalProps = null;
52
53         
54         /**
55          * 
56          * @param servers DMaaP servers
57          * @param topic DMaaP Topic to be monitored
58          * @param apiKey DMaaP API Key (optional)
59          * @param apiSecret DMaaP API Secret (optional)
60          * @param consumerGroup DMaaP Reader Consumer Group
61          * @param consumerInstance DMaaP Reader Instance
62          * @param fetchTimeout DMaaP fetch timeout
63          * @param fetchLimit DMaaP fetch limit
64          * @param environment DME2 Environment
65          * @param aftEnvironment DME2 AFT Environment
66          * @param partner DME2 Partner
67          * @param latitude DME2 Latitude
68          * @param longitude DME2 Longitude
69          * @param additionalProps Additional properties to pass to DME2
70          * @param useHttps does connection use HTTPS?
71          * @param allowSelfSignedCerts are self-signed certificates allow
72          * 
73          * @throws IllegalArgumentException An invalid parameter passed in
74          */
75         public SingleThreadedDmaapTopicSource(List<String> servers, String topic, 
76                                                                                         String apiKey, String apiSecret,
77                                                                                         String userName, String password,
78                                                                                         String consumerGroup, String consumerInstance,
79                                                                                         int fetchTimeout, int fetchLimit,
80                                                                                         String environment, String aftEnvironment, String partner,
81                                                                                         String latitude, String longitude, Map<String,String> additionalProps,
82                                                                                         boolean useHttps, boolean allowSelfSignedCerts) {
83                         
84                 super(servers, topic, apiKey, apiSecret, 
85                           consumerGroup, consumerInstance, 
86                           fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts);
87                 
88                 this.userName = userName;
89                 this.password = password;
90                 
91                 this.environment = environment;
92                 this.aftEnvironment = aftEnvironment;
93                 this.partner = partner;
94                 
95                 this.latitude = latitude;
96                 this.longitude = longitude;
97                 
98                 this.additionalProps = additionalProps;
99                 try {
100                         this.init();
101                 } catch (Exception e) {
102                         logger.error("ERROR during init of topic {}", this.topic);
103                         throw new IllegalArgumentException(e);
104                 }
105         }
106
107         /**
108          * 
109          * @param servers DMaaP servers
110          * @param topic DMaaP Topic to be monitored
111          * @param apiKey DMaaP API Key (optional)
112          * @param apiSecret DMaaP API Secret (optional)
113          * @param consumerGroup DMaaP Reader Consumer Group
114          * @param consumerInstance DMaaP Reader Instance
115          * @param fetchTimeout DMaaP fetch timeout
116          * @param fetchLimit DMaaP fetch limit
117          * @param useHttps does connection use HTTPS?
118          * @param allowSelfSignedCerts are self-signed certificates allow
119          * @throws IllegalArgumentException An invalid parameter passed in
120          */
121         public SingleThreadedDmaapTopicSource(List<String> servers, String topic, 
122                                                       String apiKey, String apiSecret,
123                                                       String userName, String password,
124                                                       String consumerGroup, String consumerInstance, 
125                                                       int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) {
126                 
127                 
128                 super(servers, topic, apiKey, apiSecret, 
129                           consumerGroup, consumerInstance, 
130                           fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
131                 
132                 this.userName = userName;
133                 this.password = password;               
134                 
135                 try {
136                         this.init();
137                 } catch (Exception e) {
138                         logger.warn("dmaap-source: cannot create topic {} because of {}", topic, e.getMessage(), e);
139                         throw new IllegalArgumentException(e);
140                 }
141         }
142         
143
144         /**
145          * Initialize the Cambria or MR Client
146          */
147         @Override
148         public void init() throws MalformedURLException {
149                 if (anyNullOrEmpty(this.userName, this.password)) {
150                                 this.consumer =
151                                                 new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, 
152                                                                                            this.apiKey, this.apiSecret,
153                                                                                            this.consumerGroup, this.consumerInstance,
154                                                                                            this.fetchTimeout, this.fetchLimit,
155                                                                                            this.useHttps, this.allowSelfSignedCerts);
156                 } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
157                         this.consumer =
158                                         new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, 
159                                                                                     this.apiKey, this.apiSecret,
160                                                                                     this.userName, this.password,
161                                                                                     this.consumerGroup, this.consumerInstance,
162                                                                                     this.fetchTimeout, this.fetchLimit,
163                                                                                     this.useHttps, this.allowSelfSignedCerts);
164                 } else {
165                         this.consumer =
166                                         new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic, 
167                                                                                     this.apiKey, this.apiSecret,
168                                                                                     this.userName, this.password,
169                                                                                     this.consumerGroup, this.consumerInstance,
170                                                                                     this.fetchTimeout, this.fetchLimit,
171                                                                                     this.environment, this.aftEnvironment, this.partner,
172                                                                                     this.latitude, this.longitude, this.additionalProps, this.useHttps);
173                 }
174                         
175                 logger.info("{}: INITTED", this);
176         }
177         
178         /**
179          * {@inheritDoc}
180          */
181         @Override
182         public CommInfrastructure getTopicCommInfrastructure() {
183                 return Topic.CommInfrastructure.DMAAP;
184         }
185
186         @Override
187         public String toString() {
188                 StringBuilder builder = new StringBuilder();
189                 builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=")
190                                 .append((password == null || password.isEmpty()) ? "-" : password.length())
191                                 .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure())
192                                 .append(", toString()=").append(super.toString()).append("]");
193                 return builder.toString();
194         }
195
196
197 }