2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Properties;
27 import org.apache.commons.lang3.StringUtils;
28 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink;
30 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
31 import org.onap.policy.common.endpoints.utils.DmaapPropertyUtils;
32 import org.onap.policy.common.endpoints.utils.PropertyUtils;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
37 * Factory of DMAAP Reader Topics indexed by topic name.
39 class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
41 private static final String MISSING_TOPIC = "A topic must be provided";
46 private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);
49 * DMAAP Topic Name Index.
51 protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
54 public DmaapTopicSink build(BusTopicParams busTopicParams) {
56 if (StringUtils.isBlank(busTopicParams.getTopic())) {
57 throw new IllegalArgumentException(MISSING_TOPIC);
61 if (dmaapTopicWriters.containsKey(busTopicParams.getTopic())) {
62 return dmaapTopicWriters.get(busTopicParams.getTopic());
65 DmaapTopicSink dmaapTopicSink = makeSink(busTopicParams);
67 if (busTopicParams.isManaged()) {
68 dmaapTopicWriters.put(busTopicParams.getTopic(), dmaapTopicSink);
70 return dmaapTopicSink;
75 public DmaapTopicSink build(List<String> servers, String topic) {
76 return this.build(BusTopicParams.builder()
81 .allowSelfSignedCerts(false)
86 public List<DmaapTopicSink> build(Properties properties) {
88 String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
89 if (StringUtils.isBlank(writeTopics)) {
90 logger.info("{}: no topic for DMaaP Sink", this);
91 return new ArrayList<>();
94 List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>();
96 for (String topic : writeTopics.split("\\s*,\\s*")) {
97 addTopic(newDmaapTopicSinks, properties, topic);
99 return newDmaapTopicSinks;
103 private void addTopic(List<DmaapTopicSink> newDmaapTopicSinks, Properties properties, String topic) {
104 if (this.dmaapTopicWriters.containsKey(topic)) {
105 newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
109 String topicPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic;
111 PropertyUtils props = new PropertyUtils(properties, topicPrefix,
112 (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
114 String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
115 if (StringUtils.isBlank(servers)) {
116 logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
120 DmaapTopicSink dmaapTopicSink = this.build(DmaapPropertyUtils.makeBuilder(props, topic, servers)
121 .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null))
124 newDmaapTopicSinks.add(dmaapTopicSink);
130 * @param busTopicParams parameters to use to configure the sink
133 protected DmaapTopicSink makeSink(BusTopicParams busTopicParams) {
134 return new InlineDmaapTopicSink(busTopicParams);
138 public void destroy(String topic) {
140 if (topic == null || topic.isEmpty()) {
141 throw new IllegalArgumentException(MISSING_TOPIC);
144 DmaapTopicSink dmaapTopicWriter;
145 synchronized (this) {
146 if (!dmaapTopicWriters.containsKey(topic)) {
150 dmaapTopicWriter = dmaapTopicWriters.remove(topic);
153 dmaapTopicWriter.shutdown();
157 public void destroy() {
158 List<DmaapTopicSink> writers = this.inventory();
159 for (DmaapTopicSink writer : writers) {
163 synchronized (this) {
164 this.dmaapTopicWriters.clear();
169 public DmaapTopicSink get(String topic) {
171 if (topic == null || topic.isEmpty()) {
172 throw new IllegalArgumentException(MISSING_TOPIC);
175 synchronized (this) {
176 if (dmaapTopicWriters.containsKey(topic)) {
177 return dmaapTopicWriters.get(topic);
179 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
185 public synchronized List<DmaapTopicSink> inventory() {
186 return new ArrayList<>(this.dmaapTopicWriters.values());
190 public String toString() {
191 return "IndexedDmaapTopicSinkFactory []";