2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23 import java.net.MalformedURLException;
26 import org.onap.policy.common.endpoints.event.comm.Topic;
27 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
32 * This topic reader implementation specializes in reading messages over DMAAP topic and notifying
35 public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource implements DmaapTopicSource, Runnable {
37 private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class);
40 protected final String userName;
41 protected final String password;
43 protected String environment = null;
44 protected String aftEnvironment = null;
45 protected String partner = null;
46 protected String latitude = null;
47 protected String longitude = null;
49 protected Map<String, String> additionalProps = null;
54 * @param busTopicParams Parameters object containing all the required inputs *
55 * @throws IllegalArgumentException An invalid parameter passed in
57 public SingleThreadedDmaapTopicSource(BusTopicParams busTopicParams) {
59 super(busTopicParams);
61 this.userName = busTopicParams.getUserName();
62 this.password = busTopicParams.getPassword();
64 this.environment = busTopicParams.getEnvironment();
65 this.aftEnvironment = busTopicParams.getAftEnvironment();
66 this.partner = busTopicParams.getPartner();
68 this.latitude = busTopicParams.getLatitude();
69 this.longitude = busTopicParams.getLongitude();
71 this.additionalProps = busTopicParams.getAdditionalProps();
74 } catch (Exception e) {
75 logger.error("ERROR during init in dmaap-source: cannot create topic {} because of {}", topic, e.getMessage(), e);
76 throw new IllegalArgumentException(e);
82 * Initialize the Cambria or MR Client
85 public void init() throws MalformedURLException {
86 if (anyNullOrEmpty(this.userName, this.password)) {
87 this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey,
88 this.apiSecret, this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit,
89 this.useHttps, this.allowSelfSignedCerts);
90 } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
91 this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey,
92 this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance,
93 this.fetchTimeout, this.fetchLimit, this.useHttps, this.allowSelfSignedCerts);
95 this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic, this.apiKey,
96 this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance,
97 this.fetchTimeout, this.fetchLimit, this.environment, this.aftEnvironment, this.partner,
98 this.latitude, this.longitude, this.additionalProps, this.useHttps);
101 logger.info("{}: INITTED", this);
108 public CommInfrastructure getTopicCommInfrastructure() {
109 return Topic.CommInfrastructure.DMAAP;
113 public String toString() {
114 StringBuilder builder = new StringBuilder();
115 builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=")
116 .append((password == null || password.isEmpty()) ? "-" : password.length())
117 .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()).append(", toString()=")
118 .append(super.toString()).append("]");
119 return builder.toString();