26afd19aa9bd45cab7e0c1ed05ecc77f5b810e5e
[ccsdk/features.git] /
1 /*******************************************************************************
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  ******************************************************************************/
18
19 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
20
21 import java.util.LinkedList;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Properties;
25
26 import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 public class DMaaPVESMsgConsumerMain implements Runnable {
31
32         private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerMain.class);
33
34         boolean threadsRunning = false;
35         static List<DMaaPVESMsgConsumer> consumers = new LinkedList<>();
36         public GeneralConfig config;
37         public PNFRegistrationConfig pnfRegistrationConfig;
38         public FaultConfig faultConfig;
39
40         public DMaaPVESMsgConsumerMain(Map<String, Configuration> configMap) {
41                 configMap.forEach((k, v) -> initialize(k, v));
42         }
43
44         public void initialize(String domain, Configuration domainConfig) {
45                 LOG.debug("In initialize method : Domain = {} and domainConfig = {}",domain,domainConfig);
46                 String consumerClass = null;
47                 Properties consumerProperties = new Properties();
48                 if (domain.equalsIgnoreCase("pnfregistration")) {
49                         this.pnfRegistrationConfig = (PNFRegistrationConfig) domainConfig;
50
51                         consumerClass = pnfRegistrationConfig.getConsumerClass();
52                         LOG.debug("Consumer class = "+consumerClass);
53
54                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, pnfRegistrationConfig.getTransportType());
55                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, pnfRegistrationConfig.getHostPort());
56                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, pnfRegistrationConfig.getContenttype());
57                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP, pnfRegistrationConfig.getConsumerGroup());
58                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID, pnfRegistrationConfig.getConsumerId());
59                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TOPIC, pnfRegistrationConfig.getTopic());
60                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, pnfRegistrationConfig.getTimeout());
61                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit());
62                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, pnfRegistrationConfig.getFetchPause());
63                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, pnfRegistrationConfig.getProtocol());
64                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_USERNAME, pnfRegistrationConfig.getUsername());
65                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PASSWORD, pnfRegistrationConfig.getPassword());
66                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT, pnfRegistrationConfig.getClientReadTimeout());
67                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT, pnfRegistrationConfig.getClientConnectTimeout());
68                 } else if (domain.equalsIgnoreCase("fault")) {
69                         this.faultConfig = (FaultConfig) domainConfig;
70                         consumerClass = faultConfig.getConsumerClass();
71                         LOG.debug("Consumer class = {}",consumerClass);
72                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, faultConfig.getTransportType());
73                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, faultConfig.getHostPort());
74                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, faultConfig.getContenttype());
75                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup());
76                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId());
77                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic());
78                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout());
79                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit());
80                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause());
81                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, faultConfig.getProtocol());
82                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_USERNAME, faultConfig.getUsername());
83                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PASSWORD, faultConfig.getPassword());
84                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT, faultConfig.getClientReadTimeout());
85                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT, faultConfig.getClientConnectTimeout());
86                 }
87
88                 if (consumerClass != null) {
89                         LOG.info("Calling createConsumer : {}",consumerClass);
90                         threadsRunning = createConsumer(consumerClass, consumerProperties);
91                 }
92
93         }
94
95         private static boolean updateThreadState(List<DMaaPVESMsgConsumer> consumers) {
96                 boolean threadsRunning = false;
97                 for (DMaaPVESMsgConsumer consumer : consumers) {
98                         if (consumer.isRunning()) {
99                                 threadsRunning = true;
100                         }
101                 }
102                 return threadsRunning;
103         }
104
105         static boolean createConsumer(String consumerClassName, Properties properties) {
106                 Class<?> consumerClass = null;
107
108                 try {
109                         consumerClass = Class.forName(consumerClassName);
110                 } catch (Exception e) {
111                         LOG.error("Could not find DMaap VES Message consumer class {}", consumerClassName, e);
112                 }
113
114                 if (consumerClass != null) {
115                         LOG.debug("Calling handleConsumerClass");
116                         handleConsumerClass(consumerClass, consumerClassName, properties, consumers);
117                 }
118                 return !consumers.isEmpty();
119         }
120
121         private static boolean handleConsumerClass(Class<?> consumerClass, String consumerClassName, Properties properties, List<DMaaPVESMsgConsumer> consumers) {
122                 DMaaPVESMsgConsumer consumer = null;
123
124                 try {
125                         consumer = (DMaaPVESMsgConsumer) consumerClass.newInstance();
126                         LOG.debug("Successfully created an instance of consumerClass : {}",consumerClassName);
127                 } catch (Exception e) {
128                         LOG.error("Could not create consumer from class {}",consumerClassName, e);
129                 }
130
131                 if (consumer != null) {
132                         LOG.info("Initializing consumer {}({})", consumerClassName, properties);
133                         consumer.init(properties);
134
135                         if (consumer.isReady()) {
136                                 Thread consumerThread = new Thread(consumer);
137                                 consumerThread.start();
138                                 consumers.add(consumer);
139
140                                 LOG.info("Started consumer thread ({} : {})", consumerClassName,
141                                                 properties);
142                                 return true;
143                         } else {
144                                 LOG.debug("Consumer {} is not ready", consumerClassName);
145                         }
146                 }
147                 return false;
148         }
149
150         public void run() {
151                 while (threadsRunning) {
152                         threadsRunning = updateThreadState(consumers);
153                         if (!threadsRunning) {
154                                 break;
155                         }
156
157                         try {
158                                 Thread.sleep(10000);
159                         } catch (InterruptedException e) {
160                                 LOG.error(e.getLocalizedMessage(), e);
161                         }
162                 }
163
164                 LOG.info("No listener threads running - exiting");
165         }
166
167         public static List<DMaaPVESMsgConsumer> getConsumers() {
168                 return consumers;
169         }
170
171 }