<goals>
<goal>prepare-agent</goal>
</goals>
+ <configuration>
+ <excludes>
+ <exclude>*</exclude>
+ </excludes>
+ </configuration>
</execution>
<execution>
<id>report</id>
</goals>
</execution>
</executions>
- <configuration>
- <excludes>
- <exclude>**/*App.*</exclude>
- </excludes>
- </configuration>
</plugin>
</plugins>
</build>
Undertow.builder()
.addHttpListener(8081, "0.0.0.0")
- .setHandler(Handlers.routing().add("put", "/delivery", dataRouterSubscriber)
+ .setHandler(Handlers.routing().add("put", "/delivery/{filename}", dataRouterSubscriber)
.add("get", "/healthcheck", healthCheckHandler))
.build().start();
}
*/
@Data
public class DataRouterSubscriber implements HttpHandler {
+ public static final String METADATA_HEADER = "X-DMAAP-DR-META";
+ public static final String PUB_ID_HEADER = "X-DMAAP-DR-PUBLISH-ID";
+
private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(DataRouterSubscriber.class));
private static final int NUMBER_OF_ATTEMPTS = 5;
private static final int DEFAULT_TIMEOUT = 2000;
private static final int MAX_JITTER = 50;
- private static final String METADATA_HEADER = "X-ATT-DR-META";
private static final String BAD_METADATA_MESSAGE = "Malformed Metadata.";
private static final String NO_METADATA_MESSAGE = "Missing Metadata.";
*/
public void start(MapperConfig config) throws TooManyTriesException, InterruptedException {
try {
- logger.unwrap().info(ONAPLogConstants.Markers.ENTRY, "Starting subscription to DataRouter");
+ logger.unwrap().info("Starting subscription to DataRouter {}", ONAPLogConstants.Markers.ENTRY);
subscribe(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, config);
+ logger.unwrap().info("Successfully started DR Subscriber");
} finally {
- logger.unwrap().info(ONAPLogConstants.Markers.EXIT, "");
+ logger.unwrap().info("{}", ONAPLogConstants.Markers.EXIT);
}
}
subscriberObj.addProperty("lastMod", Instant.now().toString());
subscriberObj.addProperty("username", config.getBusControllerUserName());
subscriberObj.addProperty("userpwd", config.getBusControllerPassword());
+ subscriberObj.addProperty("privilegedSubscriber", true);
return subscriberObj;
}
Map<String,String> mdc = MDC.getCopyOfContextMap();
EventMetadata metadata = getMetadata(httpServerExchange);
+ String publishIdentity = httpServerExchange.getRequestHeaders().get(PUB_ID_HEADER).getFirst();
httpServerExchange.getRequestReceiver()
.receiveFullString((callbackExchange, body) ->
- httpServerExchange.dispatch(() -> eventReceiver.receive(new Event(callbackExchange, body, metadata, mdc)))
+ httpServerExchange.dispatch(() ->
+ eventReceiver.receive(new Event(callbackExchange, body, metadata, mdc, publishIdentity)))
);
} catch (NoMetadataException exception) {
logger.unwrap().info("Bad Request: no metadata found under '{}' header.", METADATA_HEADER, exception);
--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.exceptions;
+
+public class ProcessEventException extends RuntimeException{
+ public ProcessEventException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
private EventMetadata metadata;
@NonNull
private Map<String, String> mdc;
+ @NonNull
+ private String publishIdentity;
}
return new URL(this.getBusControllerSubscriptionEndpoint());\r
}\r
\r
+ public String getSubscriberIdentity(){\r
+ return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getSubscriberId();\r
+ }\r
@Getter\r
@EqualsAndHashCode\r
private class StreamsSubscribes {\r
--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
+import org.onap.logging.ref.slf4j.ONAPLogAdapter;
+import org.slf4j.LoggerFactory;
+
+public class DataRouterUtils {
+ private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(DataRouterUtils.class));
+
+ private DataRouterUtils(){
+ throw new IllegalStateException("Utility class;shouldn't be constructed");
+ }
+
+ /**
+ * Sends Delete to DR required as part of the new guaranteed delivery mechanism.
+ * @param config used to determine subscriber id and target endpoint
+ * @param event event to be processed
+ */
+ public static String processEvent(MapperConfig config, Event event){
+ logger.unwrap().info("Sending processed to DataRouter");
+ String baseDelete = config.getDmaapDRDeleteEndpoint();
+ String subscriberIdentity = config.getSubscriberIdentity();
+ String delete = String.format("https://%s/%s/%s", baseDelete, subscriberIdentity, event.getPublishIdentity());
+ try {
+ return new RequestSender().send("DELETE", delete);
+ } catch (Exception exception) {
+ logger.unwrap().error("Process event failure", exception);
+ throw new ProcessEventException("Process event failure", exception);
+ }
+ }
+}
import org.onap.logging.ref.slf4j.ONAPLogConstants;\r
import org.slf4j.LoggerFactory;\r
\r
-import lombok.extern.slf4j.Slf4j;\r
-\r
public class RequestSender {\r
private static final int MAX_RETRIES = 5;\r
private static final int RETRY_INTERVAL = 1000;\r
private static final String SERVER_ERROR_MESSAGE = "Error on Server";\r
private static final int ERROR_START_RANGE = 300;\r
private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(RequestSender.class));\r
+ public static final String DELETE = "DELETE";\r
\r
/**\r
* Sends an Http GET request to a given endpoint.\r
*/\r
\r
public String send(final String urlString) throws Exception {\r
+ return send("GET", urlString);\r
+ }\r
+\r
+\r
+ /**\r
+ * Sends a request to a given endpoint.\r
+ * @param method of the outbound request\r
+ * @param urlString representing given endpoint\r
+ * @return http response body\r
+ * @throws Exception\r
+ */\r
+ public String send(String method, final String urlString) throws Exception {\r
final UUID invocationID = logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS);\r
final UUID requestID = UUID.randomUUID();\r
String result = "";\r
connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString());\r
connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString());\r
connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);\r
+ connection.setRequestMethod(method);\r
logger.unwrap()\r
.info("Sending:\n{}", connection.getRequestProperties());\r
\r
package org.onap.dcaegen2.services.pmmapper.datarouter;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
+
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
import io.undertow.io.Receiver;
import io.undertow.io.Sender;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.HeaderMap;
import io.undertow.util.StatusCodes;
-import utils.LoggingUtils;
import java.io.IOException;
import java.net.HttpURLConnection;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
-import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
import org.onap.dcaegen2.services.pmmapper.model.Event;
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
import org.onap.dcaegen2.services.pmmapper.utils.HttpServerExchangeAdapter;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import utils.LoggingUtils;
@RunWith(PowerMockRunner.class)
@PrepareForTest(DataRouterSubscriber.class)
String testString = "MESSAGE BODY";
JsonObject metadata = new JsonParser().parse(
new String(Files.readAllBytes(Paths.get("src/test/resources/valid_metadata.json")))).getAsJsonObject();
- when(httpServerExchange.getRequestHeaders().get(any(String.class)).get(anyInt()))
+ when(httpServerExchange.getRequestHeaders().get(DataRouterSubscriber.METADATA_HEADER).get(anyInt()))
.thenReturn(metadata.toString());
+ when(httpServerExchange.getRequestHeaders().get(DataRouterSubscriber.PUB_ID_HEADER).getFirst()).thenReturn("");
doAnswer((Answer<Void>) invocationOnMock -> {
Receiver.FullStringCallback callback = invocationOnMock.getArgument(0);
callback.handle(httpServerExchange, testString);
--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+import utils.EventUtils;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(RequestSender.class)
+public class DataRouterUtilsTest {
+
+ @Test
+ public void processEventSuccessful() throws Exception {
+ String serviceResponse = "I'm a service response ;)";
+ String publishIdentity = "12";
+ PowerMockito.mockStatic(Thread.class);
+ MapperConfig mockMapperConfig = mock(MapperConfig.class);
+ URL mockURL = mock(URL.class);
+ HttpURLConnection mockConnection = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+ when(mockConnection.getResponseCode()).thenReturn(200);
+ when(mockConnection.getInputStream()).thenReturn(new ByteArrayInputStream(serviceResponse.getBytes()));
+
+ when(mockURL.openConnection()).thenReturn(mockConnection);
+ when(mockMapperConfig.getDmaapDRDeleteEndpoint()).thenReturn("dmaap-dr-node/delete/");
+ when(mockMapperConfig.getSubscriberIdentity()).thenReturn("12");
+
+ PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL);
+
+ Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class), publishIdentity);
+ assertEquals(serviceResponse, DataRouterUtils.processEvent(mockMapperConfig, testEvent));
+ verify(mockConnection, times(1)).setRequestMethod(RequestSender.DELETE);
+ }
+
+ @Test
+ public void testNegativeResponse() throws Exception {
+ String serviceResponse = "I'm a negative service response ;)";
+ String publishIdentity = "12";
+ PowerMockito.mockStatic(Thread.class);
+ MapperConfig mockMapperConfig = mock(MapperConfig.class);
+ URL mockURL = mock(URL.class);
+ HttpURLConnection mockConnection = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+ when(mockConnection.getResponseCode()).thenReturn(503);
+ when(mockConnection.getInputStream())
+ .thenAnswer(invocationOnMock -> new ByteArrayInputStream(serviceResponse.getBytes()));
+
+ when(mockURL.openConnection()).thenReturn(mockConnection);
+ when(mockMapperConfig.getDmaapDRDeleteEndpoint()).thenReturn("dmaap-dr-node/delete/");
+ when(mockMapperConfig.getSubscriberIdentity()).thenReturn("12");
+
+ PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL);
+ Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class), publishIdentity);
+ assertEquals(serviceResponse, DataRouterUtils.processEvent(mockMapperConfig, testEvent));
+ verify(mockConnection, times(5)).setRequestMethod(RequestSender.DELETE);
+ }
+
+ @Test
+ public void testConstructionException() {
+ assertThrows(IllegalStateException.class, () -> Whitebox.invokeConstructor(DataRouterUtils.class));
+ }
+
+ @Test
+ public void testProcessEventFailure() throws Exception {
+ PowerMockito.mockStatic(Thread.class);
+ MapperConfig mockMapperConfig = mock(MapperConfig.class);
+ URL mockURL = mock(URL.class);
+ HttpURLConnection mockConnection = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+ when(mockConnection.getResponseCode()).thenReturn(503);
+
+ when(mockURL.openConnection()).thenReturn(mockConnection);
+ when(mockMapperConfig.getDmaapDRDeleteEndpoint()).thenReturn("dmaap-dr-node/delete/");
+ when(mockMapperConfig.getSubscriberIdentity()).thenReturn("12");
+
+ PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL);
+ Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class));
+ assertThrows(ProcessEventException.class, () -> DataRouterUtils.processEvent(mockMapperConfig, testEvent));
+ }
+}
}
/**
+ * Makes an event with a mock http server exchange, empty mdc and publish identity
* @param body body for the event.
* @param eventMetadata metadata for the event.
* @return event with mock HttpServerExchange
*/
public static Event makeMockEvent(String body, EventMetadata eventMetadata) {
- return new Event(mock(HttpServerExchange.class, RETURNS_DEEP_STUBS), body, eventMetadata, new HashMap<>());
+ return new Event(mock(HttpServerExchange.class, RETURNS_DEEP_STUBS), body, eventMetadata, new HashMap<>(), "");
}
+
+
+ /**
+ * Makes an event with a mock http server exchange and empty mdc
+ * @param body body for the event.
+ * @param eventMetadata metadata for the event.
+ * @return event with mock HttpServerExchange
+ */
+ public static Event makeMockEvent(String body, EventMetadata eventMetadata, String publishIdentity) {
+ HttpServerExchange mockHttpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
+ return new Event(mockHttpServerExchange, body, eventMetadata, new HashMap<>(), publishIdentity);
+ }
+
+
}