de5fed6aeb319647a5f351571453f26c0a1fb70f
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22
23 import java.net.MalformedURLException;
24 import java.util.List;
25 import java.util.Map;
26
27 import org.onap.policy.common.endpoints.event.comm.Topic;
28 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.CambriaConsumerWrapper;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.DmaapDmeConsumerWrapper;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35  * This topic reader implementation specializes in reading messages over DMAAP topic and notifying
36  * its listeners
37  */
38 public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource implements DmaapTopicSource, Runnable {
39
40     private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class);
41
42
43     protected final String userName;
44     protected final String password;
45
46     protected String environment = null;
47     protected String aftEnvironment = null;
48     protected String partner = null;
49     protected String latitude = null;
50     protected String longitude = null;
51
52     protected Map<String, String> additionalProps = null;
53
54
55     /**
56      * 
57      * @param servers DMaaP servers
58      * @param topic DMaaP Topic to be monitored
59      * @param apiKey DMaaP API Key (optional)
60      * @param apiSecret DMaaP API Secret (optional)
61      * @param consumerGroup DMaaP Reader Consumer Group
62      * @param consumerInstance DMaaP Reader Instance
63      * @param fetchTimeout DMaaP fetch timeout
64      * @param fetchLimit DMaaP fetch limit
65      * @param environment DME2 Environment
66      * @param aftEnvironment DME2 AFT Environment
67      * @param partner DME2 Partner
68      * @param latitude DME2 Latitude
69      * @param longitude DME2 Longitude
70      * @param additionalProps Additional properties to pass to DME2
71      * @param useHttps does connection use HTTPS?
72      * @param allowSelfSignedCerts are self-signed certificates allow
73      * 
74      * @throws IllegalArgumentException An invalid parameter passed in
75      */
76     public SingleThreadedDmaapTopicSource(List<String> servers, String topic, String apiKey, String apiSecret,
77             String userName, String password, String consumerGroup, String consumerInstance, int fetchTimeout,
78             int fetchLimit, String environment, String aftEnvironment, String partner, String latitude,
79             String longitude, Map<String, String> additionalProps, boolean useHttps, boolean allowSelfSignedCerts) {
80
81         super(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps,
82                 allowSelfSignedCerts);
83
84         this.userName = userName;
85         this.password = password;
86
87         this.environment = environment;
88         this.aftEnvironment = aftEnvironment;
89         this.partner = partner;
90
91         this.latitude = latitude;
92         this.longitude = longitude;
93
94         this.additionalProps = additionalProps;
95         try {
96             this.init();
97         } catch (Exception e) {
98             logger.error("ERROR during init of topic {}", this.topic);
99             throw new IllegalArgumentException(e);
100         }
101     }
102
103     /**
104      * 
105      * @param servers DMaaP servers
106      * @param topic DMaaP Topic to be monitored
107      * @param apiKey DMaaP API Key (optional)
108      * @param apiSecret DMaaP API Secret (optional)
109      * @param consumerGroup DMaaP Reader Consumer Group
110      * @param consumerInstance DMaaP Reader Instance
111      * @param fetchTimeout DMaaP fetch timeout
112      * @param fetchLimit DMaaP fetch limit
113      * @param useHttps does connection use HTTPS?
114      * @param allowSelfSignedCerts are self-signed certificates allow
115      * @throws IllegalArgumentException An invalid parameter passed in
116      */
117     public SingleThreadedDmaapTopicSource(List<String> servers, String topic, String apiKey, String apiSecret,
118             String userName, String password, String consumerGroup, String consumerInstance, int fetchTimeout,
119             int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) {
120
121
122         super(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps,
123                 allowSelfSignedCerts);
124
125         this.userName = userName;
126         this.password = password;
127
128         try {
129             this.init();
130         } catch (Exception e) {
131             logger.warn("dmaap-source: cannot create topic {} because of {}", topic, e.getMessage(), e);
132             throw new IllegalArgumentException(e);
133         }
134     }
135
136
137     /**
138      * Initialize the Cambria or MR Client
139      */
140     @Override
141     public void init() throws MalformedURLException {
142         if (anyNullOrEmpty(this.userName, this.password)) {
143             this.consumer = new CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret,
144                     this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit, this.useHttps,
145                     this.allowSelfSignedCerts);
146         } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
147             this.consumer = new CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret,
148                     this.userName, this.password, this.consumerGroup, this.consumerInstance, this.fetchTimeout,
149                     this.fetchLimit, this.useHttps, this.allowSelfSignedCerts);
150         } else {
151             this.consumer = new DmaapDmeConsumerWrapper(this.servers, this.topic, this.apiKey,
152                     this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance,
153                     this.fetchTimeout, this.fetchLimit, this.environment, this.aftEnvironment, this.partner,
154                     this.latitude, this.longitude, this.additionalProps, this.useHttps);
155         }
156
157         logger.info("{}: INITTED", this);
158     }
159
160     /**
161      * {@inheritDoc}
162      */
163     @Override
164     public CommInfrastructure getTopicCommInfrastructure() {
165         return Topic.CommInfrastructure.DMAAP;
166     }
167
168     @Override
169     public String toString() {
170         StringBuilder builder = new StringBuilder();
171         builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=")
172                 .append((password == null || password.isEmpty()) ? "-" : password.length())
173                 .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()).append(", toString()=")
174                 .append(super.toString()).append("]");
175         return builder.toString();
176     }
177
178
179 }