Release version 1.1.0 of sli/northbound
[ccsdk/sli/northbound.git] / dmaap-listener / src / main / java / org / onap / ccsdk / sli / northbound / dmaapclient / DmaapListener.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * openECOMP : SDN-C
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights
6  *                      reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.ccsdk.sli.northbound.dmaapclient;
23
24 import java.io.File;
25 import java.io.FileInputStream;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Properties;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 public class DmaapListener {
33
34     private static final String DMAAP_LISTENER_PROPERTIES = "dmaap-listener.properties";
35     private static final String DMAAP_LISTENER_PROPERTIES_DIR = "/opt/onap/ccsdk/data/properties";
36     private static final String SDNC_CONFIG_DIR = "SDNC_CONFIG_DIR";
37     private static final Logger LOG = LoggerFactory.getLogger(DmaapListener.class);
38
39     public static void main(String[] args) {
40
41         Properties properties = new Properties();
42         String propFileName = DMAAP_LISTENER_PROPERTIES;
43         String propPath = null;
44         String propDir = System.getProperty(SDNC_CONFIG_DIR);
45         if(propDir == null) {
46                 propDir = System.getenv(SDNC_CONFIG_DIR);
47                 LOG.debug(SDNC_CONFIG_DIR + " read from environment variable with value " + propDir);
48         }
49         List<SdncDmaapConsumer> consumers = new LinkedList<>();
50
51         if (args.length > 0) {
52             propFileName = args[0];
53         }
54
55         if (propDir == null) {
56             propDir = DMAAP_LISTENER_PROPERTIES_DIR;
57         }
58
59         if (!propFileName.startsWith("/")) {
60             propPath = propDir + "/" + propFileName;
61         }
62
63         if (propPath != null) {
64             properties = loadProperties(propPath, properties);
65
66             String subscriptionStr = properties.getProperty("subscriptions");
67
68             boolean threadsRunning = false;
69
70             LOG.debug("Dmaap subscriptions : " + subscriptionStr);
71
72             if (subscriptionStr != null) {
73                 threadsRunning = handleSubscriptions(subscriptionStr, propDir, properties, consumers);
74             }
75
76             while (threadsRunning) {
77                 threadsRunning = updateThreadState(consumers);
78                 if (!threadsRunning) {
79                     break;
80                 }
81
82                 try {
83                     Thread.sleep(10000);
84                 } catch (InterruptedException e) {
85                     LOG.error(e.getLocalizedMessage(), e);
86                 }
87             }
88
89             LOG.info("No listener threads running - exiting");
90         }
91     }
92
93     private static boolean updateThreadState(List<SdncDmaapConsumer> consumers) {
94         boolean threadsRunning = false;
95         for (SdncDmaapConsumer consumer : consumers) {
96             if (consumer.isRunning()) {
97                 threadsRunning = true;
98             }
99         }
100         return threadsRunning;
101     }
102
103     static Properties loadProperties(String propPath, Properties properties) {
104         File propFile = new File(propPath);
105
106         if (!propFile.canRead()) {
107             LOG.error("Cannot read properties file " + propPath);
108             System.exit(1);
109         }
110
111         try (FileInputStream in = new FileInputStream(propFile)) {
112             properties.load(in);
113         } catch (Exception e) {
114             LOG.error("Caught exception loading properties from " + propPath, e);
115             System.exit(1);
116         }
117         return properties;
118     }
119
120     static boolean handleSubscriptions(String subscriptionStr, String propDir, Properties properties,
121         List<SdncDmaapConsumer> consumers) {
122         String[] subscriptions = subscriptionStr.split(";");
123
124         for (String subscription1 : subscriptions) {
125             String[] subscription = subscription1.split(":");
126             String consumerClassName = subscription[0];
127             String propertyPath = subscription[1];
128
129             LOG.debug(String.format("Handling subscription [%s,%s]", consumerClassName, propertyPath));
130
131             if (propertyPath == null) {
132                 LOG.error(String.format("Invalid subscription (%s) property file missing", subscription1));
133                 continue;
134             }
135
136             if (!propertyPath.startsWith("/")) {
137                 propertyPath = propDir + "/" + propertyPath;
138             }
139
140             Class<?> consumerClass = null;
141
142             try {
143                 consumerClass = Class.forName(consumerClassName);
144             } catch (Exception e) {
145                 LOG.error("Could not find DMaap consumer class {}", consumerClassName, e);
146             }
147
148             if (consumerClass != null) {
149                 handleConsumerClass(consumerClass, consumerClassName, propertyPath,
150                     properties, consumers);
151             }
152         }
153         return !consumers.isEmpty();
154     }
155
156     private static boolean handleConsumerClass(Class<?> consumerClass, String consumerClassName, String propertyPath,
157         Properties properties, List<SdncDmaapConsumer> consumers) {
158
159         SdncDmaapConsumer consumer = null;
160
161         try {
162             consumer = (SdncDmaapConsumer) consumerClass.newInstance();
163         } catch (Exception e) {
164             LOG.error("Could not create consumer from class " + consumerClassName, e);
165         }
166
167         if (consumer != null) {
168             LOG.debug(String.format("Initializing consumer %s(%s)", consumerClassName, propertyPath));
169             consumer.init(properties, propertyPath);
170
171             if (consumer.isReady()) {
172                 Thread consumerThread = new Thread(consumer);
173                 consumerThread.start();
174                 consumers.add(consumer);
175
176                 LOG.info(String.format("Started consumer thread (%s : %s)", consumerClassName,
177                     propertyPath));
178                 return true;
179             } else {
180                 LOG.debug(String.format("Consumer %s is not ready", consumerClassName));
181             }
182         }
183         return false;
184     }
185 }