<junit4.version>4.12</junit4.version>
<jsonschema.version>1.3.0</jsonschema.version>
<xerces.version>2.11.0</xerces.version>
+ <reactor.test>3.1.0.RELEASE</reactor.test>
<!-- Plugin Versions -->
<shade.plugin.version>3.2.0</shade.plugin.version>
<jacoco.version>0.8.2</jacoco.version>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ <version>${reactor.test}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.everit.json</groupId>
<artifactId>org.everit.json.schema</artifactId>
import org.onap.dcaegen2.services.pmmapper.filtering.MetadataFilter;
import org.onap.dcaegen2.services.pmmapper.filtering.MeasFilterHandler;
import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
+import org.onap.dcaegen2.services.pmmapper.messagerouter.VESPublisher;
import org.onap.dcaegen2.services.pmmapper.model.Event;
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
import org.onap.dcaegen2.services.pmmapper.healthcheck.HealthCheckHandler;
Mapper mapper = new Mapper(mappingTemplate);
MeasSplitter splitter = new MeasSplitter(measConverter);
XMLValidator validator = new XMLValidator(xmlSchema);
+ VESPublisher vesPublisher = new VESPublisher(mapperConfig);
flux.onBackpressureDrop(App::handleBackPressure)
.doOnNext(App::receiveRequest)
.concatMap(event -> App.split(splitter,event, mapperConfig))
.filter(events -> App.filter(filterHandler, events, mapperConfig))
.concatMap(events -> App.map(mapper, events, mapperConfig))
+ .concatMap(vesPublisher::publish)
.subscribe(events -> logger.unwrap().info("Event Processed"));
DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next, mapperConfig);
private JsonObject getBusControllerSubscribeBody(MapperConfig config) {
JsonObject subscriberObj = new JsonObject();
- subscriberObj.addProperty("dcaeLocationName", config.getDcaeLocation());
+ subscriberObj.addProperty("dcaeLocationName", config.getSubscriberDcaeLocation());
subscriberObj.addProperty("deliveryURL", config.getBusControllerDeliveryUrl());
subscriberObj.addProperty("feedId", config.getDmaapDRFeedId());
subscriberObj.addProperty("lastMod", Instant.now().toString());
--- /dev/null
+/*-\r
+ * ============LICENSE_START=======================================================\r
+ * Copyright (C) 2019 Nordix Foundation.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ *\r
+ * SPDX-License-Identifier: Apache-2.0\r
+ * ============LICENSE_END=========================================================\r
+ */\r
+\r
+package org.onap.dcaegen2.services.pmmapper.exceptions;\r
+\r
+public class MRPublisherException extends RuntimeException{\r
+ public MRPublisherException(String message, Throwable cause) {\r
+ super(message, cause);\r
+ }\r
+\r
+}\r
--- /dev/null
+/*-\r
+ * ============LICENSE_START=======================================================\r
+ * Copyright (C) 2019 Nordix Foundation.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ *\r
+ * SPDX-License-Identifier: Apache-2.0\r
+ * ============LICENSE_END=========================================================\r
+ */\r
+\r
+package org.onap.dcaegen2.services.pmmapper.messagerouter;\r
+\r
+import java.util.List;\r
+\r
+import org.onap.dcaegen2.services.pmmapper.exceptions.MRPublisherException;\r
+import org.onap.dcaegen2.services.pmmapper.model.Event;\r
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;\r
+import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;\r
+import org.onap.logging.ref.slf4j.ONAPLogAdapter;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import reactor.core.publisher.Flux;\r
+\r
+public class VESPublisher {\r
+ private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(VESPublisher.class));\r
+ private RequestSender sender;\r
+ private MapperConfig config;\r
+\r
+ public VESPublisher(MapperConfig config) {\r
+ this(config, new RequestSender());\r
+ }\r
+\r
+ public VESPublisher(MapperConfig config, RequestSender sender) {\r
+ this.sender = sender;\r
+ this.config = config;\r
+ }\r
+\r
+ public Flux<Event> publish(List<Event> events) {\r
+ logger.unwrap().info("Publishing VES events to messagerouter.");\r
+ Event event = events.get(0);\r
+ try {\r
+ events.forEach(e -> this.publish(e.getVes()));\r
+ logger.unwrap().info("Successfully published VES events to messagerouter.");\r
+ } catch(MRPublisherException e) {\r
+ logger.unwrap().error("Failed to publish VES event(s) to messagerouter. {}", e.getMessage());\r
+ return Flux.empty();\r
+ }\r
+ return Flux.just(event);\r
+ }\r
+\r
+ private void publish(String ves) {\r
+ try {\r
+ String topicUrl = config.getPublisherTopicUrl();\r
+ sender.send("POST", topicUrl, ves);\r
+ } catch (Exception e) {\r
+ throw new MRPublisherException(e.getMessage(), e);\r
+ }\r
+ }\r
+}\r
return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getSubscriberId();\r
}\r
\r
+ public String getSubscriberDcaeLocation() {\r
+ return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getLocation();\r
+ }\r
+\r
+ public String getPublisherTopicUrl() {\r
+ return this.getStreamsPublishes().getDmaapPublisher().getDmaapInfo().getTopicUrl();\r
+ }\r
+\r
public boolean dmaapInfoEquals(MapperConfig mapperConfig){\r
return this\r
.getStreamsSubscribes()\r
package org.onap.dcaegen2.services.pmmapper.utils;\r
\r
import java.io.BufferedReader;\r
+import java.io.IOException;\r
import java.io.InputStream;\r
import java.io.InputStreamReader;\r
+import java.io.OutputStream;\r
import java.net.HttpURLConnection;\r
import java.net.URL;\r
+import java.nio.charset.StandardCharsets;\r
import java.util.UUID;\r
import java.util.stream.Collectors;\r
\r
private static final int ERROR_START_RANGE = 300;\r
private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(RequestSender.class));\r
public static final String DELETE = "DELETE";\r
+ public static final String DEFAULT_CONTENT_TYPE = "text/plain";\r
+ public static final int DEFAULT_READ_TIMEOUT = 5000;\r
\r
/**\r
- * Sends an Http GET request to a given endpoint.\r
- *\r
- * @return http response body\r
- * @throws Exception\r
- * @throws InterruptedException\r
+ * Works just like {@link RequestSender#send(method,urlString)}, except {@code method }\r
+ * is set to {@code GET} by default.\r
+ * @see RequestSender#send(String,String,String)\r
*/\r
-\r
public String send(final String urlString) throws Exception {\r
return send("GET", urlString);\r
}\r
\r
+ /**\r
+ * Works just like {@link RequestSender#send(method,urlString,body)}, except {@code body }\r
+ * is set to empty String by default.\r
+ * @see RequestSender#send(String,String,String)\r
+ */\r
+ public String send(String method, final String urlString) throws Exception {\r
+ return send(method,urlString,"");\r
+ }\r
\r
/**\r
- * Sends a request to a given endpoint.\r
+ * Sends an http request to a given endpoint.\r
* @param method of the outbound request\r
* @param urlString representing given endpoint\r
+ * @param body of the request as json\r
* @return http response body\r
* @throws Exception\r
*/\r
- public String send(String method, final String urlString) throws Exception {\r
+ public String send(String method, final String urlString, final String body) throws Exception {\r
final UUID invocationID = logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS);\r
final UUID requestID = UUID.randomUUID();\r
String result = "";\r
\r
for (int i = 1; i <= MAX_RETRIES; i++) {\r
- URL url = new URL(urlString);\r
- HttpURLConnection connection = (HttpURLConnection) url.openConnection();\r
- connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString());\r
- connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString());\r
- connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);\r
- connection.setRequestMethod(method);\r
- logger.unwrap()\r
- .info("Sending:\n{}", connection.getRequestProperties());\r
+ final URL url = new URL(urlString);\r
+ final HttpURLConnection connection = getHttpURLConnection(method, url, invocationID, requestID);\r
+ if(!body.isEmpty()) {\r
+ setMessageBody(connection, body);\r
+ }\r
+ logger.unwrap().info("Sending {} request to {}.", method, urlString);\r
\r
try (InputStream is = connection.getInputStream();\r
BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {\r
.collect(Collectors.joining("\n"));\r
int responseCode = connection.getResponseCode();\r
if (!(isWithinErrorRange(responseCode))) {\r
- logger.unwrap()\r
- .info("Received:\n{}", result);\r
+ logger.unwrap().info("Server Response Received:\n{}", result);\r
break;\r
}\r
-\r
} catch (Exception e) {\r
if (retryLimitReached(i)) {\r
- logger.unwrap()\r
- .error("Execution error: "+connection.getResponseMessage(), e);\r
+ logger.unwrap().error("Execution error: "+connection.getResponseMessage(), e);\r
throw new Exception(SERVER_ERROR_MESSAGE + ": " + connection.getResponseMessage(), e);\r
}\r
}\r
return result;\r
}\r
\r
+ private HttpURLConnection getHttpURLConnection(String method, URL url, UUID invocationID, UUID requestID) throws Exception {\r
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();\r
+ connection.setReadTimeout(DEFAULT_READ_TIMEOUT);\r
+ connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString());\r
+ connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString());\r
+ connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);\r
+ connection.setRequestMethod(method);\r
+\r
+ return connection;\r
+ }\r
+\r
+ private void setMessageBody(HttpURLConnection connection, String body) throws IOException {\r
+ connection.setRequestProperty("Content-Type",DEFAULT_CONTENT_TYPE);\r
+ connection.setDoOutput(true);\r
+ OutputStream outputStream = connection.getOutputStream();\r
+ outputStream.write(body.getBytes(StandardCharsets.UTF_8));\r
+ outputStream.flush();\r
+ outputStream.close();\r
+ }\r
+\r
private boolean retryLimitReached(final int retryCount) {\r
return retryCount >= MAX_RETRIES;\r
}\r
--- /dev/null
+/*-\r
+ * ============LICENSE_START=======================================================\r
+ * Copyright (C) 2019 Nordix Foundation.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ *\r
+ * SPDX-License-Identifier: Apache-2.0\r
+ * ============LICENSE_END=========================================================\r
+ */\r
+package org.onap.dcaegen2.pmmapper.messagerouter;\r
+import static org.junit.jupiter.api.Assertions.assertThrows;\r
+import static org.mockito.Mockito.mock;\r
+import static org.mockito.Mockito.times;\r
+import static org.mockito.Mockito.verify;\r
+import static org.mockito.Mockito.when;\r
+import reactor.test.StepVerifier;\r
+import java.util.Arrays;\r
+import java.util.List;\r
+\r
+import org.junit.Before;\r
+import org.junit.Test;\r
+import org.junit.runner.RunWith;\r
+import org.mockito.Mockito;\r
+import org.onap.dcaegen2.services.pmmapper.exceptions.MRPublisherException;\r
+import org.onap.dcaegen2.services.pmmapper.messagerouter.VESPublisher;\r
+import org.onap.dcaegen2.services.pmmapper.model.EnvironmentConfig;\r
+import org.onap.dcaegen2.services.pmmapper.model.Event;\r
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;\r
+import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;\r
+import org.powermock.core.classloader.annotations.PrepareForTest;\r
+import org.powermock.modules.junit4.PowerMockRunner;\r
+import reactor.core.publisher.Flux;\r
+\r
+@RunWith(PowerMockRunner.class)\r
+@PrepareForTest(EnvironmentConfig.class)\r
+public class VESPublisherTest {\r
+\r
+ private static String topicURL = "http://mr/topic";\r
+ private static RequestSender sender;\r
+ private static MapperConfig config;\r
+ private VESPublisher sut;\r
+ private String ves = "{}";\r
+\r
+ @Before\r
+ public void before() throws Exception {\r
+ config = mock(MapperConfig.class);\r
+ sender = mock(RequestSender.class);\r
+ sut = new VESPublisher(config, sender);\r
+ when(config.getPublisherTopicUrl()).thenReturn(topicURL);\r
+ }\r
+\r
+ @Test\r
+ public void publish_multiple_success() throws Exception {\r
+ Event event = mock(Event.class);\r
+ List<Event> events = Arrays.asList(event,event,event);\r
+ when(event.getVes()).thenReturn(ves);\r
+\r
+ Flux<Event> flux = sut.publish(events);\r
+\r
+ verify(sender, times(3)).send(Mockito.anyString(),Mockito.anyString(), Mockito.anyString());\r
+ StepVerifier.create(flux)\r
+ .expectNextMatches(event::equals)\r
+ .expectComplete()\r
+ .verify();\r
+ }\r
+\r
+ @Test\r
+ public void publish_multiple_fail() throws Exception {\r
+ Event event = mock(Event.class);\r
+ List<Event> events = Arrays.asList(event,event,event);\r
+ when(event.getVes()).thenReturn(ves);\r
+ when(sender.send("POST",topicURL,ves)).thenThrow(Exception.class);\r
+\r
+ Flux<Event> flux = sut.publish(events);\r
+\r
+ StepVerifier.create(flux)\r
+ .verifyComplete();\r
+ }\r
+}\r