78bfd46cf6fd7ade5f0e487663190f6d58daa6bb
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.impl;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
28
29 import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink;
30 import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSinkFactory;
31 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * Factory of noop sinks
37  */
38 public class IndexedNoopTopicSinkFactory implements NoopTopicSinkFactory {
39     private static final String MISSING_TOPIC = "A topic must be provided";
40
41     private static final IndexedNoopTopicSinkFactory instance = new IndexedNoopTopicSinkFactory();
42
43     /**
44      * Logger
45      */
46     private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
47
48     /**
49      * noop topic sinks map
50      */
51     protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<>();
52
53     /**
54      * Get the singleton instance.
55      * 
56      * @return the instance
57      */
58     public static IndexedNoopTopicSinkFactory getInstance() {
59         return instance;
60     }
61
62     private IndexedNoopTopicSinkFactory() {}
63
64     @Override
65     public List<NoopTopicSink> build(Properties properties) {
66
67         final String sinkTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS);
68         if (sinkTopics == null || sinkTopics.isEmpty()) {
69             logger.info("{}: no topic for noop sink", this);
70             return new ArrayList<>();
71         }
72
73         final List<String> sinkTopicList = new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*")));
74         final List<NoopTopicSink> newSinks = new ArrayList<>();
75         synchronized (this) {
76             for (final String topic : sinkTopicList) {
77                 if (this.noopTopicSinks.containsKey(topic)) {
78                     newSinks.add(this.noopTopicSinks.get(topic));
79                     continue;
80                 }
81
82                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + "." + topic
83                         + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
84
85                 if (servers == null || servers.isEmpty()) {
86                     servers = "noop";
87                 }
88
89                 final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
90
91                 final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + "."
92                         + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
93                 boolean managed = true;
94                 if (managedString != null && !managedString.isEmpty()) {
95                     managed = Boolean.parseBoolean(managedString);
96                 }
97
98                 final NoopTopicSink noopSink = this.build(serverList, topic, managed);
99                 newSinks.add(noopSink);
100             }
101             return newSinks;
102         }
103     }
104
105     @Override
106     public NoopTopicSink build(List<String> servers, String topic, boolean managed) {
107
108         List<String> noopSinkServers = servers;
109         if (noopSinkServers == null) {
110             noopSinkServers = new ArrayList<>();
111         }
112
113         if (noopSinkServers.isEmpty()) {
114             noopSinkServers.add("noop");
115         }
116
117         if (topic == null || topic.isEmpty()) {
118             throw new IllegalArgumentException(MISSING_TOPIC);
119         }
120
121         synchronized (this) {
122             if (this.noopTopicSinks.containsKey(topic)) {
123                 return this.noopTopicSinks.get(topic);
124             }
125
126             final NoopTopicSink sink = new NoopTopicSink(noopSinkServers, topic);
127
128             if (managed) {
129                 this.noopTopicSinks.put(topic, sink);
130             }
131
132             return sink;
133         }
134     }
135
136     @Override
137     public void destroy(String topic) {
138         if (topic == null || topic.isEmpty()) {
139             throw new IllegalArgumentException(MISSING_TOPIC);
140         }
141
142         NoopTopicSink noopSink;
143         synchronized (this) {
144             if (!this.noopTopicSinks.containsKey(topic)) {
145                 return;
146             }
147
148             noopSink = this.noopTopicSinks.remove(topic);
149         }
150
151         noopSink.shutdown();
152     }
153
154     @Override
155     public void destroy() {
156         final List<NoopTopicSink> sinks = this.inventory();
157         for (final NoopTopicSink sink : sinks) {
158             sink.shutdown();
159         }
160
161         synchronized (this) {
162             this.noopTopicSinks.clear();
163         }
164     }
165
166     @Override
167     public NoopTopicSink get(String topic) {
168         if (topic == null || topic.isEmpty()) {
169             throw new IllegalArgumentException(MISSING_TOPIC);
170         }
171
172         synchronized (this) {
173             if (this.noopTopicSinks.containsKey(topic)) {
174                 return this.noopTopicSinks.get(topic);
175             } else {
176                 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
177             }
178         }
179     }
180
181     @Override
182     public List<NoopTopicSink> inventory() {
183         return new ArrayList<>(this.noopTopicSinks.values());
184     }
185 }