150a02c02185d59bb4654ef8976326d4c368fe80
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Properties;
27 import org.apache.commons.lang3.StringUtils;
28 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink;
30 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
31 import org.onap.policy.common.endpoints.utils.PropertyUtils;
32 import org.onap.policy.common.endpoints.utils.UebPropertyUtils;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Factory of UEB Reader Topics indexed by topic name.
38  */
39 class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
40     private static final String MISSING_TOPIC = "A topic must be provided";
41
42     /**
43      * Logger.
44      */
45     private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
46
47     /**
48      * UEB Topic Name Index.
49      */
50     protected HashMap<String, UebTopicSink> uebTopicSinks = new HashMap<>();
51
52     @Override
53     public UebTopicSink build(BusTopicParams busTopicParams) {
54
55         if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
56             throw new IllegalArgumentException("UEB Server(s) must be provided");
57         }
58
59         if (StringUtils.isBlank(busTopicParams.getTopic())) {
60             throw new IllegalArgumentException(MISSING_TOPIC);
61         }
62
63         synchronized (this) {
64             if (uebTopicSinks.containsKey(busTopicParams.getTopic())) {
65                 return uebTopicSinks.get(busTopicParams.getTopic());
66             }
67
68             UebTopicSink uebTopicWriter = makeSink(busTopicParams);
69
70             if (busTopicParams.isManaged()) {
71                 uebTopicSinks.put(busTopicParams.getTopic(), uebTopicWriter);
72             }
73
74             return uebTopicWriter;
75         }
76     }
77
78
79     @Override
80     public UebTopicSink build(List<String> servers, String topic) {
81         return this.build(BusTopicParams.builder()
82                 .servers(servers)
83                 .topic(topic)
84                 .managed(true)
85                 .useHttps(false)
86                 .allowSelfSignedCerts(false)
87                 .build());
88     }
89
90
91     @Override
92     public List<UebTopicSink> build(Properties properties) {
93
94         String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS);
95         if (StringUtils.isBlank(writeTopics)) {
96             logger.info("{}: no topic for UEB Sink", this);
97             return new ArrayList<>();
98         }
99
100         List<UebTopicSink> newUebTopicSinks = new ArrayList<>();
101         synchronized (this) {
102             for (String topic : writeTopics.split("\\s*,\\s*")) {
103                 addTopic(newUebTopicSinks, topic, properties);
104             }
105             return newUebTopicSinks;
106         }
107     }
108
109     private void addTopic(List<UebTopicSink> newUebTopicSinks, String topic, Properties properties) {
110         if (this.uebTopicSinks.containsKey(topic)) {
111             newUebTopicSinks.add(this.uebTopicSinks.get(topic));
112             return;
113         }
114
115         String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic;
116
117         PropertyUtils props = new PropertyUtils(properties, topicPrefix,
118             (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
119
120         String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
121         if (StringUtils.isBlank(servers)) {
122             logger.error("{}: no UEB servers configured for sink {}", this, topic);
123             return;
124         }
125
126         UebTopicSink uebTopicWriter = this.build(UebPropertyUtils.makeBuilder(props, topic, servers)
127                 .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null))
128                 .build());
129         newUebTopicSinks.add(uebTopicWriter);
130     }
131
132     @Override
133     public void destroy(String topic) {
134
135         if (topic == null || topic.isEmpty()) {
136             throw new IllegalArgumentException(MISSING_TOPIC);
137         }
138
139         UebTopicSink uebTopicWriter;
140         synchronized (this) {
141             if (!uebTopicSinks.containsKey(topic)) {
142                 return;
143             }
144
145             uebTopicWriter = uebTopicSinks.remove(topic);
146         }
147
148         uebTopicWriter.shutdown();
149     }
150
151     @Override
152     public void destroy() {
153         List<UebTopicSink> writers = this.inventory();
154         for (UebTopicSink writer : writers) {
155             writer.shutdown();
156         }
157
158         synchronized (this) {
159             this.uebTopicSinks.clear();
160         }
161     }
162
163     @Override
164     public UebTopicSink get(String topic) {
165
166         if (topic == null || topic.isEmpty()) {
167             throw new IllegalArgumentException(MISSING_TOPIC);
168         }
169
170         synchronized (this) {
171             if (uebTopicSinks.containsKey(topic)) {
172                 return uebTopicSinks.get(topic);
173             } else {
174                 throw new IllegalStateException("UebTopicSink for " + topic + " not found");
175             }
176         }
177     }
178
179     @Override
180     public synchronized List<UebTopicSink> inventory() {
181         return new ArrayList<>(this.uebTopicSinks.values());
182     }
183
184     /**
185      * Makes a new sink.
186      *
187      * @param busTopicParams parameters to use to configure the sink
188      * @return a new sink
189      */
190     protected UebTopicSink makeSink(BusTopicParams busTopicParams) {
191         return new InlineUebTopicSink(busTopicParams);
192     }
193
194
195     @Override
196     public String toString() {
197         StringBuilder builder = new StringBuilder();
198         builder.append("IndexedUebTopicSinkFactory []");
199         return builder.toString();
200     }
201
202 }