2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23 import java.net.MalformedURLException;
24 import java.util.List;
27 import org.onap.policy.common.endpoints.event.comm.Topic;
28 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.CambriaConsumerWrapper;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.DmaapDmeConsumerWrapper;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
35 * This topic reader implementation specializes in reading messages over DMAAP topic and notifying
38 public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource implements DmaapTopicSource, Runnable {
40 private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class);
43 protected final String userName;
44 protected final String password;
46 protected String environment = null;
47 protected String aftEnvironment = null;
48 protected String partner = null;
49 protected String latitude = null;
50 protected String longitude = null;
52 protected Map<String, String> additionalProps = null;
57 * @param servers DMaaP servers
58 * @param topic DMaaP Topic to be monitored
59 * @param apiKey DMaaP API Key (optional)
60 * @param apiSecret DMaaP API Secret (optional)
61 * @param consumerGroup DMaaP Reader Consumer Group
62 * @param consumerInstance DMaaP Reader Instance
63 * @param fetchTimeout DMaaP fetch timeout
64 * @param fetchLimit DMaaP fetch limit
65 * @param environment DME2 Environment
66 * @param aftEnvironment DME2 AFT Environment
67 * @param partner DME2 Partner
68 * @param latitude DME2 Latitude
69 * @param longitude DME2 Longitude
70 * @param additionalProps Additional properties to pass to DME2
71 * @param useHttps does connection use HTTPS?
72 * @param allowSelfSignedCerts are self-signed certificates allow
74 * @throws IllegalArgumentException An invalid parameter passed in
76 public SingleThreadedDmaapTopicSource(List<String> servers, String topic, String apiKey, String apiSecret,
77 String userName, String password, String consumerGroup, String consumerInstance, int fetchTimeout,
78 int fetchLimit, String environment, String aftEnvironment, String partner, String latitude,
79 String longitude, Map<String, String> additionalProps, boolean useHttps, boolean allowSelfSignedCerts) {
81 super(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps,
82 allowSelfSignedCerts);
84 this.userName = userName;
85 this.password = password;
87 this.environment = environment;
88 this.aftEnvironment = aftEnvironment;
89 this.partner = partner;
91 this.latitude = latitude;
92 this.longitude = longitude;
94 this.additionalProps = additionalProps;
97 } catch (Exception e) {
98 logger.error("ERROR during init of topic {}", this.topic);
99 throw new IllegalArgumentException(e);
105 * @param servers DMaaP servers
106 * @param topic DMaaP Topic to be monitored
107 * @param apiKey DMaaP API Key (optional)
108 * @param apiSecret DMaaP API Secret (optional)
109 * @param consumerGroup DMaaP Reader Consumer Group
110 * @param consumerInstance DMaaP Reader Instance
111 * @param fetchTimeout DMaaP fetch timeout
112 * @param fetchLimit DMaaP fetch limit
113 * @param useHttps does connection use HTTPS?
114 * @param allowSelfSignedCerts are self-signed certificates allow
115 * @throws IllegalArgumentException An invalid parameter passed in
117 public SingleThreadedDmaapTopicSource(List<String> servers, String topic, String apiKey, String apiSecret,
118 String userName, String password, String consumerGroup, String consumerInstance, int fetchTimeout,
119 int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) {
122 super(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps,
123 allowSelfSignedCerts);
125 this.userName = userName;
126 this.password = password;
130 } catch (Exception e) {
131 logger.warn("dmaap-source: cannot create topic {} because of {}", topic, e.getMessage(), e);
132 throw new IllegalArgumentException(e);
138 * Initialize the Cambria or MR Client
141 public void init() throws MalformedURLException {
142 if (anyNullOrEmpty(this.userName, this.password)) {
143 this.consumer = new CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret,
144 this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit, this.useHttps,
145 this.allowSelfSignedCerts);
146 } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
147 this.consumer = new CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret,
148 this.userName, this.password, this.consumerGroup, this.consumerInstance, this.fetchTimeout,
149 this.fetchLimit, this.useHttps, this.allowSelfSignedCerts);
151 this.consumer = new DmaapDmeConsumerWrapper(this.servers, this.topic, this.apiKey,
152 this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance,
153 this.fetchTimeout, this.fetchLimit, this.environment, this.aftEnvironment, this.partner,
154 this.latitude, this.longitude, this.additionalProps, this.useHttps);
157 logger.info("{}: INITTED", this);
164 public CommInfrastructure getTopicCommInfrastructure() {
165 return Topic.CommInfrastructure.DMAAP;
169 public String toString() {
170 StringBuilder builder = new StringBuilder();
171 builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=")
172 .append((password == null || password.isEmpty()) ? "-" : password.length())
173 .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()).append(", toString()=")
174 .append(super.toString()).append("]");
175 return builder.toString();