## [1.6.0] - 25/03/2021
### Changed
+- Utilize DMaaP-Client in PM-Mapper
- Switched Dockerfile to integration image (alpine-based)
## [1.5.2] - 18/03/2021
<logback.version>1.2.3</logback.version>
<reactor.version>3.4.0</reactor.version>
<undertow.version>2.2.3.Final</undertow.version>
- <gson.version>2.8.5</gson.version>
+ <gson.version>2.8.6</gson.version>
<freemarker.version>2.3.28</freemarker.version>
<commons.io.version>2.8.0</commons.io.version>
<xml.version>2.3.1</xml.version>
<jaxb.version>2.3.0.1</jaxb.version>
+ <dmaap-clinet.version>1.8.2</dmaap-clinet.version>
<!-- Testing Test Dependencies -->
<junit.version>5.3.2</junit.version>
<mockito.version>2.23.4</mockito.version>
<mockito-ju5-ext.version>2.23.4</mockito-ju5-ext.version>
<powermock.version>2.0.7</powermock.version>
- <mockserver.version>3.10.8</mockserver.version>
+ <mockserver.version>5.11.2</mockserver.version>
<junit4.version>4.12</junit4.version>
- <jsonschema.version>1.3.0</jsonschema.version>
+ <jsonschema.version>1.5.1</jsonschema.version>
+ <json.version>20210307</json.version>
+ <jackson.version>2.12.2</jackson.version>
<xerces.version>2.11.0</xerces.version>
<reactor.test>3.4.0</reactor.test>
<!-- Plugin Versions -->
<artifactId>jaxb-core</artifactId>
<version>${jaxb.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+ <artifactId>dmaap-client</artifactId>
+ <version>${dmaap-clinet.version}</version>
+ </dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-impl</artifactId>
<version>${mockserver.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.json</groupId>
+ <artifactId>json</artifactId>
+ <version>${json.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${jackson.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
/*-\r
* ============LICENSE_START=======================================================\r
* Copyright (C) 2019 Nordix Foundation.\r
+ * Copyright (C) 2021 Nokia.\r
* ================================================================================\r
* Licensed under the Apache License, Version 2.0 (the "License");\r
* you may not use this file except in compliance with the License.\r
package org.onap.dcaegen2.services.pmmapper.exceptions;\r
\r
public class MRPublisherException extends RuntimeException{\r
- public MRPublisherException(String message, Throwable cause) {\r
- super(message, cause);\r
+ public MRPublisherException(String message) {\r
+ super(message);\r
}\r
-\r
}\r
/*-\r
* ============LICENSE_START=======================================================\r
* Copyright (C) 2019 Nordix Foundation.\r
+ * Copyright (C) 2021 Nokia.\r
* ================================================================================\r
* Licensed under the Apache License, Version 2.0 (the "License");\r
* you may not use this file except in compliance with the License.\r
\r
package org.onap.dcaegen2.services.pmmapper.messagerouter;\r
\r
-import java.nio.charset.StandardCharsets;\r
-import java.util.Base64;\r
-import java.util.List;\r
-\r
import org.onap.dcaegen2.services.pmmapper.exceptions.MRPublisherException;\r
import org.onap.dcaegen2.services.pmmapper.model.Event;\r
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;\r
-import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;\r
+import org.onap.dcaegen2.services.pmmapper.utils.DmaapRequestSender;\r
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;\r
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;\r
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse;\r
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;\r
import org.onap.logging.ref.slf4j.ONAPLogAdapter;\r
import org.slf4j.LoggerFactory;\r
-\r
import reactor.core.publisher.Flux;\r
\r
+import java.util.List;\r
+import java.util.stream.Collectors;\r
+\r
public class VESPublisher {\r
private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(VESPublisher.class));\r
- private RequestSender sender;\r
- private MapperConfig config;\r
+ private final DmaapRequestSender sender;\r
+ private final MapperConfig config;\r
\r
public VESPublisher(MapperConfig config) {\r
- this(config, new RequestSender());\r
+ this(config, new DmaapRequestSender());\r
}\r
\r
- public VESPublisher(MapperConfig config, RequestSender sender) {\r
+ public VESPublisher(MapperConfig config, DmaapRequestSender sender) {\r
this.sender = sender;\r
this.config = config;\r
}\r
\r
public Flux<Event> publish(List<Event> events) {\r
logger.unwrap().info("Publishing VES events to messagerouter.");\r
- Event event = events.get(0);\r
- try {\r
- events.forEach(e -> this.publish(e.getVes()));\r
- logger.unwrap().info("Successfully published VES events to messagerouter.");\r
- } catch (MRPublisherException e) {\r
- logger.unwrap().error("Failed to publish VES event(s) to messagerouter.", e);\r
- return Flux.empty();\r
- }\r
- return Flux.just(event);\r
+ Event first = events.get(0);\r
+ List<String> vesEvents = minifiedVesEvents(events);\r
+ return publishEvents(vesEvents)\r
+ .filter(DmaapResponse::failed)\r
+ .takeLast(1)\r
+ .flatMap(this::toFluxError)\r
+ .defaultIfEmpty(first)\r
+ .doOnComplete(() -> logger.unwrap().info("Successfully published VES events to messagerouter."))\r
+ .onErrorResume(this::resume);\r
+ }\r
+\r
+ private List<String> minifiedVesEvents(List<Event> events) {\r
+ return events.stream()\r
+ .map(Event::getVes)\r
+ .map(vesEvent -> vesEvent.replace("\n", ""))\r
+ .collect(Collectors.toList());\r
+ }\r
+\r
+ private Flux<MessageRouterPublishResponse> publishEvents(List<String> vesEvents) {\r
+ String topicUrl = config.getPublisherTopicUrl();\r
+ AafCredentials credentials = aafCredentials();\r
+ return sender.send(topicUrl, vesEvents, credentials);\r
+ }\r
+\r
+ private Flux<Event> toFluxError(MessageRouterPublishResponse response) {\r
+ return Flux.error(new MRPublisherException(response.failReason()));\r
+ }\r
+\r
+ private Flux<Event> resume(Throwable t) {\r
+ logger.unwrap().error("Failed to publish VES event(s) to messagerouter.", t);\r
+ return Flux.empty();\r
}\r
\r
- private void publish(String ves) {\r
- try {\r
- String topicUrl = config.getPublisherTopicUrl();\r
- ves = ves.replaceAll("\n", "");\r
- String userCredentials = Base64.getEncoder()\r
- .encodeToString((this.config.getPublisherUserName() + ":" +\r
- this.config.getPublisherPassword())\r
- .getBytes(StandardCharsets.UTF_8));\r
- sender.send("POST", topicUrl, ves, userCredentials);\r
- } catch (Exception e) {\r
- throw new MRPublisherException(e.getMessage(), e);\r
- }\r
+ private AafCredentials aafCredentials() {\r
+ return ImmutableAafCredentials.builder()\r
+ .username(config.getPublisherUserName())\r
+ .password(config.getPublisherPassword())\r
+ .build();\r
}\r
}\r
--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nokia.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import io.vavr.control.Try;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.ImmutableRequestDiagnosticContext;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
+import org.onap.logging.ref.slf4j.ONAPLogAdapter;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.onap.dcaegen2.services.pmmapper.utils.SendersConfig.MAX_RETRIES;
+import static org.onap.dcaegen2.services.pmmapper.utils.SendersConfig.RETRY_INTERVAL;
+
+public class DmaapRequestSender {
+ private static final ONAPLogAdapter LOGGER = new ONAPLogAdapter(LoggerFactory.getLogger(DmaapRequestSender.class));
+
+ private static final DmaapRetryConfig RETRY_CONFIG = ImmutableDmaapRetryConfig.builder()
+ .retryCount(MAX_RETRIES)
+ .retryIntervalInSeconds((int) RETRY_INTERVAL.getSeconds())
+ .build();
+ private static final MessageRouterPublisherConfig CLIENT_CONFIGURATION =
+ ImmutableMessageRouterPublisherConfig.builder()
+ .retryConfig(RETRY_CONFIG)
+ .build();
+ private static final DmaapTimeoutConfig READ_TIMEOUT = ImmutableDmaapTimeoutConfig.builder()
+ .timeout(SendersConfig.READ_TIMEOUT)
+ .build();
+ private static final MessageRouterPublisher PUBLISHER =
+ DmaapClientFactory.createMessageRouterPublisher(CLIENT_CONFIGURATION);
+
+ /**
+ * Sends an http request to a given dmaap-mr topic.
+ *
+ * @param topicUrl representing given topic
+ * @param vesEvents of the requests as json
+ * @param credentials base64-encoded username password credentials
+ * @return dmaap-mr response
+ */
+ public Flux<MessageRouterPublishResponse> send(final String topicUrl, final List<String> vesEvents, final AafCredentials credentials) {
+ MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder()
+ .contentType(ContentType.TEXT_PLAIN)
+ .sinkDefinition(sink(topicUrl, credentials))
+ .timeoutConfig(READ_TIMEOUT)
+ .diagnosticContext(diagnosticContext())
+ .build();
+
+ return PUBLISHER.put(request, jsonBatch(vesEvents));
+ }
+
+ private static MessageRouterSink sink(String topicUrl, AafCredentials credentials) {
+ return ImmutableMessageRouterSink.builder()
+ .aafCredentials(credentials)
+ .topicUrl(topicUrl)
+ .build();
+ }
+
+ private static RequestDiagnosticContext diagnosticContext() {
+ UUID invocationId = uuid(LoggingUtils.invocationID(LOGGER));
+ UUID requestId = uuid(LoggingUtils.requestID());
+ return ImmutableRequestDiagnosticContext.builder()
+ .invocationId(invocationId)
+ .requestId(requestId)
+ .build();
+ }
+
+ private static Flux<JsonElement> jsonBatch(List<String> events) {
+ return Flux.fromIterable(getAsJsonElements(events));
+ }
+
+ private static List<JsonElement> getAsJsonElements(List<String> events) {
+ return events.stream()
+ .map(JsonParser::parseString)
+ .collect(Collectors.toList());
+ }
+
+ private static UUID uuid(String s) {
+ return Try.of(() -> UUID.fromString(s))
+ .getOrElse(UUID::randomUUID);
+ }
+}
--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nokia.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import org.jboss.logging.MDC;
+import org.onap.logging.ref.slf4j.ONAPLogAdapter;
+import org.onap.logging.ref.slf4j.ONAPLogConstants;
+
+import java.util.Optional;
+import java.util.UUID;
+
+final class LoggingUtils {
+
+ private LoggingUtils() {
+ throw new IllegalStateException("Utility class;shouldn't be constructed");
+ }
+
+ static String invocationID(ONAPLogAdapter logger) {
+ return Optional.ofNullable((String) MDC.get(ONAPLogConstants.MDCs.INVOCATION_ID))
+ .orElseGet(()-> logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS).toString());
+ }
+
+ static String requestID() {
+ return Optional.ofNullable((String) MDC.get(ONAPLogConstants.MDCs.REQUEST_ID))
+ .orElseGet(() -> UUID.randomUUID().toString());
+ }
+}
/*-\r
* ============LICENSE_START=======================================================\r
* Copyright (C) 2019 Nordix Foundation.\r
+ * Copyright (C) 2021 Nokia.\r
* ================================================================================\r
* Licensed under the Apache License, Version 2.0 (the "License");\r
* you may not use this file except in compliance with the License.\r
import java.net.URL;\r
import java.nio.charset.StandardCharsets;\r
import java.security.NoSuchAlgorithmException;\r
-import java.util.Optional;\r
-import java.util.UUID;\r
import java.util.stream.Collectors;\r
\r
import org.onap.dcaegen2.services.pmmapper.exceptions.RequestFailure;\r
import org.onap.dcaegen2.services.pmmapper.exceptions.ServerResponseException;\r
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;\r
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;\r
import org.onap.logging.ref.slf4j.ONAPLogAdapter;\r
import org.onap.logging.ref.slf4j.ONAPLogConstants;\r
import org.slf4j.LoggerFactory;\r
\r
import javax.net.ssl.HttpsURLConnection;\r
import javax.net.ssl.SSLContext;\r
-import org.jboss.logging.MDC;\r
\r
public class RequestSender {\r
- private static final int MAX_RETRIES = 5;\r
- private static final int RETRY_INTERVAL = 1000;\r
private static final String SERVER_ERROR_MESSAGE = "Error on Server";\r
private static final int ERROR_START_RANGE = 300;\r
- private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(RequestSender.class));\r
- public static final String DELETE = "DELETE";\r
- public static final String DEFAULT_CONTENT_TYPE = "text/plain";\r
- public static final int DEFAULT_READ_TIMEOUT = 5000;\r
+ private static final ONAPLogAdapter LOGGER = new ONAPLogAdapter(LoggerFactory.getLogger(RequestSender.class));\r
\r
/**\r
* Works just like {@link RequestSender#send(method,urlString)}, except {@code method }\r
*/\r
public String send(String method, final String urlString, final String body, final String encodedCredentials)\r
throws InterruptedException {\r
- String invocationID = Optional.ofNullable((String)MDC.get(ONAPLogConstants.MDCs.INVOCATION_ID))\r
- .orElse(logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS).toString());\r
- String requestID = Optional.ofNullable((String)MDC.get(ONAPLogConstants.MDCs.REQUEST_ID))\r
- .orElse(UUID.randomUUID().toString());\r
String result = "";\r
boolean status = false;\r
- int attempts = 1;\r
+ int attempts = 0;\r
try {\r
- while (!status && attempts <= MAX_RETRIES) {\r
- if(attempts != 1) {\r
- Thread.sleep(RETRY_INTERVAL);\r
+ while (!status && attempts <= SendersConfig.MAX_RETRIES) {\r
+ if(attempts != 0) {\r
+ Thread.sleep(SendersConfig.RETRY_INTERVAL.toMillis());\r
}\r
final URL url = new URL(urlString);\r
- final HttpURLConnection connection = getHttpURLConnection(method, url, invocationID, requestID);\r
+ final HttpURLConnection connection = getHttpURLConnection(method, url);\r
\r
if ("https".equalsIgnoreCase(url.getProtocol())) {\r
HttpsURLConnection.setDefaultSSLSocketFactory(SSLContext.getDefault().getSocketFactory());\r
attempts++;\r
}\r
} catch (IOException | NoSuchAlgorithmException ex) {\r
- logger.unwrap().warn("Request failure", ex);\r
+ LOGGER.unwrap().warn("Request failure", ex);\r
throw new RequestFailure(ex);\r
}\r
return result;\r
}\r
\r
- private HttpURLConnection getHttpURLConnection(String method, URL url, String invocationID, String requestID)\r
+ private HttpURLConnection getHttpURLConnection(String method, URL url)\r
throws IOException {\r
HttpURLConnection connection = (HttpURLConnection) url.openConnection();\r
- connection.setReadTimeout(DEFAULT_READ_TIMEOUT);\r
- connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID);\r
- connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID);\r
+ connection.setReadTimeout((int) SendersConfig.READ_TIMEOUT.toMillis());\r
+ connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, LoggingUtils.requestID());\r
+ connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, LoggingUtils.invocationID(LOGGER));\r
connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);\r
connection.setRequestMethod(method);\r
-\r
return connection;\r
}\r
\r
private void setMessageBody(HttpURLConnection connection, String body) throws IOException {\r
- connection.setRequestProperty("Content-Type",DEFAULT_CONTENT_TYPE);\r
+ connection.setRequestProperty("Content-Type", ContentType.TEXT_PLAIN.toString());\r
connection.setDoOutput(true);\r
OutputStream outputStream = connection.getOutputStream();\r
outputStream.write(body.getBytes(StandardCharsets.UTF_8));\r
}\r
\r
private boolean retryLimitReached(final int retryCount) {\r
- return retryCount >= MAX_RETRIES;\r
+ return retryCount >= SendersConfig.MAX_RETRIES;\r
}\r
\r
private boolean isWithinErrorRange(final int responseCode) {\r
}\r
\r
private String getResult(int attemptNumber, HttpURLConnection connection) throws IOException {\r
- logger.unwrap().info("Sending {} request to {}.", connection.getRequestMethod(), connection.getURL());\r
+ LOGGER.unwrap().info("Sending {} request to {}.", connection.getRequestMethod(), connection.getURL());\r
String result = "";\r
try (InputStream is = connection.getInputStream();\r
BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {\r
result = reader.lines().collect(Collectors.joining("\n"));\r
int responseCode = connection.getResponseCode();\r
if (!(isWithinErrorRange(responseCode))) {\r
- logger.unwrap().info("Response code: {}, Server Response Received:\n{}", responseCode, result);\r
+ LOGGER.unwrap().info("Response code: {}, Server Response Received:\n{}", responseCode, result);\r
}\r
} catch (Exception e) {\r
if (retryLimitReached(attemptNumber)) {\r
- logger.unwrap().error("Execution error: {}", connection.getResponseMessage(), e);\r
+ LOGGER.unwrap().error("Execution error: {}", connection.getResponseMessage(), e);\r
throw new ServerResponseException(SERVER_ERROR_MESSAGE + ": " + connection.getResponseMessage(), e);\r
}\r
}\r
--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nokia.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import java.time.Duration;
+
+final class SendersConfig {
+
+ private SendersConfig() {
+ throw new IllegalStateException("SendersConfig class;shouldn't be constructed");
+ }
+
+ static final int MAX_RETRIES = 4;
+ static final Duration RETRY_INTERVAL = Duration.ofSeconds(1);
+ static final Duration READ_TIMEOUT = Duration.ofSeconds(5);
+}
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2021 Nokia.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.pmmapper.messagerouter;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.onap.dcaegen2.services.pmmapper.exceptions.RequestFailure;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import reactor.test.StepVerifier;
-import java.util.Arrays;
-import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.Mockito;
import org.onap.dcaegen2.services.pmmapper.messagerouter.VESPublisher;
-import org.onap.dcaegen2.services.pmmapper.utils.EnvironmentConfig;
import org.onap.dcaegen2.services.pmmapper.model.Event;
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
-import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;
+import org.onap.dcaegen2.services.pmmapper.utils.DmaapRequestSender;
+import org.onap.dcaegen2.services.pmmapper.utils.EnvironmentConfig;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest(EnvironmentConfig.class)
@PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*"})
public class VESPublisherTest {
- private static String topicURL = "http://mr/topic";
- private static RequestSender sender;
- private static MapperConfig config;
+ private static final String TOPIC_URL = "http://mr/topic";
+ private static final String VES = "{}";
+ private static final ImmutableAafCredentials AAF_CREDENTIALS = ImmutableAafCredentials.builder()
+ .username("")
+ .password("")
+ .build();
+
+ private static final MessageRouterPublishResponse SUCCESSFUL =
+ ImmutableMessageRouterPublishResponse.builder().build();
+ private static final MessageRouterPublishResponse FAILED =
+ ImmutableMessageRouterPublishResponse.builder()
+ .failReason("failReason")
+ .build();
+
+ private DmaapRequestSender sender;
private VESPublisher sut;
- private String ves = "{}";
@Before
- public void before() throws Exception {
- config = mock(MapperConfig.class);
- sender = mock(RequestSender.class);
+ public void before() {
+ MapperConfig config = mock(MapperConfig.class);
+ when(config.getPublisherTopicUrl()).thenReturn(TOPIC_URL);
+ when(config.getPublisherUserName()).thenReturn("");
+ when(config.getPublisherPassword()).thenReturn("");
+ sender = mock(DmaapRequestSender.class);
sut = new VESPublisher(config, sender);
- when(config.getPublisherTopicUrl()).thenReturn(topicURL);
}
@Test
- public void publish_multiple_success() throws Exception {
+ public void publish_multiple_success() {
+ Event event = mock(Event.class);
+ List<Event> events = Arrays.asList(event, event, event);
+ when(event.getVes()).thenReturn(VES);
+ MessageRouterPublishResponse successfulResponse = ImmutableMessageRouterPublishResponse.builder().build();
+ when(sender.send(any(), any(), any())).thenReturn(Flux.just(successfulResponse, successfulResponse));
+
+ Flux<Event> flux = sut.publish(events);
+
+ verify(sender, times(1)).send(anyString(), any(), any());
+ StepVerifier.create(flux)
+ .expectNextMatches(event::equals)
+ .verifyComplete();
+ }
+
+ @Test
+ public void publish_multiple_fail_sender_exceptions() {
+ Event event = mock(Event.class);
+ List<Event> events = Arrays.asList(event, event, event);
+ when(event.getVes()).thenReturn(VES);
+ when(sender.send(eq(TOPIC_URL), any(), eq(AAF_CREDENTIALS)))
+ .thenReturn(Flux.error(new RuntimeException()));
+
+ Flux<Event> flux = sut.publish(events);
+
+ StepVerifier.create(flux)
+ .verifyComplete();
+ }
+
+ @Test
+ public void publish_multiple_fail_mr_responses_failed() {
Event event = mock(Event.class);
- List<Event> events = Arrays.asList(event,event,event);
- when(event.getVes()).thenReturn(ves);
+ List<Event> events = Arrays.asList(event, event, event);
+ when(event.getVes()).thenReturn(VES);
+ when(sender.send(eq(TOPIC_URL), any(), eq(AAF_CREDENTIALS)))
+ .thenReturn(Flux.just(FAILED, FAILED, FAILED));
Flux<Event> flux = sut.publish(events);
- verify(sender, times(3)).send(Mockito.anyString(),Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
StepVerifier.create(flux)
- .expectNextMatches(event::equals)
- .expectComplete()
- .verify();
+ .verifyComplete();
}
@Test
- public void publish_multiple_fail() throws Exception {
+ public void publish_multiple_fail_and_multiple_success() {
Event event = mock(Event.class);
- List<Event> events = Arrays.asList(event,event,event);
- when(event.getVes()).thenReturn(ves);
- when(sender.send("POST",topicURL,ves,"base64encoded")).thenThrow(RequestFailure.class);
+ when(event.getVes()).thenReturn(VES);
+ List<Event> events = Arrays.asList(event, event, event, event);
+ when(sender.send(eq(TOPIC_URL), any(), eq(AAF_CREDENTIALS)))
+ .thenReturn(Flux.just(SUCCESSFUL, FAILED, SUCCESSFUL, FAILED));
Flux<Event> flux = sut.publish(events);
StepVerifier.create(flux)
- .expectNext(events.get(0))
- .verifyComplete();
+ .verifyComplete();
}
}
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockserver.client.server.MockServerClient;
+import org.mockserver.client.MockServerClient;
import org.mockserver.integration.ClientAndServer;
import org.mockserver.model.HttpRequest;
import org.onap.dcaegen2.services.pmmapper.filtering.MeasFilterHandler;
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019-2020 Nordix Foundation.
+ * Copyright (C) 2021 Nokia.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import org.everit.json.schema.Schema;
import org.everit.json.schema.loader.SchemaLoader;
+import org.json.JSONException;
import org.json.JSONObject;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@BeforeAll
- static void classSetup() throws IOException {
+ static void classSetup() throws IOException, JSONException {
JSONObject ves = new JSONObject(new String(Files.readAllBytes(schema)));
vesSchema = SchemaLoader.load(ves);
@ParameterizedTest
@MethodSource("getValidEvents")
- void testValidEvent(Event testEvent) {
+ void testValidEvent(Event testEvent) throws JSONException {
vesSchema.validate(new JSONObject(objUnderTest.map(testEvent)));
}
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 - 2020 Nordix Foundation.
+ * Copyright (C) 2021 Nokia.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
private static MapperConfig validConfig;
private SSLContextFactory sslContextFactory;
private static final Path validConfigPath = Paths.get("src/test/resources/valid_mapper_config.json");
+ private static final String DELETE = "DELETE";
@Test
public void processEventSuccessful() throws Exception {
Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class), publishIdentity);
assertEquals(serviceResponse, DataRouterUtils.processEvent(mockMapperConfig, testEvent));
- verify(mockConnection, times(1)).setRequestMethod(RequestSender.DELETE);
+ verify(mockConnection, times(1)).setRequestMethod(DELETE);
}
@Test
Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class), publishIdentity);
assertEquals(serviceResponse, DataRouterUtils.processEvent(mockMapperConfig, testEvent));
- verify(mockConnection, times(1)).setRequestMethod(RequestSender.DELETE);
+ verify(mockConnection, times(1)).setRequestMethod(DELETE);
}
@Test
PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL);
Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class), publishIdentity);
assertEquals(serviceResponse, DataRouterUtils.processEvent(mockMapperConfig, testEvent));
- verify(mockConnection, times(5)).setRequestMethod(RequestSender.DELETE);
+ verify(mockConnection, times(5)).setRequestMethod(DELETE);
}
@Test
--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nokia.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockserver.client.MockServerClient;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.HttpStatusCode;
+import org.mockserver.verify.VerificationTimes;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+public class DmaapRequestSenderTest {
+
+ private static final ImmutableAafCredentials AAF_CREDENTIALS = ImmutableAafCredentials.builder()
+ .username("")
+ .password("")
+ .build();
+ private static final List<String> SINGLE = Collections.singletonList("any");
+
+ private static ClientAndServer mockServer;
+ private final MockServerClient client = mockClient();
+
+ @BeforeClass
+ public static void setup() {
+ mockServer = startClientAndServer(35454);
+ }
+
+ @AfterClass
+ public static void teardown() {
+ mockServer.stop();
+ }
+
+ @Before
+ public void setUp() {
+ client.reset();
+ }
+
+ @Test
+ public void send_success() {
+ client.when(request()).respond(response()
+ .withStatusCode(HttpStatusCode.OK_200.code())
+ .withBody("ResponseBody"));
+
+ Flux<MessageRouterPublishResponse> result = new DmaapRequestSender()
+ .send("http://127.0.0.1:35454/once", SINGLE, AAF_CREDENTIALS);
+
+ StepVerifier.create(result)
+ .expectNextMatches(DmaapResponse::successful)
+ .verifyComplete();
+ client.verify(request(), VerificationTimes.once());
+ }
+
+ @Test
+ public void host_unavailable_retry_mechanism() {
+ client.when(request())
+ .respond(response().withStatusCode(HttpStatusCode.SERVICE_UNAVAILABLE_503.code()));
+
+ Flux<MessageRouterPublishResponse> result = new DmaapRequestSender()
+ .send("http://127.0.0.1:35454/anypath", SINGLE, AAF_CREDENTIALS);
+
+ StepVerifier.create(result)
+ .expectNextMatches(DmaapResponse::failed)
+ .verifyComplete();
+ client.verify(request(), VerificationTimes.exactly(5));
+ }
+
+ @Test
+ public void host_unknown() {
+ Flux<MessageRouterPublishResponse> result = new DmaapRequestSender()
+ .send("http://unknown-host:35454/host-is-unknown", SINGLE, AAF_CREDENTIALS);
+
+ StepVerifier.create(result)
+ .verifyError();
+ client.verify(request(), VerificationTimes.exactly(0));
+ }
+
+ private MockServerClient mockClient() {
+ return new MockServerClient("127.0.0.1", 35454);
+ }
+}
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019-2020 Nordix Foundation.
+ * Copyright (C) 2021 Nokia.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import java.net.UnknownHostException;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockserver.client.server.MockServerClient;
+import org.mockserver.client.MockServerClient;
import org.mockserver.integration.ClientAndServer;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpStatusCode;
import org.mockserver.verify.VerificationTimes;
-import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;
import org.onap.logging.ref.slf4j.ONAPLogConstants;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
@PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*"})
public class RequestSenderTests {
private static ClientAndServer mockServer;
- private MockServerClient client = mockClient();
+ private final MockServerClient client = mockClient();
@BeforeClass
public static void setup() {
mockServer.stop();
}
+ @Before
+ public void setUp() {
+ client.reset();
+ }
+
@Test
public void send_success() throws Exception {
String url = "http://127.0.0.1:35454/once";
assertTrue(logAppender.list.get(1).getMessage().contains("Sending"));
assertTrue(logAppender.list.get(2).getMessage().contains("Received"));
logAppender.stop();
- client.clear(req);
}
@Test
- public void host_unavailable_retry_mechanism() throws Exception {
+ public void host_unavailable_retry_mechanism() {
PowerMockito.mockStatic(Thread.class);
client.when(request())
});
client.verify(request(), VerificationTimes.exactly(5));
- client.clear(request());
}
@Test
});
client.verify(request(), VerificationTimes.exactly(0));
- client.clear(request());
}
private MockServerClient mockClient() {
return new MockServerClient("127.0.0.1", 35454);
}
-}
\ No newline at end of file
+}