d418bfacdbaef57ee85914ba696d7c1f52de9d5b
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
28
29 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * Noop Topic Sink Factory.
35  */
36 public interface NoopTopicSinkFactory {
37
38     /**
39      * Creates noop topic sinks based on properties files.
40      * 
41      * @param properties Properties containing initialization values
42      * 
43      * @return a noop topic sink
44      * @throws IllegalArgumentException if invalid parameters are present
45      */
46     List<NoopTopicSink> build(Properties properties);
47
48     /**
49      * builds a noop sink.
50      * 
51      * @param servers list of servers
52      * @param topic topic name
53      * @param managed is this sink endpoint managed?
54      * @return a noop topic sink
55      * @throws IllegalArgumentException if invalid parameters are present
56      */
57     NoopTopicSink build(List<String> servers, String topic, boolean managed);
58
59     /**
60      * Destroys a sink based on the topic.
61      * 
62      * @param topic topic name
63      * @throws IllegalArgumentException if invalid parameters are present
64      */
65     void destroy(String topic);
66
67     /**
68      * Destroys all sinks.
69      */
70     void destroy();
71
72     /**
73      * gets a sink based on topic name.
74      * 
75      * @param topic the topic name
76      * 
77      * @return a sink with topic name
78      * @throws IllegalArgumentException if an invalid topic is provided
79      * @throws IllegalStateException if the sink is in an incorrect state
80      */
81     NoopTopicSink get(String topic);
82
83     /**
84      * Provides a snapshot of the UEB Topic Writers.
85      * 
86      * @return a list of the UEB Topic Writers
87      */
88     List<NoopTopicSink> inventory();
89
90 }
91
92
93 /* ------------- implementation ----------------- */
94
95 /**
96  * Factory of noop sinks.
97  */
98 class IndexedNoopTopicSinkFactory implements NoopTopicSinkFactory {
99     private static final String MISSING_TOPIC = "A topic must be provided";
100
101     /**
102      * Logger.
103      */
104     private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
105
106     /**
107      * noop topic sinks map.
108      */
109     protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<>();
110
111     @Override
112     public List<NoopTopicSink> build(Properties properties) {
113
114         final String sinkTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS);
115         if (sinkTopics == null || sinkTopics.isEmpty()) {
116             logger.info("{}: no topic for noop sink", this);
117             return new ArrayList<>();
118         }
119
120         final List<String> sinkTopicList = new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*")));
121         final List<NoopTopicSink> newSinks = new ArrayList<>();
122         synchronized (this) {
123             for (final String topic : sinkTopicList) {
124                 if (this.noopTopicSinks.containsKey(topic)) {
125                     newSinks.add(this.noopTopicSinks.get(topic));
126                     continue;
127                 }
128
129                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + "." + topic
130                         + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
131
132                 if (servers == null || servers.isEmpty()) {
133                     servers = "noop";
134                 }
135
136                 final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
137
138                 final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS
139                         + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
140                 boolean managed = true;
141                 if (managedString != null && !managedString.isEmpty()) {
142                     managed = Boolean.parseBoolean(managedString);
143                 }
144
145                 final NoopTopicSink noopSink = this.build(serverList, topic, managed);
146                 newSinks.add(noopSink);
147             }
148             return newSinks;
149         }
150     }
151
152     @Override
153     public NoopTopicSink build(List<String> servers, String topic, boolean managed) {
154
155         List<String> noopSinkServers = servers;
156         if (noopSinkServers == null) {
157             noopSinkServers = new ArrayList<>();
158         }
159
160         if (noopSinkServers.isEmpty()) {
161             noopSinkServers.add("noop");
162         }
163
164         if (topic == null || topic.isEmpty()) {
165             throw new IllegalArgumentException(MISSING_TOPIC);
166         }
167
168         synchronized (this) {
169             if (this.noopTopicSinks.containsKey(topic)) {
170                 return this.noopTopicSinks.get(topic);
171             }
172
173             final NoopTopicSink sink = new NoopTopicSink(noopSinkServers, topic);
174
175             if (managed) {
176                 this.noopTopicSinks.put(topic, sink);
177             }
178
179             return sink;
180         }
181     }
182
183     @Override
184     public void destroy(String topic) {
185         if (topic == null || topic.isEmpty()) {
186             throw new IllegalArgumentException(MISSING_TOPIC);
187         }
188
189         NoopTopicSink noopSink;
190         synchronized (this) {
191             if (!this.noopTopicSinks.containsKey(topic)) {
192                 return;
193             }
194
195             noopSink = this.noopTopicSinks.remove(topic);
196         }
197
198         noopSink.shutdown();
199     }
200
201     @Override
202     public void destroy() {
203         final List<NoopTopicSink> sinks = this.inventory();
204         for (final NoopTopicSink sink : sinks) {
205             sink.shutdown();
206         }
207
208         synchronized (this) {
209             this.noopTopicSinks.clear();
210         }
211     }
212
213     @Override
214     public NoopTopicSink get(String topic) {
215         if (topic == null || topic.isEmpty()) {
216             throw new IllegalArgumentException(MISSING_TOPIC);
217         }
218
219         synchronized (this) {
220             if (this.noopTopicSinks.containsKey(topic)) {
221                 return this.noopTopicSinks.get(topic);
222             } else {
223                 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
224             }
225         }
226     }
227
228     @Override
229     public List<NoopTopicSink> inventory() {
230         return new ArrayList<>(this.noopTopicSinks.values());
231     }
232 }