2 * ============LICENSE_START=======================================================
\r
4 * ================================================================================
\r
5 * Copyright 2019 China Mobile
\r
6 *=================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
11 * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * Unless required by applicable law or agreed to in writing, software
\r
14 * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * See the License for the specific language governing permissions and
\r
17 * limitations under the License.
\r
18 * ============LICENSE_END=========================================================
\r
20 package org.onap.datalake.feeder.util;
\r
22 import java.io.BufferedWriter;
\r
23 import java.io.FileWriter;
\r
24 import java.io.IOException;
\r
25 import java.net.MalformedURLException;
\r
26 import java.util.ArrayList;
\r
27 import java.util.Iterator;
\r
28 import java.util.List;
\r
29 import java.util.Map.Entry;
\r
31 import org.apache.velocity.Template;
\r
32 import org.apache.velocity.VelocityContext;
\r
33 import org.apache.velocity.app.Velocity;
\r
34 import org.apache.velocity.runtime.RuntimeConstants;
\r
35 import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
\r
36 import org.onap.datalake.feeder.enumeration.DataFormat;
\r
38 import com.fasterxml.jackson.databind.JsonNode;
\r
39 import com.fasterxml.jackson.databind.ObjectMapper;
\r
40 import com.fasterxml.jackson.databind.node.JsonNodeType;
\r
42 import lombok.Getter;
\r
43 import lombok.Setter;
\r
47 * read sample json and output supervisor to resources\druid\generated
\r
48 * need manual edit to be production ready, final versions are in resources\druid
\r
50 * http://druid.io/docs/latest/tutorials/tutorial-ingestion-spec.html
\r
51 * http://druid.io/docs/latest/ingestion/flatten-json
\r
55 * reduce the manual editing
\r
58 * auto get sample, and for each topic, get multiple samples.
\r
59 * make supervisor file names consistent
\r
60 * dimension type default is string, in msgrtr.apinode.metrics.dmaap , many are long/double, so need to generate dimensionsSpec, this is done at the end of printFlattenSpec()
\r
64 public class DruidSupervisorGenerator {
\r
66 Template template = null;
\r
67 VelocityContext context;
\r
69 List<String[]> dimensions;
\r
71 public DruidSupervisorGenerator() {
\r
72 dimensions = new ArrayList<>();
\r
74 Velocity.setProperty(RuntimeConstants.RESOURCE_LOADER, "classpath");
\r
75 Velocity.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName());
\r
79 context = new VelocityContext();
\r
81 context.put("host", "message-router-kafka:9092");//TODO get from config
\r
83 template = Velocity.getTemplate("druid/kafka-supervisor-template.vm");
\r
86 private void printNode(String prefix, JsonNode node) {
\r
88 // lets see what type the node is
\r
89 // System.out.println("NodeType=" + node.getNodeType() + ", isContainerNode=" + node.isContainerNode() + ", " + node); // prints OBJECT
\r
91 if (node.isContainerNode()) {
\r
93 Iterator<Entry<String, JsonNode>> fields = node.fields();
\r
95 while (fields.hasNext()) {
\r
96 Entry<String, JsonNode> field = fields.next();
\r
97 // System.out.println("--------"+field.getKey()+"--------");
\r
98 printNode(prefix + "." + field.getKey(), field.getValue());
\r
101 if (node.isArray()) {
\r
102 Iterator<JsonNode> elements = node.elements();
\r
104 while (elements.hasNext()) {
\r
105 JsonNode element = elements.next();
\r
106 printNode(prefix + "[" + i + "]", element);
\r
112 printFlattenSpec(node.getNodeType(), prefix);
\r
117 private void printFlattenSpec(JsonNodeType type, String path) {
\r
118 String name = path.substring(2).replace('.', ':');
\r
119 // lets see what type the node is
\r
120 System.out.println("{");
\r
121 System.out.println("\"type\": \"path\",");
\r
122 System.out.println("\"name\": \"" + name + "\",");
\r
123 System.out.println("\"expr\": \"" + path + "\"");
\r
124 System.out.println("},");
\r
126 dimensions.add(new String[] { name, path });
\r
128 //for dimensionsSpec
\r
129 if (JsonNodeType.NUMBER.equals(type)) {
\r
130 System.out.println("{");
\r
131 System.out.println("\"type\": \"long\",");
\r
132 System.out.println("\"name\": \"" + name + "\",");
\r
133 System.out.println("},");
\r
135 System.out.println("\"" + name + "\",");
\r
141 public void doTopic(String topic) throws IOException {
\r
142 dimensions.clear();
\r
144 String sampleFileName = "C:\\git\\onap\\datalake\\olap\\src\\main\\resources\\druid\\" + topic + "-sample-format.json";//FIXME hard coded path
\r
145 String outputFileName = "C:\\git\\onap\\datalake\\olap\\src\\main\\resources\\druid\\generated\\" + topic + "-kafka-supervisor.json";
\r
147 // Get the contents of json as a string using commons IO IOUTils class.
\r
148 String sampleJson = Util.getTextFromFile(sampleFileName);
\r
150 // create an ObjectMapper instance.
\r
151 ObjectMapper mapper = new ObjectMapper();
\r
152 // use the ObjectMapper to read the json string and create a tree
\r
153 JsonNode root = mapper.readTree(sampleJson);
\r
154 printNode("$", root);
\r
156 context.put("topic", topic);
\r
157 context.put("timestamp", "event-header:timestamp");//FIXME hard coded, should be topic based
\r
158 context.put("timestampFormat", "yyyyMMdd-HH:mm:ss:SSS");//FIXME hard coded, should be topic based
\r
159 context.put("dimensions", dimensions);
\r
161 BufferedWriter out = new BufferedWriter(new FileWriter(outputFileName));
\r
163 template.merge(context, out);
\r
167 public static void main(String[] args) throws MalformedURLException, IOException {
\r
168 String[] topics = new String[] { "AAI-EVENT", "msgrtr.apinode.metrics.dmaap", "unauthenticated.DCAE_CL_OUTPUT", "unauthenticated.SEC_FAULT_OUTPUT" };//FIXME hard coded
\r
170 DruidSupervisorGenerator p = new DruidSupervisorGenerator();
\r
172 for (String topic : topics) {
\r