X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=bpmn%2Fso-bpmn-infrastructure-common%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fso%2Fbpmn%2Finfrastructure%2Fpnf%2Fdmaap%2FPnfEventReadyDmaapClient.java;h=1a253887dd727d05e9c45c4b6e4a414179a3bf68;hb=7a475e244b329f9f179d30c8fc96aed6045037ce;hp=a2c73ca639ae2c89f3bc10aed0d2d45d39699fa2;hpb=d04f54dedba3e4ade3ac92282c1e3a408a29e70b;p=so.git diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java index a2c73ca639..1a253887dd 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java @@ -19,7 +19,6 @@ * limitations under the License. * ============LICENSE_END========================================================= */ - package org.onap.so.bpmn.infrastructure.pnf.dmaap; import java.io.IOException; @@ -43,15 +42,15 @@ import org.springframework.stereotype.Component; @Component public class PnfEventReadyDmaapClient implements DmaapClient { - private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyDmaapClient.class); - private HttpClient httpClient; private Map pnfCorrelationIdToThreadMap; - private HttpGet getRequest; + private HttpGet getRequestForpnfReady; + private HttpGet getRequestForPnfUpdate; private int topicListenerDelayInSeconds; private volatile ScheduledThreadPoolExecutor executor; private volatile boolean dmaapThreadListenerIsRunning; + private String topicName; @Autowired public PnfEventReadyDmaapClient(Environment env) { @@ -59,9 +58,27 @@ public class PnfEventReadyDmaapClient implements DmaapClient { pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>(); topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class); executor = null; - getRequest = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix")) + topicName = env.getProperty("pnf.dmaap.topicName"); + String[] topic = topicName.split("\\s"); + String pnf_ready = null; + String pnf_update = null; + for (String t : topic) { + if (t.matches("(.*)PNF_READY(.*)")) { + pnf_ready = t; + } else if (t.matches("(.*)PNF_UPDATE(.*)")) { + pnf_update = t; + } else { + return; + } + } + getRequestForpnfReady = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix")) .scheme(env.getProperty("pnf.dmaap.protocol")).host(env.getProperty("pnf.dmaap.host")) - .port(env.getProperty("pnf.dmaap.port", Integer.class)).path(env.getProperty("pnf.dmaap.topicName")) + .port(env.getProperty("pnf.dmaap.port", Integer.class)).path(pnf_ready) + .path(env.getProperty("pnf.dmaap.consumerGroup")).path(env.getProperty("pnf.dmaap.consumerId")) + .build()); + getRequestForPnfUpdate = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix")) + .scheme(env.getProperty("pnf.dmaap.protocol")).host(env.getProperty("pnf.dmaap.host")) + .port(env.getProperty("pnf.dmaap.port", Integer.class)).path(pnf_update) .path(env.getProperty("pnf.dmaap.consumerGroup")).path(env.getProperty("pnf.dmaap.consumerId")) .build()); } @@ -105,17 +122,24 @@ public class PnfEventReadyDmaapClient implements DmaapClient { } class DmaapTopicListenerThread implements Runnable { - @Override public void run() { try { - logger.debug("dmaap listener starts listening pnf ready dmaap topic"); - HttpResponse response = httpClient.execute(getRequest); - getPnfCorrelationIdListFromResponse(response).forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound); + HttpResponse response; + response = httpClient.execute(getRequestForPnfUpdate); + List pnfUpdateResponse = getPnfCorrelationIdListFromResponse(response); + if (pnfUpdateResponse.isEmpty()) { + response = httpClient.execute(getRequestForpnfReady); + getPnfCorrelationIdListFromResponse(response) + .forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound); + } else { + pnfUpdateResponse.forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound); + } } catch (IOException e) { logger.error("Exception caught during sending rest request to dmaap for listening event topic", e); } finally { - getRequest.reset(); + getRequestForpnfReady.reset(); + getRequestForPnfUpdate.reset(); } } @@ -137,5 +161,4 @@ public class PnfEventReadyDmaapClient implements DmaapClient { } } } - }