2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.ccsdk.sli.northbound.dmaapclient;
25 import java.io.FileInputStream;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Properties;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
32 public class DmaapListener {
34 private static final String DMAAP_LISTENER_PROPERTIES = "dmaap-listener.properties";
35 private static final String DMAAP_LISTENER_PROPERTIES_DIR = "/opt/onap/ccsdk/data/properties";
36 private static final String SDNC_CONFIG_DIR = "SDNC_CONFIG_DIR";
37 private static final Logger LOG = LoggerFactory.getLogger(DmaapListener.class);
39 public static void main(String[] args) {
41 Properties properties = new Properties();
42 String propFileName = DMAAP_LISTENER_PROPERTIES;
43 String propPath = null;
44 String propDir = System.getProperty(SDNC_CONFIG_DIR);
46 propDir = System.getenv(SDNC_CONFIG_DIR);
47 LOG.debug(SDNC_CONFIG_DIR + " read from environment variable with value " + propDir);
49 List<SdncDmaapConsumer> consumers = new LinkedList<>();
51 if (args.length > 0) {
52 propFileName = args[0];
55 if (propDir == null) {
56 propDir = DMAAP_LISTENER_PROPERTIES_DIR;
59 if (!propFileName.startsWith("/")) {
60 propPath = propDir + "/" + propFileName;
63 if (propPath != null) {
64 properties = loadProperties(propPath, properties);
66 String subscriptionStr = properties.getProperty("subscriptions");
68 boolean threadsRunning = false;
70 LOG.debug("Dmaap subscriptions : " + subscriptionStr);
72 if (subscriptionStr != null) {
73 threadsRunning = handleSubscriptions(subscriptionStr, propDir, properties, consumers);
76 while (threadsRunning) {
77 threadsRunning = updateThreadState(consumers);
78 if (!threadsRunning) {
84 } catch (InterruptedException e) {
85 LOG.error(e.getLocalizedMessage(), e);
89 LOG.info("No listener threads running - exiting");
93 private static boolean updateThreadState(List<SdncDmaapConsumer> consumers) {
94 boolean threadsRunning = false;
95 for (SdncDmaapConsumer consumer : consumers) {
96 if (consumer.isRunning()) {
97 threadsRunning = true;
100 return threadsRunning;
103 static Properties loadProperties(String propPath, Properties properties) {
104 File propFile = new File(propPath);
106 if (!propFile.canRead()) {
107 LOG.error("Cannot read properties file " + propPath);
111 try (FileInputStream in = new FileInputStream(propFile)) {
113 } catch (Exception e) {
114 LOG.error("Caught exception loading properties from " + propPath, e);
120 static boolean handleSubscriptions(String subscriptionStr, String propDir, Properties properties,
121 List<SdncDmaapConsumer> consumers) {
122 String[] subscriptions = subscriptionStr.split(";");
124 for (String subscription1 : subscriptions) {
125 String[] subscription = subscription1.split(":");
126 String consumerClassName = subscription[0];
127 String propertyPath = subscription[1];
129 LOG.debug(String.format("Handling subscription [%s,%s]", consumerClassName, propertyPath));
131 if (propertyPath == null) {
132 LOG.error(String.format("Invalid subscription (%s) property file missing", subscription1));
136 if (!propertyPath.startsWith("/")) {
137 propertyPath = propDir + "/" + propertyPath;
140 Class<?> consumerClass = null;
143 consumerClass = Class.forName(consumerClassName);
144 } catch (Exception e) {
145 LOG.error("Could not find DMaap consumer class {}", consumerClassName, e);
148 if (consumerClass != null) {
149 handleConsumerClass(consumerClass, consumerClassName, propertyPath,
150 properties, consumers);
153 return !consumers.isEmpty();
156 private static boolean handleConsumerClass(Class<?> consumerClass, String consumerClassName, String propertyPath,
157 Properties properties, List<SdncDmaapConsumer> consumers) {
159 SdncDmaapConsumer consumer = null;
162 consumer = (SdncDmaapConsumer) consumerClass.newInstance();
163 } catch (Exception e) {
164 LOG.error("Could not create consumer from class " + consumerClassName, e);
167 if (consumer != null) {
168 LOG.debug(String.format("Initializing consumer %s(%s)", consumerClassName, propertyPath));
169 consumer.init(properties, propertyPath);
171 if (consumer.isReady()) {
172 Thread consumerThread = new Thread(consumer);
173 consumerThread.start();
174 consumers.add(consumer);
176 LOG.info(String.format("Started consumer thread (%s : %s)", consumerClassName,
180 LOG.debug(String.format("Consumer %s is not ready", consumerClassName));