Flatten and Aggregate features in JSON array processing 93/89293/2
authorGuobiao Mo <guobiaomo@chinamobile.com>
Tue, 4 Jun 2019 22:19:45 +0000 (15:19 -0700)
committerGuobiao Mo <guobiaomo@chinamobile.com>
Tue, 4 Jun 2019 23:29:42 +0000 (16:29 -0700)
Issue-ID: DCAEGEN2-1598
Change-Id: I9f563bcfa18285daf7b48878e8427bfdb1aff21f
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
15 files changed:
components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java [new file with mode: 0644]
components/datalake-handler/feeder/src/main/resources/application.properties
components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/readme.txt [new file with mode: 0644]
components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/unauthenticated.ves_measurement_output.json [new file with mode: 0644]
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/StoreServiceTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java
components/datalake-handler/feeder/src/test/resources/application.properties
components/datalake-handler/pom.xml

index fd9b3dc..ad142dc 100644 (file)
@@ -7,6 +7,8 @@ CREATE TABLE `topic` (
   `enabled` bit(1) DEFAULT 0,\r
   `login` varchar(255) DEFAULT NULL,\r
   `message_id_path` varchar(255) DEFAULT NULL,\r
+  `aggregate_array_path` varchar(2000) DEFAULT NULL,\r
+  `flatten_array_path` varchar(2000) DEFAULT NULL,\r
   `pass` varchar(255) DEFAULT NULL,\r
   `save_raw` bit(1) DEFAULT NULL,\r
   `ttl` int(11) DEFAULT NULL,\r
@@ -86,6 +88,12 @@ insert into db (`name`,`host`,`login`) values ('HDFS','dlhdfs','dl');
 \r
 -- in production, default enabled should be off\r
 insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_DL_DEFAULT_',1,0,3650,'JSON');\r
+insert into `topic`(`name`,correlate_cleared_message,`enabled`, message_id_path,`data_format`) values ('unauthenticated.SEC_FAULT_OUTPUT',1,1,'/event/commonEventHeader/eventName,/event/commonEventHeader/reportingEntityName,/event/faultFields/specificProblem,/event/commonEventHeader/eventId','JSON');\r
+insert into `topic`(`name`,`enabled`, aggregate_array_path,flatten_array_path,`data_format`) \r
+values ('unauthenticated.VES_MEASUREMENT_OUTPUT',1,\r
+'/event/measurementsForVfScalingFields/diskUsageArray,/event/measurementsForVfScalingFields/cpuUsageArray,/event/measurementsForVfScalingFields/vNicPerformanceArray',\r
+'/event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface',\r
+'JSON');\r
 \r
 insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','_DL_DEFAULT_');\r
 insert into `map_db_topic`(`db_name`,`topic_name`) values ('Elasticsearch','_DL_DEFAULT_');\r
@@ -93,14 +101,17 @@ insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAUL
 insert into `map_db_topic`(`db_name`,`topic_name`) values ('Druid','_DL_DEFAULT_');\r
 insert into `map_db_topic`(`db_name`,`topic_name`) values ('HDFS','_DL_DEFAULT_');\r
 \r
-insert into `topic`(`name`,correlate_cleared_message,`enabled`, message_id_path,`data_format`) values ('unauthenticated.SEC_FAULT_OUTPUT',1,1,'/event/commonEventHeader/eventName,/event/commonEventHeader/reportingEntityName,/event/faultFields/specificProblem,/event/commonEventHeader/eventId','JSON');\r
-\r
 insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','unauthenticated.SEC_FAULT_OUTPUT');\r
 insert into `map_db_topic`(`db_name`,`topic_name`) values ('Elasticsearch','unauthenticated.SEC_FAULT_OUTPUT');\r
 insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','unauthenticated.SEC_FAULT_OUTPUT');\r
 insert into `map_db_topic`(`db_name`,`topic_name`) values ('Druid','unauthenticated.SEC_FAULT_OUTPUT');\r
 insert into `map_db_topic`(`db_name`,`topic_name`) values ('HDFS','unauthenticated.SEC_FAULT_OUTPUT');\r
 \r
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','unauthenticated.VES_MEASUREMENT_OUTPUT');\r
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('Elasticsearch','unauthenticated.VES_MEASUREMENT_OUTPUT');\r
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','unauthenticated.VES_MEASUREMENT_OUTPUT');\r
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('Druid','unauthenticated.VES_MEASUREMENT_OUTPUT');\r
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('HDFS','unauthenticated.VES_MEASUREMENT_OUTPUT');\r
 \r
 insert into portal (`name`,`related_db`, host) values ('Kibana', 'Elasticsearch', 'dl_es');\r
 insert into portal (`name`,`related_db`) values ('Elasticsearch', 'Elasticsearch');\r
index 3073716..acb48ae 100644 (file)
@@ -100,6 +100,15 @@ public class Topic {
        @Column(name = "`message_id_path`")
        private String messageIdPath;
 
+       //paths to the array that need aggregation, comma separated, example: "/event/measurementsForVfScalingFields/diskUsageArray,/event/measurementsForVfScalingFields/cpuUsageArray,/event/measurementsForVfScalingFields/vNicPerformanceArray"
+       @Column(name = "`aggregate_array_path`") 
+       private String aggregateArrayPath;
+
+       //paths to the element in array that need flatten, this element is used as label, comma separated, 
+       //example: "/event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface,..."
+       @Column(name = "`flatten_array_path`") 
+       private String flattenArrayPath;
+       
        public Topic() {
        }
 
@@ -149,6 +158,8 @@ public class Topic {
                tConfig.setSaveRaw(isSaveRaw());
                tConfig.setCorrelateClearedMessage(isCorrelateClearedMessage());
                tConfig.setMessageIdPath(getMessageIdPath());
+               tConfig.setAggregateArrayPath(getAggregateArrayPath());
+               tConfig.setFlattenArrayPath(getFlattenArrayPath());
                tConfig.setTtl(getTtl());
                Set<Db> topicDb = getDbs();
                List<String> dbList = new ArrayList<>();
index deaa096..8dfe1b1 100644 (file)
@@ -51,6 +51,8 @@ public class TopicConfig {
        private int ttl;
        private boolean correlateClearedMessage;
        private String messageIdPath;
+       private String aggregateArrayPath;
+       private String flattenArrayPath;
 
        public DataFormat getDataFormat2() {
                if (dataFormat != null) {
@@ -60,7 +62,6 @@ public class TopicConfig {
                }
        }
 
-
        public boolean supportHdfs() {
                return containDb("HDFS");
        }
@@ -105,6 +106,26 @@ public class TopicConfig {
                return id;
        }
 
+       public String[] getAggregateArrayPath2() {
+               String[] ret = null;
+
+               if (StringUtils.isNotBlank(aggregateArrayPath)) {
+                       ret = aggregateArrayPath.split(",");
+               }
+
+               return ret;
+       }
+
+       public String[] getFlattenArrayPath2() {
+               String[] ret = null;
+
+               if (StringUtils.isNotBlank(flattenArrayPath)) {
+                       ret = flattenArrayPath.split(",");
+               }
+
+               return ret;
+       }
+
        @Override
        public String toString() {
                return name;
index 03faeb8..2a2f997 100644 (file)
@@ -27,12 +27,14 @@ import java.util.List;
 import javax.annotation.PostConstruct;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.json.JSONObject;
 import org.json.XML;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.dto.TopicConfig;
 import org.onap.datalake.feeder.enumeration.DataFormat;
+import org.onap.datalake.feeder.util.JsonUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -100,7 +102,7 @@ public class StoreService {
                saveJsons(topicConfig, docs, messages);
        }
 
-       private JSONObject messageToJson(TopicConfig topic, Pair<Long, String> pair) throws IOException {
+       private JSONObject messageToJson(TopicConfig topicConfig, Pair<Long, String> pair) throws IOException {
 
                long timestamp = pair.getLeft();
                String text = pair.getRight();
@@ -111,11 +113,11 @@ public class StoreService {
                //              log.debug("{} ={}", topicStr, text);
                //}
 
-               boolean storeRaw = topic.isSaveRaw();
+               boolean storeRaw = topicConfig.isSaveRaw();
 
                JSONObject json = null;
 
-               DataFormat dataFormat = topic.getDataFormat2();
+               DataFormat dataFormat = topicConfig.getDataFormat2();
 
                switch (dataFormat) {
                case JSON:
@@ -146,6 +148,20 @@ public class StoreService {
                        json.put(config.getRawDataLabel(), text);
                }
 
+               if (StringUtils.isNotBlank(topicConfig.getAggregateArrayPath())) {
+                       String[] paths = topicConfig.getAggregateArrayPath2();
+                       for (String path : paths) {
+                               JsonUtil.arrayAggregate(path, json);
+                       }
+               }
+
+               if (StringUtils.isNotBlank(topicConfig.getFlattenArrayPath())) {
+                       String[] paths = topicConfig.getFlattenArrayPath2();
+                       for (String path : paths) {
+                               JsonUtil.flattenArray(path, json);
+                       }
+               }
+
                return json;
        }
 
@@ -168,9 +184,9 @@ public class StoreService {
        }
 
        public void flush() { //force flush all buffer 
-               hdfsService.flush(); 
+               hdfsService.flush();
        }
-       
+
        public void flushStall() { //flush stall buffer
                hdfsService.flushStall();
        }
index f0b000b..64e8b8b 100644 (file)
@@ -123,6 +123,8 @@ public class TopicService {
                topic.setCorrelateClearedMessage(tConfig.isCorrelateClearedMessage());
                topic.setDataFormat(tConfig.getDataFormat());
                topic.setMessageIdPath(tConfig.getMessageIdPath());
+               topic.setAggregateArrayPath(tConfig.getAggregateArrayPath());
+               topic.setFlattenArrayPath(tConfig.getFlattenArrayPath());
 
                if(tConfig.getSinkdbs() != null) {
                        for (String item : tConfig.getSinkdbs()) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java
new file mode 100644 (file)
index 0000000..db4dcfa
--- /dev/null
@@ -0,0 +1,158 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DCAE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.util;
+
+import java.util.HashMap;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.json.JSONArray;
+import org.json.JSONObject;
+
+import lombok.Getter;
+
+/**
+ * utils for JSON
+ * 
+ * @author Guobiao Mo
+ *
+ */
+public class JsonUtil {
+
+       @Getter
+       enum AggregateType {
+               ALL("aggregate"), AVEARGE("average"), SUM("sum"), MAX("max"), MIN("min"), COUNT("count");
+               private final String name;
+
+               AggregateType(String name) {
+                       this.name = name;
+               }
+
+               public String getLabel(String path) {
+                       return path.substring(path.lastIndexOf('/') + 1) + "_" + name;
+               }
+       }
+
+       public static void flattenArray(String path, JSONObject json) {
+               //path = /event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface
+
+               int index1 = path.lastIndexOf('/');
+
+               String arrayPath = path.substring(0, index1);// /event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray
+
+               Object obj;
+               try {
+                       obj = json.query(arrayPath);
+               } catch (org.json.JSONPointerException e) {
+                       return;
+               }
+               if (obj == null || !(obj instanceof JSONArray)) {
+                       return;
+               }
+               Iterable<JSONObject> subjsonaArray = (Iterable<JSONObject>) obj;
+
+               String tagName = path.substring(index1 + 1);//astriInterface
+
+               int index2 = path.lastIndexOf('/', index1 - 1);
+               String arrayName = path.substring(index2 + 1, index1);//astriDPMeasurementArray
+
+               String parentPath = path.substring(0, index2);// /event/measurementsForVfScalingFields/astriMeasurement
+               JSONObject parent = (JSONObject) json.query(parentPath);
+
+               for (JSONObject element : subjsonaArray) {
+                       String tagValue = element.get(tagName).toString();
+                       String label = arrayName + "_" + tagName + "_" + tagValue;
+
+                       parent.put(label, element);
+               }
+       }
+
+       /**
+        * json got modified.
+        * 
+        * @param aggregateType
+        * @param path
+        * @param json
+        */
+       public static void arrayAggregate(String path, JSONObject json) {
+               HashMap<String, Double> sumHashMap = new HashMap<>();
+               HashMap<String, Double> maxHashMap = new HashMap<>();
+               HashMap<String, Double> minHashMap = new HashMap<>();
+
+               Object obj;
+               try {
+                       obj = json.query(path);
+               } catch (org.json.JSONPointerException e) {
+                       return;
+               }
+               if (obj == null || !(obj instanceof JSONArray)) {
+                       return;
+               }
+               Iterable<JSONObject> subjsonaArray = (Iterable<JSONObject>) obj;
+
+               int count = 0;
+               for (JSONObject element : subjsonaArray) {
+                       String[] names = JSONObject.getNames(element);
+                       for (String name : names) {
+                               Number value = element.optNumber(name);
+                               if (value != null) {
+                                       double existing = sumHashMap.computeIfAbsent(name, k -> 0.0);
+                                       sumHashMap.put(name, existing + value.doubleValue());
+
+                                       existing = maxHashMap.computeIfAbsent(name, k -> Double.MIN_VALUE);
+                                       maxHashMap.put(name, Math.max(existing, value.doubleValue()));
+
+                                       existing = minHashMap.computeIfAbsent(name, k -> Double.MAX_VALUE);
+                                       minHashMap.put(name, Math.min(existing, value.doubleValue()));
+                               }
+                       }
+                       count++;
+               }
+
+               if (count == 0) {
+                       return;
+               }
+
+               JSONObject parentJson = (JSONObject) json.query(path.substring(0, path.lastIndexOf('/')));
+
+               //sum
+               JSONObject aggJson = new JSONObject(sumHashMap);
+               parentJson.put(AggregateType.SUM.getLabel(path), aggJson);
+
+               //AVEARGE
+               int c = count;//need to be Effectively Final
+               sumHashMap.replaceAll((k, v) -> v / c);
+               aggJson = new JSONObject(sumHashMap);
+               parentJson.put(AggregateType.AVEARGE.getLabel(path), aggJson);
+
+               //Count
+               parentJson.put(AggregateType.COUNT.getLabel(path), count);
+
+               //Max
+               aggJson = new JSONObject(maxHashMap);
+               parentJson.put(AggregateType.MAX.getLabel(path), aggJson);
+
+               //Min
+               aggJson = new JSONObject(minHashMap);
+               parentJson.put(AggregateType.MIN.getLabel(path), aggJson);
+
+       }
+
+}
index a105473..7bbbac0 100644 (file)
@@ -42,7 +42,7 @@ dmaapCheckNewTopicInterval=60000
 kafkaConsumerCount=3
 
 #####################Elasticsearch
-elasticsearchType=doc
+elasticsearchType=_doc
 
 #####################HDFS
 hdfsBufferSize=4096
diff --git a/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/readme.txt b/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/readme.txt
new file mode 100644 (file)
index 0000000..8851353
--- /dev/null
@@ -0,0 +1,4 @@
+before creating index\r
+PUT http://dl_es:9200/unauthenticated.ves_measurement_output\r
+application/json\r
+body from unauthenticated.ves_measurement_output.json
\ No newline at end of file
diff --git a/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/unauthenticated.ves_measurement_output.json b/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/unauthenticated.ves_measurement_output.json
new file mode 100644 (file)
index 0000000..9a53b70
--- /dev/null
@@ -0,0 +1,31 @@
+{ \r
+       "mappings": { \r
+                       "properties": {\r
+                               "datalake_ts_": {\r
+                                       "type": "date",\r
+                                       "format": "epoch_millis"\r
+                               },                              \r
+                               "event.commonEventHeader.internalHeaderFields.collectorTimeStamp": {\r
+                                       "type": "date",\r
+                                       "format":"EEE, MM dd yyyy HH:mm:ss z"\r
+                               },\r
+                               "event.commonEventHeader.startEpochMicrosec": {\r
+                                       "type": "date",\r
+                                       "format": "epoch_millis"\r
+                               },\r
+                               "event.commonEventHeader.lastEpochMicrosec": {\r
+                                       "type": "date",\r
+                                       "format": "epoch_millis"\r
+                               },\r
+                               "event.measurementsForVfScalingFields.diskUsageArray": {\r
+                                       "type": "nested"\r
+                               },\r
+                               "event.measurementsForVfScalingFields.cpuUsageArray": {\r
+                                       "type": "nested"\r
+                               },\r
+                               "event.measurementsForVfScalingFields.vNicPerformanceArray": {\r
+                                       "type": "nested"\r
+                               }\r
+                       } \r
+       }\r
+}
\ No newline at end of file
index 617b50e..0c56d5a 100644 (file)
@@ -68,6 +68,7 @@ public class ApplicationConfigurationTest {
         assertNotNull(config.getRawDataLabel());
         assertNotNull(config.getTimestampLabel());
         assertNotNull(config.getElasticsearchType());
+        assertNotNull(config.getDatalakeVersion());
         
       //HDFS
         assertTrue(config.getHdfsBatchSize()>0);
index bb31cd7..f52332a 100644 (file)
@@ -76,6 +76,23 @@ public class TopicConfigTest {
 
     }
 
+    @Test
+    public void testArrayPath() {
+        Topic topic = new Topic("testArrayPath");
+        topic.setAggregateArrayPath("/data/data2/value,/data/data3");
+        topic.setFlattenArrayPath("/data/data2/value,/data/data3");
+
+        TopicConfig topicConfig = topic.getTopicConfig();
+
+        String[] value = topicConfig.getAggregateArrayPath2();
+        assertEquals(value[0], "/data/data2/value");
+        assertEquals(value[1], "/data/data3");
+
+        value = topicConfig.getFlattenArrayPath2();
+        assertEquals(value[0], "/data/data2/value");
+        assertEquals(value[1], "/data/data3");
+    }
+
     @Test
     public void testIs() {
         Topic testTopic = new Topic("test");
index 44e7632..fc05d1d 100644 (file)
@@ -93,6 +93,8 @@ public class StoreServiceTest {
                testInit();
 
                TopicConfig topicConfig = createTopicConfig("test1", "JSON");
+               topicConfig.setAggregateArrayPath("/test");
+               topicConfig.setFlattenArrayPath("/test");
 
                topicConfig = createTopicConfig("test2", "XML");
                topicConfig.setSaveRaw(false);
index 265ec96..774cd22 100644 (file)
@@ -21,6 +21,7 @@
 package org.onap.datalake.feeder.service;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -70,6 +71,8 @@ public class TopicServiceTest {
                String name = "a";
                when(topicRepository.findById(name)).thenReturn(Optional.of(new Topic(name)));
                assertEquals(topicService.getTopic(name), new Topic(name));
+               
+               assertFalse(topicService.istDefaultTopic(new Topic(name)));
        }
 
        @Test
index 189adec..64ecdee 100644 (file)
@@ -14,36 +14,37 @@ rawDataLabel=datalake_text_
 
 defaultTopicName=_DL_DEFAULT_
 
-#how often do we check topic setting update, in millisecond
-topicCheckInterval=60000
 
 #####################DMaaP
 #dmaapZookeeperHostPort=127.0.0.1:2181
 #dmaapKafkaHostPort=127.0.0.1:9092
 dmaapZookeeperHostPort=message-router-zookeeper:2181
 dmaapKafkaHostPort=message-router-kafka:9092
-dmaapKafkaGroup=dlgroup19
+dmaapKafkaGroup=dlgroup44
 #in second
 dmaapKafkaTimeout=60
 dmaapKafkaExclude[0]=__consumer_offsets
 dmaapKafkaExclude[1]=__transaction_state
-dmaapKafkaExclude[2]=msgrtr.apinode.metrics.dmaap
+#dmaapKafkaExclude[2]=msgrtr.apinode.metrics.dmaap
 #check for new topics , in millisecond
-dmaapCheckNewTopicInterval=300000
+dmaapCheckNewTopicInterval=60000
 
-kafkaConsumerCount=1
+kafkaConsumerCount=3
 
 #####################Elasticsearch
-elasticsearchType=doc
+elasticsearchType=_doc
 
 #####################HDFS
 hdfsBufferSize=4096
 #how often we flush stall updates, in millisecond
-hdfsFlushInterval=10000
-hdfsBatchSize=250
+hdfsFlushInterval=30000
+hdfsBatchSize=500
 
 #####################Logging
 logging.level.org.springframework.web=ERROR
 logging.level.com.att.nsa.apiClient.http=ERROR
 logging.level.org.onap.datalake=DEBUG
+
+#####################Verison
+datalakeVersion=0.0.1
  
index 0a48af6..d8caee8 100644 (file)
@@ -33,7 +33,7 @@
                <springcouchbase.version>3.1.2.RELEASE</springcouchbase.version>
                <jackson.version>2.9.8</jackson.version>
                <kafka.version>2.0.0</kafka.version>
-               <elasticsearchjava.version>7.0.0</elasticsearchjava.version>
+               <elasticsearchjava.version>7.1.1</elasticsearchjava.version>
                <hadoop.version>3.2.0</hadoop.version>
 
        </properties>