2  * ============LICENSE_START=======================================================
\r 
   4  * ================================================================================
\r 
   5  * Copyright 2019 China Mobile
\r 
   6  *=================================================================================
\r 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
\r 
   8  * you may not use this file except in compliance with the License.
\r 
   9  * You may obtain a copy of the License at
\r 
  11  *     http://www.apache.org/licenses/LICENSE-2.0
\r 
  13  * Unless required by applicable law or agreed to in writing, software
\r 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
\r 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r 
  16  * See the License for the specific language governing permissions and
\r 
  17  * limitations under the License.
\r 
  18  * ============LICENSE_END=========================================================
\r 
  20 package org.onap.datalake.feeder.util;
\r 
  22 import com.fasterxml.jackson.databind.JsonNode;
\r 
  23 import com.fasterxml.jackson.databind.ObjectMapper;
\r 
  24 import com.fasterxml.jackson.databind.node.JsonNodeType;
\r 
  25 import lombok.Getter;
\r 
  26 import org.apache.velocity.Template;
\r 
  27 import org.apache.velocity.VelocityContext;
\r 
  28 import org.apache.velocity.app.Velocity;
\r 
  29 import org.apache.velocity.runtime.RuntimeConstants;
\r 
  30 import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
\r 
  32 import java.io.BufferedWriter;
\r 
  33 import java.io.FileWriter;
\r 
  34 import java.io.IOException;
\r 
  35 import java.net.MalformedURLException;
\r 
  36 import java.util.ArrayList;
\r 
  37 import java.util.Iterator;
\r 
  38 import java.util.List;
\r 
  39 import java.util.Map.Entry;
\r 
  43  * read sample json and output supervisor to  resources\druid\generated
\r 
  44  * need manual edit to be production ready, final versions are in resources\druid
\r 
  46  * http://druid.io/docs/latest/tutorials/tutorial-ingestion-spec.html
\r 
  47  * http://druid.io/docs/latest/ingestion/flatten-json
\r 
  51  * reduce the manual editing
\r 
  54  * auto get sample, and for each topic, get multiple samples.
\r 
  55  * make supervisor file names consistent
\r 
  56  * dimension type default is string, in msgrtr.apinode.metrics.dmaap , many are long/double, so need to generate dimensionsSpec, this is done at the end of printFlattenSpec()
\r 
  60 public class DruidSupervisorGenerator {
\r 
  62     Template template = null;
\r 
  63     VelocityContext context;
\r 
  65     List<String[]> dimensions;
\r 
  67     public DruidSupervisorGenerator() {
\r 
  68         dimensions = new ArrayList<>();
\r 
  70         Velocity.setProperty(RuntimeConstants.RESOURCE_LOADER, "classpath");
\r 
  71         Velocity.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName());
\r 
  75         context = new VelocityContext();
\r 
  77         context.put("host", "message-router-kafka:9092");//TODO get from config
\r 
  79         template = Velocity.getTemplate("druid/kafka-supervisor-template.vm");
\r 
  82     private void printNode(String prefix, JsonNode node) {
\r 
  84         // lets see what type the node is
\r 
  85         //              System.out.println("NodeType=" + node.getNodeType() + ", isContainerNode=" + node.isContainerNode() + ", " + node); // prints OBJECT
\r 
  87         if (node.isContainerNode()) {
\r 
  89             Iterator<Entry<String, JsonNode>> fields = node.fields();
\r 
  91             while (fields.hasNext()) {
\r 
  92                 Entry<String, JsonNode> field = fields.next();
\r 
  93                 //                              System.out.println("--------"+field.getKey()+"--------");
\r 
  94                 printNode(prefix + "." + field.getKey(), field.getValue());
\r 
  97             if (node.isArray()) {
\r 
  98                 Iterator<JsonNode> elements = node.elements();
\r 
 100                 while (elements.hasNext()) {
\r 
 101                     JsonNode element = elements.next();
\r 
 102                     printNode(prefix + "[" + i + "]", element);
\r 
 108             printFlattenSpec(node.getNodeType(), prefix);
\r 
 113     private void printFlattenSpec(JsonNodeType type, String path) {
\r 
 114         String name = path.substring(2).replace('.', ':');
\r 
 115         // lets see what type the node is
\r 
 116         System.out.println("{");
\r 
 117         System.out.println("\"type\": \"path\",");
\r 
 118         System.out.println("\"name\": \"" + name + "\",");
\r 
 119         System.out.println("\"expr\": \"" + path + "\"");
\r 
 120         System.out.println("},");
\r 
 122         dimensions.add(new String[]{name, path});
\r 
 124                  //for  dimensionsSpec
\r 
 125                                 if (JsonNodeType.NUMBER.equals(type)) {
\r 
 126                                         System.out.println("{");
\r 
 127                                         System.out.println("\"type\": \"long\",");
\r 
 128                                         System.out.println("\"name\": \"" + name + "\","); 
\r 
 129                                         System.out.println("},");
\r 
 131                                         System.out.println("\"" + name + "\",");
\r 
 137     public void doTopic(String topic) throws IOException {
\r 
 138         dimensions.clear();
\r 
 140         String sampleFileName = "src/main/resources/druid/" + topic + "-sample-format.json";//FIXME hard coded path
\r 
 141         String outputFileName = "src/main/resources/druid/generated/" + topic + "-kafka-supervisor.json";
\r 
 143         // Get the contents of json as a string using commons IO IOUTils class.
\r 
 144         String sampleJson = Util.getTextFromFile(sampleFileName);
\r 
 146         // create an ObjectMapper instance.
\r 
 147         ObjectMapper mapper = new ObjectMapper();
\r 
 148         // use the ObjectMapper to read the json string and create a tree
\r 
 149         JsonNode root = mapper.readTree(sampleJson);
\r 
 150         printNode("$", root);
\r 
 152         context.put("topic", topic);
\r 
 153         context.put("timestamp", "event-header:timestamp");//FIXME hard coded, should be topic based
\r 
 154         context.put("timestampFormat", "yyyyMMdd-HH:mm:ss:SSS");//FIXME hard coded, should be topic based
\r 
 155         context.put("dimensions", dimensions);
\r 
 157         BufferedWriter out = new BufferedWriter(new FileWriter(outputFileName));
\r 
 159         template.merge(context, out);
\r 
 163     public static void main(String[] args) throws MalformedURLException, IOException {
\r 
 164         String[] topics = new String[]{"AAI-EVENT", "msgrtr.apinode.metrics.dmaap", "unauthenticated.DCAE_CL_OUTPUT", "unauthenticated.SEC_FAULT_OUTPUT"};//FIXME hard coded
\r 
 166         DruidSupervisorGenerator p = new DruidSupervisorGenerator();
\r 
 168         for (String topic : topics) {
\r