2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.policy.common.endpoints.event.comm.bus;
 
  23 import java.util.ArrayList;
 
  24 import java.util.HashMap;
 
  25 import java.util.List;
 
  26 import java.util.Properties;
 
  27 import org.apache.commons.lang3.StringUtils;
 
  28 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
 
  29 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink;
 
  30 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
 
  31 import org.onap.policy.common.endpoints.utils.PropertyUtils;
 
  32 import org.onap.policy.common.endpoints.utils.UebPropertyUtils;
 
  33 import org.slf4j.Logger;
 
  34 import org.slf4j.LoggerFactory;
 
  37  * Factory of UEB Reader Topics indexed by topic name.
 
  39 class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
 
  40     private static final String MISSING_TOPIC = "A topic must be provided";
 
  45     private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
 
  48      * UEB Topic Name Index.
 
  50     protected HashMap<String, UebTopicSink> uebTopicSinks = new HashMap<>();
 
  53     public UebTopicSink build(BusTopicParams busTopicParams) {
 
  55         if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
 
  56             throw new IllegalArgumentException("UEB Server(s) must be provided");
 
  59         if (StringUtils.isBlank(busTopicParams.getTopic())) {
 
  60             throw new IllegalArgumentException(MISSING_TOPIC);
 
  64             if (uebTopicSinks.containsKey(busTopicParams.getTopic())) {
 
  65                 return uebTopicSinks.get(busTopicParams.getTopic());
 
  68             UebTopicSink uebTopicWriter = makeSink(busTopicParams);
 
  70             if (busTopicParams.isManaged()) {
 
  71                 uebTopicSinks.put(busTopicParams.getTopic(), uebTopicWriter);
 
  74             return uebTopicWriter;
 
  80     public UebTopicSink build(List<String> servers, String topic) {
 
  81         return this.build(BusTopicParams.builder()
 
  86                 .allowSelfSignedCerts(false)
 
  92     public List<UebTopicSink> build(Properties properties) {
 
  94         String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS);
 
  95         if (StringUtils.isBlank(writeTopics)) {
 
  96             logger.info("{}: no topic for UEB Sink", this);
 
  97             return new ArrayList<>();
 
 100         List<UebTopicSink> newUebTopicSinks = new ArrayList<>();
 
 101         synchronized (this) {
 
 102             for (String topic : writeTopics.split("\\s*,\\s*")) {
 
 103                 addTopic(newUebTopicSinks, topic, properties);
 
 105             return newUebTopicSinks;
 
 109     private void addTopic(List<UebTopicSink> newUebTopicSinks, String topic, Properties properties) {
 
 110         if (this.uebTopicSinks.containsKey(topic)) {
 
 111             newUebTopicSinks.add(this.uebTopicSinks.get(topic));
 
 115         String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic;
 
 117         PropertyUtils props = new PropertyUtils(properties, topicPrefix,
 
 118             (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
 
 120         String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
 
 121         if (StringUtils.isBlank(servers)) {
 
 122             logger.error("{}: no UEB servers configured for sink {}", this, topic);
 
 126         UebTopicSink uebTopicWriter = this.build(UebPropertyUtils.makeBuilder(props, topic, servers)
 
 127                 .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null))
 
 129         newUebTopicSinks.add(uebTopicWriter);
 
 133     public void destroy(String topic) {
 
 135         if (topic == null || topic.isEmpty()) {
 
 136             throw new IllegalArgumentException(MISSING_TOPIC);
 
 139         UebTopicSink uebTopicWriter;
 
 140         synchronized (this) {
 
 141             if (!uebTopicSinks.containsKey(topic)) {
 
 145             uebTopicWriter = uebTopicSinks.remove(topic);
 
 148         uebTopicWriter.shutdown();
 
 152     public void destroy() {
 
 153         List<UebTopicSink> writers = this.inventory();
 
 154         for (UebTopicSink writer : writers) {
 
 158         synchronized (this) {
 
 159             this.uebTopicSinks.clear();
 
 164     public UebTopicSink get(String topic) {
 
 166         if (topic == null || topic.isEmpty()) {
 
 167             throw new IllegalArgumentException(MISSING_TOPIC);
 
 170         synchronized (this) {
 
 171             if (uebTopicSinks.containsKey(topic)) {
 
 172                 return uebTopicSinks.get(topic);
 
 174                 throw new IllegalStateException("UebTopicSink for " + topic + " not found");
 
 180     public synchronized List<UebTopicSink> inventory() {
 
 181         return new ArrayList<>(this.uebTopicSinks.values());
 
 187      * @param busTopicParams parameters to use to configure the sink
 
 190     protected UebTopicSink makeSink(BusTopicParams busTopicParams) {
 
 191         return new InlineUebTopicSink(busTopicParams);
 
 196     public String toString() {
 
 197         StringBuilder builder = new StringBuilder();
 
 198         builder.append("IndexedUebTopicSinkFactory []");
 
 199         return builder.toString();