Add metadata filtering 37/81837/3
authordfarrelly <david.farrelly@est.tech>
Wed, 13 Mar 2019 12:02:20 +0000 (12:02 +0000)
committerdfarrelly <david.farrelly@est.tech>
Wed, 13 Mar 2019 12:02:20 +0000 (12:02 +0000)
Issue-ID: DCAEGEN2-1286
Change-Id: Icfee7f24cb97b429e8b0db2d67da2f21e413cea0
Signed-off-by: dfarrelly <david.farrelly@est.tech>
src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java [new file with mode: 0644]
src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java
src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java
src/test/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilterTest.java [new file with mode: 0644]
src/test/resources/incorrect_metadata.json [new file with mode: 0644]
src/test/resources/multiple_filter_mapper_config.json [new file with mode: 0644]
src/test/resources/no_filter_mapper_config.json [new file with mode: 0644]
src/test/resources/valid_mapper_config.json
src/test/resources/valid_metadata.json

index 11767e6..09d8975 100644 (file)
@@ -32,6 +32,7 @@ import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError;
 import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException;
 import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException;
 import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
+import org.onap.dcaegen2.services.pmmapper.filtering.MetadataFilter;
 import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
 import org.onap.dcaegen2.services.pmmapper.model.Event;
 import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
@@ -56,6 +57,10 @@ public class App {
     public static void main(String[] args) throws InterruptedException, TooManyTriesException, CBSConfigException, EnvironmentConfigException, CBSServerError, MapperConfigException {
         Flux<Event> flux = Flux.create(eventFluxSink -> fluxSink = eventFluxSink);
         HealthCheckHandler healthCheckHandler = new HealthCheckHandler();
+
+        MapperConfig mapperConfig = new ConfigHandler().getMapperConfig();
+
+        MetadataFilter metadataFilter = new MetadataFilter(mapperConfig);
         Mapper mapper = new Mapper(mappingTemplate);
         XMLValidator validator = new XMLValidator(xmlSchema);
         flux.onBackpressureDrop(App::handleBackPressure)
@@ -64,12 +69,12 @@ public class App {
                 .parallel()
                 .runOn(Schedulers.newParallel(""), 1)
                 .doOnNext(event -> MDC.setContextMap(event.getMdc()))
+                .filter(metadataFilter::filter)
                 .filter(validator::validate)
                 .map(mapper::map)
                 .subscribe(event -> logger.unwrap().info("Event Processed"));
 
         DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next);
-        MapperConfig mapperConfig = new ConfigHandler().getMapperConfig();
         dataRouterSubscriber.start(mapperConfig);
 
         Undertow.builder()
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java
new file mode 100644 (file)
index 0000000..20c8a64
--- /dev/null
@@ -0,0 +1,126 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.filtering;
+
+import lombok.NonNull;
+import org.onap.dcaegen2.services.pmmapper.exceptions.*;
+import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
+import org.onap.dcaegen2.services.pmmapper.model.MeasFilterConfig;
+import org.onap.dcaegen2.services.pmmapper.model.MeasFilterConfig.Filter;
+import org.onap.dcaegen2.services.pmmapper.utils.DataRouterUtils;
+import org.onap.logging.ref.slf4j.ONAPLogAdapter;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class MetadataFilter {
+    private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(MetadataFilter.class));
+    MapperConfig config;
+
+    public MetadataFilter(MapperConfig config) {
+        this.config = config;
+    }
+
+    /**
+     * Filters events by their metadata against filter object in configuration
+     * @param event inbound event
+     */
+    public boolean filter(@NonNull Event event) {
+        String decompressionStatus;
+        logger.unwrap().info("Filtering event metadata");
+        EventMetadata metadata = event.getMetadata();
+
+        MeasFilterConfig measFilterConfig = config.getFilterConfig();
+
+        List<MeasFilterConfig.Filter> filters = measFilterConfig.getFilters();
+
+        if(metadata.getDecompressionStatus() != null) {
+            decompressionStatus = metadata.getDecompressionStatus();
+            logger.unwrap().debug("Decompression Status: {}", decompressionStatus);
+        }
+
+        if(filters.isEmpty()) {
+            logger.unwrap().info("No filter specified in config: {}", filters);
+            return true;
+        }
+
+        for(Filter filter : filters) {
+            if(compareObjects(filter, metadata)) {
+                logger.unwrap().info("Metadata matches filter: {}", filter);
+
+                event.setFilter(filter);
+
+                return true;
+            } else {
+                logger.unwrap().debug("Metadata does not match filter: {}", filter);
+            }
+        }
+        logger.unwrap().info("Metadata does not match any filters, sending process event indicator to DR");
+        try {
+            DataRouterUtils.processEvent(config, event);
+        }catch (ProcessEventException exception) {
+            logger.unwrap().error("Process event failure", exception);
+        }
+        return false;
+
+    }
+
+    /**
+     * Compares event metadata against filter object
+     * @param filter filter object received from configuration
+     * @param metadata metadata from event
+     */
+     private boolean compareObjects(Filter filter, EventMetadata metadata) {
+        List<Validator<Filter, EventMetadata>> validators = Arrays.asList(
+                new VendorValidator(),
+                new TypeValidator()
+        );
+
+        for(Validator<Filter, EventMetadata> validation : validators) {
+            if (! validation.validate(filter, metadata)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    interface Validator<A, B> {
+        boolean validate(A filter, B metadata);
+    }
+
+    class VendorValidator implements Validator<Filter, EventMetadata> {
+        @Override
+        public boolean validate(Filter filter, EventMetadata metadata) {
+            return filter.getVendor().equals(metadata.getVendorName());
+        }
+    }
+
+    class TypeValidator implements Validator<Filter, EventMetadata> {
+        @Override
+        public boolean validate(Filter filter, EventMetadata metadata) {
+            return filter.getNfType().equals(metadata.getProductName());
+        }
+    }
+}
\ No newline at end of file
index 0e82119..fa41142 100644 (file)
@@ -24,6 +24,7 @@ import lombok.Data;
 import lombok.NonNull;
 
 import java.util.Map;
+import org.onap.dcaegen2.services.pmmapper.model.MeasFilterConfig.Filter;
 
 /**
  * Class used to pass around relevant inbound event data.
@@ -41,5 +42,7 @@ public class Event {
     @NonNull
     private String publishIdentity;
 
-    MeasCollecFile measCollecFile;
+    private MeasCollecFile measCollecFile;
+
+    private Filter filter;
 }
index 601b00f..8a0977d 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.onap.dcaegen2.services.pmmapper.model;
 
+import com.google.gson.annotations.SerializedName;
 import lombok.Data;
 import org.onap.dcaegen2.services.pmmapper.utils.GSONRequired;
 
@@ -47,4 +48,6 @@ public class EventMetadata {
     private String fileFormatType;
     @GSONRequired
     private String fileFormatVersion;
+    @SerializedName("decompression_status")
+    private String decompressionStatus;
 }
index 458a6cd..0f1aaa9 100644 (file)
@@ -26,6 +26,7 @@ import com.google.gson.annotations.SerializedName;
 import lombok.Data;\r
 import lombok.EqualsAndHashCode;\r
 import lombok.NoArgsConstructor;\r
+import org.onap.dcaegen2.services.pmmapper.utils.GSONRequired;\r
 \r
 @Data\r
 @EqualsAndHashCode\r
@@ -37,17 +38,20 @@ public class MeasFilterConfig {
 \r
       @Data\r
       public class Filter {\r
+          @GSONRequired\r
           @SerializedName("pmDefVsn")\r
           private String dictionaryVersion;\r
 \r
+          @GSONRequired\r
           @SerializedName("nfType")\r
           private String nfType;\r
 \r
+          @GSONRequired\r
           @SerializedName("vendor")\r
           private String vendor;\r
 \r
+          @GSONRequired\r
           @SerializedName("measTypes")\r
           private List<String> measTypes;\r
-\r
       }\r
 }\r
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilterTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilterTest.java
new file mode 100644 (file)
index 0000000..47c187a
--- /dev/null
@@ -0,0 +1,119 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.filtering;
+
+import com.google.gson.Gson;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import utils.EventUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(MockitoExtension.class)
+@PrepareForTest(MapperConfig.class)
+public class MetadataFilterTest {
+
+    private MetadataFilter metadataFilter;
+
+    private static MapperConfig validConfig;
+    private static MapperConfig noFilterConfig;
+    private static MapperConfig multipleFilterConfig;
+
+    private static String validConfigFileContents;
+    private static String noFilterConfigFileContents;
+    private static String multipleFilterConfigFileContents;
+
+    private static final Path validMetadata = Paths.get("src/test/resources/valid_metadata.json");
+    private static final Path incorrectMetadata = Paths.get("src/test/resources/incorrect_metadata.json");
+
+    private static final Path validConfigPath = Paths.get("src/test/resources/valid_mapper_config.json");
+    private static final Path noFilterConfigPath = Paths.get("src/test/resources/no_filter_mapper_config.json");
+    private static final Path multipleFilterConfigPath = Paths.get("src/test/resources/multiple_filter_mapper_config.json");
+
+    private static final Path dataDirectory = Paths.get("src/test/resources/xml_validator_test/test_data/");
+
+
+    @BeforeEach
+    void setup() throws Exception {
+        validConfigFileContents = new String(Files.readAllBytes(validConfigPath));
+        noFilterConfigFileContents = new String(Files.readAllBytes(noFilterConfigPath));
+        multipleFilterConfigFileContents = new String(Files.readAllBytes(multipleFilterConfigPath));
+
+        validConfig = new Gson().fromJson(validConfigFileContents, MapperConfig.class);
+        noFilterConfig = new Gson().fromJson(noFilterConfigFileContents, MapperConfig.class);
+        multipleFilterConfig = new Gson().fromJson(multipleFilterConfigFileContents, MapperConfig.class);
+
+
+        metadataFilter = new MetadataFilter(this.validConfig);
+    }
+
+
+    @ParameterizedTest
+    @MethodSource("getEventsWithValidMetadata")
+    void testValidMetadataPass(Event testEvent) {
+        assertTrue(metadataFilter.filter(testEvent));
+    }
+
+    @ParameterizedTest
+    @MethodSource("getEventsWithValidMetadata")
+    void testEmptyFilterPass(Event testEvent) {
+        metadataFilter.config = noFilterConfig;
+        assertTrue(metadataFilter.filter(testEvent));
+    }
+
+    @ParameterizedTest
+    @MethodSource("getEventsWithValidMetadata")
+    void testMultipleFilterPass(Event testEvent) {
+        metadataFilter.config = multipleFilterConfig;
+        assertTrue(metadataFilter.filter(testEvent));
+    }
+
+    @ParameterizedTest
+    @MethodSource("getEventsWithInvalidMetadata")
+    void testInvalidMetadataFail(Event testEvent) {
+        assertFalse(metadataFilter.filter(testEvent));
+    }
+
+
+
+    private static List<Event> getEventsWithValidMetadata() throws IOException {
+        Path validDataDirectory = Paths.get(dataDirectory.toString() + "/valid/");
+        return EventUtils.eventsFromDirectory(validDataDirectory, validMetadata);
+    }
+
+    private static List<Event> getEventsWithInvalidMetadata() throws IOException {
+        Path validDataDirectory = Paths.get(dataDirectory.toString() + "/valid/");
+        return EventUtils.eventsFromDirectory(validDataDirectory, incorrectMetadata);
+    }
+}
\ No newline at end of file
diff --git a/src/test/resources/incorrect_metadata.json b/src/test/resources/incorrect_metadata.json
new file mode 100644 (file)
index 0000000..e61e94f
--- /dev/null
@@ -0,0 +1,12 @@
+{
+  "productName": "LTE",
+  "vendorName": "Ericsson",
+  "lastEpochMicrosec": "1538478000000",
+  "sourceName": "oteNB5309",
+  "startEpochMicrosec": "1538478900000",
+  "timeZoneOffset": "UTC+05.00",
+  "location": "ftpes://192.168.0.101:22/ftp/rop/A20161224.1045-1100.bin.gz",
+  "compression": "gzip",
+  "fileFormatType": "org.3GPP.32.435#measCollec",
+  "fileFormatVersion": "V8"
+}
\ No newline at end of file
diff --git a/src/test/resources/multiple_filter_mapper_config.json b/src/test/resources/multiple_filter_mapper_config.json
new file mode 100644 (file)
index 0000000..89bca57
--- /dev/null
@@ -0,0 +1,34 @@
+{
+  "pm-mapper-filter": {"filters":[{"pmDefVsn": "V8","nfType": "LTE","vendor": "Ericsson","measTypes": [ "A", "B" ]},{"pmDefVsn": "V9","nfType": "NrRadio","vendor": "Ericsson","measTypes": [ "A", "B" ]}]},
+  "streams_subscribes": {
+    "dmaap_subscriber": {
+      "type": "data_router",
+      "aaf_username": null,
+      "aaf_password": null,
+      "dmaap_info": {
+        "location": "location",
+        "delivery_url": "delivery_url",
+        "username": "username",
+        "password": "password",
+        "subscriber_id": "subscriber_id"
+      }
+    }
+  },
+  "streams_publishes": {
+    "dmaap_publisher": {
+      "type": "message_router",
+      "aaf_password": null,
+      "dmaap_info": {
+        "topic_url": "https://message-router.onap.svc.cluster.local:3904/events/some-topic",
+        "client_role": null,
+        "location": null,
+        "client_id": null
+      },
+      "aaf_username": null
+    }
+  },
+  "dmaap_dr_feed_id": "2",
+  "buscontroller_feed_subscription_endpoint": "http://dmaap-bc.onap.svc.cluster.local:8080/webapi/dr_subs",
+  "dmaap_dr_delete_endpoint": "http://dmaap-dr-node.onap.svc.cluster.local:8443/delete",
+  "services_calls": {}
+}
\ No newline at end of file
diff --git a/src/test/resources/no_filter_mapper_config.json b/src/test/resources/no_filter_mapper_config.json
new file mode 100644 (file)
index 0000000..3f855cf
--- /dev/null
@@ -0,0 +1,34 @@
+{
+  "pm-mapper-filter": {"filters":[]},
+  "streams_subscribes": {
+    "dmaap_subscriber": {
+      "type": "data_router",
+      "aaf_username": null,
+      "aaf_password": null,
+      "dmaap_info": {
+        "location": "location",
+        "delivery_url": "delivery_url",
+        "username": "username",
+        "password": "password",
+        "subscriber_id": "subscriber_id"
+      }
+    }
+  },
+  "streams_publishes": {
+    "dmaap_publisher": {
+      "type": "message_router",
+      "aaf_password": null,
+      "dmaap_info": {
+        "topic_url": "https://message-router.onap.svc.cluster.local:3904/events/some-topic",
+        "client_role": null,
+        "location": null,
+        "client_id": null
+      },
+      "aaf_username": null
+    }
+  },
+  "dmaap_dr_feed_id": "2",
+  "buscontroller_feed_subscription_endpoint": "http://dmaap-bc.onap.svc.cluster.local:8080/webapi/dr_subs",
+  "dmaap_dr_delete_endpoint": "http://dmaap-dr-node.onap.svc.cluster.local:8443/delete",
+  "services_calls": {}
+}
\ No newline at end of file
index 6cd76bd..040406f 100644 (file)
@@ -1,5 +1,5 @@
 {\r
-    "pm-mapper-filter": {"filters":[]},\r
+    "pm-mapper-filter": {"filters":[{"pmDefVsn": "V9","nfType": "NrRadio","vendor": "Ericsson","measTypes": [ "A", "B" ]}]},\r
     "streams_subscribes": {\r
         "dmaap_subscriber": {\r
             "type": "data_router",\r
index 21de3fb..cf21437 100644 (file)
@@ -8,5 +8,6 @@
   "location": "ftpes://192.168.0.101:22/ftp/rop/A20161224.1045-1100.bin.gz",
   "compression": "gzip",
   "fileFormatType": "org.3GPP.32.435#measCollec",
-  "fileFormatVersion": "V9"
+  "fileFormatVersion": "V9",
+  "decompression_status": "false"
 }
\ No newline at end of file