dfdadd1a36d46d7af0fc88d412d0eed88c3fbf64
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus;
22
23 import com.google.re2j.Pattern;
24 import java.util.ArrayList;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
28 import org.apache.commons.lang3.StringUtils;
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink;
31 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
32 import org.onap.policy.common.endpoints.utils.DmaapPropertyUtils;
33 import org.onap.policy.common.endpoints.utils.PropertyUtils;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * Factory of DMAAP Reader Topics indexed by topic name.
39  */
40 class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
41
42     private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
43     private static final String MISSING_TOPIC = "A topic must be provided";
44
45     /**
46      * Logger.
47      */
48     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);
49
50     /**
51      * DMAAP Topic Name Index.
52      */
53     protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
54
55     @Override
56     public DmaapTopicSink build(BusTopicParams busTopicParams) {
57
58         if (StringUtils.isBlank(busTopicParams.getTopic())) {
59             throw new IllegalArgumentException(MISSING_TOPIC);
60         }
61
62         synchronized (this) {
63             if (dmaapTopicWriters.containsKey(busTopicParams.getTopic())) {
64                 return dmaapTopicWriters.get(busTopicParams.getTopic());
65             }
66
67             var dmaapTopicSink = makeSink(busTopicParams);
68
69             if (busTopicParams.isManaged()) {
70                 dmaapTopicWriters.put(busTopicParams.getTopic(), dmaapTopicSink);
71             }
72             return dmaapTopicSink;
73         }
74     }
75
76     @Override
77     public DmaapTopicSink build(List<String> servers, String topic) {
78         return this.build(BusTopicParams.builder()
79                 .servers(servers)
80                 .topic(topic)
81                 .managed(true)
82                 .useHttps(false)
83                 .allowSelfSignedCerts(false)
84                 .build());
85     }
86
87     @Override
88     public List<DmaapTopicSink> build(Properties properties) {
89
90         String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
91         if (StringUtils.isBlank(writeTopics)) {
92             logger.info("{}: no topic for DMaaP Sink", this);
93             return new ArrayList<>();
94         }
95
96         List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>();
97         synchronized (this) {
98             for (String topic : COMMA_SPACE_PAT.split(writeTopics)) {
99                 addTopic(newDmaapTopicSinks, properties, topic);
100             }
101             return newDmaapTopicSinks;
102         }
103     }
104
105     private void addTopic(List<DmaapTopicSink> newDmaapTopicSinks, Properties properties, String topic) {
106         if (this.dmaapTopicWriters.containsKey(topic)) {
107             newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
108             return;
109         }
110
111         String topicPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic;
112
113         var props = new PropertyUtils(properties, topicPrefix,
114             (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
115
116         String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
117         if (StringUtils.isBlank(servers)) {
118             logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
119             return;
120         }
121
122         var dmaapTopicSink = this.build(DmaapPropertyUtils.makeBuilder(props, topic, servers)
123                 .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null))
124                 .build());
125
126         newDmaapTopicSinks.add(dmaapTopicSink);
127     }
128
129     /**
130      * Makes a new sink.
131      *
132      * @param busTopicParams parameters to use to configure the sink
133      * @return a new sink
134      */
135     protected DmaapTopicSink makeSink(BusTopicParams busTopicParams) {
136         return new InlineDmaapTopicSink(busTopicParams);
137     }
138
139     @Override
140     public void destroy(String topic) {
141
142         if (topic == null || topic.isEmpty()) {
143             throw new IllegalArgumentException(MISSING_TOPIC);
144         }
145
146         DmaapTopicSink dmaapTopicWriter;
147         synchronized (this) {
148             if (!dmaapTopicWriters.containsKey(topic)) {
149                 return;
150             }
151
152             dmaapTopicWriter = dmaapTopicWriters.remove(topic);
153         }
154
155         dmaapTopicWriter.shutdown();
156     }
157
158     @Override
159     public void destroy() {
160         List<DmaapTopicSink> writers = this.inventory();
161         for (DmaapTopicSink writer : writers) {
162             writer.shutdown();
163         }
164
165         synchronized (this) {
166             this.dmaapTopicWriters.clear();
167         }
168     }
169
170     @Override
171     public DmaapTopicSink get(String topic) {
172
173         if (topic == null || topic.isEmpty()) {
174             throw new IllegalArgumentException(MISSING_TOPIC);
175         }
176
177         synchronized (this) {
178             if (dmaapTopicWriters.containsKey(topic)) {
179                 return dmaapTopicWriters.get(topic);
180             } else {
181                 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
182             }
183         }
184     }
185
186     @Override
187     public synchronized List<DmaapTopicSink> inventory() {
188         return new ArrayList<>(this.dmaapTopicWriters.values());
189     }
190
191     @Override
192     public String toString() {
193         return "IndexedDmaapTopicSinkFactory " + dmaapTopicWriters.keySet();
194     }
195
196 }