2 * ============LICENSE_START========================================================================
3 * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4 * =================================================================================================
5 * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6 * =================================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8 * in compliance with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software distributed under the License
13 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14 * or implied. See the License for the specific language governing permissions and limitations under
16 * ============LICENSE_END==========================================================================
19 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
21 import java.util.LinkedList;
22 import java.util.List;
24 import java.util.Properties;
25 import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
29 public class DMaaPVESMsgConsumerMain implements Runnable {
31 private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerMain.class);
32 private static final String pnfRegClass =
33 "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer";
34 private static final String faultClass =
35 "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer";
36 boolean threadsRunning = false;
37 List<DMaaPVESMsgConsumer> consumers = new LinkedList<>();
38 public PNFRegistrationConfig pnfRegistrationConfig;
39 public FaultConfig faultConfig;
40 public GeneralConfig generalConfig;
42 public DMaaPVESMsgConsumerMain(Map<String, Configuration> configMap, GeneralConfig generalConfig) {
43 this.generalConfig = generalConfig;
44 configMap.forEach((k, v) -> initialize(k, v));
47 public void initialize(String domain, Configuration domainConfig) {
48 LOG.debug("In initialize method : Domain = {} and domainConfig = {}", domain, domainConfig);
49 String consumerClass = null;
50 Properties consumerProperties = new Properties();
51 if (domain.equalsIgnoreCase("pnfregistration")) {
52 this.pnfRegistrationConfig = (PNFRegistrationConfig) domainConfig;
54 consumerClass = pnfRegistrationConfig.getConsumerClass();
55 LOG.debug("Consumer class = " + consumerClass);
57 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE,
58 pnfRegistrationConfig.getTransportType());
59 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_HOST_PORT,
60 pnfRegistrationConfig.getHostPort());
61 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE,
62 pnfRegistrationConfig.getContenttype());
63 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP,
64 pnfRegistrationConfig.getConsumerGroup());
65 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID,
66 pnfRegistrationConfig.getConsumerId());
67 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TOPIC, pnfRegistrationConfig.getTopic());
68 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
69 pnfRegistrationConfig.getTimeout());
70 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit());
71 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
72 pnfRegistrationConfig.getFetchPause());
73 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PROTOCOL,
74 pnfRegistrationConfig.getProtocol());
75 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_USERNAME,
76 pnfRegistrationConfig.getUsername());
77 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PASSWORD,
78 pnfRegistrationConfig.getPassword());
79 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
80 pnfRegistrationConfig.getClientReadTimeout());
81 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
82 pnfRegistrationConfig.getClientConnectTimeout());
83 threadsRunning = createConsumer("pnfRegistration", consumerProperties);
84 } else if (domain.equalsIgnoreCase("fault")) {
85 this.faultConfig = (FaultConfig) domainConfig;
86 consumerClass = faultConfig.getConsumerClass();
87 LOG.debug("Consumer class = {}", consumerClass);
88 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, faultConfig.getTransportType());
89 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, faultConfig.getHostPort());
90 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, faultConfig.getContenttype());
91 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup());
92 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId());
93 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic());
94 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout());
95 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit());
96 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause());
97 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, faultConfig.getProtocol());
98 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_USERNAME, faultConfig.getUsername());
99 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PASSWORD, faultConfig.getPassword());
100 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
101 faultConfig.getClientReadTimeout());
102 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
103 faultConfig.getClientConnectTimeout());
104 threadsRunning = createConsumer("fault", consumerProperties);
108 private boolean updateThreadState(List<DMaaPVESMsgConsumer> consumers) {
109 boolean threadsRunning = false;
110 for (DMaaPVESMsgConsumer consumer : consumers) {
111 if (consumer.isRunning()) {
112 threadsRunning = true;
115 return threadsRunning;
118 public boolean createConsumer(String consumerType, Properties properties) {
119 DMaaPVESMsgConsumerImpl consumer = null;
121 if (consumerType.equalsIgnoreCase("pnfRegistration"))
122 consumer = new DMaaPPNFRegVESMsgConsumer(generalConfig);
123 else if (consumerType.equalsIgnoreCase("fault"))
124 consumer = new DMaaPFaultVESMsgConsumer(generalConfig);
126 handleConsumer(consumer, properties, consumers);
127 return !consumers.isEmpty();
130 private boolean handleConsumer(DMaaPVESMsgConsumer consumer, Properties properties,
131 List<DMaaPVESMsgConsumer> consumers) {
132 if (consumer != null) {
133 consumer.init(properties);
135 if (consumer.isReady()) {
136 Thread consumerThread = new Thread(consumer);
137 consumerThread.start();
138 consumers.add(consumer);
140 LOG.info("Started consumer thread ({} : {})", consumer.getClass().getSimpleName(), properties);
143 LOG.debug("Consumer {} is not ready", consumer.getClass().getSimpleName());
151 while (threadsRunning) {
152 threadsRunning = updateThreadState(consumers);
153 if (!threadsRunning) {
159 } catch (InterruptedException e) {
160 LOG.error(e.getLocalizedMessage(), e);
164 LOG.info("No listener threads running - exiting");
167 public List<DMaaPVESMsgConsumer> getConsumers() {