2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.policy.drools.event.comm.bus.internal;
23 import java.util.List;
26 import org.openecomp.policy.common.logging.eelf.PolicyLogger;
27 import org.openecomp.policy.drools.event.comm.Topic;
28 import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSource;
31 * This topic reader implementation specializes in reading messages
32 * over DMAAP topic and notifying its listeners
34 public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
35 implements DmaapTopicSource, Runnable {
38 protected boolean allowSelfSignedCerts;
39 protected final String userName;
40 protected final String password;
41 private String className = SingleThreadedDmaapTopicSource.class.getName();
43 protected String environment = null;
44 protected String aftEnvironment = null;
45 protected String partner = null;
46 protected String latitude = null;
47 protected String longitude = null;
49 protected Map<String,String> additionalProps = null;
54 * @param servers DMaaP servers
55 * @param topic DMaaP Topic to be monitored
56 * @param apiKey DMaaP API Key (optional)
57 * @param apiSecret DMaaP API Secret (optional)
58 * @param consumerGroup DMaaP Reader Consumer Group
59 * @param consumerInstance DMaaP Reader Instance
60 * @param fetchTimeout DMaaP fetch timeout
61 * @param fetchLimit DMaaP fetch limit
62 * @param environment DME2 Environment
63 * @param aftEnvironment DME2 AFT Environment
64 * @param partner DME2 Partner
65 * @param latitude DME2 Latitude
66 * @param longitude DME2 Longitude
67 * @param additionalProps Additional properties to pass to DME2
68 * @param useHttps does connection use HTTPS?
69 * @param allowSelfSignedCerts are self-signed certificates allow
71 * @throws IllegalArgumentException An invalid parameter passed in
73 public SingleThreadedDmaapTopicSource(List<String> servers, String topic,
74 String apiKey, String apiSecret,
75 String userName, String password,
76 String consumerGroup, String consumerInstance,
77 int fetchTimeout, int fetchLimit,
78 String environment, String aftEnvironment, String partner,
79 String latitude, String longitude, Map<String,String> additionalProps,
80 boolean useHttps, boolean allowSelfSignedCerts)
81 throws IllegalArgumentException {
83 super(servers, topic, apiKey, apiSecret,
84 consumerGroup, consumerInstance,
85 fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts);
87 this.userName = userName;
88 this.password = password;
90 this.environment = environment;
91 this.aftEnvironment = aftEnvironment;
92 this.partner = partner;
94 this.latitude = latitude;
95 this.longitude = longitude;
97 this.additionalProps = additionalProps;
100 } catch (Exception e) {
102 throw new IllegalArgumentException(e);
108 * @param servers DMaaP servers
109 * @param topic DMaaP Topic to be monitored
110 * @param apiKey DMaaP API Key (optional)
111 * @param apiSecret DMaaP API Secret (optional)
112 * @param consumerGroup DMaaP Reader Consumer Group
113 * @param consumerInstance DMaaP Reader Instance
114 * @param fetchTimeout DMaaP fetch timeout
115 * @param fetchLimit DMaaP fetch limit
116 * @param useHttps does connection use HTTPS?
117 * @param allowSelfSignedCerts are self-signed certificates allow
118 * @throws IllegalArgumentException An invalid parameter passed in
120 public SingleThreadedDmaapTopicSource(List<String> servers, String topic,
121 String apiKey, String apiSecret,
122 String userName, String password,
123 String consumerGroup, String consumerInstance,
124 int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts)
125 throws IllegalArgumentException {
128 super(servers, topic, apiKey, apiSecret,
129 consumerGroup, consumerInstance,
130 fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
132 this.userName = userName;
133 this.password = password;
137 } catch (Exception e) {
139 throw new IllegalArgumentException(e);
145 * Initialize the Cambria or MR Client
148 public void init() throws Exception {
149 if (this.userName == null || this.userName.isEmpty() ||
150 this.password == null || this.password.isEmpty()) {
152 new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic,
153 this.apiKey, this.apiSecret,
154 this.consumerGroup, this.consumerInstance,
155 this.fetchTimeout, this.fetchLimit,
156 this.useHttps, this.allowSelfSignedCerts);
157 } else if ((this.environment == null || this.environment.isEmpty()) &&
158 (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) &&
159 (this.latitude == null || this.latitude.isEmpty()) &&
160 (this.longitude == null || this.longitude.isEmpty()) &&
161 (this.partner == null || this.partner.isEmpty())) {
163 new BusConsumer.DmaapAafConsumerWrapper(this.servers, this.topic,
164 this.apiKey, this.apiSecret,
165 this.userName, this.password,
166 this.consumerGroup, this.consumerInstance,
167 this.fetchTimeout, this.fetchLimit, this.useHttps);
170 new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic,
171 this.apiKey, this.apiSecret,
172 this.userName, this.password,
173 this.consumerGroup, this.consumerInstance,
174 this.fetchTimeout, this.fetchLimit,
175 this.environment, this.aftEnvironment, this.partner,
176 this.latitude, this.longitude, this.additionalProps, this.useHttps);
179 PolicyLogger.info(className, "CREATION: " + this);
186 public CommInfrastructure getTopicCommInfrastructure() {
187 return Topic.CommInfrastructure.DMAAP;
191 public String toString() {
192 StringBuilder builder = new StringBuilder();
193 builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=")
194 .append((password == null || password.isEmpty()) ? "-" : password.length())
195 .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure())
196 .append(", toString()=").append(super.toString()).append("]");
197 return builder.toString();