Remove subscription through Bus Controller 48/84248/3
authordfarrelly <david.farrelly@est.tech>
Mon, 8 Apr 2019 10:24:10 +0000 (10:24 +0000)
committerdfarrelly <david.farrelly@est.tech>
Mon, 8 Apr 2019 10:24:10 +0000 (10:24 +0000)
*Remove DR subscription through Bus Controller
*Add remaining data consumed indicators
*Blueprint and Config updates to remove subscription

Issue-ID: DCAEGEN2-1038
Change-Id: I8cee463a27156fa656b0e66ef3ee231daeeda8bc
Signed-off-by: dfarrelly <david.farrelly@est.tech>
20 files changed:
dpo/blueprints/k8s-pm-mapper.yaml
src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
src/main/java/org/onap/dcaegen2/services/pmmapper/config/ConfigHandler.java
src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java [deleted file]
src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DeliveryHandler.java [new file with mode: 0644]
src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java
src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java
src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java
src/main/resources/logback.xml
src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java
src/test/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfigurationTest.java
src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java [deleted file]
src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DeliveryHandlerTest.java [new file with mode: 0644]
src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java
src/test/resources/datarouter_subscriber_test/valid_bc_response.json [deleted file]
src/test/resources/multiple_filter_mapper_config.json
src/test/resources/no_filter_mapper_config.json
src/test/resources/valid_mapper_config.json
src/test/resources/valid_metadata.json

index 0944da3..cd52e76 100644 (file)
@@ -25,10 +25,6 @@ imports:
   - "https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R4/k8splugin/1.4.5/k8splugin_types.yaml"
 
 inputs:
-  service_name:
-    type: string
-    description: Name of the serice
-    default: "dcae-pm-mapper"
   tag_version:
     type: string
     description: Docker image to be used
@@ -48,7 +44,7 @@ inputs:
   client_role:
     type: string
     description: Client role to request secure access to topic
-    default: "ves-publisher"
+    default: "org.onap.dmaap.mr.PM_MAPPER.pub"
   client_id:
     type: string
     description: Client id for given AAF client
@@ -64,11 +60,7 @@ inputs:
   dcae_location:
     type: string
     description: DCAE location for the subscriber, used to set up routing
-    default: "location"
-  subscriber_id:
-    type: string
-    description: Subscriber id in Data Router
-    default: ""
+    default: "san-francisco"
   pm_mapper_service_protocol:
     type: string
     description: PM Mapper protocol
@@ -77,18 +69,6 @@ inputs:
     type: string
     description: PM Mapper host port
     default: "8443"
-  dmaap_buscontroller_service_host:
-    type: string
-    description: DMAAP Bus Controller host address
-    default: "dmaap-bc.onap.svc.cluster.local"
-  dmaap_buscontroller_service_port:
-    type: string
-    description: DMAAP Bus Controller host port
-    default: "8080"
-  dmaap_dr_feed_id:
-    type: string
-    description: ID of the Data Router feed that the PM Mapper will subscribe to
-    default: "1"
   dmaap_dr_service_host:
     type: string
     description: DMAAP Data Router host address
@@ -105,10 +85,6 @@ inputs:
     type: string
     description: DMAAP Message Router host port
     default: "3905"
-  dmaap_mr_topic_name:
-    type: string
-    description: Name of Message Router topic events will be published to
-    default: "pm-mapper-ves"
   filter:
     type: string
     description: PM Mapper filter on measInfo, measInfoId, measType, instanceId
@@ -140,9 +116,10 @@ node_templates:
                      ":", { get_input: dmaap_buscontroller_service_port}, "/webapi/dr_subs"]}
         dmaap_dr_feed_id:
           get_input: dmaap_dr_feed_id
+        dmaap_dr_feed_name: "bulk_pm_feed"
         dmaap_dr_delete_endpoint:
           { concat: ["https://", { get_input: dmaap_dr_service_host },
-                     ":", { get_input: dmaap_dr_service_port}, "/delete"]}
+                     ":", { get_input: dmaap_dr_service_port},"/delete"]}
         pm-mapper-filter:
           get_input: filter
         streams_subscribes:
@@ -156,11 +133,9 @@ node_templates:
                 get_input: dmaap_dr_password
               location:
                 get_input: dcae_location
-              subscriber_id:
-                get_input: subscriber_id
+              subscriber_id: "1"
               delivery_url:
-                { concat: [{ get_input: pm_mapper_service_protocol },"://", { get_input: service_name }, ".onap.svc.cluster.local",
-                           ":", { get_input: pm_mapper_service_port }, "/delivery"]}
+                { concat: [{ get_input: pm_mapper_service_protocol },"://dcae-pm-mapper:",{ get_input: pm_mapper_service_port },"/delivery"]}
         streams_publishes:
           dmaap_publisher:
             aaf_username:
@@ -175,8 +150,8 @@ node_templates:
               client_id:
                 get_input: client_id
               topic_url:
-                { concat: ["https://", { get_input: dmaap_mr_service_host },
-                           ":", { get_input: dmaap_mr_service_port }, "/events/", { get_input: dmaap_mr_topic_name }]}
+                { concat: [{ get_input: pm_mapper_service_protocol },"://",{ get_input: dmaap_mr_service_host },
+                           ":",{ get_input: dmaap_mr_service_port },"/events/PM_MAPPER"]}
               location:
                 get_input: dcae_location
       docker_config:
@@ -188,8 +163,8 @@ node_templates:
       image:
         get_input: tag_version
       replicas: { get_input: replicas }
-      name: { get_input: service_name }
-      dns_name: { get_input: service_name }
+      name: "dcae-pm-mapper"
+      dns_name: "dcae-pm-mapper"
       log_info:
         log_directory: "/var/log/ONAP/dcaegen2/services/pm-mapper"
       tls_info:
index 25e3918..a5eb68d 100644 (file)
@@ -28,13 +28,11 @@ import lombok.NonNull;
 import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler;
 import org.onap.dcaegen2.services.pmmapper.config.Configurable;
 import org.onap.dcaegen2.services.pmmapper.config.DynamicConfiguration;
-import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber;
-import org.onap.dcaegen2.services.pmmapper.exceptions.CBSConfigException;
+import org.onap.dcaegen2.services.pmmapper.datarouter.DeliveryHandler;
 import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError;
 import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException;
 import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException;
 import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException;
-import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
 import org.onap.dcaegen2.services.pmmapper.filtering.MetadataFilter;
 import org.onap.dcaegen2.services.pmmapper.filtering.MeasFilterHandler;
 import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
@@ -67,7 +65,7 @@ public class App {
     private static Path xmlSchema = Paths.get("/opt/app/pm-mapper/etc/measCollec_plusString.xsd");
     private static FluxSink<Event> fluxSink;
 
-    public static void main(String[] args) throws InterruptedException, TooManyTriesException, CBSConfigException, EnvironmentConfigException, CBSServerError, MapperConfigException, IOException {
+    public static void main(String[] args) throws EnvironmentConfigException, CBSServerError, MapperConfigException, IOException {
         Flux<Event> flux = Flux.create(eventFluxSink -> fluxSink = eventFluxSink);
         HealthCheckHandler healthCheckHandler = new HealthCheckHandler();
         MapperConfig mapperConfig = new ConfigHandler().getMapperConfig();
@@ -86,18 +84,16 @@ public class App {
                 .runOn(Schedulers.newParallel(""), 1)
                 .doOnNext(event -> MDC.setContextMap(event.getMdc()))
                 .filter(metadataFilter::filter)
-                .filter(filterHandler::filterByFileType)
-                .filter(validator::validate)
+                .filter(event -> App.filterByFileType(filterHandler, event, mapperConfig))
+                .filter(event -> App.validate(validator, event, mapperConfig))
                 .concatMap(event -> App.split(splitter,event, mapperConfig))
                 .filter(events -> App.filter(filterHandler, events, mapperConfig))
                 .concatMap(events -> App.map(mapper, events, mapperConfig))
                 .concatMap(vesPublisher::publish)
                 .subscribe(event -> App.sendEventProcessed(mapperConfig, event));
 
-        DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next, mapperConfig);
-        dataRouterSubscriber.start();
+        DeliveryHandler deliveryHandler = new DeliveryHandler(fluxSink::next);
         ArrayList<Configurable> configurables = new ArrayList<>();
-        configurables.add(dataRouterSubscriber);
         configurables.add(mapperConfig);
         DynamicConfiguration dynamicConfiguration = new DynamicConfiguration(configurables, mapperConfig);
 
@@ -113,12 +109,40 @@ public class App {
 
         builder.addHttpsListener(8443, "0.0.0.0", sslContext)
                 .setHandler(Handlers.routing()
-                        .add("put", "/delivery/{filename}", dataRouterSubscriber)
+                        .add("put", "/delivery/{filename}", deliveryHandler)
                         .add("get", "/healthcheck", healthCheckHandler)
                         .add("get", "/reconfigure", dynamicConfiguration))
                 .build().start();
     }
 
+    public static boolean filterByFileType(MeasFilterHandler filterHandler,Event event, MapperConfig config) {
+        boolean hasValidFileName = false;
+        try {
+            hasValidFileName = filterHandler.filterByFileType(event);
+            if(!hasValidFileName) {
+                sendEventProcessed(config,event);
+            }
+        } catch (Exception exception) {
+            logger.unwrap().error("Unable to filter by file type", exception);
+            sendEventProcessed(config,event);
+        }
+        return hasValidFileName;
+    }
+
+    public static boolean validate(XMLValidator validator, Event event, MapperConfig config) {
+        boolean isValidXML = false;
+        try {
+            isValidXML = validator.validate(event);
+            if(!isValidXML) {
+                sendEventProcessed(config,event);
+            }
+        } catch (Exception exception) {
+            logger.unwrap().error("Unable to validate XML",exception);
+            sendEventProcessed(config,event);
+        }
+        return isValidXML;
+    }
+
     public static boolean filter(MeasFilterHandler filterHandler, List<Event> events, MapperConfig config) {
         Event event = events.get(0);
         boolean hasMatchingFilter = false;
@@ -128,7 +152,7 @@ public class App {
                 sendEventProcessed(config,event);
             }
         } catch (Exception exception) {
-            logger.unwrap().error(exception.getMessage(),exception);
+            logger.unwrap().error("Unable to filter by Meas Types",exception);
             sendEventProcessed(config,event);
         }
         return hasMatchingFilter;
@@ -139,7 +163,7 @@ public class App {
         try {
             mappedEvents = mapper.mapEvents(events);
         } catch (Exception exception) {
-            logger.unwrap().error(exception.getMessage(),exception);
+            logger.unwrap().error("Unable to map XML to VES",exception);
             sendEventProcessed(config,events.get(0));
             return Flux.<List<Event>>empty();
         }
@@ -151,7 +175,7 @@ public class App {
         try {
             splitEvents = splitter.split(event);
         } catch (Exception exception) {
-            logger.unwrap().error(exception.getMessage(),exception);
+            logger.unwrap().error("Unable to split MeasCollecFile",exception);
             sendEventProcessed(config,event);
             return Flux.<List<Event>>empty();
         }
index fe2f247..e50ec6c 100644 (file)
@@ -59,14 +59,14 @@ public class ConfigHandler {
 \r
     /**\r
      * Retrieves PM-Mapper Configuration from DCAE's ConfigBinding Service.\r
-     *
+     *\r
      * @throws EnvironmentConfigException\r
      * @throws ConsulServerError\r
      * @throws CBSConfigException\r
      * @throws CBSServerError\r
      * @throws MapperConfigException\r
-     */
-    public MapperConfig getMapperConfig() throws CBSConfigException, EnvironmentConfigException,\r
+     */\r
+    public MapperConfig getMapperConfig() throws EnvironmentConfigException,\r
             CBSServerError, MapperConfigException {\r
         String mapperConfigJson = "";\r
         String cbsSocketAddress = EnvironmentConfig.getCBSHostName() + ":" + EnvironmentConfig.getCBSPort();\r
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
deleted file mode 100644 (file)
index a0a8eaf..0000000
+++ /dev/null
@@ -1,278 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- *  Copyright (C) 2019 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.services.pmmapper.datarouter;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
-import io.undertow.util.HeaderValues;
-import lombok.Data;
-import lombok.NonNull;
-
-import org.onap.dcaegen2.services.pmmapper.config.Configurable;
-import org.onap.dcaegen2.services.pmmapper.exceptions.NoMetadataException;
-import org.onap.dcaegen2.services.pmmapper.exceptions.ReconfigurationException;
-import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
-import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
-import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
-import org.onap.dcaegen2.services.pmmapper.model.Event;
-import io.undertow.server.HttpHandler;
-import io.undertow.server.HttpServerExchange;
-import io.undertow.util.StatusCodes;
-
-import org.onap.dcaegen2.services.pmmapper.utils.HttpServerExchangeAdapter;
-import org.onap.dcaegen2.services.pmmapper.utils.RequiredFieldDeserializer;
-import org.onap.logging.ref.slf4j.ONAPLogAdapter;
-import org.onap.logging.ref.slf4j.ONAPLogConstants;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.time.Instant;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Random;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-/**
- * Subscriber for events sent from data router
- * Provides an undertow HttpHandler to be used as an endpoint for data router to send events to.
- */
-@Data
-public class DataRouterSubscriber implements HttpHandler, Configurable {
-    public static final String METADATA_HEADER = "X-DMAAP-DR-META";
-    public static final String PUB_ID_HEADER = "X-DMAAP-DR-PUBLISH-ID";
-
-    private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(DataRouterSubscriber.class));
-    private static final int NUMBER_OF_ATTEMPTS = 5;
-    private static final int DEFAULT_TIMEOUT = 2000;
-    private static final int MAX_JITTER = 50;
-
-    private static final String BAD_METADATA_MESSAGE = "Malformed Metadata.";
-    private static final String NO_METADATA_MESSAGE = "Missing Metadata.";
-
-    private boolean limited = false;
-    private Random jitterGenerator;
-    private Gson metadataBuilder;
-    private MapperConfig config;
-    public static String subscriberId;
-    @NonNull
-    private EventReceiver eventReceiver;
-
-    /**
-     * @param eventReceiver receiver for any inbound events.
-     */
-    public DataRouterSubscriber(EventReceiver eventReceiver, MapperConfig config) {
-        this.eventReceiver = eventReceiver;
-        this.jitterGenerator = new Random();
-        this.metadataBuilder = new GsonBuilder().registerTypeAdapter(EventMetadata.class, new RequiredFieldDeserializer<EventMetadata>())
-                .create();
-        this.config = config;
-        this.subscriberId="";
-    }
-
-    /**
-     * Starts data flow by subscribing to data router through bus controller.
-     *
-     * @throws TooManyTriesException in the event that timeout has occurred several times.
-     */
-    public void start() throws TooManyTriesException, InterruptedException {
-        try {
-            logger.unwrap().info("Starting subscription to DataRouter {}", ONAPLogConstants.Markers.ENTRY);
-            subscribe();
-            logger.unwrap().info("Successfully started DR Subscriber");
-        } finally {
-            logger.unwrap().info("{}", ONAPLogConstants.Markers.EXIT);
-        }
-    }
-
-    private HttpURLConnection getBusControllerConnection(String method, URL resource, int timeout) throws IOException {
-        HttpURLConnection connection = (HttpURLConnection) resource.openConnection();
-        connection.setRequestMethod(method);
-        connection.setConnectTimeout(timeout);
-        connection.setReadTimeout(timeout);
-        connection.setRequestProperty("Content-Type", "application/json");
-        connection.setDoOutput(true);
-
-        final UUID invocationID = logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS);
-        final UUID requestID = UUID.randomUUID();
-        connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString());
-        connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString());
-        connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);
-
-        return connection;
-    }
-
-    private JsonObject getBusControllerSubscribeBody(MapperConfig config) {
-        JsonObject subscriberObj = new JsonObject();
-        subscriberObj.addProperty("dcaeLocationName", config.getSubscriberDcaeLocation());
-        subscriberObj.addProperty("deliveryURL", config.getBusControllerDeliveryUrl());
-        subscriberObj.addProperty("feedId", config.getDmaapDRFeedId());
-        subscriberObj.addProperty("lastMod", Instant.now().toString());
-        subscriberObj.addProperty("username", config.getBusControllerUserName());
-        subscriberObj.addProperty("userpwd", config.getBusControllerPassword());
-        subscriberObj.addProperty("privilegedSubscriber", true);
-        return subscriberObj;
-    }
-
-    private void processResponse(HttpURLConnection connection) throws IOException {
-        try (BufferedReader responseBody = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
-            String body = responseBody.lines().collect(Collectors.joining(""));
-            updateSubscriberId(body);
-        } catch (IOException | JsonSyntaxException | IllegalStateException e) {
-            throw new IOException("Failed to process response", e);
-        }
-    }
-
-    private void updateSubscriberId(String responseBody) {
-            JsonParser parser = new JsonParser();
-            JsonObject responseObject = parser.parse(responseBody).getAsJsonObject();
-            this.subscriberId = responseObject.get("subId").getAsString();
-    }
-
-    private void subscribe() throws TooManyTriesException, InterruptedException {
-        try {
-            URL subscribeResource = this.config.getBusControllerSubscriptionUrl();
-            JsonObject subscribeBody = this.getBusControllerSubscribeBody(this.config);
-            request(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, "POST", subscribeResource, subscribeBody);
-        } catch (MalformedURLException e) {
-            throw new IllegalStateException("Subscription URL is malformed", e);
-        }
-
-    }
-    private void updateSubscriber() throws TooManyTriesException, InterruptedException {
-        try {
-            URL subscribeResource = this.config.getBusControllerSubscriptionUrl();
-            URL updateResource = new URL(String.format("%s/%s", subscribeResource, subscriberId));
-            JsonObject subscribeBody = this.getBusControllerSubscribeBody(this.config);
-            request(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, "PUT", updateResource, subscribeBody);
-        } catch (MalformedURLException e) {
-            throw new IllegalStateException("Subscription URL is malformed", e);
-        }
-    }
-
-    private void request(int attempts, int timeout, String method, URL resource, JsonObject subscribeBody) throws TooManyTriesException, InterruptedException {
-        int subResponse = 504;
-        String subMessage = "";
-        boolean processFailure = false;
-        try {
-            HttpURLConnection connection = getBusControllerConnection(method, resource, timeout);
-            try (OutputStream bodyStream = connection.getOutputStream();
-                 OutputStreamWriter bodyWriter = new OutputStreamWriter(bodyStream, StandardCharsets.UTF_8)) {
-                bodyWriter.write(subscribeBody.toString());
-            }
-            subResponse = connection.getResponseCode();
-            subMessage = connection.getResponseMessage();
-            if (subResponse < 300) {
-                processResponse(connection);
-            }
-        } catch (IOException e) {
-            logger.unwrap().error("Failure to process response", e);
-            processFailure = true;
-        }
-        logger.unwrap().info("Request to bus controller executed with Response Code: '{}' and Response Event: '{}'.", subResponse, subMessage);
-        if ((subResponse >= 300 || processFailure) && attempts > 1 ) {
-            Thread.sleep(timeout);
-            request(--attempts, (timeout * 2) + jitterGenerator.nextInt(MAX_JITTER), method, resource, subscribeBody);
-        } else if (subResponse >= 300 || processFailure) {
-            throw new TooManyTriesException("Failed to subscribe within appropriate amount of attempts");
-        }
-    }
-
-    private EventMetadata getMetadata(HttpServerExchange httpServerExchange) throws NoMetadataException {
-        String metadata = Optional.ofNullable(httpServerExchange.getRequestHeaders()
-                .get(METADATA_HEADER))
-                .map((HeaderValues headerValues) -> headerValues.get(0))
-                .orElseThrow(() -> new NoMetadataException("Metadata Not found"));
-        return metadataBuilder.fromJson(metadata, EventMetadata.class);
-    }
-
-    /**
-     * Receives inbound requests, verifies that required headers are valid
-     * and passes an Event onto the eventReceiver.
-     * The forwarded httpServerExchange response is the responsibility of the eventReceiver.
-     *
-     * @param httpServerExchange inbound http server exchange.
-     */
-    @Override
-    public void handleRequest(HttpServerExchange httpServerExchange) {
-        try{
-            logger.entering(new HttpServerExchangeAdapter(httpServerExchange));
-            if (limited) {
-                httpServerExchange.setStatusCode(StatusCodes.SERVICE_UNAVAILABLE)
-                        .getResponseSender()
-                        .send(StatusCodes.SERVICE_UNAVAILABLE_STRING);
-            } else {
-                try {
-
-                    Map<String,String> mdc = MDC.getCopyOfContextMap();
-                    EventMetadata metadata = getMetadata(httpServerExchange);
-                    String publishIdentity = httpServerExchange.getRequestHeaders().get(PUB_ID_HEADER).getFirst();
-                    httpServerExchange.getRequestReceiver()
-                            .receiveFullString((callbackExchange, body) ->
-                                httpServerExchange.dispatch(() ->
-                                        eventReceiver.receive(new Event(callbackExchange, body, metadata, mdc, publishIdentity)))
-                            );
-                } catch (NoMetadataException exception) {
-                    logger.unwrap().info("Bad Request: no metadata found under '{}' header.", METADATA_HEADER, exception);
-                    httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST)
-                            .getResponseSender()
-                            .send(NO_METADATA_MESSAGE);
-                } catch (JsonParseException exception) {
-                    logger.unwrap().info("Bad Request: Failure to parse metadata", exception);
-                    httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST)
-                            .getResponseSender()
-                            .send(BAD_METADATA_MESSAGE);
-                }
-            }
-        } finally {
-            logger.exiting();
-        }
-    }
-
-    @Override
-    public void reconfigure(MapperConfig config) throws ReconfigurationException {
-        logger.unwrap().info("Checking new Configuration against existing.");
-        if(!this.config.dmaapInfoEquals(config) || !this.config.getDmaapDRFeedId().equals(config.getDmaapDRFeedId())){
-            logger.unwrap().info("DMaaP Info changes found, reconfiguring.");
-            try {
-                this.config = config;
-                this.updateSubscriber();
-            } catch (TooManyTriesException | InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new ReconfigurationException("Failed to reconfigure DataRouter subscriber.", e);
-            }
-        }
-
-    }
-}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DeliveryHandler.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DeliveryHandler.java
new file mode 100644 (file)
index 0000000..4d6af29
--- /dev/null
@@ -0,0 +1,119 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.datarouter;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParseException;
+import io.undertow.util.HeaderValues;
+import lombok.Data;
+import lombok.NonNull;
+
+import org.onap.dcaegen2.services.pmmapper.exceptions.NoMetadataException;
+import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import io.undertow.server.HttpHandler;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.StatusCodes;
+
+import org.onap.dcaegen2.services.pmmapper.utils.HttpServerExchangeAdapter;
+import org.onap.dcaegen2.services.pmmapper.utils.RequiredFieldDeserializer;
+import org.onap.logging.ref.slf4j.ONAPLogAdapter;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Provides an undertow HttpHandler to be used as an endpoint for data router to send events to.
+ */
+@Data
+public class DeliveryHandler implements HttpHandler {
+
+    public static final String METADATA_HEADER = "X-DMAAP-DR-META";
+    public static final String PUB_ID_HEADER = "X-DMAAP-DR-PUBLISH-ID";
+
+    private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(DeliveryHandler.class));
+
+    private static final String BAD_METADATA_MESSAGE = "Malformed Metadata.";
+    private static final String NO_METADATA_MESSAGE = "Missing Metadata.";
+
+    private Gson metadataBuilder;
+
+    @NonNull
+    private EventReceiver eventReceiver;
+
+    /**
+     * @param eventReceiver receiver for any inbound events.
+     */
+    public DeliveryHandler(EventReceiver eventReceiver) {
+        this.eventReceiver = eventReceiver;
+        this.metadataBuilder = new GsonBuilder()
+                .registerTypeAdapter(EventMetadata.class, new RequiredFieldDeserializer<EventMetadata>())
+                .create();
+    }
+
+    private EventMetadata getMetadata(HttpServerExchange httpServerExchange) throws NoMetadataException {
+        String metadata = Optional.ofNullable(httpServerExchange.getRequestHeaders()
+                .get(METADATA_HEADER))
+                .map((HeaderValues headerValues) -> headerValues.get(0))
+                .orElseThrow(() -> new NoMetadataException("Metadata Not found"));
+        return metadataBuilder.fromJson(metadata, EventMetadata.class);
+    }
+
+    /**
+     * Receives inbound requests, verifies that required headers are valid
+     * and passes an Event onto the eventReceiver.
+     * The forwarded httpServerExchange response is the responsibility of the eventReceiver.
+     *
+     * @param httpServerExchange inbound http server exchange.
+     */
+    @Override
+    public void handleRequest(HttpServerExchange httpServerExchange) {
+        try{
+            logger.entering(new HttpServerExchangeAdapter(httpServerExchange));
+            try {
+                Map<String,String> mdc = MDC.getCopyOfContextMap();
+                EventMetadata metadata = getMetadata(httpServerExchange);
+                String publishIdentity = httpServerExchange.getRequestHeaders().get(PUB_ID_HEADER).getFirst();
+                httpServerExchange.getRequestReceiver()
+                        .receiveFullString((callbackExchange, body) ->
+                                httpServerExchange.dispatch(() ->
+                                        eventReceiver.receive(new Event(
+                                                callbackExchange, body, metadata, mdc, publishIdentity)))
+                        );
+            } catch (NoMetadataException exception) {
+                logger.unwrap().info("Bad Request: no metadata found under '{}' header.", METADATA_HEADER, exception);
+                httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST)
+                        .getResponseSender()
+                        .send(NO_METADATA_MESSAGE);
+            } catch (JsonParseException exception) {
+                logger.unwrap().info("Bad Request: Failure to parse metadata", exception);
+                httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST)
+                        .getResponseSender()
+                        .send(BAD_METADATA_MESSAGE);
+            }
+        } finally {
+            logger.exiting();
+        }
+    }
+}
index 20c8a64..1fb6019 100644 (file)
@@ -22,7 +22,6 @@ package org.onap.dcaegen2.services.pmmapper.filtering;
 
 import lombok.NonNull;
 import org.onap.dcaegen2.services.pmmapper.exceptions.*;
-import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
 import org.onap.dcaegen2.services.pmmapper.model.Event;
 import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
 import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
@@ -48,7 +47,6 @@ public class MetadataFilter {
      * @param event inbound event
      */
     public boolean filter(@NonNull Event event) {
-        String decompressionStatus;
         logger.unwrap().info("Filtering event metadata");
         EventMetadata metadata = event.getMetadata();
 
@@ -56,11 +54,6 @@ public class MetadataFilter {
 
         List<MeasFilterConfig.Filter> filters = measFilterConfig.getFilters();
 
-        if(metadata.getDecompressionStatus() != null) {
-            decompressionStatus = metadata.getDecompressionStatus();
-            logger.unwrap().debug("Decompression Status: {}", decompressionStatus);
-        }
-
         if(filters.isEmpty()) {
             logger.unwrap().info("No filter specified in config: {}", filters);
             return true;
index 8a0977d..601b00f 100644 (file)
@@ -19,7 +19,6 @@
  */
 package org.onap.dcaegen2.services.pmmapper.model;
 
-import com.google.gson.annotations.SerializedName;
 import lombok.Data;
 import org.onap.dcaegen2.services.pmmapper.utils.GSONRequired;
 
@@ -48,6 +47,4 @@ public class EventMetadata {
     private String fileFormatType;
     @GSONRequired
     private String fileFormatVersion;
-    @SerializedName("decompression_status")
-    private String decompressionStatus;
 }
index b9d58ee..390fa0d 100644 (file)
@@ -19,9 +19,6 @@
  */
 package org.onap.dcaegen2.services.pmmapper.model;
 
-import java.net.MalformedURLException;
-import java.net.URL;
-
 import org.onap.dcaegen2.services.pmmapper.config.Configurable;
 import org.onap.dcaegen2.services.pmmapper.utils.GSONRequired;
 import com.google.gson.annotations.SerializedName;
@@ -67,14 +64,6 @@ public class MapperConfig implements Configurable{
     @SerializedName("streams_publishes")
     private StreamsPublishes streamsPublishes;
 
-    @GSONRequired
-    @SerializedName("buscontroller_feed_subscription_endpoint")
-    private String busControllerSubscriptionEndpoint;
-
-    @GSONRequired
-    @SerializedName("dmaap_dr_feed_id")
-    private String dmaapDRFeedId;
-
     @GSONRequired
     @SerializedName("dmaap_dr_delete_endpoint")
     private String dmaapDRDeleteEndpoint;
@@ -83,34 +72,10 @@ public class MapperConfig implements Configurable{
     @SerializedName("pm-mapper-filter")
     private MeasFilterConfig filterConfig;
 
-    public String getBusControllerDeliveryUrl() {
-        return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getDeliveryUrl();
-    }
-
-    public String getDcaeLocation() {
-        return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getLocation();
-    }
-
-    public String getBusControllerUserName() {
-        return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getUsername();
-    }
-
-    public String getBusControllerPassword() {
-        return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getPassword();
-    }
-
-    public URL getBusControllerSubscriptionUrl() throws MalformedURLException {
-        return new URL(this.getBusControllerSubscriptionEndpoint());
-    }
-
     public String getSubscriberIdentity(){
         return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getSubscriberId();
     }
 
-    public String getSubscriberDcaeLocation() {
-        return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getLocation();
-    }
-
     public String getPublisherTopicUrl() {
         return this.getStreamsPublishes().getDmaapPublisher().getDmaapInfo().getTopicUrl();
     }
@@ -187,10 +152,9 @@ public class MapperConfig implements Configurable{
     @Override
     public void reconfigure(MapperConfig mapperConfig) {
         if(!this.equals(mapperConfig)) {
+            this.filterConfig = mapperConfig.getFilterConfig();
             this.streamsSubscribes = mapperConfig.getStreamsSubscribes();
             this.streamsPublishes = mapperConfig.getStreamsPublishes();
-            this.busControllerSubscriptionEndpoint = mapperConfig.getBusControllerSubscriptionEndpoint();
-            this.dmaapDRFeedId = mapperConfig.getDmaapDRFeedId();
             this.dmaapDRDeleteEndpoint = mapperConfig.getDmaapDRDeleteEndpoint();
         }
     }
index 5147863..23e8d71 100644 (file)
@@ -20,7 +20,6 @@
 
 package org.onap.dcaegen2.services.pmmapper.utils;
 
-import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber;
 import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException;
 import org.onap.dcaegen2.services.pmmapper.model.Event;
 import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
@@ -42,12 +41,11 @@ public class DataRouterUtils {
     public static String processEvent(MapperConfig config, Event event){
         logger.unwrap().info("Sending processed to DataRouter");
         String baseDelete = config.getDmaapDRDeleteEndpoint();
-        String subscriberIdentity = DataRouterSubscriber.subscriberId;
+        String subscriberIdentity = config.getSubscriberIdentity();
         String delete = String.format("%s/%s/%s", baseDelete, subscriberIdentity, event.getPublishIdentity());
         try {
             return new RequestSender().send("DELETE", delete);
         } catch (Exception exception) {
-            logger.unwrap().error("Process event failure", exception);
             throw new ProcessEventException("Process event failure", exception);
         }
     }
index 0d5d83c..dff2f8b 100644 (file)
@@ -15,7 +15,7 @@
     <property name="p_thr" value="%thread"/>\r
     <property name="pattern" value="%nopexception${p_tim}\t${p_thr}\t${p_lvl}\t${p_log}\t${p_mdc}\t${p_msg}\t${p_exc}\t${p_mak}\t%n"/>\r
 \r
-    <variable name="logLevel" value="${LOG_LEVEL:-INFO}"/>\r
+    <variable name="logLevel" value="${LOG_LEVEL:-DEBUG}"/>\r
 \r
     <logger name="org.mockserver" level="${mockserver.logLevel:-OFF}"/>\r
 \r
index b4dc178..7c5340a 100644 (file)
@@ -23,18 +23,21 @@ package org.onap.dcaegen2.services.pmmapper;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.*;
 import static org.mockserver.integration.ClientAndServer.startClientAndServer;
 import static org.mockserver.model.HttpResponse.response;
 
+import java.io.IOException;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.List;
 
+import com.google.gson.Gson;
+import io.undertow.server.HttpServerExchange;
 import io.undertow.util.StatusCodes;
+import org.onap.dcaegen2.services.pmmapper.utils.XMLValidator;
 import reactor.core.publisher.Flux;
 
 import org.junit.jupiter.api.AfterAll;
@@ -53,6 +56,7 @@ import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
 import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
 import org.onap.dcaegen2.services.pmmapper.utils.MeasConverter;
 import org.onap.dcaegen2.services.pmmapper.utils.MeasSplitter;
+import utils.EventUtils;
 
 
 @ExtendWith(MockitoExtension.class)
@@ -61,6 +65,13 @@ class AppTest {
     static ClientAndServer mockServer;
     static MockServerClient client;
 
+    private static EventMetadata eventMetadata;
+
+    private static final Path dataDirectory = Paths.get("src/test/resources/mapper_test/mapping_data/");
+    private static final Path metadata = Paths.get("src/test/resources/valid_metadata.json");
+    private static final Path schema = Paths.get("src/main/resources/measCollec_plusString.xsd");
+
+
     @BeforeAll
     public static void setup() {
         mockServer =  startClientAndServer(1080);
@@ -98,6 +109,88 @@ class AppTest {
         verify(event.getHttpServerExchange(), times(1)).unDispatch();
     }
 
+    @Test
+    public void testFilterByFileType_success() {
+        Event mockEvent = Mockito.mock(Event.class);
+        MapperConfig mockConfig = Mockito.mock(MapperConfig.class);
+
+        HttpServerExchange exchange = Mockito.mock(HttpServerExchange.class);
+        when(mockEvent.getHttpServerExchange()).thenReturn(exchange);
+        when(exchange.getRequestPath()).thenReturn("ATEST.xml");
+
+        boolean result = App.filterByFileType(new MeasFilterHandler(new MeasConverter()), mockEvent, mockConfig);
+        assertTrue(result);
+    }
+
+    @Test
+    public void testFilterByFileType_NonXML() {
+        Event mockEvent = Mockito.mock(Event.class);
+        MapperConfig mockConfig = Mockito.mock(MapperConfig.class);
+
+        HttpServerExchange exchange = Mockito.mock(HttpServerExchange.class);
+        when(mockEvent.getHttpServerExchange()).thenReturn(exchange);
+        when(exchange.getRequestPath()).thenReturn("ATEST.png");
+
+        boolean result = App.filterByFileType(new MeasFilterHandler(new MeasConverter()), mockEvent, mockConfig);
+        assertFalse(result);
+    }
+
+    @Test
+    public void testFilterByFileType_throwException() {
+        Event mockEvent = Mockito.mock(Event.class);
+        MeasFilterHandler mockFilter = Mockito.mock(MeasFilterHandler.class);
+        MapperConfig mockConfig = Mockito.mock(MapperConfig.class);
+
+        Mockito.when(mockFilter.filterByFileType(mockEvent)).thenThrow(RuntimeException.class);
+
+        boolean result = App.filterByFileType(mockFilter, mockEvent, mockConfig);
+        assertFalse(result);
+    }
+
+    @Test
+    public void testValidateXML_success() throws IOException {
+        XMLValidator mockValidator = new XMLValidator(schema);
+        MapperConfig mockConfig = Mockito.mock(MapperConfig.class);
+
+        String metadataFileContents = new String(Files.readAllBytes(metadata));
+        eventMetadata = new Gson().fromJson(metadataFileContents, EventMetadata.class);
+
+        Path testFile = Paths.get(dataDirectory + "/valid_data/meas_results.xml");
+        Event mockEvent = EventUtils.makeMockEvent(EventUtils.fileContentsToString(testFile), eventMetadata);
+
+        boolean result = App.validate(mockValidator, mockEvent, mockConfig);
+
+        assertTrue(result);
+    }
+
+    @Test
+    public void testValidateXML_failure() throws IOException {
+        XMLValidator mockValidator = new XMLValidator(schema);
+        MapperConfig mockConfig = Mockito.mock(MapperConfig.class);
+
+        String metadataFileContents = new String(Files.readAllBytes(metadata));
+        eventMetadata = new Gson().fromJson(metadataFileContents, EventMetadata.class);
+
+        Path testFile = Paths.get(dataDirectory + "/invalid_data/no_managed_element.xml");
+        Event mockEvent = EventUtils.makeMockEvent(EventUtils.fileContentsToString(testFile), eventMetadata);
+
+        boolean result = App.validate(mockValidator, mockEvent, mockConfig);
+
+        assertFalse(result);
+    }
+
+    @Test
+    public void testValidateXML_throwException() {
+        Event mockEvent = Mockito.mock(Event.class);
+        XMLValidator mockValidator = Mockito.mock(XMLValidator.class);
+        MapperConfig mockConfig = Mockito.mock(MapperConfig.class);
+
+        Mockito.when(mockValidator.validate(mockEvent)).thenThrow(RuntimeException.class);
+        boolean result = App.validate(mockValidator, mockEvent, mockConfig);
+
+        assertFalse(result);
+    }
+
     @Test
     public void testFilter_success() {
         Event mockEvent = Mockito.mock(Event.class);
index 8840825..c900942 100644 (file)
@@ -105,7 +105,7 @@ public class DynamicConfigurationTest {
         Configurable configurable = mock(Configurable.class);
         configurables.add(configurable);
         JsonObject modifiedConfig = new JsonParser().parse(config).getAsJsonObject();
-        modifiedConfig.addProperty("dmaap_dr_feed_id","3");
+        modifiedConfig.addProperty("dmaap_dr_delete_endpoint","http://modified-delete-endpoint/1");
         when(sender.send(any())).thenReturn(modifiedConfig.toString());
         MapperConfig modifiedMapperConfig = configHandler.getMapperConfig();
 
@@ -137,7 +137,7 @@ public class DynamicConfigurationTest {
         Configurable configurable = mock(Configurable.class);
         configurables.add(configurable);
         JsonObject modifiedConfig = new JsonParser().parse(config).getAsJsonObject();
-        modifiedConfig.addProperty("dmaap_dr_feed_id","3");
+        modifiedConfig.addProperty("dmaap_dr_delete_endpoint","http://modified-delete-endpoint/1");
 
         when(sender.send(any())).thenReturn(modifiedConfig.toString());
         MapperConfig modifiedMapperConfig = configHandler.getMapperConfig();
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java
deleted file mode 100644 (file)
index dbb95a7..0000000
+++ /dev/null
@@ -1,387 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- *  Copyright (C) 2019 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.services.pmmapper.datarouter;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyInt;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.read.ListAppender;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import io.undertow.io.Receiver;
-import io.undertow.io.Sender;
-import io.undertow.server.HttpServerExchange;
-import io.undertow.util.HeaderMap;
-import io.undertow.util.StatusCodes;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.jupiter.api.Assertions;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler;
-import org.onap.dcaegen2.services.pmmapper.exceptions.ReconfigurationException;
-import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
-import org.onap.dcaegen2.services.pmmapper.model.EnvironmentConfig;
-import org.onap.dcaegen2.services.pmmapper.model.Event;
-import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
-import org.onap.dcaegen2.services.pmmapper.utils.HttpServerExchangeAdapter;
-import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import utils.LoggingUtils;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({DataRouterSubscriber.class, EnvironmentConfig.class})
-public class DataRouterSubscriberTest {
-
-    private Path VALID_BC_RESPONSE_PATH = Paths.get("src/test/resources/datarouter_subscriber_test/valid_bc_response.json");
-    private Path VALID_METADATA_PATH = Paths.get("src/test/resources/valid_metadata.json");
-    private Path INVALID_METADATA_PATH = Paths.get("src/test/resources/invalid_metadata.json");
-    private Path VALID_CONFIG_PATH = Paths.get("src/test/resources/valid_mapper_config.json");
-
-    @Mock
-    private EventReceiver eventReceiver;
-    @Mock
-    private MapperConfig config;
-
-    private DataRouterSubscriber objUnderTest;
-
-    @Before
-    public void setUp() {
-        objUnderTest = new DataRouterSubscriber(eventReceiver, config);
-    }
-
-    @Test
-    public void testStartTooManyTriesWithResponse() throws IOException {
-        PowerMockito.mockStatic(Thread.class);
-
-        URL subEndpoint = mock(URL.class);
-        when(config.getBusControllerSubscriptionUrl()).thenReturn(subEndpoint);
-        HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
-        when(subEndpoint.openConnection()).thenReturn(huc);
-        when(huc.getResponseCode()).thenReturn(300);
-        Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start());
-    }
-
-    @Test
-    public void testStartImmediateSuccess() throws IOException, TooManyTriesException, InterruptedException {
-        URL subEndpoint = mock(URL.class);
-        when(config.getBusControllerSubscriptionUrl()).thenReturn(subEndpoint);
-        HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
-        String bcResponse = new String(Files.readAllBytes(VALID_BC_RESPONSE_PATH));
-        InputStream responseStream = new ByteArrayInputStream(bcResponse.getBytes(StandardCharsets.UTF_8));
-        when(huc.getInputStream()).thenReturn(responseStream);
-        when(subEndpoint.openConnection()).thenReturn(huc);
-        when(huc.getResponseCode()).thenReturn(200);
-        objUnderTest.start();
-        verify(huc, times(1)).getResponseCode();
-    }
-
-    @Test
-    public void testStartDelayedSuccess() throws IOException, TooManyTriesException, InterruptedException {
-        PowerMockito.mockStatic(Thread.class);
-        HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
-        String bcResponse = new String(Files.readAllBytes(VALID_BC_RESPONSE_PATH));
-        InputStream responseStream = new ByteArrayInputStream(bcResponse.getBytes(StandardCharsets.UTF_8));
-        when(huc.getInputStream()).thenReturn(responseStream);
-        URL subEndpoint = mock(URL.class);
-        when(config.getBusControllerSubscriptionUrl()).thenReturn(subEndpoint);
-
-        when(subEndpoint.openConnection()).thenReturn(huc);
-        doAnswer(new Answer() {
-            boolean forceRetry = true;
-
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-                if (forceRetry) {
-                    forceRetry = false;
-                    throw new IOException();
-                }
-                return 200;
-            }
-        }).when(huc).getResponseCode();
-        objUnderTest.start();
-        verify(huc, times(2)).getResponseCode();
-    }
-
-    @Test
-    public void testStartReadTimeout() throws IOException {
-        PowerMockito.mockStatic(Thread.class);
-
-        URL subEndpoint = mock(URL.class);
-        when(config.getBusControllerSubscriptionUrl()).thenReturn(subEndpoint);
-        HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
-        when(subEndpoint.openConnection()).thenReturn(huc);
-        doThrow(new IOException()).when(huc).getResponseCode();
-        Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start());
-    }
-
-    @Test
-    public void testRequestInboundLimitedStateServiceUnavailable() throws Exception {
-        HttpServerExchange httpServerExchange = mock(HttpServerExchange.class);
-        HttpServerExchangeAdapter adapterMock = PowerMockito.mock(HttpServerExchangeAdapter.class);
-        PowerMockito.whenNew(HttpServerExchangeAdapter.class).withAnyArguments().thenReturn(adapterMock);
-
-        Sender responseSender = mock(Sender.class);
-        when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
-        when(httpServerExchange.getResponseSender()).thenReturn(responseSender);
-        objUnderTest.setLimited(true);
-        objUnderTest.handleRequest(httpServerExchange);
-        verify(httpServerExchange).setStatusCode(StatusCodes.SERVICE_UNAVAILABLE);
-    }
-
-    @Test
-    public void testRequestInboundLimitedStateServiceNoEmission() throws Exception {
-        HttpServerExchange httpServerExchange = mock(HttpServerExchange.class);
-        HttpServerExchangeAdapter adapterMock = PowerMockito.mock(HttpServerExchangeAdapter.class);
-        PowerMockito.whenNew(HttpServerExchangeAdapter.class).withAnyArguments().thenReturn(adapterMock);
-
-        Sender responseSender = mock(Sender.class);
-        when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
-        when(httpServerExchange.getResponseSender()).thenReturn(responseSender);
-        objUnderTest.setLimited(true);
-        objUnderTest.handleRequest(httpServerExchange);
-        verify(eventReceiver, times(0)).receive(any());
-    }
-
-    @Test
-    public void testStartPositiveResponseCodeInvalidResponseBody() throws Exception{
-        PowerMockito.mockStatic(EnvironmentConfig.class);
-        PowerMockito.mockStatic(Thread.class);
-        PowerMockito.when(EnvironmentConfig.getCBSHostName()).thenReturn("");
-        PowerMockito.when(EnvironmentConfig.getCBSPort()).thenReturn(1);
-        PowerMockito.when(EnvironmentConfig.getServiceName()).thenReturn("");
-
-        URL mockURL = mock(URL.class);
-        when(config.getBusControllerSubscriptionUrl()).thenReturn(mockURL);
-        HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
-        String bcResponse = "not a valid response";
-        InputStream responseStream = new ByteArrayInputStream(bcResponse.getBytes(StandardCharsets.UTF_8));
-        when(huc.getInputStream()).thenReturn(responseStream);
-        when(mockURL.openConnection()).thenReturn(huc);
-        when(huc.getResponseCode()).thenReturn(200);
-        Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start());
-    }
-
-    @Test
-    public void testRequestInboundInvalidMetadata() throws Exception {
-        HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
-        JsonObject metadata = new JsonParser().parse(new String(Files
-                .readAllBytes(INVALID_METADATA_PATH))).getAsJsonObject();
-        when(httpServerExchange.getRequestHeaders().get(any(String.class)).get(anyInt()))
-                .thenReturn(metadata.toString());
-        when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
-        objUnderTest.handleRequest(httpServerExchange);
-        verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.BAD_REQUEST);
-        verify(httpServerExchange.getResponseSender(), times(1)).send("Malformed Metadata.");
-
-    }
-
-    @Test
-    public void testRequestInboundNoMetadata() throws Exception {
-        HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
-        Receiver receiver = mock(Receiver.class);
-        HeaderMap headers = mock(HeaderMap.class);
-        when(httpServerExchange.getRequestReceiver()).thenReturn(receiver);
-        when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
-        when(httpServerExchange.getRequestHeaders()).thenReturn(headers);
-        when(headers.get(any(String.class))).thenReturn(null);
-
-        doAnswer((Answer<Void>) invocationOnMock -> {
-            Receiver.FullStringCallback callback = invocationOnMock.getArgument(0);
-            callback.handle(httpServerExchange, "");
-            return null;
-        }).when(receiver).receiveFullString(any());
-        doAnswer((Answer<Void>) invocationOnMock -> {
-            Runnable runnable = invocationOnMock.getArgument(0);
-            runnable.run();
-            return null;
-        }).when(httpServerExchange).dispatch(any(Runnable.class));
-        objUnderTest.handleRequest(httpServerExchange);
-        verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.BAD_REQUEST);
-        verify(httpServerExchange.getResponseSender(), times(1)).send("Missing Metadata.");
-
-    }
-
-    @Test
-    public void testRequestInboundSuccess() throws Exception {
-        ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DataRouterSubscriber.class);
-        HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
-        Receiver receiver = mock(Receiver.class);
-        when(httpServerExchange.getRequestReceiver()).thenReturn(receiver);
-        String testString = "MESSAGE BODY";
-        JsonObject metadata = new JsonParser().parse(
-                new String(Files.readAllBytes(VALID_METADATA_PATH))).getAsJsonObject();
-        when(httpServerExchange.getRequestHeaders().get(DataRouterSubscriber.METADATA_HEADER).get(anyInt()))
-                .thenReturn(metadata.toString());
-        when(httpServerExchange.getRequestHeaders().get(DataRouterSubscriber.PUB_ID_HEADER).getFirst()).thenReturn("");
-        doAnswer((Answer<Void>) invocationOnMock -> {
-            Receiver.FullStringCallback callback = invocationOnMock.getArgument(0);
-            callback.handle(httpServerExchange, testString);
-            return null;
-        }).when(receiver).receiveFullString(any());
-
-        doAnswer((Answer<Void>) invocationOnMock -> {
-            Runnable runnable = invocationOnMock.getArgument(0);
-            runnable.run();
-            return null;
-        }).when(httpServerExchange).dispatch(any(Runnable.class));
-
-        objUnderTest.handleRequest(httpServerExchange);
-        verify(eventReceiver, times(1)).receive(any(Event.class));
-
-        assertEquals(logAppender.list.get(0).getMarker().getName(), "ENTRY");
-        assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("InvocationID"));
-        assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("RequestID"));
-        assertEquals(logAppender.list.get(1).getMarker().getName(), "EXIT");
-        logAppender.stop();
-    }
-
-    @Test
-    public void testConfigThrowsMalformedURLException() throws MalformedURLException {
-        when(config.getBusControllerSubscriptionUrl()).thenThrow(MalformedURLException.class);
-        Assertions.assertThrows(IllegalStateException.class, () -> objUnderTest.start());
-    }
-    @Test
-    public void testReconfigurationSameConfig() throws Exception {
-        PowerMockito.mockStatic(EnvironmentConfig.class);
-        PowerMockito.when(EnvironmentConfig.getCBSHostName()).thenReturn("");
-        PowerMockito.when(EnvironmentConfig.getCBSPort()).thenReturn(1);
-        PowerMockito.when(EnvironmentConfig.getServiceName()).thenReturn("");
-
-        RequestSender sender = mock(RequestSender.class);
-        String mapperConfig = new String(Files.readAllBytes(VALID_CONFIG_PATH));
-        when(sender.send(any())).thenReturn(mapperConfig);
-        MapperConfig originalMapperConfig = new ConfigHandler(sender).getMapperConfig();
-
-        DataRouterSubscriber objUnderTest = new DataRouterSubscriber(eventReceiver, originalMapperConfig);
-        objUnderTest.reconfigure(originalMapperConfig);
-        assertEquals(originalMapperConfig, objUnderTest.getConfig());
-    }
-
-    @Test
-    public void testReconfigurationModifiedFeedId() throws Exception {
-        PowerMockito.mockStatic(EnvironmentConfig.class);
-        PowerMockito.when(EnvironmentConfig.getCBSHostName()).thenReturn("");
-        PowerMockito.when(EnvironmentConfig.getCBSPort()).thenReturn(1);
-        PowerMockito.when(EnvironmentConfig.getServiceName()).thenReturn("");
-
-        URL mockURL = mock(URL.class);
-        when(config.getBusControllerSubscriptionUrl()).thenReturn(mockURL);
-        HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
-        String bcResponse = new String(Files.readAllBytes(VALID_BC_RESPONSE_PATH));
-        InputStream responseStream = new ByteArrayInputStream(bcResponse.getBytes(StandardCharsets.UTF_8));
-        when(huc.getInputStream()).thenReturn(responseStream);
-        when(mockURL.openConnection()).thenReturn(huc);
-        when(huc.getResponseCode()).thenReturn(200);
-
-        PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL);
-
-        RequestSender sender = mock(RequestSender.class);
-        String mapperConfig = new String(Files.readAllBytes(VALID_CONFIG_PATH));
-        when(sender.send(any())).thenReturn(mapperConfig);
-        MapperConfig originalMapperConfig = new ConfigHandler(sender).getMapperConfig();
-        JsonObject modifiedMapperConfigObj = new JsonParser().parse(mapperConfig).getAsJsonObject();
-        modifiedMapperConfigObj.addProperty("dmaap_dr_feed_id", "3");
-        when(sender.send(any())).thenReturn(modifiedMapperConfigObj.toString());
-        MapperConfig modifiedMapperConfig = new ConfigHandler(sender).getMapperConfig();
-
-        DataRouterSubscriber objUnderTest = new DataRouterSubscriber(eventReceiver, originalMapperConfig);
-        objUnderTest.reconfigure(modifiedMapperConfig);
-        assertEquals(modifiedMapperConfig, objUnderTest.getConfig());
-    }
-
-    @Test
-    public void testReconfigurationModifiedUsername() throws Exception {
-        PowerMockito.mockStatic(EnvironmentConfig.class);
-        PowerMockito.when(EnvironmentConfig.getCBSHostName()).thenReturn("");
-        PowerMockito.when(EnvironmentConfig.getCBSPort()).thenReturn(1);
-        PowerMockito.when(EnvironmentConfig.getServiceName()).thenReturn("");
-
-        URL mockURL = mock(URL.class);
-        when(config.getBusControllerSubscriptionUrl()).thenReturn(mockURL);
-        HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
-        String bcResponse = new String(Files.readAllBytes(VALID_BC_RESPONSE_PATH));
-        InputStream responseStream = new ByteArrayInputStream(bcResponse.getBytes(StandardCharsets.UTF_8));
-        when(huc.getInputStream()).thenReturn(responseStream);
-        when(mockURL.openConnection()).thenReturn(huc);
-        when(huc.getResponseCode()).thenReturn(200);
-
-        PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL);
-
-        RequestSender sender = mock(RequestSender.class);
-        String mapperConfig = new String(Files.readAllBytes(VALID_CONFIG_PATH));
-        when(sender.send(any())).thenReturn(mapperConfig);
-        MapperConfig originalMapperConfig = new ConfigHandler(sender).getMapperConfig();
-        JsonObject modifiedMapperConfigObj = new JsonParser().parse(mapperConfig).getAsJsonObject();
-        modifiedMapperConfigObj.get("streams_subscribes")
-                .getAsJsonObject().get("dmaap_subscriber")
-                .getAsJsonObject().get("dmaap_info")
-                .getAsJsonObject()
-                .addProperty("username", "bob");
-        when(sender.send(any())).thenReturn(modifiedMapperConfigObj.toString());
-        MapperConfig modifiedMapperConfig = new ConfigHandler(sender).getMapperConfig();
-
-        DataRouterSubscriber objUnderTest = new DataRouterSubscriber(eventReceiver, originalMapperConfig);
-        objUnderTest.reconfigure(modifiedMapperConfig);
-        assertEquals(modifiedMapperConfig, objUnderTest.getConfig());
-    }
-
-    @Test
-    public void testReconfigurationMalformedURL() throws Exception {
-        when(config.getBusControllerSubscriptionUrl()).thenThrow(MalformedURLException.class);
-        Assertions.assertThrows(IllegalStateException.class, () -> objUnderTest.reconfigure(config));
-    }
-    @Test
-    public void testReconfigurationException() throws Exception {
-        PowerMockito.mockStatic(Thread.class);
-        URL url = mock(URL.class);
-        when(url.toString()).thenReturn("http://valid:8080/");
-        when(url.openConnection()).thenThrow(IOException.class);
-        when(config.getBusControllerSubscriptionUrl()).thenReturn(url);
-        Assertions.assertThrows(ReconfigurationException.class, () -> objUnderTest.reconfigure(config));
-    }
-}
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DeliveryHandlerTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DeliveryHandlerTest.java
new file mode 100644 (file)
index 0000000..94a2c7d
--- /dev/null
@@ -0,0 +1,148 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.datarouter;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import io.undertow.io.Receiver;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.HeaderMap;
+import io.undertow.util.StatusCodes;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.stubbing.Answer;
+import org.onap.dcaegen2.services.pmmapper.model.EnvironmentConfig;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import utils.LoggingUtils;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({DeliveryHandler.class, EnvironmentConfig.class})
+public class DeliveryHandlerTest {
+
+    private Path VALID_METADATA_PATH = Paths.get("src/test/resources/valid_metadata.json");
+    private Path INVALID_METADATA_PATH = Paths.get("src/test/resources/invalid_metadata.json");
+
+    @Mock
+    private EventReceiver eventReceiver;
+
+    private DeliveryHandler objUnderTest;
+
+    @Before
+    public void setUp() {
+        objUnderTest = new DeliveryHandler(eventReceiver);
+    }
+
+    @Test
+    public void testRequestInboundInvalidMetadata() throws Exception {
+        HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
+        JsonObject metadata = new JsonParser().parse(new String(Files
+                .readAllBytes(INVALID_METADATA_PATH))).getAsJsonObject();
+        when(httpServerExchange.getRequestHeaders().get(any(String.class)).get(anyInt()))
+                .thenReturn(metadata.toString());
+        when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
+        objUnderTest.handleRequest(httpServerExchange);
+        verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.BAD_REQUEST);
+        verify(httpServerExchange.getResponseSender(), times(1)).send("Malformed Metadata.");
+
+    }
+
+    @Test
+    public void testRequestInboundNoMetadata() throws Exception {
+        HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
+        Receiver receiver = mock(Receiver.class);
+        HeaderMap headers = mock(HeaderMap.class);
+        when(httpServerExchange.getRequestReceiver()).thenReturn(receiver);
+        when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
+        when(httpServerExchange.getRequestHeaders()).thenReturn(headers);
+        when(headers.get(any(String.class))).thenReturn(null);
+
+        doAnswer((Answer<Void>) invocationOnMock -> {
+            Receiver.FullStringCallback callback = invocationOnMock.getArgument(0);
+            callback.handle(httpServerExchange, "");
+            return null;
+        }).when(receiver).receiveFullString(any());
+        doAnswer((Answer<Void>) invocationOnMock -> {
+            Runnable runnable = invocationOnMock.getArgument(0);
+            runnable.run();
+            return null;
+        }).when(httpServerExchange).dispatch(any(Runnable.class));
+        objUnderTest.handleRequest(httpServerExchange);
+        verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.BAD_REQUEST);
+        verify(httpServerExchange.getResponseSender(), times(1)).send("Missing Metadata.");
+
+    }
+
+    @Test
+    public void testRequestInboundSuccess() throws Exception {
+        ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DeliveryHandler.class);
+        HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
+        Receiver receiver = mock(Receiver.class);
+        when(httpServerExchange.getRequestReceiver()).thenReturn(receiver);
+        String testString = "MESSAGE BODY";
+        JsonObject metadata = new JsonParser().parse(
+                new String(Files.readAllBytes(VALID_METADATA_PATH))).getAsJsonObject();
+        when(httpServerExchange.getRequestHeaders().get(DeliveryHandler.METADATA_HEADER).get(anyInt()))
+                .thenReturn(metadata.toString());
+        when(httpServerExchange.getRequestHeaders().get(DeliveryHandler.PUB_ID_HEADER).getFirst()).thenReturn("");
+        doAnswer((Answer<Void>) invocationOnMock -> {
+            Receiver.FullStringCallback callback = invocationOnMock.getArgument(0);
+            callback.handle(httpServerExchange, testString);
+            return null;
+        }).when(receiver).receiveFullString(any());
+
+        doAnswer((Answer<Void>) invocationOnMock -> {
+            Runnable runnable = invocationOnMock.getArgument(0);
+            runnable.run();
+            return null;
+        }).when(httpServerExchange).dispatch(any(Runnable.class));
+
+        objUnderTest.handleRequest(httpServerExchange);
+        verify(eventReceiver, times(1)).receive(any(Event.class));
+
+        assertEquals(logAppender.list.get(0).getMarker().getName(), "ENTRY");
+        assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("InvocationID"));
+        assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("RequestID"));
+        assertEquals(logAppender.list.get(1).getMarker().getName(), "EXIT");
+        logAppender.stop();
+    }
+}
\ No newline at end of file
index 9975849..b2e6308 100644 (file)
@@ -40,7 +40,6 @@ import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber;
 import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException;
 import org.onap.dcaegen2.services.pmmapper.model.Event;
 import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
@@ -57,7 +56,7 @@ import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLContext;
 
 @PowerMockIgnore({"org.apache.http.conn.ssl.*", "javax.net.ssl.*" , "javax.crypto.*"})
-@PrepareForTest({RequestSender.class,DataRouterSubscriber.class})
+@PrepareForTest(RequestSender.class)
 @RunWith(PowerMockRunner.class)
 public class DataRouterUtilsTest {
 
diff --git a/src/test/resources/datarouter_subscriber_test/valid_bc_response.json b/src/test/resources/datarouter_subscriber_test/valid_bc_response.json
deleted file mode 100644 (file)
index 201b786..0000000
+++ /dev/null
@@ -1,15 +0,0 @@
-{
-  "type": "dr_Sub",
-  "lastMod": "2019-03-11T14:29:39.659",
-  "status": "VALID",
-  "dcaeLocationName": "location",
-  "deliveryURL": "delivery_url",
-  "feedId": "2",
-  "logURL": "https://dmaap-dr-prov/sublog/2",
-  "owner": "DGL",
-  "subId": "1",
-  "suspended": false,
-  "use100": false,
-  "username": "username",
-  "userpwd": "password"
-}
\ No newline at end of file
index 89bca57..251beb2 100644 (file)
       "aaf_username": null
     }
   },
-  "dmaap_dr_feed_id": "2",
-  "buscontroller_feed_subscription_endpoint": "http://dmaap-bc.onap.svc.cluster.local:8080/webapi/dr_subs",
   "dmaap_dr_delete_endpoint": "http://dmaap-dr-node.onap.svc.cluster.local:8443/delete",
-  "services_calls": {}
+  "services_calls": {},
+  "key_store_path": "src/test/resources/testkeystore.jks.b64",
+  "key_store_pass_path": "src/test/resources/password",
+  "trust_store_path": "src/test/resources/testkeystore.jks.b64",
+  "trust_store_pass_path": "src/test/resources/password",
+  "enable_http": false
 }
\ No newline at end of file
index 3f855cf..87fc021 100644 (file)
       "aaf_username": null
     }
   },
-  "dmaap_dr_feed_id": "2",
-  "buscontroller_feed_subscription_endpoint": "http://dmaap-bc.onap.svc.cluster.local:8080/webapi/dr_subs",
   "dmaap_dr_delete_endpoint": "http://dmaap-dr-node.onap.svc.cluster.local:8443/delete",
-  "services_calls": {}
+  "services_calls": {},
+  "key_store_path": "src/test/resources/testkeystore.jks.b64",
+  "key_store_pass_path": "src/test/resources/password",
+  "trust_store_path": "src/test/resources/testkeystore.jks.b64",
+  "trust_store_pass_path": "src/test/resources/password",
+  "enable_http": false
 }
\ No newline at end of file
index e37b77e..3d9d707 100644 (file)
@@ -27,8 +27,6 @@
             "aaf_username": null\r
         }\r
     },\r
-    "dmaap_dr_feed_id": "2",\r
-    "buscontroller_feed_subscription_endpoint": "http://dmaap-bc.onap.svc.cluster.local:8080/webapi/dr_subs",\r
     "dmaap_dr_delete_endpoint": "http://dmaap-dr-node.onap.svc.cluster.local:8443/delete",\r
     "services_calls": {},\r
     "key_store_path": "src/test/resources/testkeystore.jks.b64",\r
index cf21437..21de3fb 100644 (file)
@@ -8,6 +8,5 @@
   "location": "ftpes://192.168.0.101:22/ftp/rop/A20161224.1045-1100.bin.gz",
   "compression": "gzip",
   "fileFormatType": "org.3GPP.32.435#measCollec",
-  "fileFormatVersion": "V9",
-  "decompression_status": "false"
+  "fileFormatVersion": "V9"
 }
\ No newline at end of file