* limitations under the License.
* ============LICENSE_END=========================================================
*/
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-public class PnfEventReadyDmaapClient implements DmaapClient {
- private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyDmaapClient.class);
+public class PnfEventReadyKafkaClient implements KafkaClient {
+ private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyKafkaClient.class);
private Map<String, Runnable> pnfCorrelationIdToThreadMap;
private int topicListenerDelayInSeconds;
private volatile ScheduledThreadPoolExecutor executor;
private Map<String, Runnable> pnfCorrelationIdToThreadMap;
private int topicListenerDelayInSeconds;
private volatile ScheduledThreadPoolExecutor executor;
private KafkaConsumerImpl consumerForPnfReady;
private KafkaConsumerImpl consumerForPnfUpdate;
private String pnfReadyTopic;
private KafkaConsumerImpl consumerForPnfReady;
private KafkaConsumerImpl consumerForPnfUpdate;
private String pnfReadyTopic;
public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) {
logger.debug("registering for pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId);
pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer);
public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) {
logger.debug("registering for pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId);
pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer);
if (pnfCorrelationIdToThreadMap.isEmpty()) {
consumerForPnfUpdate.close();
consumerForPnfReady.close();
if (pnfCorrelationIdToThreadMap.isEmpty()) {
consumerForPnfUpdate.close();
consumerForPnfReady.close();
executor = new ScheduledThreadPoolExecutor(1);
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
executor = new ScheduledThreadPoolExecutor(1);
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
- executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0, topicListenerDelayInSeconds,
+ executor.scheduleWithFixedDelay(new KafkaTopicListenerThread(), 0, topicListenerDelayInSeconds,
private void informAboutPnfReadyIfPnfCorrelationIdFound(String pnfCorrelationId) {
Runnable runnable = unregister(pnfCorrelationId);
if (runnable != null) {
private void informAboutPnfReadyIfPnfCorrelationIdFound(String pnfCorrelationId) {
Runnable runnable = unregister(pnfCorrelationId);
if (runnable != null) {