* ================================================================================
* Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
* ================================================================================
+ * Copyright (C) 2018 Nokia.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
package org.onap.so.bpmn.infrastructure.pnf.dmaap;
import java.io.IOException;
-import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
import javax.ws.rs.core.UriBuilder;
-
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
-import org.onap.so.logger.MsoLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Scope;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
@Component
-@Scope("prototype")
public class PnfEventReadyDmaapClient implements DmaapClient {
- private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA, PnfEventReadyDmaapClient.class);
+ private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyDmaapClient.class);
- private final Environment env;
private HttpClient httpClient;
- private String dmaapHost;
- private int dmaapPort;
- private String dmaapProtocol;
- private String dmaapUriPathPrefix;
- private String dmaapTopicName;
- private String consumerId;
- private String consumerGroup;
private Map<String, Runnable> pnfCorrelationIdToThreadMap;
private HttpGet getRequest;
- private int dmaapClientDelayInSeconds;
+ private int topicListenerDelayInSeconds;
private volatile ScheduledThreadPoolExecutor executor;
private volatile boolean dmaapThreadListenerIsRunning;
@Autowired
public PnfEventReadyDmaapClient(Environment env) {
- this.env = env;
- }
-
- public void init() {
httpClient = HttpClientBuilder.create().build();
pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
- dmaapHost = env.getProperty("pnf.dmaap.host");
- dmaapPort = env.getProperty("pnf.dmaap.port", Integer.class);
+ topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class);
executor = null;
- getRequest = new HttpGet(buildURI());
+ getRequest = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix"))
+ .scheme(env.getProperty("pnf.dmaap.protocol"))
+ .host(env.getProperty("pnf.dmaap.host"))
+ .port(env.getProperty("pnf.dmaap.port", Integer.class))
+ .path(env.getProperty("pnf.dmaap.topicName"))
+ .path(env.getProperty("pnf.dmaap.consumerGroup"))
+ .path(env.getProperty("pnf.dmaap.consumerId")).build());
}
@Override
- public synchronized void registerForUpdate(String correlationId, Runnable informConsumer) {
- LOGGER.debug("registering for pnf ready dmaap event for correlation id: " + correlationId);
- pnfCorrelationIdToThreadMap.put(correlationId, informConsumer);
+ public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) {
+ logger.debug("registering for pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId);
+ pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer);
if (!dmaapThreadListenerIsRunning) {
startDmaapThreadListener();
}
}
@Override
- public synchronized Runnable unregister(String correlationId) {
- LOGGER.debug("unregistering from pnf ready dmaap event for correlation id: " + correlationId);
- Runnable runnable = pnfCorrelationIdToThreadMap.remove(correlationId);
+ public synchronized Runnable unregister(String pnfCorrelationId) {
+ logger.debug("unregistering from pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId);
+ Runnable runnable = pnfCorrelationIdToThreadMap.remove(pnfCorrelationId);
if (pnfCorrelationIdToThreadMap.isEmpty()) {
stopDmaapThreadListener();
}
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0,
- dmaapClientDelayInSeconds, TimeUnit.SECONDS);
+ topicListenerDelayInSeconds, TimeUnit.SECONDS);
dmaapThreadListenerIsRunning = true;
}
}
}
}
- private URI buildURI() {
- return UriBuilder.fromUri(dmaapUriPathPrefix)
- .scheme(dmaapProtocol)
- .host(dmaapHost)
- .port(dmaapPort).path(dmaapTopicName)
- .path(consumerGroup).path(consumerId).build();
- }
-
- public void setDmaapProtocol(String dmaapProtocol) {
- this.dmaapProtocol = dmaapProtocol;
- }
-
- public void setDmaapUriPathPrefix(String dmaapUriPathPrefix) {
- this.dmaapUriPathPrefix = dmaapUriPathPrefix;
- }
-
- public void setDmaapTopicName(String dmaapTopicName) {
- this.dmaapTopicName = dmaapTopicName;
- }
-
- public void setConsumerId(String consumerId) {
- this.consumerId = consumerId;
- }
-
- public void setConsumerGroup(String consumerGroup) {
- this.consumerGroup = consumerGroup;
- }
-
- public void setDmaapClientDelayInSeconds(int dmaapClientDelayInSeconds) {
- this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds;
- }
-
class DmaapTopicListenerThread implements Runnable {
@Override
public void run() {
try {
+ logger.debug("dmaap listener starts listening pnf ready dmaap topic");
HttpResponse response = httpClient.execute(getRequest);
- getCorrelationIdListFromResponse(response).forEach(this::informAboutPnfReadyIfCorrelationIdFound);
+ getPnfCorrelationIdListFromResponse(response).forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound);
} catch (IOException e) {
- LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e);
+ logger.error("Exception caught during sending rest request to dmaap for listening event topic", e);
+ }
+ finally {
+ getRequest.reset();
}
}
- private List<String> getCorrelationIdListFromResponse(HttpResponse response) throws IOException {
+ private List<String> getPnfCorrelationIdListFromResponse(HttpResponse response) throws IOException {
if (response.getStatusLine().getStatusCode() == 200) {
String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");
if (responseString != null) {
- return JsonUtilForCorrelationId.parseJsonToGelAllCorrelationId(responseString);
+ return JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(responseString);
}
}
return Collections.emptyList();
}
- private void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
- Runnable runnable = unregister(correlationId);
+ private void informAboutPnfReadyIfPnfCorrelationIdFound(String pnfCorrelationId) {
+ Runnable runnable = unregister(pnfCorrelationId);
if (runnable != null) {
- LOGGER.debug("pnf ready event got from dmaap for correlationId: " + correlationId);
+ logger.debug("dmaap listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId);
runnable.run();
}
}