2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
24 import java.net.MalformedURLException;
27 import org.onap.policy.common.endpoints.event.comm.Topic;
28 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * This topic reader implementation specializes in reading messages over DMAAP topic and notifying
36 public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource implements DmaapTopicSource, Runnable {
38 private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class);
41 protected final String userName;
42 protected final String password;
44 protected String environment = null;
45 protected String aftEnvironment = null;
46 protected String partner = null;
47 protected String latitude = null;
48 protected String longitude = null;
50 protected Map<String, String> additionalProps = null;
55 * @param busTopicParams Parameters object containing all the required inputs *
56 * @throws IllegalArgumentException An invalid parameter passed in
58 public SingleThreadedDmaapTopicSource(BusTopicParams busTopicParams) {
60 super(busTopicParams);
62 this.userName = busTopicParams.getUserName();
63 this.password = busTopicParams.getPassword();
65 this.environment = busTopicParams.getEnvironment();
66 this.aftEnvironment = busTopicParams.getAftEnvironment();
67 this.partner = busTopicParams.getPartner();
69 this.latitude = busTopicParams.getLatitude();
70 this.longitude = busTopicParams.getLongitude();
72 this.additionalProps = busTopicParams.getAdditionalProps();
75 } catch (Exception e) {
76 logger.error("ERROR during init in dmaap-source: cannot create topic {} because of {}",
77 topic, e.getMessage(), e);
78 throw new IllegalArgumentException(e);
84 * Initialize the Cambria or MR Client.
87 public void init() throws MalformedURLException {
88 if (anyNullOrEmpty(this.userName, this.password)) {
89 this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder()
90 .servers(this.servers)
93 .apiSecret(this.apiSecret)
94 .consumerGroup(this.consumerGroup)
95 .consumerInstance(this.consumerInstance)
96 .fetchTimeout(this.fetchTimeout)
97 .fetchLimit(this.fetchLimit)
98 .useHttps(this.useHttps)
99 .allowSelfSignedCerts(this.allowSelfSignedCerts)
101 } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
102 this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder()
103 .servers(this.servers)
106 .apiSecret(this.apiSecret)
107 .userName(this.userName)
108 .password(this.password)
109 .consumerGroup(this.consumerGroup)
110 .consumerInstance(this.consumerInstance)
111 .fetchTimeout(this.fetchTimeout)
112 .fetchLimit(this.fetchLimit)
113 .useHttps(this.useHttps)
114 .allowSelfSignedCerts(this.allowSelfSignedCerts)
117 this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(BusTopicParams.builder()
118 .servers(this.servers)
121 .apiSecret(this.apiSecret)
122 .userName(this.userName)
123 .password(this.password)
124 .consumerGroup(this.consumerGroup)
125 .consumerInstance(this.consumerInstance)
126 .fetchTimeout(this.fetchTimeout)
127 .fetchLimit(this.fetchLimit)
128 .environment(this.environment)
129 .aftEnvironment(this.aftEnvironment)
130 .partner(this.partner)
131 .latitude(this.latitude)
132 .longitude(this.longitude)
133 .additionalProps(this.additionalProps)
134 .useHttps(this.useHttps).build());
137 logger.info("{}: INITTED", this);
144 public CommInfrastructure getTopicCommInfrastructure() {
145 return Topic.CommInfrastructure.DMAAP;
149 public String toString() {
150 StringBuilder builder = new StringBuilder();
151 builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=")
152 .append((password == null || password.isEmpty()) ? "-" : password.length())
153 .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()).append(", toString()=")
154 .append(super.toString()).append("]");
155 return builder.toString();