17446a9440a01af67183d3ff464d3b79b42e45f2
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus;
22
23 import com.google.re2j.Pattern;
24 import java.util.ArrayList;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
28 import org.apache.commons.lang3.StringUtils;
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink;
31 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
32 import org.onap.policy.common.endpoints.utils.PropertyUtils;
33 import org.onap.policy.common.endpoints.utils.UebPropertyUtils;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * Factory of UEB Reader Topics indexed by topic name.
39  */
40 class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
41     private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
42     private static final String MISSING_TOPIC = "A topic must be provided";
43
44     /**
45      * Logger.
46      */
47     private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
48
49     /**
50      * UEB Topic Name Index.
51      */
52     protected HashMap<String, UebTopicSink> uebTopicSinks = new HashMap<>();
53
54     @Override
55     public UebTopicSink build(BusTopicParams busTopicParams) {
56
57         if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
58             throw new IllegalArgumentException("UEB Server(s) must be provided");
59         }
60
61         if (StringUtils.isBlank(busTopicParams.getTopic())) {
62             throw new IllegalArgumentException(MISSING_TOPIC);
63         }
64
65         synchronized (this) {
66             if (uebTopicSinks.containsKey(busTopicParams.getTopic())) {
67                 return uebTopicSinks.get(busTopicParams.getTopic());
68             }
69
70             UebTopicSink uebTopicWriter = makeSink(busTopicParams);
71
72             if (busTopicParams.isManaged()) {
73                 uebTopicSinks.put(busTopicParams.getTopic(), uebTopicWriter);
74             }
75
76             return uebTopicWriter;
77         }
78     }
79
80
81     @Override
82     public UebTopicSink build(List<String> servers, String topic) {
83         return this.build(BusTopicParams.builder()
84                 .servers(servers)
85                 .topic(topic)
86                 .managed(true)
87                 .useHttps(false)
88                 .allowSelfSignedCerts(false)
89                 .build());
90     }
91
92
93     @Override
94     public List<UebTopicSink> build(Properties properties) {
95
96         String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS);
97         if (StringUtils.isBlank(writeTopics)) {
98             logger.info("{}: no topic for UEB Sink", this);
99             return new ArrayList<>();
100         }
101
102         List<UebTopicSink> newUebTopicSinks = new ArrayList<>();
103         synchronized (this) {
104             for (String topic : COMMA_SPACE_PAT.split(writeTopics)) {
105                 addTopic(newUebTopicSinks, topic, properties);
106             }
107             return newUebTopicSinks;
108         }
109     }
110
111     private void addTopic(List<UebTopicSink> newUebTopicSinks, String topic, Properties properties) {
112         if (this.uebTopicSinks.containsKey(topic)) {
113             newUebTopicSinks.add(this.uebTopicSinks.get(topic));
114             return;
115         }
116
117         String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic;
118
119         var props = new PropertyUtils(properties, topicPrefix,
120             (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
121
122         String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
123         if (StringUtils.isBlank(servers)) {
124             logger.error("{}: no UEB servers configured for sink {}", this, topic);
125             return;
126         }
127
128         UebTopicSink uebTopicWriter = this.build(UebPropertyUtils.makeBuilder(props, topic, servers)
129                 .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null))
130                 .build());
131         newUebTopicSinks.add(uebTopicWriter);
132     }
133
134     @Override
135     public void destroy(String topic) {
136
137         if (topic == null || topic.isEmpty()) {
138             throw new IllegalArgumentException(MISSING_TOPIC);
139         }
140
141         UebTopicSink uebTopicWriter;
142         synchronized (this) {
143             if (!uebTopicSinks.containsKey(topic)) {
144                 return;
145             }
146
147             uebTopicWriter = uebTopicSinks.remove(topic);
148         }
149
150         uebTopicWriter.shutdown();
151     }
152
153     @Override
154     public void destroy() {
155         List<UebTopicSink> writers = this.inventory();
156         for (UebTopicSink writer : writers) {
157             writer.shutdown();
158         }
159
160         synchronized (this) {
161             this.uebTopicSinks.clear();
162         }
163     }
164
165     @Override
166     public UebTopicSink get(String topic) {
167
168         if (topic == null || topic.isEmpty()) {
169             throw new IllegalArgumentException(MISSING_TOPIC);
170         }
171
172         synchronized (this) {
173             if (uebTopicSinks.containsKey(topic)) {
174                 return uebTopicSinks.get(topic);
175             } else {
176                 throw new IllegalStateException("UebTopicSink for " + topic + " not found");
177             }
178         }
179     }
180
181     @Override
182     public synchronized List<UebTopicSink> inventory() {
183         return new ArrayList<>(this.uebTopicSinks.values());
184     }
185
186     /**
187      * Makes a new sink.
188      *
189      * @param busTopicParams parameters to use to configure the sink
190      * @return a new sink
191      */
192     protected UebTopicSink makeSink(BusTopicParams busTopicParams) {
193         return new InlineUebTopicSink(busTopicParams);
194     }
195
196
197     @Override
198     public String toString() {
199         return "IndexedUebTopicSinkFactory []";
200     }
201
202 }