6f0753b388d42dbc8000368f7c79a13716e33df2
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Properties;
27 import org.apache.commons.lang3.StringUtils;
28 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink;
30 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
31 import org.onap.policy.common.endpoints.utils.DmaapPropertyUtils;
32 import org.onap.policy.common.endpoints.utils.PropertyUtils;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Factory of DMAAP Reader Topics indexed by topic name.
38  */
39 class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
40
41     private static final String MISSING_TOPIC = "A topic must be provided";
42
43     /**
44      * Logger.
45      */
46     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);
47
48     /**
49      * DMAAP Topic Name Index.
50      */
51     protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
52
53     @Override
54     public DmaapTopicSink build(BusTopicParams busTopicParams) {
55
56         if (StringUtils.isBlank(busTopicParams.getTopic())) {
57             throw new IllegalArgumentException(MISSING_TOPIC);
58         }
59
60         synchronized (this) {
61             if (dmaapTopicWriters.containsKey(busTopicParams.getTopic())) {
62                 return dmaapTopicWriters.get(busTopicParams.getTopic());
63             }
64
65             DmaapTopicSink dmaapTopicSink = makeSink(busTopicParams);
66
67             if (busTopicParams.isManaged()) {
68                 dmaapTopicWriters.put(busTopicParams.getTopic(), dmaapTopicSink);
69             }
70             return dmaapTopicSink;
71         }
72     }
73
74     @Override
75     public DmaapTopicSink build(List<String> servers, String topic) {
76         return this.build(BusTopicParams.builder()
77                 .servers(servers)
78                 .topic(topic)
79                 .managed(true)
80                 .useHttps(false)
81                 .allowSelfSignedCerts(false)
82                 .build());
83     }
84
85     @Override
86     public List<DmaapTopicSink> build(Properties properties) {
87
88         String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
89         if (StringUtils.isBlank(writeTopics)) {
90             logger.info("{}: no topic for DMaaP Sink", this);
91             return new ArrayList<>();
92         }
93
94         List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>();
95         synchronized (this) {
96             for (String topic : writeTopics.split("\\s*,\\s*")) {
97                 addTopic(newDmaapTopicSinks, properties, topic);
98             }
99             return newDmaapTopicSinks;
100         }
101     }
102
103     private void addTopic(List<DmaapTopicSink> newDmaapTopicSinks, Properties properties, String topic) {
104         if (this.dmaapTopicWriters.containsKey(topic)) {
105             newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
106             return;
107         }
108
109         String topicPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic;
110
111         PropertyUtils props = new PropertyUtils(properties, topicPrefix,
112             (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
113
114         String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
115         if (StringUtils.isBlank(servers)) {
116             logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
117             return;
118         }
119
120         DmaapTopicSink dmaapTopicSink = this.build(DmaapPropertyUtils.makeBuilder(props, topic, servers)
121                 .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null))
122                 .build());
123
124         newDmaapTopicSinks.add(dmaapTopicSink);
125     }
126
127     /**
128      * Makes a new sink.
129      *
130      * @param busTopicParams parameters to use to configure the sink
131      * @return a new sink
132      */
133     protected DmaapTopicSink makeSink(BusTopicParams busTopicParams) {
134         return new InlineDmaapTopicSink(busTopicParams);
135     }
136
137     @Override
138     public void destroy(String topic) {
139
140         if (topic == null || topic.isEmpty()) {
141             throw new IllegalArgumentException(MISSING_TOPIC);
142         }
143
144         DmaapTopicSink dmaapTopicWriter;
145         synchronized (this) {
146             if (!dmaapTopicWriters.containsKey(topic)) {
147                 return;
148             }
149
150             dmaapTopicWriter = dmaapTopicWriters.remove(topic);
151         }
152
153         dmaapTopicWriter.shutdown();
154     }
155
156     @Override
157     public void destroy() {
158         List<DmaapTopicSink> writers = this.inventory();
159         for (DmaapTopicSink writer : writers) {
160             writer.shutdown();
161         }
162
163         synchronized (this) {
164             this.dmaapTopicWriters.clear();
165         }
166     }
167
168     @Override
169     public DmaapTopicSink get(String topic) {
170
171         if (topic == null || topic.isEmpty()) {
172             throw new IllegalArgumentException(MISSING_TOPIC);
173         }
174
175         synchronized (this) {
176             if (dmaapTopicWriters.containsKey(topic)) {
177                 return dmaapTopicWriters.get(topic);
178             } else {
179                 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
180             }
181         }
182     }
183
184     @Override
185     public synchronized List<DmaapTopicSink> inventory() {
186         return new ArrayList<>(this.dmaapTopicWriters.values());
187     }
188
189     @Override
190     public String toString() {
191         return "IndexedDmaapTopicSinkFactory []";
192     }
193
194 }