package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap;
-import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import org.openecomp.mso.jsonpath.JsonPathUtil;
import org.openecomp.mso.logger.MsoLogger;
-public class PnfEventReadyConsumer implements DmaapClient {
+public class PnfEventReadyDmaapClient implements DmaapClient {
private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA);
private int dmaapClientDelayInSeconds;
private volatile boolean dmaapThreadListenerIsRunning;
- public PnfEventReadyConsumer() {
+ public PnfEventReadyDmaapClient() {
httpClient = HttpClientBuilder.create().build();
pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
executor = null;
getRequest = new HttpGet(buildURI());
}
- //TODO: extract this logic to separate class and test it there to avoid using VisibleForTesting
- @VisibleForTesting
- void sendRequest() {
- try {
- HttpResponse response = httpClient.execute(getRequest);
- getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound);
- } catch (IOException e) {
- LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e);
- }
- }
-
@Override
public synchronized void registerForUpdate(String correlationId, Runnable informConsumer) {
pnfCorrelationIdToThreadMap.put(correlationId, informConsumer);
private synchronized void startDmaapThreadListener() {
if (!dmaapThreadListenerIsRunning) {
executor = Executors.newScheduledThreadPool(1);
- executor.scheduleWithFixedDelay(this::sendRequest, 0,
+ executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0,
dmaapClientDelayInSeconds, TimeUnit.SECONDS);
dmaapThreadListenerIsRunning = true;
}
.path(consumerGroup).path(consumerId).build();
}
- private Optional<String> getCorrelationIdFromResponse(HttpResponse response) throws IOException {
- if (response.getStatusLine().getStatusCode() == 200) {
- String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");
- if (responseString != null) {
- return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID);
- }
- }
- return Optional.empty();
- }
-
-
- private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
- Runnable runnable = unregister(correlationId);
- if (runnable != null) {
- runnable.run();
- }
- }
-
public void setDmaapHost(String dmaapHost) {
this.dmaapHost = dmaapHost;
}
this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds;
}
+ class DmaapTopicListenerThread implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ HttpResponse response = httpClient.execute(getRequest);
+ getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound);
+ } catch (IOException e) {
+ LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e);
+ }
+ }
+
+ private Optional<String> getCorrelationIdFromResponse(HttpResponse response) throws IOException {
+ if (response.getStatusLine().getStatusCode() == 200) {
+ String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");
+ if (responseString != null) {
+ return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID);
+ }
+ }
+ return Optional.empty();
+ }
+
+ private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
+ Runnable runnable = unregister(correlationId);
+ if (runnable != null) {
+ runnable.run();
+ }
+ }
+ }
+
}
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread;
-public class PnfEventReadyConsumerTest {
+public class PnfEventReadyDmaapClientTest {
private static final String CORRELATION_ID = "corrTestId";
private static final String CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId";
private static final String CONSUMER_ID = "consumerTestId";
private static final String CONSUMER_GROUP = "consumerGroupTest";
- private PnfEventReadyConsumer testedObject;
+ private PnfEventReadyDmaapClient testedObject;
+ private DmaapTopicListenerThread testedObjectInnerClassThread;
private HttpClient httpClientMock;
private Runnable threadMockToNotifyCamundaFlow;
private ScheduledExecutorService executorMock;
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
- testedObject = new PnfEventReadyConsumer();
+ testedObject = new PnfEventReadyDmaapClient();
testedObject.setDmaapHost(HOST);
testedObject.setDmaapPort(PORT);
testedObject.setDmaapProtocol(PROTOCOL);
testedObject.setConsumerGroup(CONSUMER_GROUP);
testedObject.setDmaapClientDelayInSeconds(1);
testedObject.init();
+ testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread();
httpClientMock = mock(HttpClient.class);
threadMockToNotifyCamundaFlow = mock(Runnable.class);
executorMock = mock(ScheduledExecutorService.class);
throws IOException {
when(httpClientMock.execute(any(HttpGet.class))).
thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID)));
- testedObject.sendRequest();
+ testedObjectInnerClassThread.run();
ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class);
verify(httpClientMock).execute(captor1.capture());
assertThat(captor1.getValue().getURI()).hasHost(HOST).hasPort(PORT).hasScheme(PROTOCOL)
when(httpClientMock.execute(any(HttpGet.class))).
thenReturn(createResponse(
String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID_NOT_FOUND_IN_MAP)));
- testedObject.sendRequest();
+ testedObjectInnerClassThread.run();
verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
}
public void correlationIdIsNotFoundInHttpResponse() throws IOException {
when(httpClientMock.execute(any(HttpGet.class))).
thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_CORRELATION_ID));
- testedObject.sendRequest();
+ testedObjectInnerClassThread.run();
verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
}