Support HDFS as a data store 87/87587/1
authorGuobiao Mo <guobiaomo@chinamobile.com>
Mon, 13 May 2019 18:58:33 +0000 (11:58 -0700)
committerGuobiao Mo <guobiaomo@chinamobile.com>
Mon, 13 May 2019 18:58:33 +0000 (11:58 -0700)
Issue-ID: DCAEGEN2-1498

Change-Id: Id203275bce01bd4a4d6ec131fb9696d78eda82f5
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
20 files changed:
components/datalake-handler/feeder/pom.xml
components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java [new file with mode: 0644]
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
components/datalake-handler/feeder/src/main/resources/application.properties
components/datalake-handler/feeder/src/main/resources/logback.xml
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/DbServiceTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullServiceTest.java
components/datalake-handler/feeder/src/test/resources/application.properties
components/datalake-handler/pom.xml

index 63a2af9..8c285f8 100644 (file)
 <?xml version="1.0" encoding="UTF-8"?>
 <project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.onap.dcaegen2.services.components</groupId>
-        <artifactId>datalake-handler</artifactId>
-        <version>1.0.0-SNAPSHOT</version>
-    </parent>
-
-    <groupId>org.onap.dcaegen2.services.components.datalake-handler</groupId>
-    <artifactId>feeder</artifactId>
-    <packaging>jar</packaging>
-    <name>DataLake Feeder</name>
-
-
-    <dependencies>
-
-        <dependency>
-            <groupId>org.mariadb.jdbc</groupId>
-            <artifactId>mariadb-java-client</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.json</groupId>
-            <artifactId>json</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.httpcomponents</groupId>
-            <artifactId>httpclient</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-web</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-actuator</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-data-jpa</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-data-couchbase</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-test</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-configuration-processor</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.elasticsearch.client</groupId>
-            <artifactId>elasticsearch-rest-high-level-client</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>commons-io</groupId>
-            <artifactId>commons-io</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>com.fasterxml.jackson.dataformat</groupId>
-            <artifactId>jackson-dataformat-yaml</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>com.fasterxml.jackson.dataformat</groupId>
-            <artifactId>jackson-dataformat-xml</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-databind</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>com.google.code.gson</groupId>
-            <artifactId>gson</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.projectlombok</groupId>
-            <artifactId>lombok</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>io.druid</groupId>
-            <artifactId>tranquility-core_2.11</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.velocity</groupId>
-            <artifactId>velocity-engine-core</artifactId>
-        </dependency>
-
-
-        <dependency>
-            <groupId>org.hibernate</groupId>
-            <artifactId>hibernate-core</artifactId>
-            <version>5.3.7.Final</version>
-        </dependency>
-
-        <!-- jsr303 validation -->
-        <dependency>
-            <groupId>javax.validation</groupId>
-            <artifactId>validation-api</artifactId>
-            <version>2.0.1.Final</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.hibernate</groupId>
-            <artifactId>hibernate-validator</artifactId>
-            <version>6.0.10.Final</version>
-        </dependency>
-
-        <dependency>
-            <groupId>io.springfox</groupId>
-            <artifactId>springfox-swagger2</artifactId>
-            <version>2.9.2</version>
-            <scope>compile</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>io.springfox</groupId>
-            <artifactId>springfox-swagger-ui</artifactId>
-            <version>2.9.2</version>
-            <scope>compile</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.mongodb</groupId>
-            <artifactId>mongo-java-driver</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.couchbase.mock</groupId>
-            <artifactId>CouchbaseMock</artifactId>
-            <version>1.5.22</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.springframework.boot</groupId>
-                <artifactId>spring-boot-maven-plugin</artifactId>
-                <version>${springboot.version}</version>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>repackage</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-
-            <plugin>
-                <artifactId>maven-failsafe-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>integration-test</goal>
-                            <goal>verify</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.onap.dcaegen2.services.components</groupId>
+               <artifactId>datalake-handler</artifactId>
+               <version>1.0.0-SNAPSHOT</version>
+       </parent>
+
+       <groupId>org.onap.dcaegen2.services.components.datalake-handler</groupId>
+       <artifactId>feeder</artifactId>
+       <packaging>jar</packaging>
+       <name>DataLake Feeder</name>
+
+
+       <dependencies>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-client</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.mariadb.jdbc</groupId>
+                       <artifactId>mariadb-java-client</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.json</groupId>
+                       <artifactId>json</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.httpcomponents</groupId>
+                       <artifactId>httpclient</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka-clients</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.springframework.boot</groupId>
+                       <artifactId>spring-boot-starter-web</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.springframework.boot</groupId>
+                       <artifactId>spring-boot-starter-actuator</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.springframework.boot</groupId>
+                       <artifactId>spring-boot-starter-data-jpa</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.springframework.boot</groupId>
+                       <artifactId>spring-boot-starter-data-couchbase</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.springframework.boot</groupId>
+                       <artifactId>spring-boot-starter-test</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.springframework.boot</groupId>
+                       <artifactId>spring-boot-configuration-processor</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.elasticsearch.client</groupId>
+                       <artifactId>elasticsearch-rest-high-level-client</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>commons-io</groupId>
+                       <artifactId>commons-io</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.fasterxml.jackson.dataformat</groupId>
+                       <artifactId>jackson-dataformat-yaml</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.fasterxml.jackson.dataformat</groupId>
+                       <artifactId>jackson-dataformat-xml</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.fasterxml.jackson.core</groupId>
+                       <artifactId>jackson-databind</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.google.code.gson</groupId>
+                       <artifactId>gson</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.projectlombok</groupId>
+                       <artifactId>lombok</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>io.druid</groupId>
+                       <artifactId>tranquility-core_2.11</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.velocity</groupId>
+                       <artifactId>velocity-engine-core</artifactId>
+               </dependency>
+
+
+               <dependency>
+                       <groupId>org.hibernate</groupId>
+                       <artifactId>hibernate-core</artifactId>
+                       <version>5.3.7.Final</version>
+               </dependency>
+
+               <!-- jsr303 validation -->
+               <dependency>
+                       <groupId>javax.validation</groupId>
+                       <artifactId>validation-api</artifactId>
+                       <version>2.0.1.Final</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.hibernate</groupId>
+                       <artifactId>hibernate-validator</artifactId>
+                       <version>6.0.10.Final</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>io.springfox</groupId>
+                       <artifactId>springfox-swagger2</artifactId>
+                       <version>2.9.2</version>
+                       <scope>compile</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>io.springfox</groupId>
+                       <artifactId>springfox-swagger-ui</artifactId>
+                       <version>2.9.2</version>
+                       <scope>compile</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.mongodb</groupId>
+                       <artifactId>mongo-java-driver</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>com.couchbase.mock</groupId>
+                       <artifactId>CouchbaseMock</artifactId>
+                       <version>1.5.22</version>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.springframework.boot</groupId>
+                               <artifactId>spring-boot-maven-plugin</artifactId>
+                               <version>${springboot.version}</version>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>repackage</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <plugin>
+                               <artifactId>maven-failsafe-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>integration-test</goal>
+                                                       <goal>verify</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
 
 </project>
index 6688d68..04299e6 100644 (file)
@@ -82,6 +82,7 @@ insert into db (`name`,`host`,`login`,`pass`,`database_name`) values ('Couchbase
 insert into db (`name`,`host`) values ('Elasticsearch','dl_es');\r
 insert into db (`name`,`host`,`port`,`database_name`) values ('MongoDB','dl_mongodb',27017,'datalake');\r
 insert into db (`name`,`host`) values ('Druid','dl_druid');\r
+insert into db (`name`,`host`) values ('HDFS','dlhdfs');\r
 \r
 \r
 -- in production, default enabled should be off\r
@@ -94,3 +95,4 @@ insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','_DL_DEFA
 insert into `map_db_topic`(`db_name`,`topic_name`) values ('Elasticsearch','_DL_DEFAULT_');\r
 insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_');\r
 insert into `map_db_topic`(`db_name`,`topic_name`) values ('Druid','_DL_DEFAULT_');\r
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('HDFS','_DL_DEFAULT_');\r
index d59c0fc..9106185 100644 (file)
@@ -41,6 +41,16 @@ import lombok.Setter;
 @EnableAutoConfiguration
 public class ApplicationConfiguration {
 
+       //App general
+       private boolean async;
+       private boolean enableSSL;
+
+       private String timestampLabel;
+       private String rawDataLabel;
+
+       private String defaultTopicName;
+
+       //DMaaP
        private String dmaapZookeeperHostPort;
        private String dmaapKafkaHostPort;
        private String dmaapKafkaGroup;
@@ -51,13 +61,10 @@ public class ApplicationConfiguration {
 
        private int kafkaConsumerCount;
 
-       private boolean async;
-       private boolean enableSSL;
-
-       private String timestampLabel;
-       private String rawDataLabel;
-       
-       private String defaultTopicName;
-
        private String elasticsearchType;
+
+       //HDFS
+       private int hdfsBufferSize;     
+       private long hdfsFlushInterval;
+       private int hdfsBatchSize;
 }
index 15ffc8a..deaa096 100644 (file)
@@ -60,7 +60,11 @@ public class TopicConfig {
                }
        }
 
-       
+
+       public boolean supportHdfs() {
+               return containDb("HDFS");
+       }
+
        public boolean supportElasticsearch() {
                return containDb("Elasticsearch");//TODO string hard codes
        }
index f5ee5b7..7c23776 100644 (file)
 */
 
 package org.onap.datalake.feeder.service;
+
 import java.util.ArrayList;
 import java.util.List;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
+
 import org.json.JSONObject;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.domain.Topic;
 import org.onap.datalake.feeder.dto.TopicConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +41,9 @@ import com.couchbase.client.java.Cluster;
 import com.couchbase.client.java.CouchbaseCluster;
 import com.couchbase.client.java.document.JsonDocument;
 import com.couchbase.client.java.document.JsonLongDocument;
-import com.couchbase.client.java.document.json.JsonObject; 
+import com.couchbase.client.java.document.json.JsonObject;
+import com.couchbase.client.java.env.CouchbaseEnvironment;
+import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
 
 import rx.Observable;
 import rx.functions.Func1;
@@ -63,65 +64,69 @@ public class CouchbaseService {
 
        @Autowired
        private DbService dbService;
-       
+
        Bucket bucket;
        private boolean isReady = false;
 
        @PostConstruct
        private void init() {
-        // Initialize Couchbase Connection
-        try {
-            Db couchbase = dbService.getCouchbase();
-            Cluster cluster = CouchbaseCluster.create(couchbase.getHost());
-            cluster.authenticate(couchbase.getLogin(), couchbase.getPass());
-            bucket = cluster.openBucket(couchbase.getDatabase());
-            log.info("Connect to Couchbase {}", couchbase.getHost());
-            // Create a N1QL Primary Index (but ignore if it exists)
-            bucket.bucketManager().createN1qlPrimaryIndex(true, false);
-            isReady = true;
-        }
-        catch(Exception        ex)
-        {
-            isReady = false;
-        }
+               // Initialize Couchbase Connection
+               try {
+                       Db couchbase = dbService.getCouchbase();
+
+                       //this tunes the SDK (to customize connection timeout)
+                       CouchbaseEnvironment env = DefaultCouchbaseEnvironment.builder().connectTimeout(60000) // 60s, default is 5s
+                                       .build();
+                       Cluster cluster = CouchbaseCluster.create(env, couchbase.getHost());
+                       cluster.authenticate(couchbase.getLogin(), couchbase.getPass());
+                       bucket = cluster.openBucket(couchbase.getDatabase());
+                       // Create a N1QL Primary Index (but ignore if it exists)
+                       bucket.bucketManager().createN1qlPrimaryIndex(true, false);
+
+                       log.info("Connected to Couchbase {}", couchbase.getHost());
+                       isReady = true;
+               } catch (Exception ex) {
+                       log.error("error connection to Couchbase.", ex);
+                       isReady = false;
+               }
        }
 
        @PreDestroy
-       public void cleanUp() { 
+       public void cleanUp() {
                bucket.close();
-       } 
+       }
 
-       public void saveJsons(TopicConfig topic, List<JSONObject> jsons) { 
-               List<JsonDocument> documents= new ArrayList<>(jsons.size());
-               for(JSONObject json : jsons) {
+       public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
+               List<JsonDocument> documents = new ArrayList<>(jsons.size());
+               for (JSONObject json : jsons) {
                        //convert to Couchbase JsonObject from org.json JSONObject
-                       JsonObject jsonObject = JsonObject.fromJson(json.toString());   
+                       JsonObject jsonObject = JsonObject.fromJson(json.toString());
 
                        long timestamp = jsonObject.getLong(config.getTimestampLabel());//this is Kafka time stamp, which is added in StoreService.messageToJson()
 
                        //setup TTL
-                       int expiry = (int) (timestamp/1000L) + topic.getTtl()*3600*24; //in second
-                       
+                       int expiry = (int) (timestamp / 1000L) + topic.getTtl() * 3600 * 24; //in second
+
                        String id = getId(topic, json);
                        JsonDocument doc = JsonDocument.create(id, expiry, jsonObject);
                        documents.add(doc);
                }
                try {
                        saveDocuments(documents);
-               }catch(Exception e) {
+               } catch (Exception e) {
                        log.error("error saving to Couchbase.", e);
                }
-               log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size()); 
+               log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size());
        }
 
        public String getId(TopicConfig topic, JSONObject json) {
                //if this topic requires extract id from JSON
                String id = topic.getMessageId(json);
-               if(id != null) {
+               if (id != null) {
                        return id;
                }
-               
-               String topicStr= topic.getName();               
+
+               String topicStr = topic.getName();
                //String id = topicStr+":"+timestamp+":"+UUID.randomUUID();
 
                //https://forums.couchbase.com/t/how-to-set-an-auto-increment-id/4892/2
@@ -129,24 +134,19 @@ public class CouchbaseService {
                // increment by 1, initialize at 0 if counter doc not found
                //TODO how slow is this compared with above UUID approach?
                JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345 
-               id = topicStr +":"+ nextIdNumber.content();
-               
+               id = topicStr + ":" + nextIdNumber.content();
+
                return id;
        }
-        
+
        //https://docs.couchbase.com/java-sdk/2.7/document-operations.html
-       private void saveDocuments(List<JsonDocument> documents) { 
-               Observable
-           .from(documents)
-           .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
-               @Override
-               public Observable<JsonDocument> call(final JsonDocument docToInsert) {
-                   return bucket.async().insert(docToInsert);
-               }
-           })
-           .last()
-           .toBlocking()
-           .single();          
+       private void saveDocuments(List<JsonDocument> documents) {
+               Observable.from(documents).flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
+                       @Override
+                       public Observable<JsonDocument> call(final JsonDocument docToInsert) {
+                               return bucket.async().insert(docToInsert);
+                       }
+               }).last().toBlocking().single();
        }
 
 }
index e859270..58bb433 100644 (file)
@@ -28,7 +28,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 /**
- * Service for Dbs 
+ * Service for Dbs
  * 
  * @author Guobiao Mo
  *
@@ -38,11 +38,11 @@ public class DbService {
 
        @Autowired
        private DbRepository dbRepository;
-       
+
        public Db getDb(String name) {
                Optional<Db> ret = dbRepository.findById(name);
                return ret.isPresent() ? ret.get() : null;
-       }       
+       }
 
        public Db getCouchbase() {
                return getDb("Couchbase");
@@ -58,6 +58,10 @@ public class DbService {
 
        public Db getDruid() {
                return getDb("Druid");
-       }       
+       }
+
+       public Db getHdfs() {
+               return getDb("HDFS");
+       }
 
 }
index de8c9e8..0caec79 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.datalake.feeder.service;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -65,9 +66,7 @@ public class DmaapService {
                        ZooKeeper zk = new ZooKeeper(config.getDmaapZookeeperHostPort(), 10000, watcher);
                        List<String> topics = zk.getChildren("/brokers/topics", false);
                        String[] excludes = config.getDmaapKafkaExclude();
-                       for (String exclude : excludes) {
-                               topics.remove(exclude);
-                       }
+                       topics.removeAll(Arrays.asList(excludes));
                        return topics;
                } catch (Exception e) {
                        log.error("Can not get topic list from Zookeeper, for testing, going to use hard coded topic list.", e);
@@ -81,7 +80,7 @@ public class DmaapService {
                        return Collections.emptyList();
                }
 
-               List<String> ret = new ArrayList<>();
+               List<String> ret = new ArrayList<>(allTopics.size());
                for (String topicStr : allTopics) {
                        TopicConfig topicConfig = topicService.getEffectiveTopic(topicStr, true);
                        if (topicConfig.isEnabled()) {
index c354f17..f1bed60 100644 (file)
@@ -84,7 +84,7 @@ public class ElasticsearchService {
                // Initialize the Connection
                client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http")));
 
-               log.info("Connect to Elasticsearch Host {}", elasticsearchHost);
+               log.info("Connected to Elasticsearch Host {}", elasticsearchHost);
 
                listener = new ActionListener<BulkResponse>() {
                        @Override
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java
new file mode 100644 (file)
index 0000000..e8d2910
--- /dev/null
@@ -0,0 +1,189 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.dto.TopicConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Service to write data to HDFS
+ * 
+ * @author Guobiao Mo
+ *
+ */
+@Service
+public class HdfsService {
+
+       private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+       @Autowired
+       ApplicationConfiguration config;
+
+       @Autowired
+       private DbService dbService;
+
+       FileSystem fileSystem;
+       private boolean isReady = false;
+
+       private ThreadLocal<Map<String, Buffer>> bufferLocal = ThreadLocal.withInitial(HashMap::new);
+       private ThreadLocal<SimpleDateFormat> dayFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
+       private ThreadLocal<SimpleDateFormat> timeFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS"));
+
+       @Setter
+       @Getter
+       private class Buffer {
+               long lastFlush;
+               List<String> data;
+
+               public Buffer() {
+                       lastFlush = Long.MIN_VALUE;
+                       data = new ArrayList<>();
+               }
+
+               public void flush(String topic) {
+                       try {
+                               saveMessages(topic, data);
+                               data.clear();
+                               lastFlush = System.currentTimeMillis();
+                               log.debug("done flush, topic={}, buffer size={}", topic, data.size());
+                       } catch (IOException e) {
+                               log.error("error saving to HDFS." + topic, e);
+                       }
+               }
+
+               public void flushStall(String topic) {
+                       if (!data.isEmpty() && System.currentTimeMillis() > lastFlush + config.getHdfsFlushInterval()) {
+                               log.debug("going to flushStall topic={}, buffer size={}", topic, data.size());
+                               flush(topic);
+                       }
+               }
+
+               private void saveMessages(String topic, List<String> bufferList) throws IOException {
+
+                       String thread = Thread.currentThread().getName();
+                       Date date = new Date();
+                       String day = dayFormat.get().format(date);
+                       String time = timeFormat.get().format(date);
+                       String filePath = String.format("/datalake/%s/%s/%s-%s", topic, day, time, thread);
+                       Path path = new Path(filePath);
+                       log.debug("writing to HDFS {}", filePath);
+
+                       // Create a new file and write data to it.
+                       FSDataOutputStream out = fileSystem.create(path, true, config.getHdfsBufferSize());
+
+                       bufferList.stream().forEach(message -> {
+                               try {
+                                       out.writeUTF(message);
+                                       out.write('\n');
+                               } catch (IOException e) {
+                                       log.error("error writing to HDFS.", e);
+                               }
+                       });
+
+                       out.close();
+               }
+       }
+
+       @PostConstruct
+       private void init() {
+               // Initialize HDFS Connection 
+               try {
+                       Db hdfs = dbService.getHdfs();
+
+                       //Get configuration of Hadoop system
+                       Configuration hdfsConfig = new Configuration();
+
+                       int port = hdfs.getPort() == null ? 8020 : hdfs.getPort();
+
+                       String hdfsuri = String.format("hdfs://%s:%s", hdfs.getHost(), port);
+                       hdfsConfig.set("fs.defaultFS", hdfsuri);
+
+                       log.info("Connecting to -- {}", hdfsuri);
+
+                       fileSystem = FileSystem.get(hdfsConfig);
+
+                       isReady = true;
+               } catch (Exception ex) {
+                       log.error("error connection to HDFS.", ex);
+                       isReady = false;
+               }
+       }
+
+       @PreDestroy
+       public void cleanUp() {
+               try {
+                       flush();
+                       fileSystem.close();
+               } catch (IOException e) {
+                       log.error("fileSystem.close() at cleanUp.", e);
+               }
+       }
+
+       public void flush() {
+               bufferLocal.get().forEach((topic, buffer) -> buffer.flush(topic));
+       }
+
+       //if no new data comes in for a topic for a while, need to flush its buffer
+       public void flushStall() {
+               bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic));
+       }
+
+       public void saveMessages(TopicConfig topic, List<Pair<Long, String>> messages) {
+               String topicStr = topic.getName();
+
+               Map<String, Buffer> bufferMap = bufferLocal.get();
+               final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer());
+
+               List<String> bufferData = buffer.getData();
+
+               messages.stream().forEach(message -> bufferData.add(message.getRight()));//note that message left is not used
+
+               if (bufferData.size() >= config.getHdfsBatchSize()) {
+                       buffer.flush(topicStr);
+               }
+       }
+
+}
index c540895..32d21c6 100644 (file)
@@ -135,7 +135,7 @@ public class MongodbService {
        }
 
        public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
-               if (dbReady == false)
+               if (dbReady == false)//TOD throw exception
                        return;
                List<Document> documents = new ArrayList<>(jsons.size());
                for (JSONObject json : jsons) {
index b3a6d29..1154b3a 100644 (file)
@@ -54,7 +54,7 @@ import org.springframework.stereotype.Service;
  */
 
 @Service
-@Scope(value=ConfigurableBeanFactory.SCOPE_PROTOTYPE)
+@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
 public class PullThread implements Runnable {
 
        @Autowired
@@ -90,7 +90,8 @@ public class PullThread implements Runnable {
                Properties consumerConfig = new Properties();
 
                consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getDmaapKafkaHostPort());
-               consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,  config.getDmaapKafkaGroup());
+               consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config.getDmaapKafkaGroup());
+               consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(id));
                consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
                consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
                consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
@@ -119,28 +120,29 @@ public class PullThread implements Runnable {
                        while (active.get()) {
 
                                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout()));
-                if (records != null) {
-                    List<Pair<Long, String>> messages = new ArrayList<>(records.count());
-                    for (TopicPartition partition : records.partitions()) {
-                        messages.clear();
-                        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
-                        for (ConsumerRecord<String, String> record : partitionRecords) {
-                            messages.add(Pair.of(record.timestamp(), record.value()));
-                            //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
-                        }
-                        storeService.saveMessages(partition.topic(), messages);
-                        log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
-
-                        if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
-                            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
-                            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
-                        }
-                    }
-
-                    if (async) {//for high Throughput, async commit offset in batch to Kafka
-                        consumer.commitAsync();
-                    }
-                }
+                               if (records != null) {
+                                       List<Pair<Long, String>> messages = new ArrayList<>(records.count());
+                                       for (TopicPartition partition : records.partitions()) {
+                                               messages.clear();
+                                               List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
+                                               for (ConsumerRecord<String, String> record : partitionRecords) {
+                                                       messages.add(Pair.of(record.timestamp(), record.value()));
+                                                       //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
+                                               }
+                                               storeService.saveMessages(partition.topic(), messages);
+                                               log.info("saved to topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
+
+                                               if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
+                                                       long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+                                                       consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
+                                               }
+                                       }
+
+                                       if (async) {//for high Throughput, async commit offset in batch to Kafka
+                                               consumer.commitAsync();
+                                       }
+                               }
+                               storeService.flushStall();
                        }
                } catch (Exception e) {
                        log.error("Puller {} run():   exception={}", id, e.getMessage());
@@ -153,6 +155,7 @@ public class PullThread implements Runnable {
        public void shutdown() {
                active.set(false);
                consumer.wakeup();
+               consumer.unsubscribe();
        }
 
        private class DummyRebalanceListener implements ConsumerRebalanceListener {
index 449dacf..2d00a9b 100644 (file)
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 
 import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
 
 import org.apache.commons.lang3.tuple.Pair;
 
@@ -35,7 +34,6 @@ import org.json.JSONException;
 import org.json.JSONObject;
 import org.json.XML;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
-import org.onap.datalake.feeder.domain.Topic;
 import org.onap.datalake.feeder.dto.TopicConfig;
 import org.onap.datalake.feeder.enumeration.DataFormat;
 import org.slf4j.Logger;
@@ -77,6 +75,9 @@ public class StoreService {
        @Autowired
        private ElasticsearchService elasticsearchService;
 
+       @Autowired
+       private HdfsService hdfsService;
+
        private Map<String, TopicConfig> topicMap = new HashMap<>();
 
        private ObjectMapper yamlReader;
@@ -86,10 +87,6 @@ public class StoreService {
                yamlReader = new ObjectMapper(new YAMLFactory());
        }
 
-       @PreDestroy
-       public void cleanUp() {
-       }
-
        public void saveMessages(String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text
                if (messages == null || messages.isEmpty()) {
                        return;
@@ -109,7 +106,7 @@ public class StoreService {
                        }
                }
 
-               saveJsons(topic, docs);
+               saveJsons(topic, docs, messages);
        }
 
        private JSONObject messageToJson(TopicConfig topic, Pair<Long, String> pair) throws JSONException, JsonParseException, JsonMappingException, IOException {
@@ -161,7 +158,7 @@ public class StoreService {
                return json;
        }
 
-       private void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
+       private void saveJsons(TopicConfig topic, List<JSONObject> jsons, List<Pair<Long, String>> messages) {
                if (topic.supportMongoDB()) {
                        mongodbService.saveJsons(topic, jsons);
                }
@@ -173,6 +170,13 @@ public class StoreService {
                if (topic.supportElasticsearch()) {
                        elasticsearchService.saveJsons(topic, jsons);
                }
+
+               if (topic.supportHdfs()) {
+                       hdfsService.saveMessages(topic, messages);
+               }
        }
 
+       public void flushStall() {
+               hdfsService.flushStall();
+       }
 }
index e1b8399..b9d6b9e 100644 (file)
@@ -1,8 +1,20 @@
-
+#####################App general
 server.port = 1680
 server.servlet.context-path = /datalake/v1
 
-# Spring connection to MariaDB for ORM
+#tolerate inconsistency when system crash, see PullThread.run()
+async=true
+
+#SSL global flag, if enabled, still need to check each individual DB SSL flag
+enableSSL=false
+
+#names for extra fields that DL adds to each record
+timestampLabel=datalake_ts_
+rawDataLabel=datalake_text_
+
+defaultTopicName=_DL_DEFAULT_
+
+#####################Spring connection to MariaDB for ORM
 #spring.jpa.hibernate.ddl-auto=update
 spring.jpa.hibernate.ddl-auto=none
 spring.jpa.show-sql=false
@@ -13,42 +25,32 @@ spring.datasource.username=dl
 spring.datasource.password=dl1234
 
 
-#For Beijing lab
-#dmaapZookeeperHostPort=zookeeper.mr01.onap.vip:80
-#dmaapKafkaHostPort=kafka.mr01.onap.vip:80
-#spring.couchbase.bootstrap-hosts=172.30.1.74
-#couchbaseHost=172.30.1.74
-
-#DMaaP
+#####################DMaaP
 #dmaapZookeeperHostPort=127.0.0.1:2181
 #dmaapKafkaHostPort=127.0.0.1:9092
 dmaapZookeeperHostPort=message-router-zookeeper:2181
 dmaapKafkaHostPort=message-router-kafka:9092
-dmaapKafkaGroup=dlgroup10
+dmaapKafkaGroup=dlgroup19
+#in second
 dmaapKafkaTimeout=60
 dmaapKafkaExclude[0]=__consumer_offsets
-dmaapKafkaExclude[1]=msgrtr.apinode.metrics.dmaap
+dmaapKafkaExclude[1]=__transaction_state
+dmaapKafkaExclude[2]=msgrtr.apinode.metrics.dmaap
 #check for new topics 
 dmaapCheckNewTopicIntervalInSec=3000
 
-kafkaConsumerCount=1
-
-#tolerate inconsistency when system crash, see PullThread.run()
-async=true
-
-#SSL global flag, if enabled, still need to check each individual DB SSL flag
-enableSSL=false
+kafkaConsumerCount=3 
 
-#names for extra fields that DL adds to each record
-timestampLabel=datalake_ts_
-rawDataLabel=datalake_text_
+#####################Elasticsearch
+elasticsearchType=doc
 
-defaultTopicName=_DL_DEFAULT_
+#####################HDFS
+hdfsBufferSize=4096
+#how often we flush stall updates, in millisecond
+hdfsFlushInterval=10000
+hdfsBatchSize=250
 
-elasticsearchType=doc
-       
-#Logging
+#####################Logging
 logging.level.org.springframework.web=ERROR
 logging.level.com.att.nsa.apiClient.http=ERROR
 logging.level.org.onap.datalake=DEBUG
index 28fbafc..320e9db 100644 (file)
@@ -9,7 +9,7 @@
                </layout>\r
        </appender>\r
 \r
-       <logger name="com.mkyong.web" level="debug"\r
+       <logger name="org.onap.datalake" level="debug"\r
                additivity="false">\r
                <appender-ref ref="STDOUT" />\r
        </logger>\r
index c4a5b1b..7243a8e 100644 (file)
@@ -26,7 +26,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -60,12 +59,20 @@ public class ApplicationConfigurationTest {
         assertTrue(config.getDmaapCheckNewTopicIntervalInSec() > 0);
 
         assertTrue(config.getKafkaConsumerCount() > 0);
+
+        assertNotNull(config.getDmaapKafkaExclude());
+        
         assertNotNull(config.isAsync());
         assertNotNull(config.isEnableSSL());
         assertNotNull(config.getDefaultTopicName());
         assertNotNull(config.getRawDataLabel());
         assertNotNull(config.getTimestampLabel());
-        assertEquals(null, config.getElasticsearchType());
+        assertNotNull(config.getElasticsearchType());
+        
+      //HDFS
+        assertTrue(config.getHdfsBatchSize()>0);
+        assertTrue(config.getHdfsBufferSize()>0);
+        assertTrue(config.getHdfsFlushInterval()>0);
     }
 
 }
index dc9feed..bb31cd7 100644 (file)
@@ -99,6 +99,7 @@ public class TopicConfigTest {
         assertFalse(testTopicConfig.supportCouchbase());
         assertFalse(testTopicConfig.supportDruid());
         assertFalse(testTopicConfig.supportMongoDB());
+        assertFalse(testTopicConfig.supportHdfs());
 
         testTopic.getDbs().remove(new Db("Elasticsearch"));
         testTopicConfig = testTopic.getTopicConfig();
index 4948001..8aa60ab 100644 (file)
@@ -91,4 +91,11 @@ public class DbServiceTest {
                assertEquals(dbService.getDruid(), new Db(name));
        }
 
+       @Test
+       public void testGetHdfs() {
+               String name = "HDFS";
+               when(dbRepository.findById(name)).thenReturn(Optional.of(new Db(name)));
+               assertEquals(dbService.getHdfs(), new Db(name));
+       }
+
 }
index b907705..75a2618 100644 (file)
@@ -1,35 +1,48 @@
-
+#####################App general
 server.port = 1680
+server.servlet.context-path = /datalake/v1
+
+#tolerate inconsistency when system crash, see PullThread.run()
+async=true
+
+#SSL global flag, if enabled, still need to check each individual DB SSL flag
+enableSSL=false
+
+#names for extra fields that DL adds to each record
+timestampLabel=datalake_ts_
+rawDataLabel=datalake_text_
+
+defaultTopicName=_DL_DEFAULT_
 
 
-#DMaaP
+
+#####################DMaaP
 #dmaapZookeeperHostPort=127.0.0.1:2181
 #dmaapKafkaHostPort=127.0.0.1:9092
 dmaapZookeeperHostPort=message-router-zookeeper:2181
 dmaapKafkaHostPort=message-router-kafka:9092
-dmaapKafkaGroup=dlgroup10
+dmaapKafkaGroup=dlgroup19
+#in second
 dmaapKafkaTimeout=60
+dmaapKafkaExclude[0]=__consumer_offsets
+dmaapKafkaExclude[1]=__transaction_state
+dmaapKafkaExclude[2]=msgrtr.apinode.metrics.dmaap
 #check for new topics 
 dmaapCheckNewTopicIntervalInSec=3000
 
 kafkaConsumerCount=1
 
-#tolerate inconsistency when system crash, see PullThread.run()
-async=true
-
-#SSL global flag, if enabled, still need to check each individual DB SSL flag
-enableSSL=false
-
-#names for extra fields that DL adds to each record
-timestampLabel=datalake_ts_
-rawDataLabel=datalake_text_
+#####################Elasticsearch
+elasticsearchType=doc
 
-defaultTopicName=_DL_DEFAULT_
+#####################HDFS
+hdfsBufferSize=4096
+#how often we flush stall updates, in millisecond
+hdfsFlushInterval=10000
+hdfsBatchSize=250
 
-       
-#Logging
+#####################Logging
 logging.level.org.springframework.web=ERROR
 logging.level.com.att.nsa.apiClient.http=ERROR
 logging.level.org.onap.datalake=DEBUG
  
index 671c56c..da02f2e 100644 (file)
@@ -4,23 +4,23 @@
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
 
-    <parent>
-       <groupId>org.onap.oparent</groupId>
-       <artifactId>oparent</artifactId>
-       <version>1.2.3</version>
-    </parent>
-  
+       <parent>
+               <groupId>org.onap.oparent</groupId>
+               <artifactId>oparent</artifactId>
+               <version>1.2.3</version>
+       </parent>
+
        <groupId>org.onap.dcaegen2.services.components</groupId>
        <artifactId>datalake-handler</artifactId>
        <version>1.0.0-SNAPSHOT</version>
        <packaging>pom</packaging>
-       
+
        <name>dcaegen2-service-datalake-handler</name>
 
        <modules>
                <module>feeder</module>
-               <module>admin</module> 
-               <module>collector</module> 
+               <module>admin</module>
+               <module>collector</module>
        </modules>
 
        <properties>
@@ -34,6 +34,7 @@
                <jackson.version>2.9.8</jackson.version>
                <kafka.version>2.0.0</kafka.version>
                <elasticsearchjava.version>7.0.0</elasticsearchjava.version>
+               <hadoop.version>3.2.0</hadoop.version>
 
        </properties>
 
                <dependencies>
 
                        <dependency>
-                       <groupId>org.mariadb.jdbc</groupId>
-                       <artifactId>mariadb-java-client</artifactId>
-                       <version>2.4.1</version>
+                               <groupId>org.apache.hadoop</groupId>
+                               <artifactId>hadoop-client</artifactId>
+                               <version>${hadoop.version}</version>
+                       </dependency>
+
+                       <dependency>
+                               <groupId>org.mariadb.jdbc</groupId>
+                               <artifactId>mariadb-java-client</artifactId>
+                               <version>2.4.1</version>
                        </dependency>
 
                        <dependency>
                                <version>${springboot.version}</version>
                        </dependency>
                        <!-- end::actuator[] -->
-                       
-               <dependency>
-               <groupId>org.springframework.boot</groupId>
-                  <artifactId>spring-boot-starter-data-jpa</artifactId>
+
+                       <dependency>
+                               <groupId>org.springframework.boot</groupId>
+                               <artifactId>spring-boot-starter-data-jpa</artifactId>
                                <version>${springboot.version}</version>
-                       </dependency>
-        
+                       </dependency>
+
                        <dependency>
                                <groupId>org.springframework.boot</groupId>
                                <artifactId>spring-boot-starter-data-couchbase</artifactId>