2ced5bcba9be5498198f86d33acdff7767fe4977
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus.internal;
22
23 import java.util.List;
24 import java.util.Map;
25
26 import org.openecomp.policy.common.logging.eelf.PolicyLogger;
27 import org.openecomp.policy.drools.event.comm.Topic;
28 import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSource;
29
30 /**
31  * This topic reader implementation specializes in reading messages
32  * over DMAAP topic and notifying its listeners
33  */
34 public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
35                                             implements DmaapTopicSource, Runnable {
36         
37
38         protected boolean allowSelfSignedCerts;
39         protected final String userName;
40         protected final String password;
41         private String className = SingleThreadedDmaapTopicSource.class.getName();
42         
43         protected String environment = null;
44         protected String aftEnvironment = null;
45         protected String partner = null;
46         protected String latitude = null;
47         protected String longitude = null;
48         
49         protected Map<String,String> additionalProps = null;
50
51         
52         /**
53          * 
54          * @param servers DMaaP servers
55          * @param topic DMaaP Topic to be monitored
56          * @param apiKey DMaaP API Key (optional)
57          * @param apiSecret DMaaP API Secret (optional)
58          * @param consumerGroup DMaaP Reader Consumer Group
59          * @param consumerInstance DMaaP Reader Instance
60          * @param fetchTimeout DMaaP fetch timeout
61          * @param fetchLimit DMaaP fetch limit
62          * @param environment DME2 Environment
63          * @param aftEnvironment DME2 AFT Environment
64          * @param partner DME2 Partner
65          * @param latitude DME2 Latitude
66          * @param longitude DME2 Longitude
67          * @param additionalProps Additional properties to pass to DME2
68          * @param useHttps does connection use HTTPS?
69          * @param allowSelfSignedCerts are self-signed certificates allow
70          * 
71          * @throws IllegalArgumentException An invalid parameter passed in
72          */
73         public SingleThreadedDmaapTopicSource(List<String> servers, String topic, 
74                                                                                         String apiKey, String apiSecret,
75                                                                                         String userName, String password,
76                                                                                         String consumerGroup, String consumerInstance,
77                                                                                         int fetchTimeout, int fetchLimit,
78                                                                                         String environment, String aftEnvironment, String partner,
79                                                                                         String latitude, String longitude, Map<String,String> additionalProps,
80                                                                                         boolean useHttps, boolean allowSelfSignedCerts)
81                         throws IllegalArgumentException {
82                         
83                 super(servers, topic, apiKey, apiSecret, 
84                           consumerGroup, consumerInstance, 
85                           fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts);
86                 
87                 this.userName = userName;
88                 this.password = password;
89                 
90                 this.environment = environment;
91                 this.aftEnvironment = aftEnvironment;
92                 this.partner = partner;
93                 
94                 this.latitude = latitude;
95                 this.longitude = longitude;
96                 
97                 this.additionalProps = additionalProps;
98                 try {
99                         this.init();
100                 } catch (Exception e) {
101                         e.printStackTrace();
102                         throw new IllegalArgumentException(e);
103                 }
104         }
105
106         /**
107          * 
108          * @param servers DMaaP servers
109          * @param topic DMaaP Topic to be monitored
110          * @param apiKey DMaaP API Key (optional)
111          * @param apiSecret DMaaP API Secret (optional)
112          * @param consumerGroup DMaaP Reader Consumer Group
113          * @param consumerInstance DMaaP Reader Instance
114          * @param fetchTimeout DMaaP fetch timeout
115          * @param fetchLimit DMaaP fetch limit
116          * @param useHttps does connection use HTTPS?
117          * @param allowSelfSignedCerts are self-signed certificates allow
118          * @throws IllegalArgumentException An invalid parameter passed in
119          */
120         public SingleThreadedDmaapTopicSource(List<String> servers, String topic, 
121                                                       String apiKey, String apiSecret,
122                                                       String userName, String password,
123                                                       String consumerGroup, String consumerInstance, 
124                                                       int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts)
125                         throws IllegalArgumentException {
126                 
127                 
128                 super(servers, topic, apiKey, apiSecret, 
129                           consumerGroup, consumerInstance, 
130                           fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
131                 
132                 this.userName = userName;
133                 this.password = password;               
134                 
135                 try {
136                         this.init();
137                 } catch (Exception e) {
138                         e.printStackTrace();
139                         throw new IllegalArgumentException(e);
140                 }
141         }
142         
143
144         /**
145          * Initialize the Cambria or MR Client
146          */
147         @Override
148         public void init() throws Exception {
149                 if (this.userName == null || this.userName.isEmpty() || 
150                                 this.password == null || this.password.isEmpty()) {
151                                 this.consumer =
152                                                 new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, 
153                                                                                            this.apiKey, this.apiSecret,
154                                                                                            this.consumerGroup, this.consumerInstance,
155                                                                                            this.fetchTimeout, this.fetchLimit,
156                                                                                            this.useHttps, this.allowSelfSignedCerts);
157                         } else if ((this.environment == null    || this.environment.isEmpty()) &&
158                                            (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) &&
159                                            (this.latitude == null          || this.latitude.isEmpty()) &&
160                                            (this.longitude == null         || this.longitude.isEmpty()) &&
161                                            (this.partner == null           || this.partner.isEmpty())) {
162                                 this.consumer =
163                                                 new BusConsumer.DmaapAafConsumerWrapper(this.servers, this.topic, 
164                                                                                             this.apiKey, this.apiSecret,
165                                                                                             this.userName, this.password,
166                                                                                             this.consumerGroup, this.consumerInstance,
167                                                                                             this.fetchTimeout, this.fetchLimit, this.useHttps);
168                         } else {
169                                 this.consumer =
170                                                 new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic, 
171                                                                                             this.apiKey, this.apiSecret,
172                                                                                             this.userName, this.password,
173                                                                                             this.consumerGroup, this.consumerInstance,
174                                                                                             this.fetchTimeout, this.fetchLimit,
175                                                                                             this.environment, this.aftEnvironment, this.partner,
176                                                                                             this.latitude, this.longitude, this.additionalProps, this.useHttps);
177                         }
178                         
179                 PolicyLogger.info(className, "CREATION: " + this);
180         }
181         
182         /**
183          * {@inheritDoc}
184          */
185         @Override
186         public CommInfrastructure getTopicCommInfrastructure() {
187                 return Topic.CommInfrastructure.DMAAP;
188         }
189
190         @Override
191         public String toString() {
192                 StringBuilder builder = new StringBuilder();
193                 builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=")
194                                 .append((password == null || password.isEmpty()) ? "-" : password.length())
195                                 .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure())
196                                 .append(", toString()=").append(super.toString()).append("]");
197                 return builder.toString();
198         }
199
200
201 }