2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.event.comm.bus.internal;
23 import java.net.MalformedURLException;
24 import java.util.List;
27 import org.onap.policy.drools.event.comm.Topic;
28 import org.onap.policy.drools.event.comm.bus.DmaapTopicSource;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * This topic reader implementation specializes in reading messages
34 * over DMAAP topic and notifying its listeners
36 public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
37 implements DmaapTopicSource, Runnable {
39 private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class);
42 protected final String userName;
43 protected final String password;
45 protected String environment = null;
46 protected String aftEnvironment = null;
47 protected String partner = null;
48 protected String latitude = null;
49 protected String longitude = null;
51 protected Map<String,String> additionalProps = null;
56 * @param servers DMaaP servers
57 * @param topic DMaaP Topic to be monitored
58 * @param apiKey DMaaP API Key (optional)
59 * @param apiSecret DMaaP API Secret (optional)
60 * @param consumerGroup DMaaP Reader Consumer Group
61 * @param consumerInstance DMaaP Reader Instance
62 * @param fetchTimeout DMaaP fetch timeout
63 * @param fetchLimit DMaaP fetch limit
64 * @param environment DME2 Environment
65 * @param aftEnvironment DME2 AFT Environment
66 * @param partner DME2 Partner
67 * @param latitude DME2 Latitude
68 * @param longitude DME2 Longitude
69 * @param additionalProps Additional properties to pass to DME2
70 * @param useHttps does connection use HTTPS?
71 * @param allowSelfSignedCerts are self-signed certificates allow
73 * @throws IllegalArgumentException An invalid parameter passed in
75 public SingleThreadedDmaapTopicSource(List<String> servers, String topic,
76 String apiKey, String apiSecret,
77 String userName, String password,
78 String consumerGroup, String consumerInstance,
79 int fetchTimeout, int fetchLimit,
80 String environment, String aftEnvironment, String partner,
81 String latitude, String longitude, Map<String,String> additionalProps,
82 boolean useHttps, boolean allowSelfSignedCerts)
83 throws IllegalArgumentException {
85 super(servers, topic, apiKey, apiSecret,
86 consumerGroup, consumerInstance,
87 fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts);
89 this.userName = userName;
90 this.password = password;
92 this.environment = environment;
93 this.aftEnvironment = aftEnvironment;
94 this.partner = partner;
96 this.latitude = latitude;
97 this.longitude = longitude;
99 this.additionalProps = additionalProps;
102 } catch (Exception e) {
103 logger.error("ERROR during init of topic {}", this.topic);
104 throw new IllegalArgumentException(e);
110 * @param servers DMaaP servers
111 * @param topic DMaaP Topic to be monitored
112 * @param apiKey DMaaP API Key (optional)
113 * @param apiSecret DMaaP API Secret (optional)
114 * @param consumerGroup DMaaP Reader Consumer Group
115 * @param consumerInstance DMaaP Reader Instance
116 * @param fetchTimeout DMaaP fetch timeout
117 * @param fetchLimit DMaaP fetch limit
118 * @param useHttps does connection use HTTPS?
119 * @param allowSelfSignedCerts are self-signed certificates allow
120 * @throws IllegalArgumentException An invalid parameter passed in
122 public SingleThreadedDmaapTopicSource(List<String> servers, String topic,
123 String apiKey, String apiSecret,
124 String userName, String password,
125 String consumerGroup, String consumerInstance,
126 int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts)
127 throws IllegalArgumentException {
130 super(servers, topic, apiKey, apiSecret,
131 consumerGroup, consumerInstance,
132 fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
134 this.userName = userName;
135 this.password = password;
139 } catch (Exception e) {
140 logger.warn("dmaap-source: cannot create topic {} because of {}", topic, e.getMessage(), e);
141 throw new IllegalArgumentException(e);
147 * Initialize the Cambria or MR Client
150 public void init() throws MalformedURLException {
151 if (this.userName == null || this.userName.isEmpty() ||
152 this.password == null || this.password.isEmpty()) {
154 new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic,
155 this.apiKey, this.apiSecret,
156 this.consumerGroup, this.consumerInstance,
157 this.fetchTimeout, this.fetchLimit,
158 this.useHttps, this.allowSelfSignedCerts);
159 } else if ((this.environment == null || this.environment.isEmpty()) &&
160 (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) &&
161 (this.latitude == null || this.latitude.isEmpty()) &&
162 (this.longitude == null || this.longitude.isEmpty()) &&
163 (this.partner == null || this.partner.isEmpty())) {
165 new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic,
166 this.apiKey, this.apiSecret,
167 this.userName, this.password,
168 this.consumerGroup, this.consumerInstance,
169 this.fetchTimeout, this.fetchLimit,
170 this.useHttps, this.allowSelfSignedCerts);
173 new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic,
174 this.apiKey, this.apiSecret,
175 this.userName, this.password,
176 this.consumerGroup, this.consumerInstance,
177 this.fetchTimeout, this.fetchLimit,
178 this.environment, this.aftEnvironment, this.partner,
179 this.latitude, this.longitude, this.additionalProps, this.useHttps);
182 logger.info("{}: INITTED", this);
189 public CommInfrastructure getTopicCommInfrastructure() {
190 return Topic.CommInfrastructure.DMAAP;
194 public String toString() {
195 StringBuilder builder = new StringBuilder();
196 builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=")
197 .append((password == null || password.isEmpty()) ? "-" : password.length())
198 .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure())
199 .append(", toString()=").append(super.toString()).append("]");
200 return builder.toString();