VES Collector - Event Ordering 36/87336/19
authorZlatko Murgoski <zlatko.murgoski@nokia.com>
Thu, 9 May 2019 09:21:14 +0000 (11:21 +0200)
committerZlatko Murgoski <zlatko.murgoski@nokia.com>
Fri, 14 Jun 2019 09:56:34 +0000 (11:56 +0200)
https://jira.onap.org/browse/DCAEGEN2-1483

Change-Id: I28b0e871ce570a3cf4c0d2e08d040b66eb6db3aa
Issue-ID: DCAEGEN2-1483
Signed-off-by: Zlatko Murgoski <zlatko.murgoski@nokia.com>
23 files changed:
dpo/blueprint/blueprint_ves.yaml
dpo/data-formats/ConsulConfig.json
dpo/spec/vescollector-componentspec.json
dpo/tosca_model/schema.yaml
dpo/tosca_model/template.yaml
dpo/tosca_model/translate.yaml
etc/collector.properties
src/main/java/org/onap/dcae/ApplicationSettings.java
src/main/java/org/onap/dcae/VesApplication.java
src/main/java/org/onap/dcae/common/EventProcessor.java [deleted file]
src/main/java/org/onap/dcae/common/EventSender.java
src/main/java/org/onap/dcae/common/EventUpdater.java [new file with mode: 0644]
src/main/java/org/onap/dcae/restapi/EventValidator.java [new file with mode: 0644]
src/main/java/org/onap/dcae/restapi/HealthCheckController.java
src/main/java/org/onap/dcae/restapi/SwaggerConfig.java
src/main/java/org/onap/dcae/restapi/VesRestController.java
src/main/java/org/onap/dcae/restapi/WebMvcConfig.java
src/test/java/org/onap/dcae/ApplicationSettingsTest.java
src/test/java/org/onap/dcae/TLSTestBase.java
src/test/java/org/onap/dcae/common/EventSenderTest.java
src/test/resources/controller-config_dmaap_ip.json
src/test/resources/controller-config_singleline_ip.json
src/test/resources/test_collector_ip_op.properties

index 2dbc5a6..43158f1 100644 (file)
 # ============LICENSE_END=========================================================
 #
 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
-tosca_definitions_version: cloudify_dsl_1_3
-
-description: >
-  This handcrafted blueprint will install the ves collector and provision the needed message router topics. This blueprint can be used to verify that a platform installation is operational and working correctly.
 
+tosca_definitions_version: cloudify_dsl_1_3
 imports:
-- http://www.getcloudify.org/spec/cloudify/3.4/types.yaml
-- https://NEXUS_REPO_HOST:8443/repository/NEXUS_RAW/type_files/docker/2.2.0/node-type.yaml
-- https://NEXUS_REPO_HOST:8443/repository/NEXUS_RAW/type_files/relationship/1.0.0/node-type.yaml
-- http://NEXUS_REPO_HOST:8081/repository/NEXUS_RAW/type_files/dmaap/dmaap_mr.yaml
-
+  - http://www.getcloudify.org/spec/cloudify/3.4/types.yaml
+  - https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R4/k8splugin/1.4.5/k8splugin_types.yaml
+  - https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R4/dcaepolicyplugin/2.3.0/dcaepolicyplugin_types.yaml
 inputs:
-
-  service_id:
-    description: Unique id used for an instance of this DCAE service. Use deployment id
-    default: 'foobar'
-  location_id:
-    default: 'solutioning-central'
-  docker_host_override:
-    default: 'component_dockerhost'
-
-  topic00_aaf_username:
-  topic00_aaf_password:
-  topic00_location:
-    default: mtc5
-  topic00_client_role:
-    default: com.att.dcae.member
-
-  topic01_aaf_username:
-  topic01_aaf_password:
-  topic01_location:
-    default: mtc5
-  topic01_client_role:
-    default: com.att.dcae.member
-
-  topic02_aaf_username:
-  topic02_aaf_password:
-  topic02_location:
-    default: mtc5
-  topic02_client_role:
-    default: com.att.dcae.member
-
-  topic03_aaf_username:
-  topic03_aaf_password:
-  topic03_location:
-    default: mtc5
-  topic03_client_role:
-    default: com.att.dcae.member
-
+  collector.dmaap.streamid:
+    type: string
+    default: "fault=ves-fault,ves-fault-secondary|syslog=ves-syslog,ves-syslog-secondary|heartbeat=ves-heartbeat,ves-heartbeat-secondary|measurementsForVfScaling=ves-measurement,ves-measurement-secondary|mobileFlow=ves-mobileflow,ves-mobileflow-secondary|other=ves-other,ves-other-secondary|stateChange=ves-statechange,ves-statechange-secondary|thresholdCrossingAlert=ves-thresholdCrossingAlert,ves-thresholdCrossingAlert-secondary|voiceQuality=ves-voicequality,ves-voicequality-secondary|sipSignaling=ves-sipsignaling,ves-sipsignaling-secondary|notification=ves-notification,ves-notification-secondary|pnfRegistration=ves-pnfRegistration,ves-pnfRegistration-secondary"
+  external_port:
+    type: string
+    description: Kubernetes node port on which collector is exposed
+    default: "30235"
+  header.authlist:
+    type: string
+    default: "sample1,$2a$10$pgjaxDzSuc6XVFEeqvxQ5u90DKJnM/u7TJTcinAlFJVaavXMWf/Zi|userid1,$2a$10$61gNubgJJl9lh3nvQvY9X.x4e5ETWJJ7ao7ZhJEvmfJigov26Z6uq|userid2,$2a$10$G52y/3uhuhWAMy.bx9Se8uzWinmbJa.dlm1LW6bYPdPkkywLDPLiy"
+  log_directory:
+    type: string
+    default: "/opt/app/VESCollector/logs"
+  replicas:
+    type: integer
+    description: number of instances
+    default: 1
+  tag_version:
+    type: string
+    default: "nexus.onap.org:10001/onap/org.onap.dcaegen2.collectors.ves.vescollector:1.3"
+  ves_fault_publish_url:
+    type: string
+  ves_fault_secondary_publish_url:
+    type: string
+  ves_heartbeat_publish_url:
+    type: string
+  ves_heartbeat_secondary_publish_url:
+    type: string
+  ves_measurement_publish_url:
+    type: string
+  ves_measurement_secondary_publish_url:
+    type: string
+  ves_mobileflow_publish_url:
+    type: string
+  ves_mobileflow_secondary_publish_url:
+    type: string
+  ves_notification_publish_url:
+    type: string
+  ves_notification_secondary_publish_url:
+    type: string
+  ves_other_publish_url:
+    type: string
+  ves_other_secondary_publish_url:
+    type: string
+  ves_pnfRegistration_publish_url:
+    type: string
+  ves_pnfRegistration_secondary_publish_url:
+    type: string
+  ves_sipsignaling_publish_url:
+    type: string
+  ves_sipsignaling_secondary_publish_url:
+    type: string
+  ves_statechange_publish_url:
+    type: string
+  ves_statechange_secondary_publish_url:
+    type: string
+  ves_syslog_publish_url:
+    type: string
+  ves_syslog_secondary_publish_url:
+    type: string
+  ves_thresholdCrossingAlert_publish_url:
+    type: string
+  ves_thresholdCrossingAlert_secondary_publish_url:
+    type: string
+  ves_voicequality_publish_url:
+    type: string
+  ves_voicequality_secondary_publish_url:
+    type: string
 node_templates:
-
-  topic00:
-    type: dcae.nodes.Topic
-    properties:
-      topic_name: sec-fault-unsecure
-
-  topic01:
-    type: dcae.nodes.Topic
-    properties:
-      topic_name: sec-measurement
-
-  topic02:
-    type: dcae.nodes.Topic
-    properties:
-      topic_name: sec-measurement-unsecure
-
-  topic03:
-    type: dcae.nodes.Topic
-    properties:
-      topic_name: sec-fault
-
-  component00:
-    type: dcae.nodes.DockerContainerForComponentsUsingDmaap
-    properties:
-      service_component_type:
-        'dcae-controller-ves-collector'
-      service_id:
-        { get_input: service_id }
-      location_id:
-        { get_input: location_id }
-      application_config:
-        collector.keystore.passwordfile: "/opt/app/dcae-certificate/.password"
-        collector.service.secure.port: -1
-        tomcat.maxthreads: '200'
-        collector.keystore.file.location: "/opt/app/dcae-certificate/keystore.jks"
-        auth.method: "noAuth"
-        collector.service.port: 8080
-        streams_publishes:
-          sec_fault_unsecure:
-            aaf_password: { get_input: topic00_aaf_password }
-            dmaap_info: "<<topic00>>"
-            type: message_router
-            aaf_username: { get_input: topic00_aaf_username }
-          sec_measurement:
-            aaf_password: { get_input: topic01_aaf_password }
-            aaf_username: { get_input: topic01_aaf_username }
-            type: message_router
-            dmaap_info: "<<topic01>>"
-          sec_measurement_unsecure:
-            aaf_password: { get_input: topic02_aaf_password }
-            aaf_username: { get_input: topic02_aaf_username }
-            dmaap_info: "<<topic02>>"
-            type: message_router
-          sec_fault:
-            aaf_password: { get_input: topic03_aaf_password }
-            aaf_username: { get_input: topic03_aaf_username }
-            dmaap_info: "<<topic03>>"
-            type: message_router
-        services_calls: {}
-        collector.schema.checkflag: 1
-        collector.dmaap.streamid: fault=sec_fault,roadm-sec-to-hp|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert
-        header.authlist: userid1,base64encodepwd1|userid2,base64encodepwd2
-        streams_subscribes: {}
-        collector.inputQueue.maxPending: 8096
-        collector.schema.file: "./etc/CommonEventFormat_27.2.json"
-      image:
-        NEXUS_REPO_HOST:18443/dcae-dev-raw/dcae-controller-ves-collector:1.1.3
-      docker_config:
-        healthcheck:
-          type: "http"
-          interval: "15s"
-          timeout: "1s"
-          endpoint: "/"
-      streams_publishes:
-      - name: topic00
-        location: { get_input: topic00_location }
-        client_role: { get_input: topic00_client_role }
-        type: message_router
-      - name: topic01
-        location: { get_input: topic01_location }
-        client_role: { get_input: topic01_client_role }
-        type: message_router
-      - name: topic02
-        location: { get_input: topic02_location }
-        client_role: { get_input: topic02_client_role }
-        type: message_router
-      - name: topic03
-        location: { get_input: topic03_location }
-        client_role: { get_input: topic03_client_role }
-        type: message_router
-      streams_subscribes: []
-    relationships:
-    - type: dcae.relationships.component_contained_in
-      target: docker_host
-    - type: dcae.relationships.publish_events
-      target: topic00
-    - type: dcae.relationships.publish_events
-      target: topic01
-    - type: dcae.relationships.publish_events
-      target: topic02
-    - type: dcae.relationships.publish_events
-      target: topic03
+  dcae-ves-collector:
+    type: dcae.nodes.ContainerizedPlatformComponent
     interfaces:
       cloudify.interfaces.lifecycle:
-        stop:
+        start:
           inputs:
-            cleanup_image:
-              True
-
-  docker_host:
-    type: dcae.nodes.SelectedDockerHost
+            ports:
+              - concat: ["8443:", {get_input: external_port }]
     properties:
-      location_id:
-        { get_input: location_id }
-      docker_host_override:
-        { get_input: docker_host_override }
+      application_config:
+        service_calls: []
+        stream_publishes:
+          ves-fault:
+            dmaap_info:
+              topic_url:
+                get_input: ves_fault_publish_url
+            type: message router
+          ves-fault-secondary:
+            dmaap_info:
+              topic_url:
+                get_input: ves_fault_secondary_publish_url
+            type: message router
+          ves-heartbeat:
+            dmaap_info:
+              topic_url:
+                get_input: ves_heartbeat_publish_url
+            type: message router
+          ves-heartbeat-secondary:
+            dmaap_info:
+              topic_url:
+                get_input: ves_heartbeat_secondary_publish_url
+            type: message router
+          ves-measurement:
+            dmaap_info:
+              topic_url:
+                get_input: ves_measurement_publish_url
+            type: message router
+          ves-measurement-secondary:
+            dmaap_info:
+              topic_url:
+                get_input: ves_measurement_secondary_publish_url
+            type: message router
+          ves-mobileflow:
+            dmaap_info:
+              topic_url:
+                get_input: ves_mobileflow_publish_url
+            type: message router
+          ves-mobileflow-secondary:
+            dmaap_info:
+              topic_url:
+                get_input: ves_mobileflow_secondary_publish_url
+            type: message router
+          ves-notification:
+            dmaap_info:
+              topic_url:
+                get_input: ves_notification_publish_url
+            type: message router
+          ves-notification-secondary:
+            dmaap_info:
+              topic_url:
+                get_input: ves_notification_secondary_publish_url
+            type: message router
+          ves-other:
+            dmaap_info:
+              topic_url:
+                get_input: ves_other_publish_url
+            type: message router
+          ves-other-secondary:
+            dmaap_info:
+              topic_url:
+                get_input: ves_other_secondary_publish_url
+            type: message router
+          ves-pnfRegistration:
+            dmaap_info:
+              topic_url:
+                get_input: ves_pnfRegistration_publish_url
+            type: message router
+          ves-pnfRegistration-secondary:
+            dmaap_info:
+              topic_url:
+                get_input: ves_pnfRegistration_secondary_publish_url
+            type: message router
+          ves-sipsignaling:
+            dmaap_info:
+              topic_url:
+                get_input: ves_sipsignaling_publish_url
+            type: message router
+          ves-sipsignaling-secondary:
+            dmaap_info:
+              topic_url:
+                get_input: ves_sipsignaling_secondary_publish_url
+            type: message router
+          ves-statechange:
+            dmaap_info:
+              topic_url:
+                get_input: ves_statechange_publish_url
+            type: message router
+          ves-statechange-secondary:
+            dmaap_info:
+              topic_url:
+                get_input: ves_statechange_secondary_publish_url
+            type: message router
+          ves-syslog:
+            dmaap_info:
+              topic_url:
+                get_input: ves_syslog_publish_url
+            type: message router
+          ves-syslog-secondary:
+            dmaap_info:
+              topic_url:
+                get_input: ves_syslog_secondary_publish_url
+            type: message router
+          ves-thresholdCrossingAlert:
+            dmaap_info:
+              topic_url:
+                get_input: ves_thresholdCrossingAlert_publish_url
+            type: message router
+          ves-thresholdCrossingAlert-secondary:
+            dmaap_info:
+              topic_url:
+                get_input: ves_thresholdCrossingAlert_secondary_publish_url
+            type: message router
+          ves-voicequality:
+            dmaap_info:
+              topic_url:
+                get_input: ves_voicequality_publish_url
+            type: message router
+          ves-voicequality-secondary:
+            dmaap_info:
+              topic_url:
+                get_input: ves_voicequality_secondary_publish_url
+            type: message router
+        stream_subcribes: {}
+        auth.method: noAuth
+        collector.dmaap.streamid:
+          get_input: collector.dmaap.streamid
+        collector.keystore.file.location: /opt/app/dcae-certificate/keystore.jks
+        collector.keystore.passwordfile: /opt/app/dcae-certificate/.password
+        collector.schema.checkflag: 1
+        collector.schema.file: {"v1":"./etc/CommonEventFormat_27.2.json","v2":"./etc/CommonEventFormat_27.2.json","v3":"./etc/CommonEventFormat_27.2.json","v4":"./etc/CommonEventFormat_27.2.json","v5":"./etc/CommonEventFormat_28.4.1.json","v7":"./etc/CommonEventFormat_30.json"}
+        collector.service.port: 8080
+        collector.service.secure.port: 8443
+        collector.truststore.file.location: /opt/app/dcae-certificate/truststore.jks
+        collector.truststore.passwordfile: /opt/app/dcae-certificate/.trustpassword
+        event.transform.flag: 1
+        header.authlist:
+          get_input: header.authlist
+        tomcat.maxthreads: 200
+      docker_config:
+        interval: 15s
+        timeout: 1s
+        type: https
+        endpoint: /healthcheck
+      image:
+        get_input: tag_version
+      log_info:
+        get_input: log_directory
+      dns_name: dcae-ves-collector
+      replicas:
+        get_input: replicas
+      name: dcae-ves-collector
index ea65522..89348bf 100644 (file)
@@ -6,7 +6,6 @@
   "collector.service.port": "8080",
   "collector.schema.file": "{\"v1\":\"./etc/CommonEventFormat_27.2.json\",\"v2\":\"./etc/CommonEventFormat_27.2.json\",\"v3\":\"./etc/CommonEventFormat_27.2.json\",\"v4\":\"./etc/CommonEventFormat_27.2.json\",\"v5\":\"./etc/CommonEventFormat_28.4.1.json\",\"v7\":\"./etc/CommonEventFormat_30.0.1.json\"}",
   "collector.keystore.passwordfile": "/opt/app/VESCollector/etc/passwordfile",
-  "collector.inputQueue.maxPending": "8096",
   "streams_publishes": {
     "ves-measurement": {
       "type": "message_router",
index 4e2eb97..1b47268 100644 (file)
@@ -1,6 +1,6 @@
 {
   "self": {
-    "version": "1.3.0",
+    "version": "1.5.0",
     "name": "dcae-ves-collector",
     "description": "Collector for receiving VES events through restful interface",
     "component_type": "docker"
       "policy_editable": false,
       "designer_editable": false
     },
-    {
-      "name": "collector.inputQueue.maxPending",
-      "value": 8096,
-      "description": "Maximum queue limit across domains collector will queue before event is published",
-      "sourced_at_deployment": false,
-      "policy_editable": false,
-      "designer_editable": false
-    },
     {
       "name": "collector.dmaap.streamid",
       "value": "fault=ves-fault,ves-fault-secondary|syslog=ves-syslog,ves-syslog-secondary|heartbeat=ves-heartbeat,ves-heartbeat-secondary|measurementsForVfScaling=ves-measurement,ves-measurement-secondary|mobileFlow=ves-mobileflow,ves-mobileflow-secondary|other=ves-other,ves-other-secondary|stateChange=ves-statechange,ves-statechange-secondary|thresholdCrossingAlert=ves-thresholdCrossingAlert,ves-thresholdCrossingAlert-secondary|voiceQuality=ves-voicequality,ves-voicequality-secondary|sipSignaling=ves-sipsignaling,ves-sipsignaling-secondary|notification=ves-notification,ves-notification-secondary|pnfRegistration=ves-pnfRegistration,ves-pnfRegistration-secondary",
     {
       "name": "auth.method",
       "value": "noAuth",
-      "description": "Basic Authentication flag; when enabled only secure port will be supported.",
+      "description": "Property to manage application mode, possible configurations: noAuth - default option - no security (http) , certOnly - auth by certificate (https), basicAuth - auth by basic auth username and password (https),certBasicAuth - auth by certificate and basic auth username / password (https),",
       "sourced_at_deployment": false,
       "policy_editable": false,
       "designer_editable": false
       }
     ],
     "ports": [
+      "8080:8080",
       "8443:8443"
     ]
   },
   "artifacts": [
     {
       "type": "docker image",
-      "uri": "nexus.onap.org:10001/onap/org.onap.dcaegen2.collectors.ves.vescollector:1.3"
+      "uri": "nexus.onap.org:10001/onap/org.onap.dcaegen2.collectors.ves.vescollector:latest"
     }
   ]
 }
index 6c1b275..c44a4f7 100644 (file)
@@ -195,8 +195,6 @@ node_types:
     properties:
       docker_collector.dmaap.streamid:
         type: string
-      docker_collector.inputQueue.maxPending:
-        type: string
       docker_collector.keystore.file.location:
         type: string
       docker_collector.keystore.passwordfile:
index 73b4ad3..2f132e1 100644 (file)
@@ -26,7 +26,6 @@ topology_template:
       type: dcae.nodes.dockerApp.ves
       properties:
         docker_collector.dmaap.streamid: fault=sec_fault,roadm-sec-to-hp|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert
-        docker_collector.inputQueue.maxPending: '8096'
         docker_collector.keystore.file.location: /opt/app/dcae-certificate/keystore.jks
         docker_collector.keystore.passwordfile: /opt/app/dcae-certificate/.password
         docker_collector.schema.checkflag: '1'
index 284f34b..f6b7a23 100644 (file)
@@ -24,8 +24,6 @@ topology_template:
   inputs:
     docker_collector.dmaap.streamid:
       type: string
-    docker_collector.inputQueue.maxPending:
-      type: string
     docker_collector.keystore.file.location:
       type: string
     docker_collector.keystore.passwordfile:
@@ -105,8 +103,6 @@ topology_template:
         application_config:
           collector.dmaap.streamid:
             get_input: docker_collector.dmaap.streamid
-          collector.inputQueue.maxPending:
-            get_input: docker_collector.inputQueue.maxPending
           collector.keystore.file.location:
             get_input: docker_collector.keystore.file.location
           collector.keystore.passwordfile:
index 82ba595..ae15cd9 100755 (executable)
@@ -46,15 +46,6 @@ collector.cert.subject.matcher=etc/certSubjectMatcher.properties
 collector.truststore.file.location=etc/truststore\r
 collector.truststore.passwordfile=etc/trustpasswordfile\r
 \r
-## Processing\r
-##\r
-## If there's a problem that prevents the collector from processing alarms,\r
-## it's normally better to apply back pressure to the caller than to try to\r
-## buffer beyond a reasonable size limit. With a limit, the server won't crash\r
-## due to being out of memory, and the caller will get a 5xx reply saying the\r
-## server is in trouble.\r
-collector.inputQueue.maxPending=8096\r
-\r
 ## Schema Validation checkflag\r
 ## default no validation checkflag (-1)\r
 ## If enabled (1) - schemafile location must be specified\r
index 61bcf4b..205659c 100644 (file)
@@ -84,38 +84,14 @@ public class ApplicationSettings {
             throw new ApplicationException(ex);
         }
     }
-    public void loadPropertiesFromFile() {
-        try {
-            properties.load(configurationFileLocation);
-        } catch (ConfigurationException ex) {
-            log.error("Cannot load properties cause:", ex);
-            throw new ApplicationException(ex);
-        }
-    }
-
     public Map<String, String> validAuthorizationCredentials() {
         return prepareUsersMap(properties.getString("header.authlist", null));
     }
 
-    private Map<String, String> prepareUsersMap(@Nullable String allowedUsers) {
-        return allowedUsers == null ? HashMap.empty()
-                : List.of(allowedUsers.split("\\|"))
-                .map(t->t.split(","))
-                .toMap(t-> t[0].trim(), t -> t[1].trim());
-    }
-
-    private String findOutConfigurationFileLocation(Map<String, String> parsedArgs) {
-        return prependWithUserDirOnRelative(parsedArgs.get("c").getOrElse("etc/collector.properties"));
-    }
-
     public Path configurationFileLocation() {
         return Paths.get(configurationFileLocation);
     }
 
-    public int maximumAllowedQueuedEvents() {
-        return properties.getInt("collector.inputQueue.maxPending", 1024 * 4);
-    }
-
     public boolean jsonSchemaValidationEnabled() {
         return properties.getInt("collector.schema.checkflag", -1) > 0;
     }
@@ -126,22 +102,8 @@ public class ApplicationSettings {
                 .getOrElseThrow(() -> new IllegalStateException("No fallback schema present in application."));
     }
 
-    private Map<String, JsonSchema> loadJsonSchemas() {
-        return jsonSchema().toMap().entrySet().stream()
-                .map(this::readSchemaForVersion)
-                .collect(HashMap.collector());
-    }
-
-    private Tuple2<String, JsonSchema> readSchemaForVersion(java.util.Map.Entry<String, Object> versionToFilePath) {
-        try {
-            String schemaContent = new String(
-                    readAllBytes(Paths.get(versionToFilePath.getValue().toString())));
-            JsonNode schemaNode = JsonLoader.fromString(schemaContent);
-            JsonSchema schema = JsonSchemaFactory.byDefault().getJsonSchema(schemaNode);
-            return Tuple(versionToFilePath.getKey(), schema);
-        } catch (IOException | ProcessingException e) {
-            throw new ApplicationException("Could not read schema from path: " + versionToFilePath.getValue(), e);
-        }
+    public boolean isVersionSupported(String version){
+       return loadedJsonSchemas.containsKey(version);
     }
 
     public int httpPort() {
@@ -183,6 +145,7 @@ public class ApplicationSettings {
     public String exceptionConfigFileLocation() {
         return properties.getString("exceptionConfig", null);
     }
+
     public String dMaaPConfigurationFileLocation() {
         return prependWithUserDirOnRelative(properties.getString("collector.dmaapfile", "etc/DmaapConfig.json"));
     }
@@ -204,7 +167,16 @@ public class ApplicationSettings {
         }
     }
 
-    public void addOrUpdate(String key, String value) {
+    private void loadPropertiesFromFile() {
+        try {
+            properties.load(configurationFileLocation);
+        } catch (ConfigurationException ex) {
+            log.error("Cannot load properties cause:", ex);
+            throw new ApplicationException(ex);
+        }
+    }
+
+    private void addOrUpdate(String key, String value) {
         if (properties.containsKey(key)) {
             properties.setProperty(key, value);
         } else {
@@ -212,6 +184,35 @@ public class ApplicationSettings {
         }
     }
 
+    private String findOutConfigurationFileLocation(Map<String, String> parsedArgs) {
+        return prependWithUserDirOnRelative(parsedArgs.get("c").getOrElse("etc/collector.properties"));
+    }
+
+    private Map<String, JsonSchema> loadJsonSchemas() {
+        return jsonSchema().toMap().entrySet().stream()
+            .map(this::readSchemaForVersion)
+            .collect(HashMap.collector());
+    }
+
+    private Tuple2<String, JsonSchema> readSchemaForVersion(java.util.Map.Entry<String, Object> versionToFilePath) {
+        try {
+            String schemaContent = new String(
+                readAllBytes(Paths.get(versionToFilePath.getValue().toString())));
+            JsonNode schemaNode = JsonLoader.fromString(schemaContent);
+            JsonSchema schema = JsonSchemaFactory.byDefault().getJsonSchema(schemaNode);
+            return Tuple(versionToFilePath.getKey(), schema);
+        } catch (IOException | ProcessingException e) {
+            throw new ApplicationException("Could not read schema from path: " + versionToFilePath.getValue(), e);
+        }
+    }
+
+    private Map<String, String> prepareUsersMap(@Nullable String allowedUsers) {
+        return allowedUsers == null ? HashMap.empty()
+            : List.of(allowedUsers.split("\\|"))
+                .map(t->t.split(","))
+                .toMap(t-> t[0].trim(), t -> t[1].trim());
+    }
+
     private JSONObject jsonSchema() {
         return new JSONObject(properties.getString("collector.schema.file",
                 format("{\"%s\":\"etc/CommonEventFormat_28.4.1.json\"}", FALLBACK_VES_VERSION)));
index d658b4a..e334082 100644 (file)
@@ -22,14 +22,9 @@ package org.onap.dcae;
 
 import io.vavr.collection.Map;
 import java.nio.file.Paths;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import org.json.JSONObject;
-import org.onap.dcae.common.EventProcessor;
 import org.onap.dcae.common.EventSender;
 import org.onap.dcae.common.publishing.DMaaPConfigurationParser;
 import org.onap.dcae.common.publishing.EventPublisher;
@@ -49,21 +44,16 @@ import org.springframework.context.annotation.Lazy;
 @SpringBootApplication(exclude = {GsonAutoConfiguration.class, SecurityAutoConfiguration.class})
 public class VesApplication {
 
-    private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
     private static final Logger incomingRequestsLogger = LoggerFactory.getLogger("org.onap.dcae.common.input");
     private static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.common.output");
     private static final Logger errorLog = LoggerFactory.getLogger("org.onap.dcae.common.error");
-    private static final int MAX_THREADS = 20;
-    public static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
     private static ApplicationSettings properties;
     private static ConfigurableApplicationContext context;
     private static ConfigLoader configLoader;
-    private static EventProcessor eventProcessor;
     private static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
     private static SpringApplication app;
     private static EventPublisher eventPublisher;
     private static ScheduledFuture<?> scheduleFeatures;
-    private static ExecutorService executor;
 
     public static void main(String[] args) {
       app = new SpringApplication(VesApplication.class);
@@ -73,7 +63,6 @@ public class VesApplication {
       app.setAddCommandLineProperties(true);
       context = app.run();
       configLoader.updateConfig();
-
     }
 
     public static void restartApplication() {
@@ -89,7 +78,6 @@ public class VesApplication {
     }
 
     private static void init() {
-      fProcessingInputQueue = new LinkedBlockingQueue<>(properties.maximumAllowedQueuedEvents());
       createConfigLoader();
       createSchedulePoolExecutor();
       createExecutors();
@@ -97,12 +85,6 @@ public class VesApplication {
 
     private static void createExecutors() {
       eventPublisher = EventPublisher.createPublisher(oplog, getDmapConfig());
-      eventProcessor = new EventProcessor(new EventSender(eventPublisher, properties));
-
-      executor = Executors.newFixedThreadPool(MAX_THREADS);
-      for (int i = 0; i < MAX_THREADS; ++i) {
-        executor.execute(eventProcessor);
-      }
     }
 
     private static void createSchedulePoolExecutor() {
@@ -141,12 +123,6 @@ public class VesApplication {
         return incomingRequestsLogger;
     }
 
-    @Bean
-    @Qualifier("metricsLog")
-    public Logger incomingRequestsMetricsLogger() {
-        return metriclog;
-    }
-
     @Bean
     @Qualifier("errorLog")
     public Logger errorLogger() {
@@ -154,8 +130,9 @@ public class VesApplication {
     }
 
     @Bean
-    public LinkedBlockingQueue<JSONObject> inputQueue() {
-        return fProcessingInputQueue;
+    @Qualifier("eventSender")
+    public EventSender eventSender() {
+        return new EventSender(eventPublisher,properties);
     }
 
 }
diff --git a/src/main/java/org/onap/dcae/common/EventProcessor.java b/src/main/java/org/onap/dcae/common/EventProcessor.java
deleted file mode 100644 (file)
index bf3bf70..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-/*-\r
- * ============LICENSE_START=======================================================\r
- * PROJECT\r
- * ================================================================================\r
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.\r
- * ================================================================================\r
- * Licensed under the Apache License, Version 2.0 (the "License");\r
- * you may not use this file except in compliance with the License.\r
- * You may obtain a copy of the License at\r
- *\r
- *      http://www.apache.org/licenses/LICENSE-2.0\r
- *\r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- * ============LICENSE_END=========================================================\r
- */\r
-\r
-package org.onap.dcae.common;\r
-\r
-import com.att.nsa.clock.SaClock;\r
-import com.att.nsa.logging.LoggingContext;\r
-import com.att.nsa.logging.log4j.EcompFields;\r
-import org.json.JSONObject;\r
-import org.onap.dcae.VesApplication;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-\r
-public class EventProcessor implements Runnable {\r
-\r
-    private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);\r
-    private EventSender eventSender;\r
-\r
-    public EventProcessor(EventSender eventSender) {\r
-        this.eventSender = eventSender;\r
-    }\r
-\r
-    @Override\r
-    public void run() {\r
-        try {\r
-          while (true){\r
-            JSONObject event = VesApplication.fProcessingInputQueue.take();\r
-            log.info("QueueSize:" + VesApplication.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event);\r
-            setLoggingContext(event);\r
-            log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + eventSender.getDomain(event));\r
-            eventSender.send(event);\r
-            log.debug("Message published" + event);\r
-          }\r
-        } catch (InterruptedException e) {\r
-            log.error("EventProcessor InterruptedException" + e.getMessage());\r
-            Thread.currentThread().interrupt();\r
-        }\r
-    }\r
-\r
-  private void setLoggingContext(JSONObject event) {\r
-        LoggingContext localLC = VESLogger.getLoggingContextForThread(event.get("VESuniqueId").toString());\r
-        localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());\r
-    }\r
-}
\ No newline at end of file
index 48268d6..c1002af 100644 (file)
  */
 package org.onap.dcae.common;
 
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
+import com.att.nsa.clock.SaClock;
+import com.att.nsa.logging.LoggingContext;
+import com.att.nsa.logging.log4j.EcompFields;
 import io.vavr.collection.Map;
-import java.io.FileReader;
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
+import org.json.JSONArray;
 import org.json.JSONObject;
-import org.onap.dcae.ApplicationException;
 import org.onap.dcae.ApplicationSettings;
 import org.onap.dcae.common.publishing.EventPublisher;
 import org.slf4j.Logger;
@@ -38,88 +33,47 @@ import org.slf4j.LoggerFactory;
 
 public class EventSender {
 
-  private static final String COULD_NOT_FIND_FILE = "Couldn't find file ./etc/eventTransform.json";
+  private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
   private Map<String, String[]> streamidHash;
-  private ApplicationSettings properties;
   private EventPublisher eventPublisher;
-
-  private static final Type EVENT_LIST_TYPE = new TypeToken<List<Event>>() {}.getType();
+  private static final String VES_UNIQUE_ID = "VESuniqueId";
   private static final Logger log = LoggerFactory.getLogger(EventSender.class);
   private static final String EVENT_LITERAL = "event";
   private static final String COMMON_EVENT_HEADER = "commonEventHeader";
-  private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
 
   public EventSender( EventPublisher eventPublisher, ApplicationSettings properties) {
     this.eventPublisher = eventPublisher;
     this.streamidHash = properties.dMaaPStreamsMapping();
-    this.properties = properties;
-
   }
 
-  public void send(JSONObject event) {
-    streamidHash.get(getDomain(event))
-        .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + event))
-        .forEach(streamIds -> sendEventsToStreams(event, streamIds));
+  public void send(JSONArray arrayOfEvents) {
+    for (int i = 0; i < arrayOfEvents.length(); i++) {
+      metriclog.info("EVENT_PUBLISH_START");
+      JSONObject object = (JSONObject) arrayOfEvents.get(i);
+      setLoggingContext(object);
+      streamidHash.get(getDomain(object))
+          .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + object))
+          .forEach(streamIds -> sendEventsToStreams(object, streamIds));
+      log.debug("Message published" + object);
+    }
+    log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
+    metriclog.info("EVENT_PUBLISH_END");
   }
 
-  public static String getDomain(JSONObject event) {
+  private static String getDomain(JSONObject event) {
     return event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain");
   }
 
   private void sendEventsToStreams(JSONObject event, String[] streamIdList) {
     for (String aStreamIdList : streamIdList) {
       log.info("Invoking publisher for streamId:" + aStreamIdList);
-      eventPublisher.sendEvent(overrideEvent(event), aStreamIdList);
+      eventPublisher.sendEvent(event, aStreamIdList);
     }
   }
 
-  private JSONObject overrideEvent(JSONObject event) {
-    JSONObject jsonObject = addCurrentTimeToEvent(event);
-    if (properties.eventTransformingEnabled()) {
-      try (FileReader fr = new FileReader("./etc/eventTransform.json")) {
-        log.info("parse eventTransform.json");
-        List<Event> events = new Gson().fromJson(fr, EVENT_LIST_TYPE);
-        parseEventsJson(events, new ConfigProcessorAdapter(new ConfigProcessors(jsonObject)));
-      } catch (IOException e) {
-        log.error(COULD_NOT_FIND_FILE, e);
-        throw new ApplicationException(COULD_NOT_FIND_FILE, e);
-      }
-    }
-    if (jsonObject.has("VESversion"))
-      jsonObject.remove("VESversion");
-
-    log.debug("Modified event:" + jsonObject);
-    return jsonObject;
-  }
-
-  private JSONObject addCurrentTimeToEvent(JSONObject event) {
-    final Date currentTime = new Date();
-    JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", dateFormat.format(currentTime));
-    JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER);
-    commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp);
-    event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey);
-    return event;
-  }
-
-  private void parseEventsJson(List<Event> eventsTransform, ConfigProcessorAdapter configProcessorAdapter) {
-    for (Event eventTransform : eventsTransform) {
-      JSONObject filterObj = new JSONObject(eventTransform.filter.toString());
-      if (configProcessorAdapter.isFilterMet(filterObj)) {
-        callProcessorsMethod(configProcessorAdapter, eventTransform.processors);
-      }
-    }
-  }
-
-  private void callProcessorsMethod(ConfigProcessorAdapter configProcessorAdapter, List<Processor> processors) {
-    for (Processor processor : processors) {
-      final String functionName = processor.functionName;
-      final JSONObject args = new JSONObject(processor.args.toString());
-      log.info(String.format("functionName==%s | args==%s", functionName, args));
-      try {
-        configProcessorAdapter.runConfigProcessorFunctionByName(functionName, args);
-      } catch (ReflectiveOperationException e) {
-        log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause());
-      }
-    }
+  private void setLoggingContext(JSONObject event) {
+    LoggingContext localLC = VESLogger.getLoggingContextForThread(event.get(VES_UNIQUE_ID).toString());
+    localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
+    log.debug("event.VESuniqueId" + event.get(VES_UNIQUE_ID) + "event.commonEventHeader.domain:" + getDomain(event));
   }
 }
diff --git a/src/main/java/org/onap/dcae/common/EventUpdater.java b/src/main/java/org/onap/dcae/common/EventUpdater.java
new file mode 100644 (file)
index 0000000..1caa4f1
--- /dev/null
@@ -0,0 +1,137 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019 Nokia. All rights reserved.s
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.common;
+
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import java.io.FileReader;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.onap.dcae.ApplicationException;
+import org.onap.dcae.ApplicationSettings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventUpdater {
+
+  private static final String EVENT_LIST = "eventList";
+  private static final String EVENT = "event";
+  private static final String VES_UNIQUE_ID = "VESuniqueId";
+  private static final String VES_VERSION = "VESversion";
+  private static final String COULD_NOT_FIND_FILE = "Couldn't find file ./etc/eventTransform.json";
+  private static final Type EVENT_LIST_TYPE = new TypeToken<List<Event>>() {}.getType();
+  private static final Logger log = LoggerFactory.getLogger(EventSender.class);
+  private static final String EVENT_LITERAL = "event";
+  private static final String COMMON_EVENT_HEADER = "commonEventHeader";
+  private static final String EVENT_TRANSFORM = "./etc/eventTransform.json";
+  private ApplicationSettings settings;
+  private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
+
+  public EventUpdater(ApplicationSettings settings) {
+    this.settings = settings;
+  }
+
+  public JSONArray convert(JSONObject jsonObject, String version, UUID uuid, String type){
+    if(type.equalsIgnoreCase(EVENT_LIST)){
+      return convertEvents(jsonObject, uuid.toString(), version);
+    }
+    else {
+      return convertEvent(jsonObject, uuid.toString(), version);
+    }
+  }
+
+  private JSONArray convertEvents(JSONObject jsonObject,
+      String uuid, String version) {
+    JSONArray asArrayEvents = new JSONArray();
+
+    JSONArray events = jsonObject.getJSONArray(EVENT_LIST);
+    for (int i = 0; i < events.length(); i++) {
+      JSONObject event = new JSONObject().put(EVENT, events.getJSONObject(i));
+      event.put(VES_UNIQUE_ID, uuid + "-" + i);
+      event.put(VES_VERSION, version);
+      asArrayEvents.put(overrideEvent(event));
+    }
+    return asArrayEvents;
+  }
+
+  private JSONArray convertEvent(JSONObject jsonObject, String uuid, String version) {
+    jsonObject.put(VES_UNIQUE_ID, uuid);
+    jsonObject.put(VES_VERSION, version);
+    return new JSONArray().put(overrideEvent(jsonObject));
+  }
+
+  private JSONObject overrideEvent(JSONObject event) {
+    JSONObject jsonObject = addCurrentTimeToEvent(event);
+    if (settings.eventTransformingEnabled()) {
+      try (FileReader fr = new FileReader(EVENT_TRANSFORM)) {
+        log.info("parse " + EVENT_TRANSFORM + " file");
+        List<Event> events = new Gson().fromJson(fr, EVENT_LIST_TYPE);
+        parseEventsJson(events, new ConfigProcessorAdapter(new ConfigProcessors(jsonObject)));
+      } catch (IOException e) {
+        log.error(COULD_NOT_FIND_FILE, e);
+        throw new ApplicationException(COULD_NOT_FIND_FILE, e);
+      }
+    }
+    if (jsonObject.has(VES_VERSION))
+       jsonObject.remove(VES_VERSION);
+    log.debug("Modified event:" + jsonObject);
+    return jsonObject;
+  }
+
+  private JSONObject addCurrentTimeToEvent(JSONObject event) {
+    final Date currentTime = new Date();
+    JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", dateFormat.format(currentTime));
+    JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER);
+    commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp);
+    event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey);
+    return event;
+  }
+
+  private void parseEventsJson(List<Event> eventsTransform, ConfigProcessorAdapter configProcessorAdapter) {
+    for (Event eventTransform : eventsTransform) {
+      JSONObject filterObj = new JSONObject(eventTransform.filter.toString());
+      if (configProcessorAdapter.isFilterMet(filterObj)) {
+        callProcessorsMethod(configProcessorAdapter, eventTransform.processors);
+      }
+    }
+  }
+
+  private void callProcessorsMethod(ConfigProcessorAdapter configProcessorAdapter, List<Processor> processors) {
+    for (Processor processor : processors) {
+      //TODO try to remove refection
+      final String functionName = processor.functionName;
+      final JSONObject args = new JSONObject(processor.args.toString());
+      log.info(String.format("functionName==%s | args==%s", functionName, args));
+      try {
+        configProcessorAdapter.runConfigProcessorFunctionByName(functionName, args);
+      } catch (ReflectiveOperationException e) {
+        log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause());
+      }
+    }
+  }
+}
diff --git a/src/main/java/org/onap/dcae/restapi/EventValidator.java b/src/main/java/org/onap/dcae/restapi/EventValidator.java
new file mode 100644 (file)
index 0000000..f119b50
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019 Nokia. All rights reserved.s
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.restapi;
+
+import static java.util.stream.StreamSupport.stream;
+
+import com.github.fge.jackson.JsonLoader;
+import com.github.fge.jsonschema.core.report.ProcessingReport;
+import com.github.fge.jsonschema.main.JsonSchema;
+import java.util.Optional;
+import org.json.JSONObject;
+import org.onap.dcae.ApplicationException;
+import org.onap.dcae.ApplicationSettings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.ResponseEntity;
+
+public class EventValidator {
+
+  private static final Logger log = LoggerFactory.getLogger(EventValidator.class);
+
+  private ApplicationSettings applicationSettings;
+
+  public EventValidator(ApplicationSettings applicationSettings) {
+    this.applicationSettings = applicationSettings;
+  }
+
+  public Optional<ResponseEntity<String>> validate(JSONObject jsonObject, String type, String version){
+    if (applicationSettings.jsonSchemaValidationEnabled()) {
+      if (jsonObject.has(type)) {
+        if (!conformsToSchema(jsonObject, version)) {
+          return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
+        }
+      } else {
+        return errorResponse(ApiException.INVALID_JSON_INPUT);
+      }
+    }
+    return Optional.empty();
+  }
+
+  private boolean conformsToSchema(JSONObject payload, String version) {
+    try {
+      JsonSchema schema = applicationSettings.jsonSchema(version);
+      ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString()));
+      if (report.isSuccess()) {
+        return true;
+      }
+      log.warn("Schema validation failed for event: " + payload);
+      stream(report.spliterator(), false).forEach(e -> log.warn(e.getMessage()));
+      return false;
+    } catch (Exception e) {
+      throw new ApplicationException("Unable to validate against schema", e);
+    }
+  }
+
+  private Optional<ResponseEntity<String>> errorResponse(ApiException noServerResources) {
+    return Optional.of(ResponseEntity.status(noServerResources.httpStatusCode)
+        .body(noServerResources.toJSON().toString()));
+  }
+}
index 9c65619..77c6802 100644 (file)
 
 package org.onap.dcae.restapi;
 
-import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
 
-
-@Controller
+@RestController
 public class HealthCheckController {
 
+    @GetMapping("/")
+    public String main() {
+        return "Welcome to VESCollector";
+    }
+
     @GetMapping("/healthcheck")
     public String healthCheck() {
-        return "hello";
+        return "I'm good";
     }
 
 }
index 60740a8..267db05 100644 (file)
@@ -31,6 +31,7 @@ import springfox.documentation.swagger2.annotations.EnableSwagger2;
 @Configuration
 @EnableSwagger2
 public class SwaggerConfig{
+
   @Bean
   public Docket api() {
     return new Docket(DocumentationType.SWAGGER_2)
@@ -39,5 +40,4 @@ public class SwaggerConfig{
         .paths(PathSelectors.any())
         .build();
   }
-
 }
index 3102c31..b18eb7b 100644 (file)
 
 package org.onap.dcae.restapi;
 
-import static java.util.stream.StreamSupport.stream;
 import static org.springframework.http.ResponseEntity.accepted;
+import static org.springframework.http.ResponseEntity.badRequest;
 
 import com.att.nsa.clock.SaClock;
 import com.att.nsa.logging.LoggingContext;
 import com.att.nsa.logging.log4j.EcompFields;
-import com.github.fge.jackson.JsonLoader;
-import com.github.fge.jsonschema.core.report.ProcessingReport;
-import com.github.fge.jsonschema.main.JsonSchema;
-
+import java.util.Optional;
 import java.util.UUID;
-import java.util.concurrent.LinkedBlockingQueue;
 import javax.servlet.http.HttpServletRequest;
-
 import org.json.JSONArray;
 import org.json.JSONObject;
-import org.onap.dcae.ApplicationException;
 import org.onap.dcae.ApplicationSettings;
+import org.onap.dcae.common.EventSender;
 import org.onap.dcae.common.VESLogger;
+import org.onap.dcae.common.EventUpdater;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RestController;
@@ -54,152 +49,64 @@ import org.springframework.web.bind.annotation.RestController;
 @RestController
 public class VesRestController {
 
-    private static final Logger log = LoggerFactory.getLogger(VesRestController.class);
-    private static final String INVALID_JSON = ApiException.INVALID_JSON_INPUT.toJSON().toString();
-    private final ApplicationSettings applicationSettings;
-    private final LinkedBlockingQueue<JSONObject> inputQueue;
-    private final Logger metricsLog;
-    private final Logger errorLog;
-    private final Logger incomingRequestsLogger;
+    private static final String VES_EVENT_MESSAGE = "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'";
+    private static final String EVENT_LIST = "eventList";
+    private static final String EVENT = "event";
+    private final ApplicationSettings settings;
+    private final Logger requestLogger;
+    private EventSender eventSender;
 
     @Autowired
-    VesRestController(ApplicationSettings applicationSettings,
-                      @Qualifier("metricsLog") Logger metricsLog,
-                      @Qualifier("errorLog") Logger errorLog,
+    VesRestController(ApplicationSettings settings,
                       @Qualifier("incomingRequestsLogger") Logger incomingRequestsLogger,
-                      @Qualifier("inputQueue") LinkedBlockingQueue<JSONObject> inputQueue) {
-        this.applicationSettings = applicationSettings;
-        this.metricsLog = metricsLog;
-        this.errorLog = errorLog;
-        this.incomingRequestsLogger = incomingRequestsLogger;
-        this.inputQueue = inputQueue;
-    }
-
-    @GetMapping("/")
-    String mainPage() {
-        return "Welcome to VESCollector";
+                      @Qualifier("eventSender") EventSender eventSender) {
+        this.settings = settings;
+        this.requestLogger = incomingRequestsLogger;
+        this.eventSender = eventSender;
     }
 
-    //refactor in next iteration
-    @PostMapping(value = {"/eventListener/v1",
-            "/eventListener/v1/eventBatch",
-            "/eventListener/v2",
-            "/eventListener/v2/eventBatch",
-            "/eventListener/v3",
-            "/eventListener/v3/eventBatch",
-            "/eventListener/v4",
-            "/eventListener/v4/eventBatch",
-            "/eventListener/v5",
-            "/eventListener/v5/eventBatch",
-            "/eventListener/v7",
-            "/eventListener/v7/eventBatch"}, consumes = "application/json")
-    ResponseEntity<String> receiveEvent(@RequestBody String jsonPayload, HttpServletRequest httpServletRequest) {
-        String request = httpServletRequest.getRequestURI();
-        String version = extractVersion(request);
-
-        JSONObject jsonObject;
-        try {
-            jsonObject = new JSONObject(jsonPayload);
-        } catch (Exception e) {
-            log.error(INVALID_JSON);
-            return ResponseEntity.badRequest().body(INVALID_JSON);
-        }
-
-        String uuid = setUpECOMPLoggingForRequest();
-        incomingRequestsLogger.info(String.format(
-                "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'",
-                jsonObject, uuid, version, httpServletRequest.getRemoteHost()));
-
-        if (applicationSettings.jsonSchemaValidationEnabled()) {
-            if (isBatchRequest(request) && (jsonObject.has("eventList") && (!jsonObject.has("event")))) {
-                if (!conformsToSchema(jsonObject, version)) {
-                    return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
-                }
-            } else if (!isBatchRequest(request) && (!jsonObject.has("eventList") && (jsonObject.has("event")))) {
-                if (!conformsToSchema(jsonObject, version)) {
-                    return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
-                }
-            } else {
-                return errorResponse(ApiException.INVALID_JSON_INPUT);
-            }
+    @PostMapping(value = {"/eventListener/{version}"}, consumes = "application/json")
+    ResponseEntity<String> event(@RequestBody String event, @PathVariable String version, HttpServletRequest request) {
+        if (settings.isVersionSupported(version)) {
+            return process(event, version, request, EVENT);
         }
+        return badRequest().contentType(MediaType.APPLICATION_JSON).body(String.format("API version %s is not supported", version));
+    }
 
-        JSONArray commonlyFormatted = convertToJSONArrayCommonFormat(jsonObject, request, uuid, version);
 
-        if (!putEventsOnProcessingQueue(commonlyFormatted)) {
-            errorLog.error("EVENT_RECEIPT_FAILURE: QueueFull " + ApiException.NO_SERVER_RESOURCES);
-            return errorResponse(ApiException.NO_SERVER_RESOURCES);
+    @PostMapping(value = {"/eventListener/{version}/eventBatch"}, consumes = "application/json")
+    ResponseEntity<String> events(@RequestBody String events, @PathVariable String version, HttpServletRequest request) {
+        if (settings.isVersionSupported(version)) {
+            return process(events, version, request, EVENT_LIST);
         }
-        return accepted()
-                .contentType(MediaType.APPLICATION_JSON)
-                .body("Accepted");
+        return badRequest().contentType(MediaType.APPLICATION_JSON).body(String.format("API version %s is not supported", version));
     }
 
-    private String extractVersion(String httpServletRequest) {
-        return httpServletRequest.split("/")[2];
-    }
+    private ResponseEntity<String> process(String events, String version, HttpServletRequest request, String type) {
 
-    private ResponseEntity<String> errorResponse(ApiException noServerResources) {
-        return ResponseEntity.status(noServerResources.httpStatusCode)
-                .body(noServerResources.toJSON().toString());
-    }
+        JSONObject jsonObject = new JSONObject(events);
 
-    private boolean putEventsOnProcessingQueue(JSONArray arrayOfEvents) {
-        for (int i = 0; i < arrayOfEvents.length(); i++) {
-            metricsLog.info("EVENT_PUBLISH_START");
-            if (!inputQueue.offer((JSONObject) arrayOfEvents.get(i))) {
-                return false;
-            }
-        }
-        log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
-        metricsLog.info("EVENT_PUBLISH_END");
-        return true;
-    }
+        EventValidator eventValidator = new EventValidator(settings);
+        Optional<ResponseEntity<String>> validationResult = eventValidator.validate(jsonObject, type, version);
 
-    private boolean conformsToSchema(JSONObject payload, String version) {
-        try {
-            JsonSchema schema = applicationSettings.jsonSchema(version);
-            ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString()));
-            if (!report.isSuccess()) {
-                log.warn("Schema validation failed for event: " + payload);
-                stream(report.spliterator(), false).forEach(e -> log.warn(e.getMessage()));
-                return false;
-            }
-            return report.isSuccess();
-        } catch (Exception e) {
-            throw new ApplicationException("Unable to validate against schema", e);
+        if (validationResult.isPresent()){
+            return validationResult.get();
         }
+        JSONArray arrayOfEvents = new EventUpdater(settings).convert(jsonObject,version, generateUUID(version, request.getRequestURI(), jsonObject), type);
+        eventSender.send(arrayOfEvents);
+        // TODO call service and return status, replace CambriaClient, split event to single object and list of them
+        return accepted().contentType(MediaType.APPLICATION_JSON).body("Accepted");
     }
 
-    private static JSONArray convertToJSONArrayCommonFormat(JSONObject jsonObject, String request,
-                                                            String uuid, String version) {
-        JSONArray asArrayEvents = new JSONArray();
-        String vesUniqueIdKey = "VESuniqueId";
-        String vesVersionKey = "VESversion";
-        if (isBatchRequest(request)) {
-            JSONArray events = jsonObject.getJSONArray("eventList");
-            for (int i = 0; i < events.length(); i++) {
-                JSONObject event = new JSONObject().put("event", events.getJSONObject(i));
-                event.put(vesUniqueIdKey, uuid + "-" + i);
-                event.put(vesVersionKey, version);
-                asArrayEvents.put(event);
-            }
-        } else {
-            jsonObject.put(vesUniqueIdKey, uuid);
-            jsonObject.put(vesVersionKey, version);
-            asArrayEvents = new JSONArray().put(jsonObject);
-        }
-        return asArrayEvents;
+    private UUID generateUUID(String version, String uri, JSONObject jsonObject) {
+        UUID uuid = UUID.randomUUID();
+        setUpECOMPLoggingForRequest(uuid);
+        requestLogger.info(String.format(VES_EVENT_MESSAGE, jsonObject, uuid, version, uri));
+        return uuid;
     }
 
-    private static String setUpECOMPLoggingForRequest() {
-        final UUID uuid = UUID.randomUUID();
+    private static void setUpECOMPLoggingForRequest(UUID uuid) {
         LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
         localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
-        return uuid.toString();
-    }
-
-    private static boolean isBatchRequest(String request) {
-        return request.contains("eventBatch");
     }
 }
\ No newline at end of file
index 7059c4e..c3e2a5d 100644 (file)
@@ -52,5 +52,4 @@ public class WebMvcConfig extends WebMvcConfigurationSupport {
         resolver.setSuffix(".html");
         return resolver;
     }
-
 }
index 60287ae..6b0023f 100644 (file)
@@ -234,25 +234,6 @@ public class ApplicationSettingsTest {
         assertEquals(sanitizePath("etc/DmaapConfig.json"), dmaapConfigFileLocation);
     }
 
-    @Test
-    public void shouldReturnMaximumAllowedQueuedEvents() throws IOException {
-        // when
-        int maximumAllowedQueuedEvents = fromTemporaryConfiguration("collector.inputQueue.maxPending=10000")
-            .maximumAllowedQueuedEvents();
-
-        // then
-        assertEquals(10000, maximumAllowedQueuedEvents);
-    }
-
-    @Test
-    public void shouldReturnDefaultMaximumAllowedQueuedEvents() throws IOException {
-        // when
-        int maximumAllowedQueuedEvents = fromTemporaryConfiguration().maximumAllowedQueuedEvents();
-
-        // then
-        assertEquals(1024 * 4, maximumAllowedQueuedEvents);
-    }
-
     @Test
     public void shouldTellIfSchemaValidationIsEnabled() throws IOException {
         // when
index 4dada12..df10ead 100644 (file)
@@ -24,6 +24,7 @@ package org.onap.dcae;
 import org.json.JSONObject;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mockito;
+import org.onap.dcae.common.EventSender;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.mock.mockito.MockBean;
@@ -69,8 +70,8 @@ public class TLSTestBase {
     protected abstract class TestClassBase {
 
         @MockBean
-        @Qualifier("inputQueue")
-        protected LinkedBlockingQueue<JSONObject> queue;
+        @Qualifier("eventSender")
+        protected EventSender eventSender;
 
         @LocalServerPort
         private int port;
index aba3c2a..f49d3cd 100644 (file)
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
 
 import io.vavr.collection.HashMap;
 import io.vavr.collection.Map;
+import org.json.JSONArray;
 import org.json.JSONObject;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -39,7 +40,6 @@ import org.onap.dcae.common.publishing.EventPublisher;
 @RunWith(MockitoJUnitRunner.Silent.class)
 public class EventSenderTest {
 
-
   private String event = "{\"VESversion\":\"v7\",\"VESuniqueId\":\"fd69d432-5cd5-4c15-9d34-407c81c61c6a-0\",\"event\":{\"commonEventHeader\":{\"startEpochMicrosec\":1544016106000000,\"eventId\":\"fault33\",\"timeZoneOffset\":\"UTC+00.00\",\"priority\":\"Normal\",\"version\":\"4.0.1\",\"nfVendorName\":\"Ericsson\",\"reportingEntityName\":\"1\",\"sequence\":1,\"domain\":\"fault\",\"lastEpochMicrosec\":1544016106000000,\"eventName\":\"Fault_KeyFileFault\",\"vesEventListenerVersion\":\"7.0.1\",\"sourceName\":\"1\"},\"faultFields\":{\"eventSeverity\":\"CRITICAL\",\"alarmCondition\":\"KeyFileFault\",\"faultFieldsVersion\":\"4.0\",\"eventCategory\":\"PROCESSINGERRORALARM\",\"specificProblem\":\"License Key File Fault_1\",\"alarmAdditionalInformation\":{\"probableCause\":\"ConfigurationOrCustomizationError\",\"additionalText\":\"test_1\",\"source\":\"ManagedElement=1,SystemFunctions=1,Lm=1\"},\"eventSourceType\":\"Lm\",\"vfStatus\":\"Active\"}}}\n";
 
   @Mock
@@ -54,7 +54,10 @@ public class EventSenderTest {
   public void shouldntSendEventWhenStreamIdsIsEmpty() {
     when(settings.dMaaPStreamsMapping()).thenReturn(HashMap.empty());
     eventSender = new EventSender(eventPublisher, settings );
-    eventSender.send(new JSONObject(event));
+    JSONObject jsonObject = new JSONObject(event);
+    JSONArray jsonArray = new JSONArray();
+    jsonArray.put(jsonObject);
+    eventSender.send(jsonArray);
     verify(eventPublisher,never()).sendEvent(any(),any());
   }
 
@@ -63,7 +66,10 @@ public class EventSenderTest {
     Map<String, String[]> streams = HashMap.of("fault", new String[]{"ves-fault", "fault-ves"});
     when(settings.dMaaPStreamsMapping()).thenReturn(streams);
     eventSender = new EventSender(eventPublisher, settings );
-    eventSender.send(new JSONObject(event));
+    JSONObject jsonObject = new JSONObject(event);
+    JSONArray jsonArray = new JSONArray();
+    jsonArray.put(jsonObject);
+    eventSender.send(jsonArray);
     verify(eventPublisher, times(2)).sendEvent(any(),any());
   }
 }
\ No newline at end of file
index 1cc6576..f148db5 100644 (file)
@@ -1,6 +1,5 @@
 {
        "auth.method": "noAuth",
-       "collector.inputQueue.maxPending": 8096,
        "collector.schema.checkflag": 1,
        "collector.keystore.file.location": "/opt/app/dcae-certificate/keystore.jks",
        "tomcat.maxthreads": "200",
index c3a8d06..a3974e0 100644 (file)
@@ -5,7 +5,6 @@
   "tomcat.maxthreads": "200",
   "collector.dmaap.streamid": "fault=ves-fault|syslog=ves-syslog|heartbeat=ves-heartbeat|measurementsForVfScaling=ves-measurement|mobileFlow=ves-mobileflow|other=ves-other|stateChange=ves-statechange|thresholdCrossingAlert=ves-thresholdCrossingAlert|voiceQuality=ves-voicequality|sipSignaling=ves-sipsignaling",
   "streams_subscribes": {},
-  "collector.inputQueue.maxPending": "8096",
   "streams_publishes": {
     "ves-mobileflow": {
       "type": "message_router",
index 9450067..0916211 100644 (file)
@@ -9,7 +9,6 @@ collector.dmaapfile=./etc/DmaapConfig.json
 auth.method=noAuth
 header.authlist=sample1,$2a$10$pgjaxDzSuc6XVFEeqvxQ5u90DKJnM/u7TJTcinAlFJVaavXMWf/Zi|userid1,$2a$10$61gNubgJJl9lh3nvQvY9X.x4e5ETWJJ7ao7ZhJEvmfJigov26Z6uq|userid2,$2a$10$G52y/3uhuhWAMy.bx9Se8uzWinmbJa.dlm1LW6bYPdPkkywLDPLiy
 event.transform.flag=1
-collector.inputQueue.maxPending = 8096
 streams_subscribes = {}
 services_calls = {}
 tomcat.maxthreads = 200