From b6555bc1f55e82a4436232ec28b2f5e50600d70d Mon Sep 17 00:00:00 2001 From: Rajamohan Raj Date: Tue, 7 May 2019 01:05:25 +0000 Subject: [PATCH] Source code initial workingDraft kafka2hdfs writer. Change-Id: I29576fb6d1e2700de5b1097e4ddfc261ca477ee9 Issue-ID: ONAPARC-451 Signed-off-by: Rajamohan Raj --- vnfs/DAaaS/.gitignore | 7 ++ .../hdfs-writer-source-code/hdfs-writer/README.md | 11 ++ .../hdfs-writer-source-code/hdfs-writer/pom.xml | 111 +++++++++++++++++++++ .../src/main/java/CreateKafkaConsumer.java | 81 +++++++++++++++ .../hdfs-writer/src/main/java/HdfsWriter.java | 40 ++++++++ .../hdfs-writer/src/main/java/Orchestrator.java | 51 ++++++++++ .../src/main/java/config/Configuration.java | 38 +++++++ .../hdfs-writer/src/main/java/kafka2hdfsApp.java | 14 +++ .../hdfs-writer/src/main/resources/configs.yaml | 10 ++ 9 files changed, 363 insertions(+) create mode 100644 vnfs/DAaaS/.gitignore create mode 100644 vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/README.md create mode 100644 vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/pom.xml create mode 100644 vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java create mode 100644 vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java create mode 100644 vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java create mode 100644 vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java create mode 100644 vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java create mode 100644 vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml diff --git a/vnfs/DAaaS/.gitignore b/vnfs/DAaaS/.gitignore new file mode 100644 index 00000000..b13f7a20 --- /dev/null +++ b/vnfs/DAaaS/.gitignore @@ -0,0 +1,7 @@ +*.iml +.idea +target/ +dependency-reduced-pom.xml +File2hdfsApp.java +copyScript.sh +sample_configs.yaml diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/README.md b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/README.md new file mode 100644 index 00000000..4de7d0f9 --- /dev/null +++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/README.md @@ -0,0 +1,11 @@ +# HDFS-writer + +HDFS writer can read from a message from kafka topic and persist that in the +HDFS file system given. This is a work in progress and shall be moved +to separate source code repo later. + +## Usage + +## Config items + +## Troubleshooting \ No newline at end of file diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/pom.xml b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/pom.xml new file mode 100644 index 00000000..20c11fea --- /dev/null +++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/pom.xml @@ -0,0 +1,111 @@ + + + 4.0.0 + + com.intel.onap + hdfs-writer + 1.0 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-shade-plugin + 2.3 + + + package + + shade + + + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + maven-assembly-plugin + + + + kafka2hdfsApp + + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + + + + org.apache.hadoop + hadoop-core + 1.2.1 + + + org.apache.hadoop + hadoop-client + 3.2.0 + + + org.apache.hadoop + hadoop-hdfs + 2.7.1 + + + org.apache.kafka + kafka-clients + 2.2.0 + + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + 2.9.8 + + + com.fasterxml.jackson.core + jackson-databind + 2.2.3 + + + + + \ No newline at end of file diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java new file mode 100644 index 00000000..2042a146 --- /dev/null +++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java @@ -0,0 +1,81 @@ +import config.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; + +public class CreateKafkaConsumer { + + + private static Logger log = LoggerFactory.getLogger(CreateKafkaConsumer.class); + + private final String BOOTSTRAP_SERVERS = (String) Configuration.getSettings().get("kafka").get("bootStrapServers"); + private final String GROUP_ID_CONFIG = (String) Configuration.getSettings().get("kafka").get("group_id"); + private final String KEY_DESERIALIZER = (String) Configuration.getSettings().get("kafka").get("key_deserialize_class"); + private final String VAL_DESERIALIZER = (String) Configuration.getSettings().get("kafka").get("value_deserialize_class"); + private final String KAFKA_TOPIC = (String) Configuration.getSettings().get("kafka").get("topic"); + + private final String HDFS_URL= (String) Configuration.getSettings().get("hdfs").get("hdfsURL"); + private final String HDFS_REMOTE_FILE = (String) Configuration.getSettings().get("hdfs").get("hdfs_remote_file"); + + private KafkaConsumer kafkaConsumer; + private Properties properties = new Properties(); + private HdfsWriter hdfsWriter; + private FileSystem hdfsFileSystem; + + + + public CreateKafkaConsumer() throws IOException{ + setKafkaProperties(); + kafkaConsumer = new KafkaConsumer<>(properties); + kafkaConsumer.subscribe(Collections.singletonList(KAFKA_TOPIC)); + hdfsWriter = new HdfsWriter(); + hdfsFileSystem = hdfsWriter.createHdfsFileSystem(HDFS_URL); + log.info(":::Created kafkaConsumer:::"); + } + + private void setKafkaProperties(){ + + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VAL_DESERIALIZER); + log.info(":::Set kafka properties:::"); + } + + + public void processKafkaMessage() throws IOException{ + try{ + while(true){ + ConsumerRecords recordsPerPartition = kafkaConsumer.poll(100000); + if(recordsPerPartition.isEmpty()) + log.info(":::recordsPerPartition is NULL:::"); + else + log.info(":::size of recordsPerPartition: "+recordsPerPartition.count()+" :::"); + + for(ConsumerRecord record:recordsPerPartition){ + log.info("Topic: "+record.topic()); + log.info("partition: "+record.partition()); + log.info("ReceivedKey: "+record.key()+" ReceivedValue: "+record.value()); + FSDataOutputStream fsDataOutputStream = hdfsWriter.invokeHdfsWriter(hdfsFileSystem, HDFS_REMOTE_FILE); + hdfsWriter.writeMessageToHdfs(fsDataOutputStream, record.value()); + fsDataOutputStream.close(); + } + + } + } + + finally { + log.info(":::Closing kafkaConsumer:::"); + kafkaConsumer.close(); + } + } +} diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java new file mode 100644 index 00000000..cd5b6635 --- /dev/null +++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java @@ -0,0 +1,40 @@ +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; + +public class HdfsWriter { + + private static Logger log = LoggerFactory.getLogger(CreateKafkaConsumer.class); + + + public FileSystem createHdfsFileSystem(String hdfsDestination) throws IOException { + Configuration hdfsConfiguration = new Configuration(); + FileSystem hdfsFileSystem = FileSystem.get(URI.create(hdfsDestination), hdfsConfiguration); + log.info(":::Created hdfsFileSystem:::"); + return hdfsFileSystem; + } + + + public void writeMessageToHdfs(FSDataOutputStream fsDataOutputStream, String bytesFromKafka) throws IOException { + fsDataOutputStream.writeBytes(bytesFromKafka); + log.info(":::Wrote to HDFS:::"); + } + + + public FSDataOutputStream invokeHdfsWriter(FileSystem hdfsFileSystem, String hdfsFile) throws IOException { + FSDataOutputStream fsDataOutputStream; + if(!hdfsFileSystem.exists(new Path("/"+hdfsFile))) + fsDataOutputStream = hdfsFileSystem.create(new Path("/"+hdfsFile)); + else + fsDataOutputStream = hdfsFileSystem.append(new Path("/"+hdfsFile)); + log.info(":::HDFSWriter invoked:::"); + return fsDataOutputStream; + } + +} diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java new file mode 100644 index 00000000..b4daf2d1 --- /dev/null +++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java @@ -0,0 +1,51 @@ +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import config.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; + + +public class Orchestrator { + + private static Logger logger = LoggerFactory.getLogger(Orchestrator.class); + + public void init(String configYamlFile){ + + parseConfigYaml(configYamlFile); + } + + private void parseConfigYaml(String configYaml) { + + URL fileUrl = getClass().getResource(configYaml); + if(fileUrl==null) + System.out.println("::: Config file missing!!! :::"); + + else{ + Configuration conf = new Configuration(); + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + String realConfigYaml = configYaml; + + if (!realConfigYaml.startsWith("/")) { + realConfigYaml = "/" + configYaml; + } + Map configs; + try (InputStream is = getClass().getResourceAsStream(realConfigYaml)) { + TypeReference> typeRef + = new TypeReference>() { + }; + configs = mapper.readValue(is, typeRef); + conf.init(configs); + + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + } +} + diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java new file mode 100644 index 00000000..c7de131b --- /dev/null +++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java @@ -0,0 +1,38 @@ +package config; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class Configuration{ + + private static Logger log = LoggerFactory.getLogger(Configuration.class); + private static Map> settings; + + public void init(Map yamlConfigs){ + settings = new HashMap<>(); + + if(yamlConfigs!=null){ + Iterator keys = yamlConfigs.keySet().iterator(); + while(keys.hasNext()){ + String key = keys.next(); + + Object value = yamlConfigs.get(key); + + if(value instanceof Map){ + Map valueMap = (Map) value; + settings.put(key, valueMap); + } + } + } + log.info(":::Settings initiated :::"); + } + + public static Map> getSettings() { + return settings; + } +} \ No newline at end of file diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java new file mode 100644 index 00000000..5c041134 --- /dev/null +++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java @@ -0,0 +1,14 @@ +import java.io.IOException; + +public class kafka2hdfsApp { + + public static void main(String[] args) throws IOException { + System.out.println("Begin::: kafka2hdfsApp"); + Orchestrator orchestrator = new Orchestrator(); + orchestrator.init(args[1]); + + CreateKafkaConsumer createKafkaConsumer = new CreateKafkaConsumer(); + createKafkaConsumer.processKafkaMessage(); + + } +} diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml new file mode 100644 index 00000000..8955c304 --- /dev/null +++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml @@ -0,0 +1,10 @@ +kafka: + bootStrapServers: + group_id: + key_deserialize_class: + value_deserialize_class: + topic: + +hdfs: + hdfsURL: + hdfs_remote_file: -- 2.16.6