Source code initial workingDraft kafka2hdfs writer. 98/87098/1
authorRajamohan Raj <rajamohan.raj@intel.com>
Tue, 7 May 2019 01:05:25 +0000 (01:05 +0000)
committerRajamohan Raj <rajamohan.raj@intel.com>
Tue, 7 May 2019 01:05:25 +0000 (01:05 +0000)
Change-Id: I29576fb6d1e2700de5b1097e4ddfc261ca477ee9
Issue-ID: ONAPARC-451
Signed-off-by: Rajamohan Raj <rajamohan.raj@intel.com>
vnfs/DAaaS/.gitignore [new file with mode: 0644]
vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/README.md [new file with mode: 0644]
vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/pom.xml [new file with mode: 0644]
vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java [new file with mode: 0644]
vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java [new file with mode: 0644]
vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java [new file with mode: 0644]
vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java [new file with mode: 0644]
vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java [new file with mode: 0644]
vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml [new file with mode: 0644]

diff --git a/vnfs/DAaaS/.gitignore b/vnfs/DAaaS/.gitignore
new file mode 100644 (file)
index 0000000..b13f7a2
--- /dev/null
@@ -0,0 +1,7 @@
+*.iml
+.idea
+target/
+dependency-reduced-pom.xml
+File2hdfsApp.java
+copyScript.sh
+sample_configs.yaml
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/README.md b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/README.md
new file mode 100644 (file)
index 0000000..4de7d0f
--- /dev/null
@@ -0,0 +1,11 @@
+# HDFS-writer
+
+HDFS writer can read from a message from kafka topic and persist that in the 
+HDFS file system given. This is a work in progress and shall be moved
+to separate source code repo later.
+
+## Usage
+
+## Config items
+
+## Troubleshooting
\ No newline at end of file
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/pom.xml b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/pom.xml
new file mode 100644 (file)
index 0000000..20c11fe
--- /dev/null
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>com.intel.onap</groupId>
+    <artifactId>hdfs-writer</artifactId>
+    <version>1.0</version>
+
+    <!--Begin: compile and build the fat jar -->
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.8.1</version>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>2.3</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <transformers>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                            </transformers>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            <mainClass>kafka2hdfsApp</mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
+                        <phase>package</phase> <!-- bind to the packaging phase -->
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+    <!--End: compile and build the fat jar -->
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-core</artifactId>
+            <version>1.2.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>3.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>2.7.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>2.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-yaml</artifactId>
+            <version>2.9.8</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>2.2.3</version>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java
new file mode 100644 (file)
index 0000000..2042a14
--- /dev/null
@@ -0,0 +1,81 @@
+import config.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+public class CreateKafkaConsumer {
+
+
+    private static Logger log = LoggerFactory.getLogger(CreateKafkaConsumer.class);
+
+    private final String BOOTSTRAP_SERVERS = (String) Configuration.getSettings().get("kafka").get("bootStrapServers");
+    private final String GROUP_ID_CONFIG = (String) Configuration.getSettings().get("kafka").get("group_id");
+    private final String KEY_DESERIALIZER = (String) Configuration.getSettings().get("kafka").get("key_deserialize_class");
+    private final String VAL_DESERIALIZER = (String) Configuration.getSettings().get("kafka").get("value_deserialize_class");
+    private final String KAFKA_TOPIC = (String) Configuration.getSettings().get("kafka").get("topic");
+
+    private final String HDFS_URL= (String) Configuration.getSettings().get("hdfs").get("hdfsURL");
+    private final String HDFS_REMOTE_FILE = (String) Configuration.getSettings().get("hdfs").get("hdfs_remote_file");
+
+    private KafkaConsumer<String, String> kafkaConsumer;
+    private Properties properties = new Properties();
+    private HdfsWriter hdfsWriter;
+    private FileSystem hdfsFileSystem;
+
+
+
+    public CreateKafkaConsumer() throws IOException{
+        setKafkaProperties();
+        kafkaConsumer = new KafkaConsumer<>(properties);
+        kafkaConsumer.subscribe(Collections.singletonList(KAFKA_TOPIC));
+        hdfsWriter = new HdfsWriter();
+        hdfsFileSystem = hdfsWriter.createHdfsFileSystem(HDFS_URL);
+        log.info(":::Created kafkaConsumer:::");
+    }
+
+    private void setKafkaProperties(){
+
+        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
+        properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
+        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
+        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VAL_DESERIALIZER);
+        log.info(":::Set kafka properties:::");
+    }
+
+
+    public void processKafkaMessage() throws IOException{
+        try{
+            while(true){
+                ConsumerRecords<String, String> recordsPerPartition = kafkaConsumer.poll(100000);
+                if(recordsPerPartition.isEmpty())
+                    log.info(":::recordsPerPartition is NULL:::");
+                else
+                    log.info(":::size of recordsPerPartition: "+recordsPerPartition.count()+" :::");
+
+                for(ConsumerRecord<String, String> record:recordsPerPartition){
+                    log.info("Topic: "+record.topic());
+                    log.info("partition: "+record.partition());
+                    log.info("ReceivedKey: "+record.key()+" ReceivedValue: "+record.value());
+                    FSDataOutputStream fsDataOutputStream = hdfsWriter.invokeHdfsWriter(hdfsFileSystem, HDFS_REMOTE_FILE);
+                    hdfsWriter.writeMessageToHdfs(fsDataOutputStream, record.value());
+                    fsDataOutputStream.close();
+                }
+
+                }
+        }
+
+        finally {
+                log.info(":::Closing kafkaConsumer:::");
+                kafkaConsumer.close();
+        }
+    }
+}
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java
new file mode 100644 (file)
index 0000000..cd5b663
--- /dev/null
@@ -0,0 +1,40 @@
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+public class HdfsWriter {
+
+    private static Logger log = LoggerFactory.getLogger(CreateKafkaConsumer.class);
+
+
+    public FileSystem createHdfsFileSystem(String hdfsDestination) throws IOException {
+        Configuration hdfsConfiguration = new Configuration();
+        FileSystem hdfsFileSystem = FileSystem.get(URI.create(hdfsDestination), hdfsConfiguration);
+        log.info(":::Created hdfsFileSystem:::");
+        return hdfsFileSystem;
+    }
+
+
+    public void writeMessageToHdfs(FSDataOutputStream fsDataOutputStream, String bytesFromKafka) throws IOException {
+        fsDataOutputStream.writeBytes(bytesFromKafka);
+        log.info(":::Wrote to HDFS:::");
+    }
+
+
+    public FSDataOutputStream  invokeHdfsWriter(FileSystem hdfsFileSystem, String hdfsFile) throws IOException {
+        FSDataOutputStream fsDataOutputStream;
+        if(!hdfsFileSystem.exists(new Path("/"+hdfsFile)))
+            fsDataOutputStream = hdfsFileSystem.create(new Path("/"+hdfsFile));
+        else
+            fsDataOutputStream = hdfsFileSystem.append(new Path("/"+hdfsFile));
+        log.info(":::HDFSWriter invoked:::");
+        return fsDataOutputStream;
+    }
+
+}
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java
new file mode 100644 (file)
index 0000000..b4daf2d
--- /dev/null
@@ -0,0 +1,51 @@
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import config.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class Orchestrator {
+
+    private static Logger logger = LoggerFactory.getLogger(Orchestrator.class);
+
+    public void init(String configYamlFile){
+
+        parseConfigYaml(configYamlFile);
+    }
+
+    private void parseConfigYaml(String configYaml) {
+
+        URL fileUrl = getClass().getResource(configYaml);
+        if(fileUrl==null)
+            System.out.println("::: Config file missing!!! :::");
+
+        else{
+            Configuration conf = new Configuration();
+            ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+            String realConfigYaml = configYaml;
+
+            if (!realConfigYaml.startsWith("/")) {
+                realConfigYaml = "/" + configYaml;
+            }
+            Map<String, Object> configs;
+            try (InputStream is = getClass().getResourceAsStream(realConfigYaml)) {
+                TypeReference<HashMap<String, Object>> typeRef
+                        = new TypeReference<HashMap<String, Object>>() {
+                };
+                configs = mapper.readValue(is, typeRef);
+                conf.init(configs);
+
+            } catch (Exception e) {
+                logger.error(e.getMessage());
+            }
+        }
+    }
+}
+
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java
new file mode 100644 (file)
index 0000000..c7de131
--- /dev/null
@@ -0,0 +1,38 @@
+package config;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class Configuration{
+
+    private static Logger log = LoggerFactory.getLogger(Configuration.class);
+    private static Map<String, Map<String, Object>> settings;
+
+    public void init(Map<String, Object> yamlConfigs){
+        settings = new HashMap<>();
+
+        if(yamlConfigs!=null){
+            Iterator<String> keys = yamlConfigs.keySet().iterator();
+            while(keys.hasNext()){
+                String key = keys.next();
+
+                Object value = yamlConfigs.get(key);
+
+                if(value instanceof Map){
+                    Map<String, Object> valueMap = (Map<String, Object>) value;
+                    settings.put(key, valueMap);
+                }
+            }
+        }
+    log.info(":::Settings initiated :::");
+    }
+
+    public static Map<String, Map<String, Object>> getSettings() {
+        return settings;
+    }
+}
\ No newline at end of file
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java
new file mode 100644 (file)
index 0000000..5c04113
--- /dev/null
@@ -0,0 +1,14 @@
+import java.io.IOException;
+
+public class kafka2hdfsApp {
+
+    public static void main(String[] args) throws IOException {
+        System.out.println("Begin::: kafka2hdfsApp");
+        Orchestrator orchestrator = new Orchestrator();
+        orchestrator.init(args[1]);
+
+        CreateKafkaConsumer createKafkaConsumer = new CreateKafkaConsumer();
+        createKafkaConsumer.processKafkaMessage();
+
+    }
+}
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml
new file mode 100644 (file)
index 0000000..8955c30
--- /dev/null
@@ -0,0 +1,10 @@
+kafka:
+  bootStrapServers:
+  group_id:
+  key_deserialize_class:
+  value_deserialize_class:
+  topic:
+
+hdfs:
+  hdfsURL:
+  hdfs_remote_file: