f8ee7a48f6ac32ae9e0ab595a5843334905987c6
[ccsdk/features.git] /
1 /*
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  */
18
19 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
20
21 import java.util.LinkedList;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Properties;
25 import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 public class DMaaPVESMsgConsumerMain implements Runnable {
30
31         private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerMain.class);
32         private static final String _PNFREG_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer";
33         private static final String _FAULT_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer";
34         private static final String _PNFREG_DOMAIN = "pnfRegistration";
35         private static final String _FAULT_DOMAIN = "fault";
36
37         boolean threadsRunning = false;
38         List<DMaaPVESMsgConsumer> consumers = new LinkedList<>();
39         private PNFRegistrationConfig pnfRegistrationConfig;
40         private FaultConfig faultConfig;
41         private GeneralConfig generalConfig;
42
43         public DMaaPVESMsgConsumerMain(Map<String, Configuration> configMap, GeneralConfig generalConfig) {
44                 this.generalConfig = generalConfig;
45                 configMap.forEach((k, v) -> initialize(k, v));
46         }
47
48         public void initialize(String domain, Configuration domainConfig) {
49                 LOG.debug("In initialize method : Domain = {} and domainConfig = {}", domain, domainConfig);
50                 String consumerClass = null;
51                 Properties consumerProperties = new Properties();
52                 if (domain.equalsIgnoreCase(_PNFREG_DOMAIN)) {
53                         this.pnfRegistrationConfig = (PNFRegistrationConfig) domainConfig;
54
55                         consumerClass = _PNFREG_CLASS;
56                         LOG.debug("Consumer class = {}", consumerClass);
57
58                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE,
59                                         pnfRegistrationConfig.getTransportType());
60                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_HOST_PORT,
61                                         pnfRegistrationConfig.getHostPort());
62                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE,
63                                         pnfRegistrationConfig.getContenttype());
64                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP,
65                                         pnfRegistrationConfig.getConsumerGroup());
66                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID,
67                                         pnfRegistrationConfig.getConsumerId());
68                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TOPIC, pnfRegistrationConfig.getTopic());
69                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
70                                         pnfRegistrationConfig.getTimeout());
71                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit());
72                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
73                                         pnfRegistrationConfig.getFetchPause());
74                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PROTOCOL,
75                                         pnfRegistrationConfig.getProtocol());
76                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_USERNAME,
77                                         pnfRegistrationConfig.getUsername());
78                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PASSWORD,
79                                         pnfRegistrationConfig.getPassword());
80                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
81                                         pnfRegistrationConfig.getClientReadTimeout());
82                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
83                                         pnfRegistrationConfig.getClientConnectTimeout());
84                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
85                                         pnfRegistrationConfig.getHTTPProxyURI());
86                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
87                                         pnfRegistrationConfig.getHTTPProxyUsername());
88                         consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
89                                         pnfRegistrationConfig.getHTTPProxyPassword());
90
91                         threadsRunning = createConsumer(_PNFREG_DOMAIN, consumerProperties);
92                 } else if (domain.equalsIgnoreCase(_FAULT_DOMAIN)) {
93                         this.faultConfig = (FaultConfig) domainConfig;
94                         consumerClass = _FAULT_CLASS;
95                         LOG.debug("Consumer class = {}", consumerClass);
96                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, faultConfig.getTransportType());
97                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, faultConfig.getHostPort());
98                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, faultConfig.getContenttype());
99                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup());
100                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId());
101                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic());
102                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout());
103                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit());
104                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause());
105                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, faultConfig.getProtocol());
106                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_USERNAME, faultConfig.getUsername());
107                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PASSWORD, faultConfig.getPassword());
108                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
109                                         faultConfig.getClientReadTimeout());
110                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
111                                         faultConfig.getClientConnectTimeout());
112                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
113                                         faultConfig.getHTTPProxyURI());
114                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
115                                         faultConfig.getHTTPProxyUsername());
116                         consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
117                                         faultConfig.getHTTPProxyPassword());
118                         threadsRunning = createConsumer(_FAULT_DOMAIN, consumerProperties);
119                 }
120         }
121
122         private boolean updateThreadState(List<DMaaPVESMsgConsumer> consumers) {
123                 boolean threadsRunning = false;
124                 for (DMaaPVESMsgConsumer consumer : consumers) {
125                         if (consumer.isRunning()) {
126                                 threadsRunning = true;
127                         }
128                 }
129                 return threadsRunning;
130         }
131
132         public boolean createConsumer(String consumerType, Properties properties) {
133                 DMaaPVESMsgConsumerImpl consumer = null;
134
135                 if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN))
136                         consumer = new DMaaPPNFRegVESMsgConsumer(generalConfig);
137                 else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN))
138                         consumer = new DMaaPFaultVESMsgConsumer(generalConfig);
139
140                 handleConsumer(consumer, properties, consumers);
141                 return !consumers.isEmpty();
142         }
143
144         private boolean handleConsumer(DMaaPVESMsgConsumer consumer, Properties properties,
145                         List<DMaaPVESMsgConsumer> consumers) {
146                 if (consumer != null) {
147                         consumer.init(properties);
148
149                         if (consumer.isReady()) {
150                                 Thread consumerThread = new Thread(consumer);
151                                 consumerThread.start();
152                                 consumers.add(consumer);
153
154                                 LOG.info("Started consumer thread ({} : {})", consumer.getClass().getSimpleName(), properties);
155                                 return true;
156                         } else {
157                                 LOG.debug("Consumer {} is not ready", consumer.getClass().getSimpleName());
158                         }
159                 }
160                 return false;
161         }
162
163         @Override
164         public void run() {
165                 while (threadsRunning) {
166                         threadsRunning = updateThreadState(consumers);
167                         if (!threadsRunning) {
168                                 break;
169                         }
170
171                         try {
172                                 Thread.sleep(10000);
173                         } catch (InterruptedException e) {
174                                 LOG.error(e.getLocalizedMessage(), e);
175                         }
176                 }
177
178                 LOG.info("No listener threads running - exiting");
179         }
180
181         public List<DMaaPVESMsgConsumer> getConsumers() {
182                 return consumers;
183         }
184
185 }