X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=bpmn%2Fso-bpmn-infrastructure-common%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fso%2Fbpmn%2Finfrastructure%2Fpnf%2Fkafka%2FPnfEventReadyKafkaClient.java;fp=bpmn%2Fso-bpmn-infrastructure-common%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fso%2Fbpmn%2Finfrastructure%2Fpnf%2Fdmaap%2FPnfEventReadyDmaapClient.java;h=0d3e0e0230054b969bae2bc216ce3978c856ebfe;hb=0720a8a4e336d516ee00c515a392bb48a23404fd;hp=44b16dad281436485988bbe55afd32371e86c172;hpb=0420dbbef6cf04a662dadcd84c6a4682e3a412fc;p=so.git diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java similarity index 85% rename from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java rename to bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java index 44b16dad28..0d3e0e0230 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java @@ -19,7 +19,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.so.bpmn.infrastructure.pnf.dmaap; +package org.onap.so.bpmn.infrastructure.pnf.kafka; import java.io.IOException; import java.util.Collections; @@ -36,12 +36,12 @@ import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; @Component -public class PnfEventReadyDmaapClient implements DmaapClient { - private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyDmaapClient.class); +public class PnfEventReadyKafkaClient implements KafkaClient { + private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyKafkaClient.class); private Map pnfCorrelationIdToThreadMap; private int topicListenerDelayInSeconds; private volatile ScheduledThreadPoolExecutor executor; - private volatile boolean dmaapThreadListenerIsRunning; + private volatile boolean kafkaThreadListenerIsRunning; private KafkaConsumerImpl consumerForPnfReady; private KafkaConsumerImpl consumerForPnfUpdate; private String pnfReadyTopic; @@ -53,9 +53,9 @@ public class PnfEventReadyDmaapClient implements DmaapClient { @Autowired - public PnfEventReadyDmaapClient(Environment env) throws IOException { + public PnfEventReadyKafkaClient(Environment env) throws IOException { pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>(); - topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class); + topicListenerDelayInSeconds = env.getProperty("pnf.kafka.topicListenerDelayInSeconds", Integer.class); executor = null; try { consumerForPnfReady = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers")); @@ -75,8 +75,8 @@ public class PnfEventReadyDmaapClient implements DmaapClient { public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) { logger.debug("registering for pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId); pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer); - if (!dmaapThreadListenerIsRunning) { - startDmaapThreadListener(); + if (!kafkaThreadListenerIsRunning) { + startKafkaThreadListener(); } } @@ -87,31 +87,31 @@ public class PnfEventReadyDmaapClient implements DmaapClient { if (pnfCorrelationIdToThreadMap.isEmpty()) { consumerForPnfUpdate.close(); consumerForPnfReady.close(); - stopDmaapThreadListener(); + stopKafkaThreadListener(); } return runnable; } - private synchronized void startDmaapThreadListener() { - if (!dmaapThreadListenerIsRunning) { + private synchronized void startKafkaThreadListener() { + if (!kafkaThreadListenerIsRunning) { executor = new ScheduledThreadPoolExecutor(1); executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0, topicListenerDelayInSeconds, + executor.scheduleWithFixedDelay(new KafkaTopicListenerThread(), 0, topicListenerDelayInSeconds, TimeUnit.SECONDS); - dmaapThreadListenerIsRunning = true; + kafkaThreadListenerIsRunning = true; } } - private synchronized void stopDmaapThreadListener() { - if (dmaapThreadListenerIsRunning) { + private synchronized void stopKafkaThreadListener() { + if (kafkaThreadListenerIsRunning) { executor.shutdown(); - dmaapThreadListenerIsRunning = false; + kafkaThreadListenerIsRunning = false; executor = null; } } - class DmaapTopicListenerThread implements Runnable { + class KafkaTopicListenerThread implements Runnable { @Override public void run() { try { @@ -141,7 +141,7 @@ public class PnfEventReadyDmaapClient implements DmaapClient { private void informAboutPnfReadyIfPnfCorrelationIdFound(String pnfCorrelationId) { Runnable runnable = unregister(pnfCorrelationId); if (runnable != null) { - logger.debug("dmaap listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId); + logger.debug("kafka listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId); runnable.run(); } }