8ac41424cda7603253d0616e12986032b5af5f6c
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22
23 import java.net.MalformedURLException;
24 import java.util.List;
25 import java.util.Map;
26
27 import org.onap.policy.common.endpoints.event.comm.Topic;
28 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * This topic reader implementation specializes in reading messages over DMAAP topic and notifying
34  * its listeners
35  */
36 public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource implements DmaapTopicSource, Runnable {
37
38     private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class);
39
40
41     protected final String userName;
42     protected final String password;
43
44     protected String environment = null;
45     protected String aftEnvironment = null;
46     protected String partner = null;
47     protected String latitude = null;
48     protected String longitude = null;
49
50     protected Map<String, String> additionalProps = null;
51
52
53     /**
54      * 
55      * @param servers DMaaP servers
56      * @param topic DMaaP Topic to be monitored
57      * @param apiKey DMaaP API Key (optional)
58      * @param apiSecret DMaaP API Secret (optional)
59      * @param consumerGroup DMaaP Reader Consumer Group
60      * @param consumerInstance DMaaP Reader Instance
61      * @param fetchTimeout DMaaP fetch timeout
62      * @param fetchLimit DMaaP fetch limit
63      * @param environment DME2 Environment
64      * @param aftEnvironment DME2 AFT Environment
65      * @param partner DME2 Partner
66      * @param latitude DME2 Latitude
67      * @param longitude DME2 Longitude
68      * @param additionalProps Additional properties to pass to DME2
69      * @param useHttps does connection use HTTPS?
70      * @param allowSelfSignedCerts are self-signed certificates allow
71      * 
72      * @throws IllegalArgumentException An invalid parameter passed in
73      */
74     public SingleThreadedDmaapTopicSource(List<String> servers, String topic, String apiKey, String apiSecret,
75             String userName, String password, String consumerGroup, String consumerInstance, int fetchTimeout,
76             int fetchLimit, String environment, String aftEnvironment, String partner, String latitude,
77             String longitude, Map<String, String> additionalProps, boolean useHttps, boolean allowSelfSignedCerts) {
78
79         super(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps,
80                 allowSelfSignedCerts);
81
82         this.userName = userName;
83         this.password = password;
84
85         this.environment = environment;
86         this.aftEnvironment = aftEnvironment;
87         this.partner = partner;
88
89         this.latitude = latitude;
90         this.longitude = longitude;
91
92         this.additionalProps = additionalProps;
93         try {
94             this.init();
95         } catch (Exception e) {
96             logger.error("ERROR during init of topic {}", this.topic);
97             throw new IllegalArgumentException(e);
98         }
99     }
100
101     /**
102      * 
103      * @param servers DMaaP servers
104      * @param topic DMaaP Topic to be monitored
105      * @param apiKey DMaaP API Key (optional)
106      * @param apiSecret DMaaP API Secret (optional)
107      * @param consumerGroup DMaaP Reader Consumer Group
108      * @param consumerInstance DMaaP Reader Instance
109      * @param fetchTimeout DMaaP fetch timeout
110      * @param fetchLimit DMaaP fetch limit
111      * @param useHttps does connection use HTTPS?
112      * @param allowSelfSignedCerts are self-signed certificates allow
113      * @throws IllegalArgumentException An invalid parameter passed in
114      */
115     public SingleThreadedDmaapTopicSource(List<String> servers, String topic, String apiKey, String apiSecret,
116             String userName, String password, String consumerGroup, String consumerInstance, int fetchTimeout,
117             int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) {
118
119
120         super(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps,
121                 allowSelfSignedCerts);
122
123         this.userName = userName;
124         this.password = password;
125
126         try {
127             this.init();
128         } catch (Exception e) {
129             logger.warn("dmaap-source: cannot create topic {} because of {}", topic, e.getMessage(), e);
130             throw new IllegalArgumentException(e);
131         }
132     }
133
134
135     /**
136      * Initialize the Cambria or MR Client
137      */
138     @Override
139     public void init() throws MalformedURLException {
140         if (anyNullOrEmpty(this.userName, this.password)) {
141             this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey,
142                     this.apiSecret, this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit,
143                     this.useHttps, this.allowSelfSignedCerts);
144         } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
145             this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey,
146                     this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance,
147                     this.fetchTimeout, this.fetchLimit, this.useHttps, this.allowSelfSignedCerts);
148         } else {
149             this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic, this.apiKey,
150                     this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance,
151                     this.fetchTimeout, this.fetchLimit, this.environment, this.aftEnvironment, this.partner,
152                     this.latitude, this.longitude, this.additionalProps, this.useHttps);
153         }
154
155         logger.info("{}: INITTED", this);
156     }
157
158     /**
159      * {@inheritDoc}
160      */
161     @Override
162     public CommInfrastructure getTopicCommInfrastructure() {
163         return Topic.CommInfrastructure.DMAAP;
164     }
165
166     @Override
167     public String toString() {
168         StringBuilder builder = new StringBuilder();
169         builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=")
170                 .append((password == null || password.isEmpty()) ? "-" : password.length())
171                 .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()).append(", toString()=")
172                 .append(super.toString()).append("]");
173         return builder.toString();
174     }
175
176
177 }