2  * ============LICENSE_START========================================================================
 
   3  * ONAP : ccsdk feature sdnr wt mountpoint-registrar
 
   4  * =================================================================================================
 
   5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
 
   6  * =================================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 
   8  * in compliance with the License. You may obtain a copy of the License at
 
  10  * http://www.apache.org/licenses/LICENSE-2.0
 
  12  * Unless required by applicable law or agreed to in writing, software distributed under the License
 
  13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 
  14  * or implied. See the License for the specific language governing permissions and limitations under
 
  16  * ============LICENSE_END==========================================================================
 
  19 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
 
  21 import java.util.LinkedList;
 
  22 import java.util.List;
 
  24 import java.util.Properties;
 
  25 import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
 
  26 import org.slf4j.Logger;
 
  27 import org.slf4j.LoggerFactory;
 
  29 public class DMaaPVESMsgConsumerMain implements Runnable {
 
  31     private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerMain.class);
 
  32     private static final String pnfRegClass =
 
  33             "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer";
 
  34     private static final String faultClass =
 
  35             "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer";
 
  36     boolean threadsRunning = false;
 
  37     List<DMaaPVESMsgConsumer> consumers = new LinkedList<>();
 
  38     public PNFRegistrationConfig pnfRegistrationConfig;
 
  39     public FaultConfig faultConfig;
 
  40     public GeneralConfig generalConfig;
 
  42     public DMaaPVESMsgConsumerMain(Map<String, Configuration> configMap, GeneralConfig generalConfig) {
 
  43         this.generalConfig = generalConfig;
 
  44         configMap.forEach((k, v) -> initialize(k, v));
 
  47     public void initialize(String domain, Configuration domainConfig) {
 
  48         LOG.debug("In initialize method : Domain = {} and domainConfig = {}", domain, domainConfig);
 
  49         String consumerClass = null;
 
  50         Properties consumerProperties = new Properties();
 
  51         if (domain.equalsIgnoreCase("pnfregistration")) {
 
  52             this.pnfRegistrationConfig = (PNFRegistrationConfig) domainConfig;
 
  54             consumerClass = pnfRegistrationConfig.getConsumerClass();
 
  55             LOG.debug("Consumer class = " + consumerClass);
 
  57             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE,
 
  58                     pnfRegistrationConfig.getTransportType());
 
  59             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_HOST_PORT,
 
  60                     pnfRegistrationConfig.getHostPort());
 
  61             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE,
 
  62                     pnfRegistrationConfig.getContenttype());
 
  63             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP,
 
  64                     pnfRegistrationConfig.getConsumerGroup());
 
  65             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID,
 
  66                     pnfRegistrationConfig.getConsumerId());
 
  67             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TOPIC, pnfRegistrationConfig.getTopic());
 
  68             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
 
  69                     pnfRegistrationConfig.getTimeout());
 
  70             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit());
 
  71             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
 
  72                     pnfRegistrationConfig.getFetchPause());
 
  73             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PROTOCOL,
 
  74                     pnfRegistrationConfig.getProtocol());
 
  75             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_USERNAME,
 
  76                     pnfRegistrationConfig.getUsername());
 
  77             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PASSWORD,
 
  78                     pnfRegistrationConfig.getPassword());
 
  79             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
 
  80                     pnfRegistrationConfig.getClientReadTimeout());
 
  81             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
 
  82                     pnfRegistrationConfig.getClientConnectTimeout());
 
  83             threadsRunning = createConsumer("pnfRegistration", consumerProperties);
 
  84         } else if (domain.equalsIgnoreCase("fault")) {
 
  85             this.faultConfig = (FaultConfig) domainConfig;
 
  86             consumerClass = faultConfig.getConsumerClass();
 
  87             LOG.debug("Consumer class = {}", consumerClass);
 
  88             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, faultConfig.getTransportType());
 
  89             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, faultConfig.getHostPort());
 
  90             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, faultConfig.getContenttype());
 
  91             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup());
 
  92             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId());
 
  93             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic());
 
  94             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout());
 
  95             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit());
 
  96             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause());
 
  97             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, faultConfig.getProtocol());
 
  98             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_USERNAME, faultConfig.getUsername());
 
  99             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PASSWORD, faultConfig.getPassword());
 
 100             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
 
 101                     faultConfig.getClientReadTimeout());
 
 102             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
 
 103                     faultConfig.getClientConnectTimeout());
 
 104             threadsRunning = createConsumer("fault", consumerProperties);
 
 108     private boolean updateThreadState(List<DMaaPVESMsgConsumer> consumers) {
 
 109         boolean threadsRunning = false;
 
 110         for (DMaaPVESMsgConsumer consumer : consumers) {
 
 111             if (consumer.isRunning()) {
 
 112                 threadsRunning = true;
 
 115         return threadsRunning;
 
 118     public boolean createConsumer(String consumerType, Properties properties) {
 
 119         DMaaPVESMsgConsumerImpl consumer = null;
 
 121         if (consumerType.equalsIgnoreCase("pnfRegistration"))
 
 122             consumer = new DMaaPPNFRegVESMsgConsumer(generalConfig);
 
 123         else if (consumerType.equalsIgnoreCase("fault"))
 
 124             consumer = new DMaaPFaultVESMsgConsumer(generalConfig);
 
 126         handleConsumer(consumer, properties, consumers);
 
 127         return !consumers.isEmpty();
 
 130     private boolean handleConsumer(DMaaPVESMsgConsumer consumer, Properties properties,
 
 131             List<DMaaPVESMsgConsumer> consumers) {
 
 132         if (consumer != null) {
 
 133             consumer.init(properties);
 
 135             if (consumer.isReady()) {
 
 136                 Thread consumerThread = new Thread(consumer);
 
 137                 consumerThread.start();
 
 138                 consumers.add(consumer);
 
 140                 LOG.info("Started consumer thread ({} : {})", consumer.getClass().getSimpleName(), properties);
 
 143                 LOG.debug("Consumer {} is not ready", consumer.getClass().getSimpleName());
 
 151         while (threadsRunning) {
 
 152             threadsRunning = updateThreadState(consumers);
 
 153             if (!threadsRunning) {
 
 159             } catch (InterruptedException e) {
 
 160                 LOG.error(e.getLocalizedMessage(), e);
 
 164         LOG.info("No listener threads running - exiting");
 
 167     public List<DMaaPVESMsgConsumer> getConsumers() {