2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.openecomp.sdnc.dmaapclient;
25 import java.io.FileInputStream;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Properties;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
33 public class DmaapListener {
35 private static final String DMAAP_LISTENER_PROPERTIES = "dmaap-listener.properties";
36 private static final String SDNC_CONFIG_DIR = "SDNC_CONFIG_DIR";
37 private static final Logger LOG = LoggerFactory
38 .getLogger(DmaapListener.class);
40 public static void main(String[] args) {
42 Properties properties = new Properties();
45 String propFileName = DMAAP_LISTENER_PROPERTIES;
47 if (args.length > 0) {
48 propFileName = args[0];
51 String propPath = null;
52 String propDir = System.getenv(SDNC_CONFIG_DIR);
54 List<SdncDmaapConsumer> consumers = new LinkedList();
56 if (propDir == null) {
58 propDir = "/opt/sdnc/data/properties";
61 if (!propFileName.startsWith("/")) {
62 propPath = propDir + "/" + propFileName;
65 File propFile = new File(propPath);
67 if (!propFile.canRead()) {
68 LOG.error("Cannot read properties file "+propPath);
73 properties.load(new FileInputStream(propFile));
74 } catch (Exception e) {
75 LOG.error("Caught exception loading properties from "+propPath, e);
79 String subscriptionStr = properties.getProperty("subscriptions");
81 boolean threadsRunning = false;
83 LOG.debug("Dmaap subscriptions : "+subscriptionStr);
85 if (subscriptionStr != null) {
86 String[] subscriptions = subscriptionStr.split(";");
88 for (int i = 0; i < subscriptions.length; i++) {
89 String[] subscription = subscriptions[i].split(":");
90 String consumerClassName = subscription[0];
91 String propertyPath = subscription[1];
93 LOG.debug("Handling subscription [" + consumerClassName + "," + propertyPath + "]");
95 if (propertyPath == null) {
96 LOG.error("Invalid subscription (" + subscriptions[i] + ") property file missing");
100 if (!propertyPath.startsWith("/")) {
101 propertyPath = propDir + "/" + propertyPath;
104 Class<?> consumerClass = null;
107 consumerClass = Class.forName(consumerClassName);
108 } catch (Exception e) {
109 LOG.error("Could not find DMaap consumer class " + consumerClassName);
112 if (consumerClass != null) {
114 SdncDmaapConsumer consumer = null;
117 consumer = (SdncDmaapConsumer) consumerClass.newInstance();
118 } catch (Exception e) {
119 LOG.error("Could not create consumer from class " + consumerClassName, e);
122 if (consumer != null) {
123 LOG.debug("Initializing consumer " + consumerClassName + "(" + propertyPath + ")");
124 consumer.init(properties, propertyPath);
126 if (consumer.isReady()) {
127 Thread consumerThread = new Thread(consumer);
128 consumerThread.start();
129 consumers.add(consumer);
130 threadsRunning = true;
131 LOG.info("Started consumer thread (" + consumerClassName + " : " + propertyPath + ")");
133 LOG.debug("Consumer " + consumerClassName + " is not ready");
142 while (threadsRunning) {
144 threadsRunning = false;
145 for (SdncDmaapConsumer consumer : consumers) {
146 if (consumer.isRunning()) {
147 threadsRunning = true;
151 if (!threadsRunning) {
157 } catch (InterruptedException e) {
162 LOG.info("No listener threads running - exitting");