2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.event.comm.bus.internal;
23 import java.net.MalformedURLException;
24 import java.util.List;
27 import org.onap.policy.drools.event.comm.Topic;
28 import org.onap.policy.drools.event.comm.bus.DmaapTopicSource;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * This topic reader implementation specializes in reading messages
34 * over DMAAP topic and notifying its listeners
36 public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
37 implements DmaapTopicSource, Runnable {
39 private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class);
42 protected final String userName;
43 protected final String password;
45 protected String environment = null;
46 protected String aftEnvironment = null;
47 protected String partner = null;
48 protected String latitude = null;
49 protected String longitude = null;
51 protected Map<String,String> additionalProps = null;
56 * @param servers DMaaP servers
57 * @param topic DMaaP Topic to be monitored
58 * @param apiKey DMaaP API Key (optional)
59 * @param apiSecret DMaaP API Secret (optional)
60 * @param consumerGroup DMaaP Reader Consumer Group
61 * @param consumerInstance DMaaP Reader Instance
62 * @param fetchTimeout DMaaP fetch timeout
63 * @param fetchLimit DMaaP fetch limit
64 * @param environment DME2 Environment
65 * @param aftEnvironment DME2 AFT Environment
66 * @param partner DME2 Partner
67 * @param latitude DME2 Latitude
68 * @param longitude DME2 Longitude
69 * @param additionalProps Additional properties to pass to DME2
70 * @param useHttps does connection use HTTPS?
71 * @param allowSelfSignedCerts are self-signed certificates allow
73 * @throws IllegalArgumentException An invalid parameter passed in
75 public SingleThreadedDmaapTopicSource(List<String> servers, String topic,
76 String apiKey, String apiSecret,
77 String userName, String password,
78 String consumerGroup, String consumerInstance,
79 int fetchTimeout, int fetchLimit,
80 String environment, String aftEnvironment, String partner,
81 String latitude, String longitude, Map<String,String> additionalProps,
82 boolean useHttps, boolean allowSelfSignedCerts) {
84 super(servers, topic, apiKey, apiSecret,
85 consumerGroup, consumerInstance,
86 fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts);
88 this.userName = userName;
89 this.password = password;
91 this.environment = environment;
92 this.aftEnvironment = aftEnvironment;
93 this.partner = partner;
95 this.latitude = latitude;
96 this.longitude = longitude;
98 this.additionalProps = additionalProps;
101 } catch (Exception e) {
102 logger.error("ERROR during init of topic {}", this.topic);
103 throw new IllegalArgumentException(e);
109 * @param servers DMaaP servers
110 * @param topic DMaaP Topic to be monitored
111 * @param apiKey DMaaP API Key (optional)
112 * @param apiSecret DMaaP API Secret (optional)
113 * @param consumerGroup DMaaP Reader Consumer Group
114 * @param consumerInstance DMaaP Reader Instance
115 * @param fetchTimeout DMaaP fetch timeout
116 * @param fetchLimit DMaaP fetch limit
117 * @param useHttps does connection use HTTPS?
118 * @param allowSelfSignedCerts are self-signed certificates allow
119 * @throws IllegalArgumentException An invalid parameter passed in
121 public SingleThreadedDmaapTopicSource(List<String> servers, String topic,
122 String apiKey, String apiSecret,
123 String userName, String password,
124 String consumerGroup, String consumerInstance,
125 int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) {
128 super(servers, topic, apiKey, apiSecret,
129 consumerGroup, consumerInstance,
130 fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
132 this.userName = userName;
133 this.password = password;
137 } catch (Exception e) {
138 logger.warn("dmaap-source: cannot create topic {} because of {}", topic, e.getMessage(), e);
139 throw new IllegalArgumentException(e);
145 * Initialize the Cambria or MR Client
148 public void init() throws MalformedURLException {
149 if (anyNullOrEmpty(this.userName, this.password)) {
151 new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic,
152 this.apiKey, this.apiSecret,
153 this.consumerGroup, this.consumerInstance,
154 this.fetchTimeout, this.fetchLimit,
155 this.useHttps, this.allowSelfSignedCerts);
156 } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
158 new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic,
159 this.apiKey, this.apiSecret,
160 this.userName, this.password,
161 this.consumerGroup, this.consumerInstance,
162 this.fetchTimeout, this.fetchLimit,
163 this.useHttps, this.allowSelfSignedCerts);
166 new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic,
167 this.apiKey, this.apiSecret,
168 this.userName, this.password,
169 this.consumerGroup, this.consumerInstance,
170 this.fetchTimeout, this.fetchLimit,
171 this.environment, this.aftEnvironment, this.partner,
172 this.latitude, this.longitude, this.additionalProps, this.useHttps);
175 logger.info("{}: INITTED", this);
182 public CommInfrastructure getTopicCommInfrastructure() {
183 return Topic.CommInfrastructure.DMAAP;
187 public String toString() {
188 StringBuilder builder = new StringBuilder();
189 builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=")
190 .append((password == null || password.isEmpty()) ? "-" : password.length())
191 .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure())
192 .append(", toString()=").append(super.toString()).append("]");
193 return builder.toString();