Remove policy-endpoints checkstyle suppressions 22/75822/1
authorJim Hahn <jrh3@att.com>
Tue, 15 Jan 2019 14:43:13 +0000 (09:43 -0500)
committerJim Hahn <jrh3@att.com>
Tue, 15 Jan 2019 14:43:13 +0000 (09:43 -0500)
Split class files so each file only contains one class.

Change-Id: Iaf94c4c079380dc7db3aff2affbe48f50e3cc0a2
Issue-ID: POLICY-1134
Signed-off-by: Jim Hahn <jrh3@att.com>
14 files changed:
policy-endpoints/checkstyle-suppressions.xml [deleted file]
policy-endpoints/pom.xml
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java [new file with mode: 0644]
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java [new file with mode: 0644]
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java [new file with mode: 0644]
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java [new file with mode: 0644]
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClientFactory.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/IndexedHttpClientFactory.java [new file with mode: 0644]
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServerFactory.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java [new file with mode: 0644]

diff --git a/policy-endpoints/checkstyle-suppressions.xml b/policy-endpoints/checkstyle-suppressions.xml
deleted file mode 100644 (file)
index 0b65969..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-<?xml version="1.0"?>
-<!--
-  ============LICENSE_START=======================================================
-   Copyright (C) 2018 AT&T Technologies. All rights reserved.
-  ================================================================================
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-  
-       http://www.apache.org/licenses/LICENSE-2.0
-  
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  
-  SPDX-License-Identifier: Apache-2.0
-  ============LICENSE_END=========================================================
--->
-<!DOCTYPE suppressions PUBLIC
-     "-//Puppy Crawl//DTD Suppressions 1.0//EN"
-     "http://www.puppycrawl.com/dtds/suppressions_1_0.dtd">
-<suppressions>
-  <suppress checks="OneTopLevelClass"
-    files="DmaapTopicSinkFactory.java|DmaapTopicSourceFactory.java|HttpServletServerFactory.java|HttpClientFactory.java|UebTopicSinkFactory.java|NoopTopicSinkFactory.java|UebTopicSourceFactory.java|TopicEndpoint.java|HttpServletServerFactory.java"
-    lines="1-9999"/>
-</suppressions>
index 87139bc..202c9c2 100644 (file)
                             <includeTestResources>true</includeTestResources>
                             <excludes>
                             </excludes>
-                            <suppressionsLocation>${project.basedir}/checkstyle-suppressions.xml</suppressionsLocation>
                             <consoleOutput>true</consoleOutput>
                             <failsOnViolation>true</failsOnViolation>
                             <violationSeverity>warning</violationSeverity>
index 4c36a39..a3eb4df 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
 
 package org.onap.policy.common.endpoints.event.comm.bus;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
-
 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * DMAAP Topic Sink Factory.
@@ -121,296 +112,3 @@ public interface DmaapTopicSinkFactory {
      */
     List<DmaapTopicSink> inventory();
 }
-
-
-/* ------------- implementation ----------------- */
-
-/**
- * Factory of DMAAP Reader Topics indexed by topic name.
- */
-class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
-
-    private static final String MISSING_TOPIC = "A topic must be provided";
-
-    /**
-     * Logger.
-     */
-    private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);
-
-    /**
-     * DMAAP Topic Name Index.
-     */
-    protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
-
-    @Override
-    public DmaapTopicSink build(BusTopicParams busTopicParams) {
-
-        if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
-            throw new IllegalArgumentException(MISSING_TOPIC);
-        }
-
-        synchronized (this) {
-            if (dmaapTopicWriters.containsKey(busTopicParams.getTopic())) {
-                return dmaapTopicWriters.get(busTopicParams.getTopic());
-            }
-
-            DmaapTopicSink dmaapTopicSink = makeSink(busTopicParams);
-
-            if (busTopicParams.isManaged()) {
-                dmaapTopicWriters.put(busTopicParams.getTopic(), dmaapTopicSink);
-            }
-            return dmaapTopicSink;
-        }
-    }
-
-    @Override
-    public DmaapTopicSink build(List<String> servers, String topic) {
-        return this.build(BusTopicParams.builder()
-                .servers(servers)
-                .topic(topic)
-                .managed(true)
-                .useHttps(false)
-                .allowSelfSignedCerts(false)
-                .build());
-    }
-
-    @Override
-    public List<DmaapTopicSink> build(Properties properties) {
-
-        String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
-        if (writeTopics == null || writeTopics.isEmpty()) {
-            logger.info("{}: no topic for DMaaP Sink", this);
-            return new ArrayList<>();
-        }
-
-        List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
-        List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>();
-        synchronized (this) {
-            for (String topic : writeTopicList) {
-                if (this.dmaapTopicWriters.containsKey(topic)) {
-                    newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
-                    continue;
-                }
-                String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
-                        + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
-
-                List<String> serverList;
-                if (servers != null && !servers.isEmpty()) {
-                    serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
-                } else {
-                    serverList = new ArrayList<>();
-                }
-
-                final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
-                final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
-
-                final String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
-                final String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
-
-                final String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
-
-                final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
-
-                /* DME2 Properties */
-
-                final String dme2Environment = properties.getProperty(
-                        PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
-                                + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
-
-                final String dme2AftEnvironment = properties.getProperty(
-                        PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
-                                + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
-
-                final String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
-
-                final String dme2RouteOffer = properties.getProperty(
-                        PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
-                                + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
-
-                final String dme2Latitude = properties.getProperty(
-                        PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
-                                + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
-
-                final String dme2Longitude = properties.getProperty(
-                        PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
-                                + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
-
-                final String dme2EpReadTimeoutMs = properties.getProperty(
-                        PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
-                                + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
-
-                final String dme2EpConnTimeout = properties.getProperty(
-                        PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
-                                + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
-
-                final String dme2RoundtripTimeoutMs =
-                        properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
-                                + "." + topic
-                                + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
-
-                final String dme2Version = properties.getProperty(
-                        PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
-                                + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
-
-                final String dme2SubContextPath = properties.getProperty(
-                        PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
-                                + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
-
-                final String dme2SessionStickinessRequired =
-                        properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
-                                + "." + topic
-                                + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
-
-                Map<String, String> dme2AdditionalProps = new HashMap<>();
-
-                if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
-                    dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
-                }
-                if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
-                    dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
-                }
-                if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
-                    dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
-                }
-                if (dme2Version != null && !dme2Version.isEmpty()) {
-                    dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
-                }
-                if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
-                    dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
-                }
-                if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
-                    dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
-                }
-                if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
-                    dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
-                }
-
-                if (servers == null || servers.isEmpty()) {
-                    logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
-                    continue;
-                }
-
-                boolean managed = true;
-                if (managedString != null && !managedString.isEmpty()) {
-                    managed = Boolean.parseBoolean(managedString);
-                }
-
-                String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
-                        + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
-
-                // default is to use HTTP if no https property exists
-                boolean useHttps = false;
-                if (useHttpsString != null && !useHttpsString.isEmpty()) {
-                    useHttps = Boolean.parseBoolean(useHttpsString);
-                }
-
-                String allowSelfSignedCertsString =
-                        properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
-                                + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
-
-                // default is to disallow self-signed certs
-                boolean allowSelfSignedCerts = false;
-                if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
-                    allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
-                }
-
-                DmaapTopicSink dmaapTopicSink = this.build(BusTopicParams.builder()
-                        .servers(serverList)
-                        .topic(topic)
-                        .apiKey(apiKey)
-                        .apiSecret(apiSecret)
-                        .userName(aafMechId)
-                        .password(aafPassword)
-                        .partitionId(partitionKey)
-                        .environment(dme2Environment)
-                        .aftEnvironment(dme2AftEnvironment)
-                        .partner(dme2Partner)
-                        .latitude(dme2Latitude)
-                        .longitude(dme2Longitude)
-                        .additionalProps(dme2AdditionalProps)
-                        .managed(managed)
-                        .useHttps(useHttps)
-                        .allowSelfSignedCerts(allowSelfSignedCerts)
-                        .build());
-
-                newDmaapTopicSinks.add(dmaapTopicSink);
-            }
-            return newDmaapTopicSinks;
-        }
-    }
-
-    /**
-     * Makes a new sink.
-     *
-     * @param busTopicParams parameters to use to configure the sink
-     * @return a new sink
-     */
-    protected DmaapTopicSink makeSink(BusTopicParams busTopicParams) {
-        return new InlineDmaapTopicSink(busTopicParams);
-    }
-
-    @Override
-    public void destroy(String topic) {
-
-        if (topic == null || topic.isEmpty()) {
-            throw new IllegalArgumentException(MISSING_TOPIC);
-        }
-
-        DmaapTopicSink dmaapTopicWriter;
-        synchronized (this) {
-            if (!dmaapTopicWriters.containsKey(topic)) {
-                return;
-            }
-
-            dmaapTopicWriter = dmaapTopicWriters.remove(topic);
-        }
-
-        dmaapTopicWriter.shutdown();
-    }
-
-    @Override
-    public void destroy() {
-        List<DmaapTopicSink> writers = this.inventory();
-        for (DmaapTopicSink writer : writers) {
-            writer.shutdown();
-        }
-
-        synchronized (this) {
-            this.dmaapTopicWriters.clear();
-        }
-    }
-
-    @Override
-    public DmaapTopicSink get(String topic) {
-
-        if (topic == null || topic.isEmpty()) {
-            throw new IllegalArgumentException(MISSING_TOPIC);
-        }
-
-        synchronized (this) {
-            if (dmaapTopicWriters.containsKey(topic)) {
-                return dmaapTopicWriters.get(topic);
-            } else {
-                throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
-            }
-        }
-    }
-
-    @Override
-    public synchronized List<DmaapTopicSink> inventory() {
-        return new ArrayList<>(this.dmaapTopicWriters.values());
-    }
-
-    @Override
-    public String toString() {
-        return "IndexedDmaapTopicSinkFactory []";
-    }
-
-}
index ae6c6c3..35a79bf 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
 
 package org.onap.policy.common.endpoints.event.comm.bus;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
-
 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * DMAAP Topic Source Factory.
@@ -119,349 +110,3 @@ public interface DmaapTopicSourceFactory {
      */
     List<DmaapTopicSource> inventory();
 }
-
-
-/* ------------- implementation ----------------- */
-
-/**
- * Factory of DMAAP Source Topics indexed by topic name.
- */
-
-class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
-    private static final String MISSING_TOPIC = "A topic must be provided";
-
-    /**
-     * Logger.
-     */
-    private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
-
-    /**
-     * DMaaP Topic Name Index.
-     */
-    protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
-
-    @Override
-    public DmaapTopicSource build(BusTopicParams busTopicParams) {
-
-        if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
-            throw new IllegalArgumentException(MISSING_TOPIC);
-        }
-
-        synchronized (this) {
-            if (dmaapTopicSources.containsKey(busTopicParams.getTopic())) {
-                return dmaapTopicSources.get(busTopicParams.getTopic());
-            }
-
-            DmaapTopicSource dmaapTopicSource = makeSource(busTopicParams);
-
-            if (busTopicParams.isManaged()) {
-                dmaapTopicSources.put(busTopicParams.getTopic(), dmaapTopicSource);
-            }
-            return dmaapTopicSource;
-        }
-    }
-
-    @Override
-    public List<DmaapTopicSource> build(Properties properties) {
-
-        String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
-        if (readTopics == null || readTopics.isEmpty()) {
-            logger.info("{}: no topic for DMaaP Source", this);
-            return new ArrayList<>();
-        }
-        List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
-
-        List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
-        synchronized (this) {
-            for (String topic : readTopicList) {
-                if (this.dmaapTopicSources.containsKey(topic)) {
-                    dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
-                    continue;
-                }
-
-                String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
-                        + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
-
-                List<String> serverList;
-                if (servers != null && !servers.isEmpty()) {
-                    serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
-                } else {
-                    serverList = new ArrayList<>();
-                }
-
-                final String apiKey = properties.getProperty(
-                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
-                        + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
-
-                final String apiSecret = properties.getProperty(
-                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
-                        + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
-
-                final String aafMechId = properties.getProperty(
-                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
-                        + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
-
-                final String aafPassword = properties.getProperty(
-                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
-                        + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
-
-                final String consumerGroup = properties.getProperty(
-                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
-
-                final String consumerInstance = properties.getProperty(
-                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
-
-                final String fetchTimeoutString = properties.getProperty(
-                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
-
-                /* DME2 Properties */
-
-                final String dme2Environment = properties.getProperty(
-                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
-
-                final String dme2AftEnvironment = properties.getProperty(
-                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
-
-                final String dme2Partner = properties.getProperty(
-                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
-                        + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
-
-                final String dme2RouteOffer = properties.getProperty(
-                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
-
-                final String dme2Latitude = properties.getProperty(
-                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
-                        + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
-
-                final String dme2Longitude = properties.getProperty(
-                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
-
-                final String dme2EpReadTimeoutMs =
-                        properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
-                                + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
-
-                final String dme2EpConnTimeout = properties.getProperty(
-                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
-
-                final String dme2RoundtripTimeoutMs =
-                        properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
-                                + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
-
-                final String dme2Version = properties.getProperty(
-                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
-                        + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
-
-                final String dme2SubContextPath = properties.getProperty(
-                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
-
-                final String dme2SessionStickinessRequired =
-                        properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
-                                + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
-
-                Map<String, String> dme2AdditionalProps = new HashMap<>();
-
-                if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
-                    dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
-                }
-                if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
-                    dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
-                }
-                if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
-                    dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
-                }
-                if (dme2Version != null && !dme2Version.isEmpty()) {
-                    dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
-                }
-                if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
-                    dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
-                }
-                if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
-                    dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
-                }
-                if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
-                    dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
-                }
-
-
-                if (servers == null || servers.isEmpty()) {
-
-                    logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
-                    continue;
-                }
-
-                int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
-                if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
-                    try {
-                        fetchTimeout = Integer.parseInt(fetchTimeoutString);
-                    } catch (NumberFormatException nfe) {
-                        logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
-                                topic);
-                    }
-                }
-
-                String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
-                int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
-                if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
-                    try {
-                        fetchLimit = Integer.parseInt(fetchLimitString);
-                    } catch (NumberFormatException nfe) {
-                        logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
-                                topic);
-                    }
-                }
-
-                String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
-                boolean managed = true;
-                if (managedString != null && !managedString.isEmpty()) {
-                    managed = Boolean.parseBoolean(managedString);
-                }
-
-                String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
-
-                // default is to use HTTP if no https property exists
-                boolean useHttps = false;
-                if (useHttpsString != null && !useHttpsString.isEmpty()) {
-                    useHttps = Boolean.parseBoolean(useHttpsString);
-                }
-
-                String allowSelfSignedCertsString =
-                        properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
-                                + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
-
-                // default is to disallow self-signed certs
-                boolean allowSelfSignedCerts = false;
-                if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
-                    allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
-                }
-
-
-                DmaapTopicSource uebTopicSource = this.build(BusTopicParams.builder()
-                        .servers(serverList)
-                        .topic(topic)
-                        .apiKey(apiKey)
-                        .apiSecret(apiSecret)
-                        .userName(aafMechId)
-                        .password(aafPassword)
-                        .consumerGroup(consumerGroup)
-                        .consumerInstance(consumerInstance)
-                        .fetchTimeout(fetchTimeout)
-                        .fetchLimit(fetchLimit)
-                        .environment(dme2Environment)
-                        .aftEnvironment(dme2AftEnvironment)
-                        .partner(dme2Partner)
-                        .latitude(dme2Latitude)
-                        .longitude(dme2Longitude)
-                        .additionalProps(dme2AdditionalProps)
-                        .managed(managed)
-                        .useHttps(useHttps)
-                        .allowSelfSignedCerts(allowSelfSignedCerts)
-                        .build());
-
-                dmaapTopicSourceLst.add(uebTopicSource);
-            }
-        }
-        return dmaapTopicSourceLst;
-    }
-
-    @Override
-    public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
-        return this.build(BusTopicParams.builder()
-                .servers(servers)
-                .topic(topic)
-                .apiKey(apiKey)
-                .apiSecret(apiSecret)
-                .fetchTimeout(DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH)
-                .fetchLimit(DmaapTopicSource.DEFAULT_LIMIT_FETCH)
-                .managed(true)
-                .useHttps(false)
-                .allowSelfSignedCerts(false)
-                .build());
-    }
-
-    @Override
-    public DmaapTopicSource build(List<String> servers, String topic) {
-        return this.build(servers, topic, null, null);
-    }
-
-    /**
-     * Makes a new source.
-     * 
-     * @param busTopicParams parameters to use to configure the source
-     * @return a new source
-     */
-    protected DmaapTopicSource makeSource(BusTopicParams busTopicParams) {
-        return new SingleThreadedDmaapTopicSource(busTopicParams);
-    }
-
-    @Override
-    public void destroy(String topic) {
-
-        if (topic == null || topic.isEmpty()) {
-            throw new IllegalArgumentException(MISSING_TOPIC);
-        }
-
-        DmaapTopicSource uebTopicSource;
-
-        synchronized (this) {
-            if (!dmaapTopicSources.containsKey(topic)) {
-                return;
-            }
-
-            uebTopicSource = dmaapTopicSources.remove(topic);
-        }
-
-        uebTopicSource.shutdown();
-    }
-
-    @Override
-    public void destroy() {
-        List<DmaapTopicSource> readers = this.inventory();
-        for (DmaapTopicSource reader : readers) {
-            reader.shutdown();
-        }
-
-        synchronized (this) {
-            this.dmaapTopicSources.clear();
-        }
-    }
-
-    @Override
-    public DmaapTopicSource get(String topic) {
-
-        if (topic == null || topic.isEmpty()) {
-            throw new IllegalArgumentException(MISSING_TOPIC);
-        }
-
-        synchronized (this) {
-            if (dmaapTopicSources.containsKey(topic)) {
-                return dmaapTopicSources.get(topic);
-            } else {
-                throw new IllegalStateException("DmaapTopiceSource for " + topic + " not found");
-            }
-        }
-    }
-
-    @Override
-    public synchronized List<DmaapTopicSource> inventory() {
-        return new ArrayList<>(this.dmaapTopicSources.values());
-    }
-
-    @Override
-    public String toString() {
-        return "IndexedDmaapTopicSourceFactory []";
-    }
-
-}
-
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java
new file mode 100644 (file)
index 0000000..659833c
--- /dev/null
@@ -0,0 +1,324 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Engine - Common Modules
+ * ================================================================================
+ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory of DMAAP Reader Topics indexed by topic name.
+ */
+class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
+
+    private static final String MISSING_TOPIC = "A topic must be provided";
+
+    /**
+     * Logger.
+     */
+    private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);
+
+    /**
+     * DMAAP Topic Name Index.
+     */
+    protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
+
+    @Override
+    public DmaapTopicSink build(BusTopicParams busTopicParams) {
+
+        if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
+            throw new IllegalArgumentException(MISSING_TOPIC);
+        }
+
+        synchronized (this) {
+            if (dmaapTopicWriters.containsKey(busTopicParams.getTopic())) {
+                return dmaapTopicWriters.get(busTopicParams.getTopic());
+            }
+
+            DmaapTopicSink dmaapTopicSink = makeSink(busTopicParams);
+
+            if (busTopicParams.isManaged()) {
+                dmaapTopicWriters.put(busTopicParams.getTopic(), dmaapTopicSink);
+            }
+            return dmaapTopicSink;
+        }
+    }
+
+    @Override
+    public DmaapTopicSink build(List<String> servers, String topic) {
+        return this.build(BusTopicParams.builder()
+                .servers(servers)
+                .topic(topic)
+                .managed(true)
+                .useHttps(false)
+                .allowSelfSignedCerts(false)
+                .build());
+    }
+
+    @Override
+    public List<DmaapTopicSink> build(Properties properties) {
+
+        String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
+        if (writeTopics == null || writeTopics.isEmpty()) {
+            logger.info("{}: no topic for DMaaP Sink", this);
+            return new ArrayList<>();
+        }
+
+        List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
+        List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>();
+        synchronized (this) {
+            for (String topic : writeTopicList) {
+                if (this.dmaapTopicWriters.containsKey(topic)) {
+                    newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
+                    continue;
+                }
+                String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+                        + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
+
+                List<String> serverList;
+                if (servers != null && !servers.isEmpty()) {
+                    serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
+                } else {
+                    serverList = new ArrayList<>();
+                }
+
+                final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
+                final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
+
+                final String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
+                final String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
+
+                final String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
+
+                final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
+
+                /* DME2 Properties */
+
+                final String dme2Environment = properties.getProperty(
+                        PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
+                                + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
+
+                final String dme2AftEnvironment = properties.getProperty(
+                        PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
+                                + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
+
+                final String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
+
+                final String dme2RouteOffer = properties.getProperty(
+                        PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
+                                + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
+
+                final String dme2Latitude = properties.getProperty(
+                        PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+                                + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
+
+                final String dme2Longitude = properties.getProperty(
+                        PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
+                                + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
+
+                final String dme2EpReadTimeoutMs = properties.getProperty(
+                        PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
+                                + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
+
+                final String dme2EpConnTimeout = properties.getProperty(
+                        PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
+                                + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
+
+                final String dme2RoundtripTimeoutMs =
+                        properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
+                                + "." + topic
+                                + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
+
+                final String dme2Version = properties.getProperty(
+                        PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+                                + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
+
+                final String dme2SubContextPath = properties.getProperty(
+                        PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
+                                + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
+
+                final String dme2SessionStickinessRequired =
+                        properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
+                                + "." + topic
+                                + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
+
+                Map<String, String> dme2AdditionalProps = new HashMap<>();
+
+                if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
+                    dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
+                }
+                if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
+                    dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
+                }
+                if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
+                    dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
+                }
+                if (dme2Version != null && !dme2Version.isEmpty()) {
+                    dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
+                }
+                if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
+                    dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
+                }
+                if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
+                    dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
+                }
+                if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
+                    dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
+                }
+
+                if (servers == null || servers.isEmpty()) {
+                    logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
+                    continue;
+                }
+
+                boolean managed = true;
+                if (managedString != null && !managedString.isEmpty()) {
+                    managed = Boolean.parseBoolean(managedString);
+                }
+
+                String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+                        + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
+
+                // default is to use HTTP if no https property exists
+                boolean useHttps = false;
+                if (useHttpsString != null && !useHttpsString.isEmpty()) {
+                    useHttps = Boolean.parseBoolean(useHttpsString);
+                }
+
+                String allowSelfSignedCertsString =
+                        properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
+                                + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
+
+                // default is to disallow self-signed certs
+                boolean allowSelfSignedCerts = false;
+                if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
+                    allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
+                }
+
+                DmaapTopicSink dmaapTopicSink = this.build(BusTopicParams.builder()
+                        .servers(serverList)
+                        .topic(topic)
+                        .apiKey(apiKey)
+                        .apiSecret(apiSecret)
+                        .userName(aafMechId)
+                        .password(aafPassword)
+                        .partitionId(partitionKey)
+                        .environment(dme2Environment)
+                        .aftEnvironment(dme2AftEnvironment)
+                        .partner(dme2Partner)
+                        .latitude(dme2Latitude)
+                        .longitude(dme2Longitude)
+                        .additionalProps(dme2AdditionalProps)
+                        .managed(managed)
+                        .useHttps(useHttps)
+                        .allowSelfSignedCerts(allowSelfSignedCerts)
+                        .build());
+
+                newDmaapTopicSinks.add(dmaapTopicSink);
+            }
+            return newDmaapTopicSinks;
+        }
+    }
+
+    /**
+     * Makes a new sink.
+     *
+     * @param busTopicParams parameters to use to configure the sink
+     * @return a new sink
+     */
+    protected DmaapTopicSink makeSink(BusTopicParams busTopicParams) {
+        return new InlineDmaapTopicSink(busTopicParams);
+    }
+
+    @Override
+    public void destroy(String topic) {
+
+        if (topic == null || topic.isEmpty()) {
+            throw new IllegalArgumentException(MISSING_TOPIC);
+        }
+
+        DmaapTopicSink dmaapTopicWriter;
+        synchronized (this) {
+            if (!dmaapTopicWriters.containsKey(topic)) {
+                return;
+            }
+
+            dmaapTopicWriter = dmaapTopicWriters.remove(topic);
+        }
+
+        dmaapTopicWriter.shutdown();
+    }
+
+    @Override
+    public void destroy() {
+        List<DmaapTopicSink> writers = this.inventory();
+        for (DmaapTopicSink writer : writers) {
+            writer.shutdown();
+        }
+
+        synchronized (this) {
+            this.dmaapTopicWriters.clear();
+        }
+    }
+
+    @Override
+    public DmaapTopicSink get(String topic) {
+
+        if (topic == null || topic.isEmpty()) {
+            throw new IllegalArgumentException(MISSING_TOPIC);
+        }
+
+        synchronized (this) {
+            if (dmaapTopicWriters.containsKey(topic)) {
+                return dmaapTopicWriters.get(topic);
+            } else {
+                throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
+            }
+        }
+    }
+
+    @Override
+    public synchronized List<DmaapTopicSink> inventory() {
+        return new ArrayList<>(this.dmaapTopicWriters.values());
+    }
+
+    @Override
+    public String toString() {
+        return "IndexedDmaapTopicSinkFactory []";
+    }
+
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java
new file mode 100644 (file)
index 0000000..0c008f1
--- /dev/null
@@ -0,0 +1,376 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Engine - Common Modules
+ * ================================================================================
+ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory of DMAAP Source Topics indexed by topic name.
+ */
+
+class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
+    private static final String MISSING_TOPIC = "A topic must be provided";
+
+    /**
+     * Logger.
+     */
+    private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
+
+    /**
+     * DMaaP Topic Name Index.
+     */
+    protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
+
+    @Override
+    public DmaapTopicSource build(BusTopicParams busTopicParams) {
+
+        if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
+            throw new IllegalArgumentException(MISSING_TOPIC);
+        }
+
+        synchronized (this) {
+            if (dmaapTopicSources.containsKey(busTopicParams.getTopic())) {
+                return dmaapTopicSources.get(busTopicParams.getTopic());
+            }
+
+            DmaapTopicSource dmaapTopicSource = makeSource(busTopicParams);
+
+            if (busTopicParams.isManaged()) {
+                dmaapTopicSources.put(busTopicParams.getTopic(), dmaapTopicSource);
+            }
+            return dmaapTopicSource;
+        }
+    }
+
+    @Override
+    public List<DmaapTopicSource> build(Properties properties) {
+
+        String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
+        if (readTopics == null || readTopics.isEmpty()) {
+            logger.info("{}: no topic for DMaaP Source", this);
+            return new ArrayList<>();
+        }
+        List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
+
+        List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
+        synchronized (this) {
+            for (String topic : readTopicList) {
+                if (this.dmaapTopicSources.containsKey(topic)) {
+                    dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
+                    continue;
+                }
+
+                String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+                        + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
+
+                List<String> serverList;
+                if (servers != null && !servers.isEmpty()) {
+                    serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
+                } else {
+                    serverList = new ArrayList<>();
+                }
+
+                final String apiKey = properties.getProperty(
+                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+                        + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
+
+                final String apiSecret = properties.getProperty(
+                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+                        + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
+
+                final String aafMechId = properties.getProperty(
+                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+                        + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
+
+                final String aafPassword = properties.getProperty(
+                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+                        + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
+
+                final String consumerGroup = properties.getProperty(
+                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
+
+                final String consumerInstance = properties.getProperty(
+                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
+
+                final String fetchTimeoutString = properties.getProperty(
+                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
+
+                /* DME2 Properties */
+
+                final String dme2Environment = properties.getProperty(
+                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
+
+                final String dme2AftEnvironment = properties.getProperty(
+                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
+
+                final String dme2Partner = properties.getProperty(
+                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+                        + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
+
+                final String dme2RouteOffer = properties.getProperty(
+                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
+
+                final String dme2Latitude = properties.getProperty(
+                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+                        + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
+
+                final String dme2Longitude = properties.getProperty(
+                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
+
+                final String dme2EpReadTimeoutMs =
+                        properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+                                + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
+
+                final String dme2EpConnTimeout = properties.getProperty(
+                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
+
+                final String dme2RoundtripTimeoutMs =
+                        properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+                                + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
+
+                final String dme2Version = properties.getProperty(
+                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+                        + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
+
+                final String dme2SubContextPath = properties.getProperty(
+                                PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
+
+                final String dme2SessionStickinessRequired =
+                        properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+                                + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
+
+                Map<String, String> dme2AdditionalProps = new HashMap<>();
+
+                if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
+                    dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
+                }
+                if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
+                    dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
+                }
+                if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
+                    dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
+                }
+                if (dme2Version != null && !dme2Version.isEmpty()) {
+                    dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
+                }
+                if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
+                    dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
+                }
+                if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
+                    dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
+                }
+                if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
+                    dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
+                }
+
+
+                if (servers == null || servers.isEmpty()) {
+
+                    logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
+                    continue;
+                }
+
+                int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
+                if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
+                    try {
+                        fetchTimeout = Integer.parseInt(fetchTimeoutString);
+                    } catch (NumberFormatException nfe) {
+                        logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
+                                topic);
+                    }
+                }
+
+                String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
+                int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
+                if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
+                    try {
+                        fetchLimit = Integer.parseInt(fetchLimitString);
+                    } catch (NumberFormatException nfe) {
+                        logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
+                                topic);
+                    }
+                }
+
+                String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
+                boolean managed = true;
+                if (managedString != null && !managedString.isEmpty()) {
+                    managed = Boolean.parseBoolean(managedString);
+                }
+
+                String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
+
+                // default is to use HTTP if no https property exists
+                boolean useHttps = false;
+                if (useHttpsString != null && !useHttpsString.isEmpty()) {
+                    useHttps = Boolean.parseBoolean(useHttpsString);
+                }
+
+                String allowSelfSignedCertsString =
+                        properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+                                + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
+
+                // default is to disallow self-signed certs
+                boolean allowSelfSignedCerts = false;
+                if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
+                    allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
+                }
+
+
+                DmaapTopicSource uebTopicSource = this.build(BusTopicParams.builder()
+                        .servers(serverList)
+                        .topic(topic)
+                        .apiKey(apiKey)
+                        .apiSecret(apiSecret)
+                        .userName(aafMechId)
+                        .password(aafPassword)
+                        .consumerGroup(consumerGroup)
+                        .consumerInstance(consumerInstance)
+                        .fetchTimeout(fetchTimeout)
+                        .fetchLimit(fetchLimit)
+                        .environment(dme2Environment)
+                        .aftEnvironment(dme2AftEnvironment)
+                        .partner(dme2Partner)
+                        .latitude(dme2Latitude)
+                        .longitude(dme2Longitude)
+                        .additionalProps(dme2AdditionalProps)
+                        .managed(managed)
+                        .useHttps(useHttps)
+                        .allowSelfSignedCerts(allowSelfSignedCerts)
+                        .build());
+
+                dmaapTopicSourceLst.add(uebTopicSource);
+            }
+        }
+        return dmaapTopicSourceLst;
+    }
+
+    @Override
+    public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
+        return this.build(BusTopicParams.builder()
+                .servers(servers)
+                .topic(topic)
+                .apiKey(apiKey)
+                .apiSecret(apiSecret)
+                .fetchTimeout(DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH)
+                .fetchLimit(DmaapTopicSource.DEFAULT_LIMIT_FETCH)
+                .managed(true)
+                .useHttps(false)
+                .allowSelfSignedCerts(false)
+                .build());
+    }
+
+    @Override
+    public DmaapTopicSource build(List<String> servers, String topic) {
+        return this.build(servers, topic, null, null);
+    }
+
+    /**
+     * Makes a new source.
+     * 
+     * @param busTopicParams parameters to use to configure the source
+     * @return a new source
+     */
+    protected DmaapTopicSource makeSource(BusTopicParams busTopicParams) {
+        return new SingleThreadedDmaapTopicSource(busTopicParams);
+    }
+
+    @Override
+    public void destroy(String topic) {
+
+        if (topic == null || topic.isEmpty()) {
+            throw new IllegalArgumentException(MISSING_TOPIC);
+        }
+
+        DmaapTopicSource uebTopicSource;
+
+        synchronized (this) {
+            if (!dmaapTopicSources.containsKey(topic)) {
+                return;
+            }
+
+            uebTopicSource = dmaapTopicSources.remove(topic);
+        }
+
+        uebTopicSource.shutdown();
+    }
+
+    @Override
+    public void destroy() {
+        List<DmaapTopicSource> readers = this.inventory();
+        for (DmaapTopicSource reader : readers) {
+            reader.shutdown();
+        }
+
+        synchronized (this) {
+            this.dmaapTopicSources.clear();
+        }
+    }
+
+    @Override
+    public DmaapTopicSource get(String topic) {
+
+        if (topic == null || topic.isEmpty()) {
+            throw new IllegalArgumentException(MISSING_TOPIC);
+        }
+
+        synchronized (this) {
+            if (dmaapTopicSources.containsKey(topic)) {
+                return dmaapTopicSources.get(topic);
+            } else {
+                throw new IllegalStateException("DmaapTopiceSource for " + topic + " not found");
+            }
+        }
+    }
+
+    @Override
+    public synchronized List<DmaapTopicSource> inventory() {
+        return new ArrayList<>(this.dmaapTopicSources.values());
+    }
+
+    @Override
+    public String toString() {
+        return "IndexedDmaapTopicSourceFactory []";
+    }
+
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java
new file mode 100644 (file)
index 0000000..5b3fc66
--- /dev/null
@@ -0,0 +1,237 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Engine - Common Modules
+ * ================================================================================
+ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory of UEB Reader Topics indexed by topic name.
+ */
+class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
+    private static final String MISSING_TOPIC = "A topic must be provided";
+
+    /**
+     * Logger.
+     */
+    private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
+
+    /**
+     * UEB Topic Name Index.
+     */
+    protected HashMap<String, UebTopicSink> uebTopicSinks = new HashMap<>();
+
+    @Override
+    public UebTopicSink build(BusTopicParams busTopicParams) {
+
+        if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
+            throw new IllegalArgumentException("UEB Server(s) must be provided");
+        }
+
+        if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
+            throw new IllegalArgumentException(MISSING_TOPIC);
+        }
+
+        synchronized (this) {
+            if (uebTopicSinks.containsKey(busTopicParams.getTopic())) {
+                return uebTopicSinks.get(busTopicParams.getTopic());
+            }
+
+            UebTopicSink uebTopicWriter = makeSink(busTopicParams);
+
+            if (busTopicParams.isManaged()) {
+                uebTopicSinks.put(busTopicParams.getTopic(), uebTopicWriter);
+            }
+
+            return uebTopicWriter;
+        }
+    }
+
+
+    @Override
+    public UebTopicSink build(List<String> servers, String topic) {
+        return this.build(BusTopicParams.builder()
+                .servers(servers)
+                .topic(topic)
+                .managed(true)
+                .useHttps(false)
+                .allowSelfSignedCerts(false)
+                .build());
+    }
+
+
+    @Override
+    public List<UebTopicSink> build(Properties properties) {
+
+        String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS);
+        if (writeTopics == null || writeTopics.isEmpty()) {
+            logger.info("{}: no topic for UEB Sink", this);
+            return new ArrayList<>();
+        }
+
+        List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
+        List<UebTopicSink> newUebTopicSinks = new ArrayList<>();
+        synchronized (this) {
+            for (String topic : writeTopicList) {
+                if (this.uebTopicSinks.containsKey(topic)) {
+                    newUebTopicSinks.add(this.uebTopicSinks.get(topic));
+                    continue;
+                }
+
+                String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
+                        + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
+                if (servers == null || servers.isEmpty()) {
+                    logger.error("{}: no UEB servers configured for sink {}", this, topic);
+                    continue;
+                }
+
+                final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
+
+                final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS 
+                                + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
+                final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS 
+                                + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
+                final String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS 
+                                + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
+
+                String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
+                        + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
+                boolean managed = true;
+                if (managedString != null && !managedString.isEmpty()) {
+                    managed = Boolean.parseBoolean(managedString);
+                }
+
+                String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
+                        + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
+
+                // default is to use HTTP if no https property exists
+                boolean useHttps = false;
+                if (useHttpsString != null && !useHttpsString.isEmpty()) {
+                    useHttps = Boolean.parseBoolean(useHttpsString);
+                }
+
+
+                String allowSelfSignedCertsString =
+                        properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
+                                + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
+
+                // default is to disallow self-signed certs
+                boolean allowSelfSignedCerts = false;
+                if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
+                    allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
+                }
+
+                UebTopicSink uebTopicWriter = this.build(BusTopicParams.builder()
+                        .servers(serverList)
+                        .topic(topic)
+                        .apiKey(apiKey)
+                        .apiSecret(apiSecret)
+                        .partitionId(partitionKey)
+                        .managed(managed)
+                        .useHttps(useHttps)
+                        .allowSelfSignedCerts(allowSelfSignedCerts)
+                        .build());
+                newUebTopicSinks.add(uebTopicWriter);
+            }
+            return newUebTopicSinks;
+        }
+    }
+
+    @Override
+    public void destroy(String topic) {
+
+        if (topic == null || topic.isEmpty()) {
+            throw new IllegalArgumentException(MISSING_TOPIC);
+        }
+
+        UebTopicSink uebTopicWriter;
+        synchronized (this) {
+            if (!uebTopicSinks.containsKey(topic)) {
+                return;
+            }
+
+            uebTopicWriter = uebTopicSinks.remove(topic);
+        }
+
+        uebTopicWriter.shutdown();
+    }
+
+    @Override
+    public void destroy() {
+        List<UebTopicSink> writers = this.inventory();
+        for (UebTopicSink writer : writers) {
+            writer.shutdown();
+        }
+
+        synchronized (this) {
+            this.uebTopicSinks.clear();
+        }
+    }
+
+    @Override
+    public UebTopicSink get(String topic) {
+
+        if (topic == null || topic.isEmpty()) {
+            throw new IllegalArgumentException(MISSING_TOPIC);
+        }
+
+        synchronized (this) {
+            if (uebTopicSinks.containsKey(topic)) {
+                return uebTopicSinks.get(topic);
+            } else {
+                throw new IllegalStateException("UebTopicSink for " + topic + " not found");
+            }
+        }
+    }
+
+    @Override
+    public synchronized List<UebTopicSink> inventory() {
+        return new ArrayList<>(this.uebTopicSinks.values());
+    }
+
+    /**
+     * Makes a new sink.
+     * 
+     * @param busTopicParams parameters to use to configure the sink
+     * @return a new sink
+     */
+    protected UebTopicSink makeSink(BusTopicParams busTopicParams) {
+        return new InlineUebTopicSink(busTopicParams);
+    }
+
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("IndexedUebTopicSinkFactory []");
+        return builder.toString();
+    }
+
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java
new file mode 100644 (file)
index 0000000..88a472c
--- /dev/null
@@ -0,0 +1,274 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Engine - Common Modules
+ * ================================================================================
+ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory of UEB Source Topics indexed by topic name.
+ */
+class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
+    private static final String MISSING_TOPIC = "A topic must be provided";
+
+    /**
+     * Logger.
+     */
+    private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class);
+
+    /**
+     * UEB Topic Name Index.
+     */
+    protected HashMap<String, UebTopicSource> uebTopicSources = new HashMap<>();
+
+    @Override
+    public UebTopicSource build(BusTopicParams busTopicParams) {
+        if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
+            throw new IllegalArgumentException("UEB Server(s) must be provided");
+        }
+
+        if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
+            throw new IllegalArgumentException(MISSING_TOPIC);
+        }
+
+        synchronized (this) {
+            if (uebTopicSources.containsKey(busTopicParams.getTopic())) {
+                return uebTopicSources.get(busTopicParams.getTopic());
+            }
+
+            UebTopicSource uebTopicSource = makeSource(busTopicParams);
+
+            if (busTopicParams.isManaged()) {
+                uebTopicSources.put(busTopicParams.getTopic(), uebTopicSource);
+            }
+
+            return uebTopicSource;
+        }
+    }
+
+    @Override
+    public List<UebTopicSource> build(Properties properties) {
+
+        String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS);
+        if (readTopics == null || readTopics.isEmpty()) {
+            logger.info("{}: no topic for UEB Source", this);
+            return new ArrayList<>();
+        }
+        List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
+
+        List<UebTopicSource> newUebTopicSources = new ArrayList<>();
+        synchronized (this) {
+            for (String topic : readTopicList) {
+                if (this.uebTopicSources.containsKey(topic)) {
+                    newUebTopicSources.add(this.uebTopicSources.get(topic));
+                    continue;
+                }
+
+                String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
+                        + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
+
+                if (servers == null || servers.isEmpty()) {
+                    logger.error("{}: no UEB servers configured for sink {}", this, topic);
+                    continue;
+                }
+
+                final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
+
+                final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS 
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
+
+                final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS 
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
+
+                final String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS 
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
+
+                final String consumerInstance = properties.getProperty(
+                        PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
+
+                String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
+                int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
+                if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
+                    try {
+                        fetchTimeout = Integer.parseInt(fetchTimeoutString);
+                    } catch (NumberFormatException nfe) {
+                        logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
+                                topic);
+                    }
+                }
+
+                String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
+                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
+                int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH;
+                if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
+                    try {
+                        fetchLimit = Integer.parseInt(fetchLimitString);
+                    } catch (NumberFormatException nfe) {
+                        logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
+                                topic);
+                    }
+                }
+
+                String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
+                        + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
+                boolean managed = true;
+                if (managedString != null && !managedString.isEmpty()) {
+                    managed = Boolean.parseBoolean(managedString);
+                }
+
+                String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
+                        + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
+
+                // default is to use HTTP if no https property exists
+                boolean useHttps = false;
+                if (useHttpsString != null && !useHttpsString.isEmpty()) {
+                    useHttps = Boolean.parseBoolean(useHttpsString);
+                }
+
+                String allowSelfSignedCertsString =
+                        properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+                                + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
+
+                // default is to disallow self-signed certs
+                boolean allowSelfSignedCerts = false;
+                if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
+                    allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
+                }
+
+                UebTopicSource uebTopicSource = this.build(BusTopicParams.builder()
+                        .servers(serverList)
+                        .topic(topic)
+                        .apiKey(apiKey)
+                        .apiSecret(apiSecret)
+                        .consumerGroup(consumerGroup)
+                        .consumerInstance(consumerInstance)
+                        .fetchTimeout(fetchTimeout)
+                        .fetchLimit(fetchLimit)
+                        .managed(managed)
+                        .useHttps(useHttps)
+                        .allowSelfSignedCerts(allowSelfSignedCerts).build());
+                newUebTopicSources.add(uebTopicSource);
+            }
+        }
+        return newUebTopicSources;
+    }
+
+    @Override
+    public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
+
+        return this.build(BusTopicParams.builder()
+                .servers(servers)
+                .topic(topic)
+                .apiKey(apiKey)
+                .apiSecret(apiSecret)
+                .fetchTimeout(UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH)
+                .fetchLimit(UebTopicSource.DEFAULT_LIMIT_FETCH)
+                .managed(true)
+                .useHttps(false)
+                .allowSelfSignedCerts(true).build());
+    }
+
+    @Override
+    public UebTopicSource build(List<String> servers, String topic) {
+        return this.build(servers, topic, null, null);
+    }
+
+    /**
+     * Makes a new source.
+     * 
+     * @param busTopicParams parameters to use to configure the source
+     * @return a new source
+     */
+    protected UebTopicSource makeSource(BusTopicParams busTopicParams) {
+        return new SingleThreadedUebTopicSource(busTopicParams);
+    }
+
+    @Override
+    public void destroy(String topic) {
+
+        if (topic == null || topic.isEmpty()) {
+            throw new IllegalArgumentException(MISSING_TOPIC);
+        }
+
+        UebTopicSource uebTopicSource;
+
+        synchronized (this) {
+            if (!uebTopicSources.containsKey(topic)) {
+                return;
+            }
+
+            uebTopicSource = uebTopicSources.remove(topic);
+        }
+
+        uebTopicSource.shutdown();
+    }
+
+    @Override
+    public void destroy() {
+        List<UebTopicSource> readers = this.inventory();
+        for (UebTopicSource reader : readers) {
+            reader.shutdown();
+        }
+
+        synchronized (this) {
+            this.uebTopicSources.clear();
+        }
+    }
+
+    @Override
+    public UebTopicSource get(String topic) {
+
+        if (topic == null || topic.isEmpty()) {
+            throw new IllegalArgumentException(MISSING_TOPIC);
+        }
+
+        synchronized (this) {
+            if (uebTopicSources.containsKey(topic)) {
+                return uebTopicSources.get(topic);
+            } else {
+                throw new IllegalStateException("UebTopiceSource for " + topic + " not found");
+            }
+        }
+    }
+
+    @Override
+    public synchronized List<UebTopicSource> inventory() {
+        return new ArrayList<>(this.uebTopicSources.values());
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("IndexedUebTopicSourceFactory []");
+        return builder.toString();
+    }
+}
index c200af5..0cf095f 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
 
 package org.onap.policy.common.endpoints.event.comm.bus;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
-
 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * UEB Topic Sink Factory.
@@ -98,210 +90,3 @@ public interface UebTopicSinkFactory {
      */
     List<UebTopicSink> inventory();
 }
-
-
-/* ------------- implementation ----------------- */
-
-/**
- * Factory of UEB Reader Topics indexed by topic name.
- */
-class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
-    private static final String MISSING_TOPIC = "A topic must be provided";
-
-    /**
-     * Logger.
-     */
-    private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
-
-    /**
-     * UEB Topic Name Index.
-     */
-    protected HashMap<String, UebTopicSink> uebTopicSinks = new HashMap<>();
-
-    @Override
-    public UebTopicSink build(BusTopicParams busTopicParams) {
-
-        if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
-            throw new IllegalArgumentException("UEB Server(s) must be provided");
-        }
-
-        if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
-            throw new IllegalArgumentException(MISSING_TOPIC);
-        }
-
-        synchronized (this) {
-            if (uebTopicSinks.containsKey(busTopicParams.getTopic())) {
-                return uebTopicSinks.get(busTopicParams.getTopic());
-            }
-
-            UebTopicSink uebTopicWriter = makeSink(busTopicParams);
-
-            if (busTopicParams.isManaged()) {
-                uebTopicSinks.put(busTopicParams.getTopic(), uebTopicWriter);
-            }
-
-            return uebTopicWriter;
-        }
-    }
-
-
-    @Override
-    public UebTopicSink build(List<String> servers, String topic) {
-        return this.build(BusTopicParams.builder()
-                .servers(servers)
-                .topic(topic)
-                .managed(true)
-                .useHttps(false)
-                .allowSelfSignedCerts(false)
-                .build());
-    }
-
-
-    @Override
-    public List<UebTopicSink> build(Properties properties) {
-
-        String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS);
-        if (writeTopics == null || writeTopics.isEmpty()) {
-            logger.info("{}: no topic for UEB Sink", this);
-            return new ArrayList<>();
-        }
-
-        List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
-        List<UebTopicSink> newUebTopicSinks = new ArrayList<>();
-        synchronized (this) {
-            for (String topic : writeTopicList) {
-                if (this.uebTopicSinks.containsKey(topic)) {
-                    newUebTopicSinks.add(this.uebTopicSinks.get(topic));
-                    continue;
-                }
-
-                String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
-                        + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
-                if (servers == null || servers.isEmpty()) {
-                    logger.error("{}: no UEB servers configured for sink {}", this, topic);
-                    continue;
-                }
-
-                final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
-
-                final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS 
-                                + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
-                final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS 
-                                + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
-                final String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS 
-                                + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
-
-                String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
-                        + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
-                boolean managed = true;
-                if (managedString != null && !managedString.isEmpty()) {
-                    managed = Boolean.parseBoolean(managedString);
-                }
-
-                String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
-                        + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
-
-                // default is to use HTTP if no https property exists
-                boolean useHttps = false;
-                if (useHttpsString != null && !useHttpsString.isEmpty()) {
-                    useHttps = Boolean.parseBoolean(useHttpsString);
-                }
-
-
-                String allowSelfSignedCertsString =
-                        properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
-                                + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
-
-                // default is to disallow self-signed certs
-                boolean allowSelfSignedCerts = false;
-                if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
-                    allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
-                }
-
-                UebTopicSink uebTopicWriter = this.build(BusTopicParams.builder()
-                        .servers(serverList)
-                        .topic(topic)
-                        .apiKey(apiKey)
-                        .apiSecret(apiSecret)
-                        .partitionId(partitionKey)
-                        .managed(managed)
-                        .useHttps(useHttps)
-                        .allowSelfSignedCerts(allowSelfSignedCerts)
-                        .build());
-                newUebTopicSinks.add(uebTopicWriter);
-            }
-            return newUebTopicSinks;
-        }
-    }
-
-    @Override
-    public void destroy(String topic) {
-
-        if (topic == null || topic.isEmpty()) {
-            throw new IllegalArgumentException(MISSING_TOPIC);
-        }
-
-        UebTopicSink uebTopicWriter;
-        synchronized (this) {
-            if (!uebTopicSinks.containsKey(topic)) {
-                return;
-            }
-
-            uebTopicWriter = uebTopicSinks.remove(topic);
-        }
-
-        uebTopicWriter.shutdown();
-    }
-
-    @Override
-    public void destroy() {
-        List<UebTopicSink> writers = this.inventory();
-        for (UebTopicSink writer : writers) {
-            writer.shutdown();
-        }
-
-        synchronized (this) {
-            this.uebTopicSinks.clear();
-        }
-    }
-
-    @Override
-    public UebTopicSink get(String topic) {
-
-        if (topic == null || topic.isEmpty()) {
-            throw new IllegalArgumentException(MISSING_TOPIC);
-        }
-
-        synchronized (this) {
-            if (uebTopicSinks.containsKey(topic)) {
-                return uebTopicSinks.get(topic);
-            } else {
-                throw new IllegalStateException("UebTopicSink for " + topic + " not found");
-            }
-        }
-    }
-
-    @Override
-    public synchronized List<UebTopicSink> inventory() {
-        return new ArrayList<>(this.uebTopicSinks.values());
-    }
-
-    /**
-     * Makes a new sink.
-     * 
-     * @param busTopicParams parameters to use to configure the sink
-     * @return a new sink
-     */
-    protected UebTopicSink makeSink(BusTopicParams busTopicParams) {
-        return new InlineUebTopicSink(busTopicParams);
-    }
-
-
-    @Override
-    public String toString() {
-        StringBuilder builder = new StringBuilder();
-        builder.append("IndexedUebTopicSinkFactory []");
-        return builder.toString();
-    }
-
-}
index 9631588..beacee3 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
 
 package org.onap.policy.common.endpoints.event.comm.bus;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
-
 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * UEB Topic Source Factory.
@@ -110,247 +102,3 @@ public interface UebTopicSourceFactory {
      */
     List<UebTopicSource> inventory();
 }
-
-
-/* ------------- implementation ----------------- */
-
-/**
- * Factory of UEB Source Topics indexed by topic name.
- */
-class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
-    private static final String MISSING_TOPIC = "A topic must be provided";
-
-    /**
-     * Logger.
-     */
-    private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class);
-
-    /**
-     * UEB Topic Name Index.
-     */
-    protected HashMap<String, UebTopicSource> uebTopicSources = new HashMap<>();
-
-    @Override
-    public UebTopicSource build(BusTopicParams busTopicParams) {
-        if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
-            throw new IllegalArgumentException("UEB Server(s) must be provided");
-        }
-
-        if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
-            throw new IllegalArgumentException(MISSING_TOPIC);
-        }
-
-        synchronized (this) {
-            if (uebTopicSources.containsKey(busTopicParams.getTopic())) {
-                return uebTopicSources.get(busTopicParams.getTopic());
-            }
-
-            UebTopicSource uebTopicSource = makeSource(busTopicParams);
-
-            if (busTopicParams.isManaged()) {
-                uebTopicSources.put(busTopicParams.getTopic(), uebTopicSource);
-            }
-
-            return uebTopicSource;
-        }
-    }
-
-    @Override
-    public List<UebTopicSource> build(Properties properties) {
-
-        String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS);
-        if (readTopics == null || readTopics.isEmpty()) {
-            logger.info("{}: no topic for UEB Source", this);
-            return new ArrayList<>();
-        }
-        List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
-
-        List<UebTopicSource> newUebTopicSources = new ArrayList<>();
-        synchronized (this) {
-            for (String topic : readTopicList) {
-                if (this.uebTopicSources.containsKey(topic)) {
-                    newUebTopicSources.add(this.uebTopicSources.get(topic));
-                    continue;
-                }
-
-                String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
-                        + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
-
-                if (servers == null || servers.isEmpty()) {
-                    logger.error("{}: no UEB servers configured for sink {}", this, topic);
-                    continue;
-                }
-
-                final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
-
-                final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS 
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
-
-                final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS 
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
-
-                final String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS 
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
-
-                final String consumerInstance = properties.getProperty(
-                        PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
-
-                String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
-                int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
-                if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
-                    try {
-                        fetchTimeout = Integer.parseInt(fetchTimeoutString);
-                    } catch (NumberFormatException nfe) {
-                        logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
-                                topic);
-                    }
-                }
-
-                String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
-                        + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
-                int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH;
-                if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
-                    try {
-                        fetchLimit = Integer.parseInt(fetchLimitString);
-                    } catch (NumberFormatException nfe) {
-                        logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
-                                topic);
-                    }
-                }
-
-                String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
-                        + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
-                boolean managed = true;
-                if (managedString != null && !managedString.isEmpty()) {
-                    managed = Boolean.parseBoolean(managedString);
-                }
-
-                String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
-                        + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
-
-                // default is to use HTTP if no https property exists
-                boolean useHttps = false;
-                if (useHttpsString != null && !useHttpsString.isEmpty()) {
-                    useHttps = Boolean.parseBoolean(useHttpsString);
-                }
-
-                String allowSelfSignedCertsString =
-                        properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
-                                + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
-
-                // default is to disallow self-signed certs
-                boolean allowSelfSignedCerts = false;
-                if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
-                    allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
-                }
-
-                UebTopicSource uebTopicSource = this.build(BusTopicParams.builder()
-                        .servers(serverList)
-                        .topic(topic)
-                        .apiKey(apiKey)
-                        .apiSecret(apiSecret)
-                        .consumerGroup(consumerGroup)
-                        .consumerInstance(consumerInstance)
-                        .fetchTimeout(fetchTimeout)
-                        .fetchLimit(fetchLimit)
-                        .managed(managed)
-                        .useHttps(useHttps)
-                        .allowSelfSignedCerts(allowSelfSignedCerts).build());
-                newUebTopicSources.add(uebTopicSource);
-            }
-        }
-        return newUebTopicSources;
-    }
-
-    @Override
-    public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
-
-        return this.build(BusTopicParams.builder()
-                .servers(servers)
-                .topic(topic)
-                .apiKey(apiKey)
-                .apiSecret(apiSecret)
-                .fetchTimeout(UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH)
-                .fetchLimit(UebTopicSource.DEFAULT_LIMIT_FETCH)
-                .managed(true)
-                .useHttps(false)
-                .allowSelfSignedCerts(true).build());
-    }
-
-    @Override
-    public UebTopicSource build(List<String> servers, String topic) {
-        return this.build(servers, topic, null, null);
-    }
-
-    /**
-     * Makes a new source.
-     * 
-     * @param busTopicParams parameters to use to configure the source
-     * @return a new source
-     */
-    protected UebTopicSource makeSource(BusTopicParams busTopicParams) {
-        return new SingleThreadedUebTopicSource(busTopicParams);
-    }
-
-    @Override
-    public void destroy(String topic) {
-
-        if (topic == null || topic.isEmpty()) {
-            throw new IllegalArgumentException(MISSING_TOPIC);
-        }
-
-        UebTopicSource uebTopicSource;
-
-        synchronized (this) {
-            if (!uebTopicSources.containsKey(topic)) {
-                return;
-            }
-
-            uebTopicSource = uebTopicSources.remove(topic);
-        }
-
-        uebTopicSource.shutdown();
-    }
-
-    @Override
-    public void destroy() {
-        List<UebTopicSource> readers = this.inventory();
-        for (UebTopicSource reader : readers) {
-            reader.shutdown();
-        }
-
-        synchronized (this) {
-            this.uebTopicSources.clear();
-        }
-    }
-
-    @Override
-    public UebTopicSource get(String topic) {
-
-        if (topic == null || topic.isEmpty()) {
-            throw new IllegalArgumentException(MISSING_TOPIC);
-        }
-
-        synchronized (this) {
-            if (uebTopicSources.containsKey(topic)) {
-                return uebTopicSources.get(topic);
-            } else {
-                throw new IllegalStateException("UebTopiceSource for " + topic + " not found");
-            }
-        }
-    }
-
-    @Override
-    public synchronized List<UebTopicSource> inventory() {
-        return new ArrayList<>(this.uebTopicSources.values());
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder builder = new StringBuilder();
-        builder.append("IndexedUebTopicSourceFactory []");
-        return builder.toString();
-    }
-}
index ca10680..133dfae 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -23,18 +23,9 @@ package org.onap.policy.common.endpoints.http.client;
 
 import java.security.KeyManagementException;
 import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
-
-import org.apache.commons.lang3.StringUtils;
 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-import org.onap.policy.common.endpoints.http.client.internal.JerseyClient;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Http Client Factory.
@@ -76,149 +67,3 @@ public interface HttpClientFactory {
 
     public void destroy();
 }
-
-
-/**
- * HTTP client factory implementation indexed by name.
- */
-class IndexedHttpClientFactory implements HttpClientFactory {
-
-    /**
-     * Logger.
-     */
-    private static Logger logger = LoggerFactory.getLogger(IndexedHttpClientFactory.class);
-
-    protected HashMap<String, HttpClient> clients = new HashMap<>();
-
-    @Override
-    public synchronized HttpClient build(BusTopicParams busTopicParams)
-            throws KeyManagementException, NoSuchAlgorithmException {
-        if (clients.containsKey(busTopicParams.getClientName())) {
-            return clients.get(busTopicParams.getClientName());
-        }
-
-        JerseyClient client =
-                new JerseyClient(busTopicParams);
-
-        if (busTopicParams.isManaged()) {
-            clients.put(busTopicParams.getClientName(), client);
-        }
-
-        return client;
-    }
-
-    @Override
-    public synchronized List<HttpClient> build(Properties properties)
-            throws KeyManagementException, NoSuchAlgorithmException {
-        ArrayList<HttpClient> clientList = new ArrayList<>();
-
-        String clientNames = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES);
-        if (clientNames == null || clientNames.isEmpty()) {
-            return clientList;
-        }
-
-        List<String> clientNameList = new ArrayList<>(Arrays.asList(clientNames.split("\\s*,\\s*")));
-
-        for (String clientName : clientNameList) {
-            String httpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "."
-                    + clientName + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
-            boolean https = false;
-            if (StringUtils.isNotBlank(httpsString)) {
-                https = Boolean.parseBoolean(httpsString);
-            }
-
-            String hostName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "."
-                    + clientName + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX);
-
-            String servicePortString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES
-                    + "." + clientName + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX);
-            int port;
-            try {
-                if (servicePortString == null || servicePortString.isEmpty()) {
-                    continue;
-                }
-                port = Integer.parseInt(servicePortString);
-            } catch (NumberFormatException nfe) {
-                logger.error("http-client-factory: cannot parse port {}", servicePortString, nfe);
-                continue;
-            }
-
-            String baseUrl = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "."
-                    + clientName + PolicyEndPointProperties.PROPERTY_HTTP_URL_SUFFIX);
-
-            String userName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "."
-                    + clientName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX);
-
-            String password = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "."
-                    + clientName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX);
-
-            String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "."
-                    + clientName + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
-            boolean managed = true;
-            if (managedString != null && !managedString.isEmpty()) {
-                managed = Boolean.parseBoolean(managedString);
-            }
-
-            try {
-                HttpClient client =
-                        this.build(BusTopicParams.builder()
-                                .clientName(clientName)
-                                .useHttps(https)
-                                .allowSelfSignedCerts(https)
-                                .hostname(hostName)
-                                .port(port)
-                                .basePath(baseUrl)
-                                .userName(userName)
-                                .password(password)
-                                .managed(managed)
-                                .build());
-                clientList.add(client);
-            } catch (Exception e) {
-                logger.error("http-client-factory: cannot build client {}", clientName, e);
-            }
-        }
-
-        return clientList;
-    }
-
-    @Override
-    public synchronized HttpClient get(String name) {
-        if (clients.containsKey(name)) {
-            return clients.get(name);
-        }
-
-        throw new IllegalArgumentException("Http Client " + name + " not found");
-    }
-
-    @Override
-    public synchronized List<HttpClient> inventory() {
-        return new ArrayList<>(this.clients.values());
-    }
-
-    @Override
-    public synchronized void destroy(String name) {
-        if (!clients.containsKey(name)) {
-            return;
-        }
-
-        HttpClient client = clients.remove(name);
-        try {
-            client.shutdown();
-        } catch (IllegalStateException e) {
-            logger.error("http-client-factory: cannot shutdown client {}", client, e);
-        }
-    }
-
-    @Override
-    public void destroy() {
-        List<HttpClient> clientsInventory = this.inventory();
-        for (HttpClient client : clientsInventory) {
-            client.shutdown();
-        }
-
-        synchronized (this) {
-            this.clients.clear();
-        }
-    }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/IndexedHttpClientFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/IndexedHttpClientFactory.java
new file mode 100644 (file)
index 0000000..d4d4a28
--- /dev/null
@@ -0,0 +1,181 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Engine - Common Modules
+ * ================================================================================
+ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.http.client;
+
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
+import org.onap.policy.common.endpoints.http.client.internal.JerseyClient;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HTTP client factory implementation indexed by name.
+ */
+class IndexedHttpClientFactory implements HttpClientFactory {
+
+    /**
+     * Logger.
+     */
+    private static Logger logger = LoggerFactory.getLogger(IndexedHttpClientFactory.class);
+
+    protected HashMap<String, HttpClient> clients = new HashMap<>();
+
+    @Override
+    public synchronized HttpClient build(BusTopicParams busTopicParams)
+            throws KeyManagementException, NoSuchAlgorithmException {
+        if (clients.containsKey(busTopicParams.getClientName())) {
+            return clients.get(busTopicParams.getClientName());
+        }
+
+        JerseyClient client =
+                new JerseyClient(busTopicParams);
+
+        if (busTopicParams.isManaged()) {
+            clients.put(busTopicParams.getClientName(), client);
+        }
+
+        return client;
+    }
+
+    @Override
+    public synchronized List<HttpClient> build(Properties properties)
+            throws KeyManagementException, NoSuchAlgorithmException {
+        ArrayList<HttpClient> clientList = new ArrayList<>();
+
+        String clientNames = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES);
+        if (clientNames == null || clientNames.isEmpty()) {
+            return clientList;
+        }
+
+        List<String> clientNameList = new ArrayList<>(Arrays.asList(clientNames.split("\\s*,\\s*")));
+
+        for (String clientName : clientNameList) {
+            String httpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "."
+                    + clientName + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
+            boolean https = false;
+            if (StringUtils.isNotBlank(httpsString)) {
+                https = Boolean.parseBoolean(httpsString);
+            }
+
+            String hostName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "."
+                    + clientName + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX);
+
+            String servicePortString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES
+                    + "." + clientName + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX);
+            int port;
+            try {
+                if (servicePortString == null || servicePortString.isEmpty()) {
+                    continue;
+                }
+                port = Integer.parseInt(servicePortString);
+            } catch (NumberFormatException nfe) {
+                logger.error("http-client-factory: cannot parse port {}", servicePortString, nfe);
+                continue;
+            }
+
+            String baseUrl = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "."
+                    + clientName + PolicyEndPointProperties.PROPERTY_HTTP_URL_SUFFIX);
+
+            String userName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "."
+                    + clientName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX);
+
+            String password = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "."
+                    + clientName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX);
+
+            String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "."
+                    + clientName + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
+            boolean managed = true;
+            if (managedString != null && !managedString.isEmpty()) {
+                managed = Boolean.parseBoolean(managedString);
+            }
+
+            try {
+                HttpClient client =
+                        this.build(BusTopicParams.builder()
+                                .clientName(clientName)
+                                .useHttps(https)
+                                .allowSelfSignedCerts(https)
+                                .hostname(hostName)
+                                .port(port)
+                                .basePath(baseUrl)
+                                .userName(userName)
+                                .password(password)
+                                .managed(managed)
+                                .build());
+                clientList.add(client);
+            } catch (Exception e) {
+                logger.error("http-client-factory: cannot build client {}", clientName, e);
+            }
+        }
+
+        return clientList;
+    }
+
+    @Override
+    public synchronized HttpClient get(String name) {
+        if (clients.containsKey(name)) {
+            return clients.get(name);
+        }
+
+        throw new IllegalArgumentException("Http Client " + name + " not found");
+    }
+
+    @Override
+    public synchronized List<HttpClient> inventory() {
+        return new ArrayList<>(this.clients.values());
+    }
+
+    @Override
+    public synchronized void destroy(String name) {
+        if (!clients.containsKey(name)) {
+            return;
+        }
+
+        HttpClient client = clients.remove(name);
+        try {
+            client.shutdown();
+        } catch (IllegalStateException e) {
+            logger.error("http-client-factory: cannot shutdown client {}", client, e);
+        }
+    }
+
+    @Override
+    public void destroy() {
+        List<HttpClient> clientsInventory = this.inventory();
+        for (HttpClient client : clientsInventory) {
+            client.shutdown();
+        }
+
+        synchronized (this) {
+            this.clients.clear();
+        }
+    }
+
+}
index 527ada9..0c30e3e 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.policy.common.endpoints.http.server;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
-import org.onap.policy.common.endpoints.http.server.internal.JettyJerseyServer;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Factory of HTTP Servlet-Enabled Servlets.
@@ -101,208 +94,3 @@ public interface HttpServletServerFactory {
      */
     void destroy();
 }
-
-/**
- * Indexed factory implementation.
- */
-class IndexedHttpServletServerFactory implements HttpServletServerFactory {
-
-    private static final String SPACES_COMMA_SPACES = "\\s*,\\s*";
-
-    /**
-     * logger.
-     */
-    protected static Logger logger = LoggerFactory.getLogger(IndexedHttpServletServerFactory.class);
-
-    /**
-     * servers index.
-     */
-    protected HashMap<Integer, HttpServletServer> servers = new HashMap<>();
-
-    @Override
-    public synchronized HttpServletServer build(String name, boolean https, String host, int port, String contextPath,
-        boolean swagger, boolean managed) {
-
-        if (servers.containsKey(port)) {
-            return servers.get(port);
-        }
-
-        JettyJerseyServer server = new JettyJerseyServer(name, https, host, port, contextPath, swagger);
-        if (managed) {
-            servers.put(port, server);
-        }
-
-        return server;
-    }
-
-    @Override
-    public synchronized HttpServletServer build(String name, String host, int port, String contextPath, boolean swagger,
-        boolean managed) {
-        return build(name, false, host, port, contextPath, swagger, managed);
-    }
-
-    @Override
-    public synchronized List<HttpServletServer> build(Properties properties) {
-
-        ArrayList<HttpServletServer> serviceList = new ArrayList<>();
-
-        String serviceNames = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES);
-        if (serviceNames == null || serviceNames.isEmpty()) {
-            logger.warn("No topic for HTTP Service: {}", properties);
-            return serviceList;
-        }
-
-        List<String> serviceNameList = Arrays.asList(serviceNames.split(SPACES_COMMA_SPACES));
-
-        for (String serviceName : serviceNameList) {
-            String servicePortString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
-                + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX);
-
-            int servicePort;
-            try {
-                if (servicePortString == null || servicePortString.isEmpty()) {
-                    if (logger.isWarnEnabled()) {
-                        logger.warn("No HTTP port for service in {}", serviceName);
-                    }
-                    continue;
-                }
-                servicePort = Integer.parseInt(servicePortString);
-            } catch (NumberFormatException nfe) {
-                if (logger.isWarnEnabled()) {
-                    logger.warn("No HTTP port for service in {}", serviceName);
-                }
-                continue;
-            }
-
-            final String hostName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
-                + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX);
-
-            final String contextUriPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
-                + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_CONTEXT_URIPATH_SUFFIX);
-
-            final String userName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
-                + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX);
-
-            final String password = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
-                + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX);
-
-            final String authUriPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
-                + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_URIPATH_SUFFIX);
-
-            final String restClasses = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
-                + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX);
-
-            final String filterClasses = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
-                + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_FILTER_CLASSES_SUFFIX);
-
-            final String restPackages = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
-                + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_REST_PACKAGES_SUFFIX);
-
-            final String restUriPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
-                + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_REST_URIPATH_SUFFIX);
-
-            final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
-                + "." + serviceName + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
-            boolean managed = true;
-            if (managedString != null && !managedString.isEmpty()) {
-                managed = Boolean.parseBoolean(managedString);
-            }
-
-            String swaggerString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
-                + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX);
-            boolean swagger = false;
-            if (swaggerString != null && !swaggerString.isEmpty()) {
-                swagger = Boolean.parseBoolean(swaggerString);
-            }
-
-            String httpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
-                + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
-            boolean https = false;
-            if (httpsString != null && !httpsString.isEmpty()) {
-                https = Boolean.parseBoolean(httpsString);
-            }
-
-            String aafString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
-                + serviceName + PolicyEndPointProperties.PROPERTY_AAF_SUFFIX);
-            boolean aaf = false;
-            if (aafString != null && !aafString.isEmpty()) {
-                aaf = Boolean.parseBoolean(aafString);
-            }
-
-            HttpServletServer service = build(serviceName, https, hostName, servicePort, contextUriPath, swagger,
-                managed);
-
-            /* authentication method either AAF or HTTP Basic Auth */
-
-            if (aaf) {
-                service.setAafAuthentication(contextUriPath);
-            } else if (userName != null && !userName.isEmpty() && password != null && !password.isEmpty()) {
-                service.setBasicAuthentication(userName, password, authUriPath);
-            }
-
-            if (filterClasses != null && !filterClasses.isEmpty()) {
-                List<String> filterClassesList = Arrays.asList(filterClasses.split(SPACES_COMMA_SPACES));
-                for (String filterClass : filterClassesList) {
-                    service.addFilterClass(restUriPath, filterClass);
-                }
-            }
-
-            if (restClasses != null && !restClasses.isEmpty()) {
-                List<String> restClassesList = Arrays.asList(restClasses.split(SPACES_COMMA_SPACES));
-                for (String restClass : restClassesList) {
-                    service.addServletClass(restUriPath, restClass);
-                }
-            }
-
-            if (restPackages != null && !restPackages.isEmpty()) {
-                List<String> restPackageList = Arrays.asList(restPackages.split(SPACES_COMMA_SPACES));
-                for (String restPackage : restPackageList) {
-                    service.addServletPackage(restUriPath, restPackage);
-                }
-            }
-
-            serviceList.add(service);
-        }
-
-        return serviceList;
-    }
-
-    @Override
-    public synchronized HttpServletServer get(int port) {
-
-        if (servers.containsKey(port)) {
-            return servers.get(port);
-        }
-
-        throw new IllegalArgumentException("Http Server for " + port + " not found");
-    }
-
-    @Override
-    public synchronized List<HttpServletServer> inventory() {
-        return new ArrayList<>(this.servers.values());
-    }
-
-    @Override
-    public synchronized void destroy(int port) {
-
-        if (!servers.containsKey(port)) {
-            return;
-        }
-
-        HttpServletServer server = servers.remove(port);
-        server.shutdown();
-    }
-
-    @Override
-    public synchronized void destroy() {
-        List<HttpServletServer> httpServletServers = this.inventory();
-        for (HttpServletServer server : httpServletServers) {
-            server.shutdown();
-        }
-
-        synchronized (this) {
-            this.servers.clear();
-        }
-    }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java
new file mode 100644 (file)
index 0000000..ad8ef99
--- /dev/null
@@ -0,0 +1,236 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Engine - Common Modules
+ * ================================================================================
+ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.http.server;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+import org.onap.policy.common.endpoints.http.server.internal.JettyJerseyServer;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Indexed factory implementation.
+ */
+class IndexedHttpServletServerFactory implements HttpServletServerFactory {
+
+    private static final String SPACES_COMMA_SPACES = "\\s*,\\s*";
+
+    /**
+     * logger.
+     */
+    protected static Logger logger = LoggerFactory.getLogger(IndexedHttpServletServerFactory.class);
+
+    /**
+     * servers index.
+     */
+    protected HashMap<Integer, HttpServletServer> servers = new HashMap<>();
+
+    @Override
+    public synchronized HttpServletServer build(String name, boolean https, String host, int port, String contextPath,
+        boolean swagger, boolean managed) {
+
+        if (servers.containsKey(port)) {
+            return servers.get(port);
+        }
+
+        JettyJerseyServer server = new JettyJerseyServer(name, https, host, port, contextPath, swagger);
+        if (managed) {
+            servers.put(port, server);
+        }
+
+        return server;
+    }
+
+    @Override
+    public synchronized HttpServletServer build(String name, String host, int port, String contextPath, boolean swagger,
+        boolean managed) {
+        return build(name, false, host, port, contextPath, swagger, managed);
+    }
+
+    @Override
+    public synchronized List<HttpServletServer> build(Properties properties) {
+
+        ArrayList<HttpServletServer> serviceList = new ArrayList<>();
+
+        String serviceNames = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES);
+        if (serviceNames == null || serviceNames.isEmpty()) {
+            logger.warn("No topic for HTTP Service: {}", properties);
+            return serviceList;
+        }
+
+        List<String> serviceNameList = Arrays.asList(serviceNames.split(SPACES_COMMA_SPACES));
+
+        for (String serviceName : serviceNameList) {
+            String servicePortString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
+                + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX);
+
+            int servicePort;
+            try {
+                if (servicePortString == null || servicePortString.isEmpty()) {
+                    if (logger.isWarnEnabled()) {
+                        logger.warn("No HTTP port for service in {}", serviceName);
+                    }
+                    continue;
+                }
+                servicePort = Integer.parseInt(servicePortString);
+            } catch (NumberFormatException nfe) {
+                if (logger.isWarnEnabled()) {
+                    logger.warn("No HTTP port for service in {}", serviceName);
+                }
+                continue;
+            }
+
+            final String hostName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
+                + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX);
+
+            final String contextUriPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
+                + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_CONTEXT_URIPATH_SUFFIX);
+
+            final String userName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
+                + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX);
+
+            final String password = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
+                + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX);
+
+            final String authUriPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
+                + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_URIPATH_SUFFIX);
+
+            final String restClasses = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
+                + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX);
+
+            final String filterClasses = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
+                + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_FILTER_CLASSES_SUFFIX);
+
+            final String restPackages = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
+                + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_REST_PACKAGES_SUFFIX);
+
+            final String restUriPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
+                + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_REST_URIPATH_SUFFIX);
+
+            final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
+                + "." + serviceName + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
+            boolean managed = true;
+            if (managedString != null && !managedString.isEmpty()) {
+                managed = Boolean.parseBoolean(managedString);
+            }
+
+            String swaggerString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
+                + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX);
+            boolean swagger = false;
+            if (swaggerString != null && !swaggerString.isEmpty()) {
+                swagger = Boolean.parseBoolean(swaggerString);
+            }
+
+            String httpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
+                + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
+            boolean https = false;
+            if (httpsString != null && !httpsString.isEmpty()) {
+                https = Boolean.parseBoolean(httpsString);
+            }
+
+            String aafString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
+                + serviceName + PolicyEndPointProperties.PROPERTY_AAF_SUFFIX);
+            boolean aaf = false;
+            if (aafString != null && !aafString.isEmpty()) {
+                aaf = Boolean.parseBoolean(aafString);
+            }
+
+            HttpServletServer service = build(serviceName, https, hostName, servicePort, contextUriPath, swagger,
+                managed);
+
+            /* authentication method either AAF or HTTP Basic Auth */
+
+            if (aaf) {
+                service.setAafAuthentication(contextUriPath);
+            } else if (userName != null && !userName.isEmpty() && password != null && !password.isEmpty()) {
+                service.setBasicAuthentication(userName, password, authUriPath);
+            }
+
+            if (filterClasses != null && !filterClasses.isEmpty()) {
+                List<String> filterClassesList = Arrays.asList(filterClasses.split(SPACES_COMMA_SPACES));
+                for (String filterClass : filterClassesList) {
+                    service.addFilterClass(restUriPath, filterClass);
+                }
+            }
+
+            if (restClasses != null && !restClasses.isEmpty()) {
+                List<String> restClassesList = Arrays.asList(restClasses.split(SPACES_COMMA_SPACES));
+                for (String restClass : restClassesList) {
+                    service.addServletClass(restUriPath, restClass);
+                }
+            }
+
+            if (restPackages != null && !restPackages.isEmpty()) {
+                List<String> restPackageList = Arrays.asList(restPackages.split(SPACES_COMMA_SPACES));
+                for (String restPackage : restPackageList) {
+                    service.addServletPackage(restUriPath, restPackage);
+                }
+            }
+
+            serviceList.add(service);
+        }
+
+        return serviceList;
+    }
+
+    @Override
+    public synchronized HttpServletServer get(int port) {
+
+        if (servers.containsKey(port)) {
+            return servers.get(port);
+        }
+
+        throw new IllegalArgumentException("Http Server for " + port + " not found");
+    }
+
+    @Override
+    public synchronized List<HttpServletServer> inventory() {
+        return new ArrayList<>(this.servers.values());
+    }
+
+    @Override
+    public synchronized void destroy(int port) {
+
+        if (!servers.containsKey(port)) {
+            return;
+        }
+
+        HttpServletServer server = servers.remove(port);
+        server.shutdown();
+    }
+
+    @Override
+    public synchronized void destroy() {
+        List<HttpServletServer> httpServletServers = this.inventory();
+        for (HttpServletServer server : httpServletServers) {
+            server.shutdown();
+        }
+
+        synchronized (this) {
+            this.servers.clear();
+        }
+    }
+
+}