2 * ============LICENSE_START========================================================================
3 * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4 * =================================================================================================
5 * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6 * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
7 * =================================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
9 * in compliance with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software distributed under the License
14 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
15 * or implied. See the License for the specific language governing permissions and limitations under
17 * ============LICENSE_END==========================================================================
20 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
22 import java.util.LinkedList;
23 import java.util.List;
25 import java.util.Properties;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
29 public class DMaaPVESMsgConsumerMain implements Runnable {
31 private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerMain.class);
32 private static final String _PNFREG_CLASS =
33 "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer";
34 private static final String _FAULT_CLASS =
35 "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer";
36 private static final String _CM_CLASS =
37 "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPCMVESMsgConsumer";
38 private static final String _PNFREG_DOMAIN = "pnfRegistration";
39 private static final String _FAULT_DOMAIN = "fault";
40 private static final String _CM_DOMAIN = "provisioning";
42 boolean threadsRunning = false;
43 List<DMaaPVESMsgConsumer> consumers = new LinkedList<>();
44 private PNFRegistrationConfig pnfRegistrationConfig;
45 private FaultConfig faultConfig;
46 private GeneralConfig generalConfig;
47 private ProvisioningConfig provisioningConfig;
49 public DMaaPVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) {
50 this.generalConfig = generalConfig;
51 configMap.forEach(this::initialize);
54 public void initialize(String domain, MessageConfig domainConfig) {
55 LOG.debug("In initialize method : Domain = {} and domainConfig = {}", domain, domainConfig);
57 Properties consumerProperties = new Properties();
58 if (domain.equalsIgnoreCase(_PNFREG_DOMAIN)) {
59 this.pnfRegistrationConfig = (PNFRegistrationConfig) domainConfig;
60 consumerClass = _PNFREG_CLASS;
61 LOG.debug("Consumer class = {}", consumerClass);
63 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE,
64 pnfRegistrationConfig.getTransportType());
65 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_HOST_PORT,
66 pnfRegistrationConfig.getHostPort());
67 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE,
68 pnfRegistrationConfig.getContenttype());
69 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP,
70 pnfRegistrationConfig.getConsumerGroup());
71 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID,
72 pnfRegistrationConfig.getConsumerId());
73 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TOPIC, pnfRegistrationConfig.getTopic());
74 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
75 pnfRegistrationConfig.getTimeout());
76 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit());
77 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
78 pnfRegistrationConfig.getFetchPause());
79 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PROTOCOL,
80 pnfRegistrationConfig.getProtocol());
81 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_USERNAME,
82 pnfRegistrationConfig.getUsername());
83 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PASSWORD,
84 pnfRegistrationConfig.getPassword());
85 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
86 pnfRegistrationConfig.getClientReadTimeout());
87 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
88 pnfRegistrationConfig.getClientConnectTimeout());
89 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
90 pnfRegistrationConfig.getHTTPProxyURI());
91 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
92 pnfRegistrationConfig.getHTTPProxyUsername());
93 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
94 pnfRegistrationConfig.getHTTPProxyPassword());
96 threadsRunning = createConsumer(_PNFREG_DOMAIN, consumerProperties);
97 } else if (domain.equalsIgnoreCase(_FAULT_DOMAIN)) {
98 this.faultConfig = (FaultConfig) domainConfig;
99 consumerClass = _FAULT_CLASS;
100 LOG.debug("Consumer class = {}", consumerClass);
101 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, faultConfig.getTransportType());
102 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, faultConfig.getHostPort());
103 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, faultConfig.getContenttype());
104 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup());
105 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId());
106 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic());
107 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout());
108 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit());
109 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause());
110 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, faultConfig.getProtocol());
111 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_USERNAME, faultConfig.getUsername());
112 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PASSWORD, faultConfig.getPassword());
113 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
114 faultConfig.getClientReadTimeout());
115 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
116 faultConfig.getClientConnectTimeout());
117 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
118 faultConfig.getHTTPProxyURI());
119 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
120 faultConfig.getHTTPProxyUsername());
121 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
122 faultConfig.getHTTPProxyPassword());
123 threadsRunning = createConsumer(_FAULT_DOMAIN, consumerProperties);
124 } else if (domain.equalsIgnoreCase(_CM_DOMAIN)) {
125 this.provisioningConfig = (ProvisioningConfig) domainConfig;
126 consumerClass = _CM_CLASS;
127 LOG.debug("Consumer class = {}", consumerClass);
128 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE,
129 provisioningConfig.getTransportType());
130 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_HOST_PORT,
131 provisioningConfig.getHostPort());
132 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE,
133 provisioningConfig.getContenttype());
134 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_GROUP,
135 provisioningConfig.getConsumerGroup());
136 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_ID, provisioningConfig.getConsumerId());
137 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TOPIC, provisioningConfig.getTopic());
138 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, provisioningConfig.getTimeout());
139 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_LIMIT, provisioningConfig.getLimit());
140 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
141 provisioningConfig.getFetchPause());
142 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, provisioningConfig.getProtocol());
143 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_USERNAME, provisioningConfig.getUsername());
144 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PASSWORD, provisioningConfig.getPassword());
145 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
146 provisioningConfig.getClientReadTimeout());
147 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
148 provisioningConfig.getClientConnectTimeout());
149 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
150 provisioningConfig.getHTTPProxyURI());
151 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
152 provisioningConfig.getHTTPProxyUsername());
153 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
154 provisioningConfig.getHTTPProxyPassword());
155 threadsRunning = createConsumer(_CM_DOMAIN, consumerProperties);
159 private boolean updateThreadState(List<DMaaPVESMsgConsumer> consumers) {
160 boolean threadsRunning = false;
161 for (DMaaPVESMsgConsumer consumer : consumers) {
162 if (consumer.isRunning()) {
163 threadsRunning = true;
166 return threadsRunning;
169 public boolean createConsumer(String consumerType, Properties properties) {
170 DMaaPVESMsgConsumerImpl consumer = null;
172 if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN))
173 consumer = new DMaaPPNFRegVESMsgConsumer(generalConfig);
174 else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN))
175 consumer = new DMaaPFaultVESMsgConsumer(generalConfig);
176 else if (consumerType.equalsIgnoreCase(_CM_DOMAIN))
177 consumer = new DMaaPCMVESMsgConsumer(generalConfig);
179 handleConsumer(consumer, properties, consumers);
180 return !consumers.isEmpty();
183 private boolean handleConsumer(DMaaPVESMsgConsumer consumer, Properties properties,
184 List<DMaaPVESMsgConsumer> consumers) {
185 if (consumer != null) {
186 consumer.init(properties);
188 if (consumer.isReady()) {
189 Thread consumerThread = new Thread(consumer);
190 consumerThread.start();
191 consumers.add(consumer);
193 LOG.info("Started consumer thread ({} : {})", consumer.getClass().getSimpleName(), properties);
196 LOG.debug("Consumer {} is not ready", consumer.getClass().getSimpleName());
204 while (threadsRunning) {
205 threadsRunning = updateThreadState(consumers);
206 if (!threadsRunning) {
212 } catch (InterruptedException e) {
213 LOG.error(e.getLocalizedMessage(), e);
214 Thread.currentThread().interrupt();
218 LOG.info("No listener threads running - exiting");
221 public List<DMaaPVESMsgConsumer> getConsumers() {