* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.so.bpmn.infrastructure.pnf.dmaap;
+package org.onap.so.bpmn.infrastructure.pnf.kafka;
import java.io.IOException;
import java.util.Collections;
import org.springframework.stereotype.Component;
@Component
-public class PnfEventReadyDmaapClient implements DmaapClient {
- private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyDmaapClient.class);
+public class PnfEventReadyKafkaClient implements KafkaClient {
+ private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyKafkaClient.class);
private Map<String, Runnable> pnfCorrelationIdToThreadMap;
private int topicListenerDelayInSeconds;
private volatile ScheduledThreadPoolExecutor executor;
- private volatile boolean dmaapThreadListenerIsRunning;
+ private volatile boolean kafkaThreadListenerIsRunning;
private KafkaConsumerImpl consumerForPnfReady;
private KafkaConsumerImpl consumerForPnfUpdate;
private String pnfReadyTopic;
@Autowired
- public PnfEventReadyDmaapClient(Environment env) throws IOException {
+ public PnfEventReadyKafkaClient(Environment env) throws IOException {
pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
- topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class);
+ topicListenerDelayInSeconds = env.getProperty("pnf.kafka.topicListenerDelayInSeconds", Integer.class);
executor = null;
try {
consumerForPnfReady = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers"));
public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) {
logger.debug("registering for pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId);
pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer);
- if (!dmaapThreadListenerIsRunning) {
- startDmaapThreadListener();
+ if (!kafkaThreadListenerIsRunning) {
+ startKafkaThreadListener();
}
}
if (pnfCorrelationIdToThreadMap.isEmpty()) {
consumerForPnfUpdate.close();
consumerForPnfReady.close();
- stopDmaapThreadListener();
+ stopKafkaThreadListener();
}
return runnable;
}
- private synchronized void startDmaapThreadListener() {
- if (!dmaapThreadListenerIsRunning) {
+ private synchronized void startKafkaThreadListener() {
+ if (!kafkaThreadListenerIsRunning) {
executor = new ScheduledThreadPoolExecutor(1);
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
- executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0, topicListenerDelayInSeconds,
+ executor.scheduleWithFixedDelay(new KafkaTopicListenerThread(), 0, topicListenerDelayInSeconds,
TimeUnit.SECONDS);
- dmaapThreadListenerIsRunning = true;
+ kafkaThreadListenerIsRunning = true;
}
}
- private synchronized void stopDmaapThreadListener() {
- if (dmaapThreadListenerIsRunning) {
+ private synchronized void stopKafkaThreadListener() {
+ if (kafkaThreadListenerIsRunning) {
executor.shutdown();
- dmaapThreadListenerIsRunning = false;
+ kafkaThreadListenerIsRunning = false;
executor = null;
}
}
- class DmaapTopicListenerThread implements Runnable {
+ class KafkaTopicListenerThread implements Runnable {
@Override
public void run() {
try {
private void informAboutPnfReadyIfPnfCorrelationIdFound(String pnfCorrelationId) {
Runnable runnable = unregister(pnfCorrelationId);
if (runnable != null) {
- logger.debug("dmaap listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId);
+ logger.debug("kafka listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId);
runnable.run();
}
}