6c1bc8a01246128c34bdac0d3f785953837c6833
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.event.comm.bus.internal;
22
23 import java.net.MalformedURLException;
24 import java.util.List;
25 import java.util.Map;
26
27 import org.onap.policy.drools.event.comm.Topic;
28 import org.onap.policy.drools.event.comm.bus.DmaapTopicSource;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * This topic reader implementation specializes in reading messages
34  * over DMAAP topic and notifying its listeners
35  */
36 public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
37                                             implements DmaapTopicSource, Runnable {     
38
39         private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class);
40         
41         protected boolean allowSelfSignedCerts;
42         protected final String userName;
43         protected final String password;
44         
45         protected String environment = null;
46         protected String aftEnvironment = null;
47         protected String partner = null;
48         protected String latitude = null;
49         protected String longitude = null;
50         
51         protected Map<String,String> additionalProps = null;
52
53         
54         /**
55          * 
56          * @param servers DMaaP servers
57          * @param topic DMaaP Topic to be monitored
58          * @param apiKey DMaaP API Key (optional)
59          * @param apiSecret DMaaP API Secret (optional)
60          * @param consumerGroup DMaaP Reader Consumer Group
61          * @param consumerInstance DMaaP Reader Instance
62          * @param fetchTimeout DMaaP fetch timeout
63          * @param fetchLimit DMaaP fetch limit
64          * @param environment DME2 Environment
65          * @param aftEnvironment DME2 AFT Environment
66          * @param partner DME2 Partner
67          * @param latitude DME2 Latitude
68          * @param longitude DME2 Longitude
69          * @param additionalProps Additional properties to pass to DME2
70          * @param useHttps does connection use HTTPS?
71          * @param allowSelfSignedCerts are self-signed certificates allow
72          * 
73          * @throws IllegalArgumentException An invalid parameter passed in
74          */
75         public SingleThreadedDmaapTopicSource(List<String> servers, String topic, 
76                                                                                         String apiKey, String apiSecret,
77                                                                                         String userName, String password,
78                                                                                         String consumerGroup, String consumerInstance,
79                                                                                         int fetchTimeout, int fetchLimit,
80                                                                                         String environment, String aftEnvironment, String partner,
81                                                                                         String latitude, String longitude, Map<String,String> additionalProps,
82                                                                                         boolean useHttps, boolean allowSelfSignedCerts)
83                         throws IllegalArgumentException {
84                         
85                 super(servers, topic, apiKey, apiSecret, 
86                           consumerGroup, consumerInstance, 
87                           fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts);
88                 
89                 this.userName = userName;
90                 this.password = password;
91                 
92                 this.environment = environment;
93                 this.aftEnvironment = aftEnvironment;
94                 this.partner = partner;
95                 
96                 this.latitude = latitude;
97                 this.longitude = longitude;
98                 
99                 this.additionalProps = additionalProps;
100                 try {
101                         this.init();
102                 } catch (Exception e) {
103                         logger.error("ERROR during init of topic {}", this.topic);
104                         throw new IllegalArgumentException(e);
105                 }
106         }
107
108         /**
109          * 
110          * @param servers DMaaP servers
111          * @param topic DMaaP Topic to be monitored
112          * @param apiKey DMaaP API Key (optional)
113          * @param apiSecret DMaaP API Secret (optional)
114          * @param consumerGroup DMaaP Reader Consumer Group
115          * @param consumerInstance DMaaP Reader Instance
116          * @param fetchTimeout DMaaP fetch timeout
117          * @param fetchLimit DMaaP fetch limit
118          * @param useHttps does connection use HTTPS?
119          * @param allowSelfSignedCerts are self-signed certificates allow
120          * @throws IllegalArgumentException An invalid parameter passed in
121          */
122         public SingleThreadedDmaapTopicSource(List<String> servers, String topic, 
123                                                       String apiKey, String apiSecret,
124                                                       String userName, String password,
125                                                       String consumerGroup, String consumerInstance, 
126                                                       int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts)
127                         throws IllegalArgumentException {
128                 
129                 
130                 super(servers, topic, apiKey, apiSecret, 
131                           consumerGroup, consumerInstance, 
132                           fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
133                 
134                 this.userName = userName;
135                 this.password = password;               
136                 
137                 try {
138                         this.init();
139                 } catch (Exception e) {
140                         logger.warn("dmaap-source: cannot create topic {} because of {}", topic, e.getMessage(), e);
141                         throw new IllegalArgumentException(e);
142                 }
143         }
144         
145
146         /**
147          * Initialize the Cambria or MR Client
148          */
149         @Override
150         public void init() throws MalformedURLException {
151                 if (this.userName == null || this.userName.isEmpty() || 
152                                 this.password == null || this.password.isEmpty()) {
153                                 this.consumer =
154                                                 new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, 
155                                                                                            this.apiKey, this.apiSecret,
156                                                                                            this.consumerGroup, this.consumerInstance,
157                                                                                            this.fetchTimeout, this.fetchLimit,
158                                                                                            this.useHttps, this.allowSelfSignedCerts);
159                 } else if ((this.environment == null    || this.environment.isEmpty()) &&
160                                    (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) &&
161                                    (this.latitude == null          || this.latitude.isEmpty()) &&
162                                    (this.longitude == null         || this.longitude.isEmpty()) &&
163                                    (this.partner == null           || this.partner.isEmpty())) {
164                         this.consumer =
165                                         new BusConsumer.DmaapAafConsumerWrapper(this.servers, this.topic, 
166                                                                                     this.apiKey, this.apiSecret,
167                                                                                     this.userName, this.password,
168                                                                                     this.consumerGroup, this.consumerInstance,
169                                                                                     this.fetchTimeout, this.fetchLimit, this.useHttps);
170                 } else {
171                         this.consumer =
172                                         new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic, 
173                                                                                     this.apiKey, this.apiSecret,
174                                                                                     this.userName, this.password,
175                                                                                     this.consumerGroup, this.consumerInstance,
176                                                                                     this.fetchTimeout, this.fetchLimit,
177                                                                                     this.environment, this.aftEnvironment, this.partner,
178                                                                                     this.latitude, this.longitude, this.additionalProps, this.useHttps);
179                 }
180                         
181                 logger.info("{}: INITTED", this);
182         }
183         
184         /**
185          * {@inheritDoc}
186          */
187         @Override
188         public CommInfrastructure getTopicCommInfrastructure() {
189                 return Topic.CommInfrastructure.DMAAP;
190         }
191
192         @Override
193         public String toString() {
194                 StringBuilder builder = new StringBuilder();
195                 builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=")
196                                 .append((password == null || password.isEmpty()) ? "-" : password.length())
197                                 .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure())
198                                 .append(", toString()=").append(super.toString()).append("]");
199                 return builder.toString();
200         }
201
202
203 }