package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
-import org.openecomp.mso.bpmn.infrastructure.pnf.implementation.DmaapClient;
import org.openecomp.mso.jsonpath.JsonPathUtil;
import org.openecomp.mso.logger.MsoLogger;
-public class PnfEventReadyConsumer implements Runnable, DmaapClient {
+public class PnfEventReadyConsumer implements DmaapClient {
private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA);
private Map<String, Runnable> pnfCorrelationIdToThreadMap;
private HttpGet getRequest;
private ScheduledExecutorService executor;
- private int dmaapClientInitialDelayInSeconds;
private int dmaapClientDelayInSeconds;
- private boolean dmaapThreadListenerIsRunning;
+ private volatile boolean dmaapThreadListenerIsRunning;
public PnfEventReadyConsumer() {
httpClient = HttpClientBuilder.create().build();
getRequest = new HttpGet(buildURI());
}
- @Override
- public void run() {
+ //TODO: extract this logic to separate class and test it there to avoid using VisibleForTesting
+ @VisibleForTesting
+ void sendRequest() {
try {
HttpResponse response = httpClient.execute(getRequest);
getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound);
}
@Override
- public void registerForUpdate(String correlationId, Runnable informConsumer) {
+ public synchronized void registerForUpdate(String correlationId, Runnable informConsumer) {
pnfCorrelationIdToThreadMap.put(correlationId, informConsumer);
if (!dmaapThreadListenerIsRunning) {
startDmaapThreadListener();
}
}
- private void startDmaapThreadListener() {
- executor = Executors.newScheduledThreadPool(1);
- executor.scheduleWithFixedDelay(this, dmaapClientInitialDelayInSeconds,
- dmaapClientDelayInSeconds, TimeUnit.SECONDS);
- dmaapThreadListenerIsRunning = true;
+ private synchronized void startDmaapThreadListener() {
+ if (!dmaapThreadListenerIsRunning) {
+ executor = Executors.newScheduledThreadPool(1);
+ executor.scheduleWithFixedDelay(this::sendRequest, 0,
+ dmaapClientDelayInSeconds, TimeUnit.SECONDS);
+ dmaapThreadListenerIsRunning = true;
+ }
}
- private void stopDmaapThreadListener() {
+ private synchronized void stopDmaapThreadListener() {
if (dmaapThreadListenerIsRunning) {
executor.shutdownNow();
dmaapThreadListenerIsRunning = false;
}
- private void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
- pnfCorrelationIdToThreadMap.keySet().stream().filter(key -> key.equals(correlationId)).findAny()
- .ifPresent(this::informAboutPnfReady);
- }
-
- private void informAboutPnfReady(String correlationId) {
- pnfCorrelationIdToThreadMap.get(correlationId).run();
- pnfCorrelationIdToThreadMap.remove(correlationId);
+ private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
+ Runnable runnable = pnfCorrelationIdToThreadMap.remove(correlationId);
+ if (runnable != null) {
+ runnable.run();
- if (pnfCorrelationIdToThreadMap.isEmpty()) {
- stopDmaapThreadListener();
+ if (pnfCorrelationIdToThreadMap.isEmpty()) {
+ stopDmaapThreadListener();
+ }
}
}
this.consumerGroup = consumerGroup;
}
- public void setDmaapClientInitialDelayInSeconds(int dmaapClientInitialDelayInSeconds) {
- this.dmaapClientInitialDelayInSeconds = dmaapClientInitialDelayInSeconds;
- }
-
public void setDmaapClientDelayInSeconds(int dmaapClientDelayInSeconds) {
this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds;
}
testedObject.setDmaapTopicName(EVENT_TOPIC_TEST);
testedObject.setConsumerId(CONSUMER_ID);
testedObject.setConsumerGroup(CONSUMER_GROUP);
- testedObject.setDmaapClientInitialDelayInSeconds(1);
testedObject.setDmaapClientDelayInSeconds(1);
testedObject.init();
httpClientMock = mock(HttpClient.class);
throws IOException {
when(httpClientMock.execute(any(HttpGet.class))).
thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID)));
- testedObject.run();
+ testedObject.sendRequest();
ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class);
verify(httpClientMock).execute(captor1.capture());
assertThat(captor1.getValue().getURI()).hasHost(HOST).hasPort(PORT).hasScheme(PROTOCOL)
when(httpClientMock.execute(any(HttpGet.class))).
thenReturn(createResponse(
String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID_NOT_FOUND_IN_MAP)));
- testedObject.run();
+ testedObject.sendRequest();
verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
}
public void correlationIdIsNotFoundInHttpResponse() throws IOException {
when(httpClientMock.execute(any(HttpGet.class))).
thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_CORRELATION_ID));
- testedObject.run();
+ testedObject.sendRequest();
verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
}