<dependencies>
+ <dependency>
+ <groupId>org.json</groupId>
+ <artifactId>json</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<artifactId>jackson-databind</artifactId>
</dependency>
- <dependency>
- <groupId>org.json</groupId>
- <artifactId>json</artifactId>
- </dependency>
-
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
package org.onap.datalake.feeder.config;
-import java.util.Set;
-
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringBootConfiguration;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.couchbase.CouchbaseConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.FilterType;
+import org.springframework.context.annotation.ComponentScan;
import lombok.Getter;
import lombok.Setter;
*/
@Getter
@Setter
-@Configuration
+@SpringBootConfiguration
@ConfigurationProperties
+//@ComponentScan(excludeFilters = @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = CouchbaseConfiguration.class))
+//https://stackoverflow.com/questions/29344313/prevent-application-commandlinerunner-classes-from-executing-during-junit-test
+@EnableAutoConfiguration
+//@Profile("test")
public class ApplicationConfiguration {
private String couchbaseHost;
// private int mongodbPort;
// private String mongodbDatabase;
- private boolean storeJson;
- private boolean storeYaml;
- private boolean storeXml;
-
private String dmaapZookeeperHostPort;
private String dmaapKafkaHostPort;
private String dmaapKafkaGroup;
import javax.validation.constraints.NotNull;
+import org.apache.commons.lang3.StringUtils;
+import org.json.JSONObject;
import org.onap.datalake.feeder.enumeration.DataFormat;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;
import org.springframework.data.couchbase.core.mapping.Document;
+
+import lombok.Setter;
/**
* Domain class representing topic table in Couchbase
*
*/
@Document
+@Setter
public class Topic {
@NotNull
@Id
//if this flag is true, need to correlate alarm cleared message to previous alarm
private Boolean correlateClearedMessage;
+
+ //the value in the JSON with this path will be used as DB id
+ private String messageIdPath;
public Topic() {
}
//if 'this' Topic does not have the setting, use default Topic's
private boolean is(Boolean b, Predicate<Topic> pre) {
+ return is(b, pre, false);
+ }
+
+ private boolean is(Boolean b, Predicate<Topic> pre, boolean defaultValue) {
if (b != null) {
return b;
} else if (defaultTopic != null) {
return pre.test(defaultTopic);
} else {
- return false;
+ return defaultValue;
}
}
return is(supportDruid, Topic::isSupportDruid);
}
+ //extract DB id from a JSON attribute, TODO support multiple attributes
+ public String getMessageId(JSONObject json) {
+ String id = null;
+
+ if(StringUtils.isNotBlank(messageIdPath)) {
+ id = json.query(messageIdPath).toString();
+ }
+
+ return id;
+ }
+
@Override
public String toString() {
return id;
}
- // for testing
- public static void main(String[] args) {
- Topic defaultTopic=new Topic("def");
- Topic test = new Topic("test");
- test.setDefaultTopic(defaultTopic);
- defaultTopic.supportElasticsearch=true;
- boolean b = test.isSupportElasticsearch();
- System.out.println(b);
+ /**
+ * @return the messageIdPath
+ */
+ public String getMessageIdPath() {
+ return messageIdPath;
+ }
+
+ /**
+ * @param messageIdPath the messageIdPath to set
+ */
+ public void setMessageIdPath(String messageIdPath) {
+ this.messageIdPath = messageIdPath;
}
}
*/
public enum DataFormat {
JSON, XML, YAML, TEXT;
+
+ public static DataFormat fromString(String s) {
+ if ("JSON".equalsIgnoreCase(s)) {
+ return JSON;
+ }
+ if ("XML".equalsIgnoreCase(s)) {
+ return XML;
+ }
+ if ("YAML".equalsIgnoreCase(s)) {
+ return YAML;
+ }
+ if ("TEXT".equalsIgnoreCase(s)) {
+ return TEXT;
+ }
+ throw new IllegalArgumentException("Invalid value for format: " + s);
+ }
}
import org.apache.velocity.app.Velocity;\r
import org.apache.velocity.runtime.RuntimeConstants;\r
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;\r
+import org.onap.datalake.feeder.enumeration.DataFormat;\r
\r
import com.fasterxml.jackson.databind.JsonNode;\r
import com.fasterxml.jackson.databind.ObjectMapper;\r
import com.fasterxml.jackson.databind.node.JsonNodeType;\r
\r
+import lombok.Getter;\r
+import lombok.Setter;\r
+\r
\r
/*\r
* read sample json and output supervisor to resources\druid\generated\r
* dimension type default is string, in msgrtr.apinode.metrics.dmaap , many are long/double, so need to generate dimensionsSpec, this is done at the end of printFlattenSpec()\r
*/\r
\r
+@Getter\r
public class DruidSupervisorGenerator {\r
\r
Template template = null;\r
\r
context = new VelocityContext();\r
\r
- context.put("host", "dl_dmaap_kf");\r
+ context.put("host", "message-router-kafka:9092");//TODO get from config\r
\r
template = Velocity.getTemplate("druid/kafka-supervisor-template.vm");\r
}\r
\r
- public void printNode(String prefix, JsonNode node) {\r
+ private void printNode(String prefix, JsonNode node) {\r
\r
// lets see what type the node is\r
// System.out.println("NodeType=" + node.getNodeType() + ", isContainerNode=" + node.isContainerNode() + ", " + node); // prints OBJECT\r
\r
}\r
\r
- public void printFlattenSpec(JsonNodeType type, String path) {\r
+ private void printFlattenSpec(JsonNodeType type, String path) {\r
String name = path.substring(2).replace('.', ':');\r
// lets see what type the node is\r
System.out.println("{");\r
context.put("topic", topic);\r
context.put("timestamp", "event-header:timestamp");//FIXME hard coded, should be topic based\r
context.put("timestampFormat", "yyyyMMdd-HH:mm:ss:SSS");//FIXME hard coded, should be topic based\r
-\r
context.put("dimensions", dimensions);\r
\r
BufferedWriter out = new BufferedWriter(new FileWriter(outputFileName));\r
--- /dev/null
+{
+ "type": "kafka",
+ "dataSchema": {
+ "dataSource": "$topic",
+ "parser": {
+ "type": "string",
+ "parseSpec": {
+ "format": "json",
+ "flattenSpec": {
+ "useFieldDiscovery": false,
+ "fields": [
+ #foreach($flatten in $dimensions)
+
+ {
+"type": "path",
+"name": "$flatten[0]",
+"expr": "$flatten[1]"
+},
+ #end
+ ]
+ },
+ "timestampSpec": {
+ "column": "$timestamp",
+ "format": "$timestampFormat"
+ },
+ "dimensionsSpec": {
+ "dimensions": [
+ ],
+ "dimensionsExclusions": [
+ ]
+ }
+ }
+ },
+ "metricsSpec": [],
+ "granularitySpec": {
+ "type": "uniform",
+ "segmentGranularity": "HOUR",
+ "queryGranularity": "MINUTE",
+ "rollup": false
+ }
+ },
+ "tuningConfig": {
+ "type": "kafka",
+ "reportParseExceptions": true
+ },
+ "ioConfig": {
+ "topic": "$topic",
+ "replicas": 1,
+ "startDelay": "PT1S",
+ "taskDuration": "PT1H",
+ "completionTimeout": "PT30M",
+ "consumerProperties": {
+ "bootstrap.servers": "$host"
+ },
+ "useEarliestOffset": true
+ }
+}
--- /dev/null
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.config;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.onap.datalake.feeder.Application;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.ConfigFileApplicationContextInitializer;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * test ApplicationConfiguration
+ *
+ * @author Guobiao Mo
+ *
+ */
+//@RunWith(SpringRunner.class)
+//@SpringBootTest
+/*
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = Application.class,
+ initializers = ConfigFileApplicationContextInitializer.class)
+*/
+@RunWith(SpringJUnit4ClassRunner.class)
+@SpringBootTest(classes = ApplicationConfiguration.class)
+//@ActiveProfiles("test")
+public class ApplicationConfigurationTest {
+
+ @Autowired
+ private ApplicationConfiguration config;
+
+ @Test
+ public void readConfig() {
+ assertNotNull(config.getCouchbaseHost());
+ assertNotNull(config.getCouchbaseUser());
+ assertNotNull(config.getCouchbasePass());
+ assertNotNull(config.getCouchbaseBucket());
+
+ assertNotNull(config.getDmaapZookeeperHostPort());
+ assertNotNull(config.getDmaapKafkaHostPort());
+ assertNotNull(config.getDmaapKafkaGroup());
+ assertTrue(config.getDmaapKafkaTimeout() > 0L);
+ assertTrue(config.getDmaapCheckNewTopicIntervalInSec() > 0);
+
+ assertTrue(config.getKafkaConsumerCount() > 0);
+ }
+
+}
--- /dev/null
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.domain;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.json.JSONObject;
+import org.junit.Test;
+
+/**
+ * Test Topic
+ *
+ * @author Guobiao Mo
+ *
+ */
+
+public class TopicTest {
+
+ @Test
+ public void getMessageId() {
+ String text = "{ data: { data2 : { value : 'hello'}}}";
+
+ JSONObject json = new JSONObject(text);
+
+ Topic topic = new Topic("test getMessageId");
+ topic.setMessageIdPath("/data/data2/value");
+
+ String value = topic.getMessageId(json);
+
+ assertEquals(value, "hello");
+ }
+
+ @Test
+ public void testIs() {
+ Topic defaultTopic=new Topic("default");
+ Topic testTopic = new Topic("test");
+ testTopic.setDefaultTopic(defaultTopic);
+
+ defaultTopic.setSupportElasticsearch(true);
+ boolean b = testTopic.isSupportElasticsearch();
+ assertTrue(b);
+
+ defaultTopic.setSupportElasticsearch(false);
+ b = testTopic.isSupportElasticsearch();
+ assertFalse(b);
+ }
+}
--- /dev/null
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DCAE
+* ================================================================================
+* Copyright 2018 TechMahindra
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.enumeration;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+/**
+ * Test Data format of DMaaP messages
+ *
+ * @author Guobiao Mo
+ *
+ */
+public class DataFormatTest {
+ @Test
+ public void fromString() {
+ assertEquals(DataFormat.JSON, DataFormat.fromString("json"));
+ assertEquals(DataFormat.XML, DataFormat.fromString("xml"));
+ assertEquals(DataFormat.YAML, DataFormat.fromString("YAML"));
+ assertEquals(DataFormat.TEXT, DataFormat.fromString("Text"));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void fromStringWithException() {
+ DataFormat.fromString("test");
+ }
+}
--- /dev/null
+/*\r
+* ============LICENSE_START=======================================================\r
+* ONAP : DataLake\r
+* ================================================================================\r
+* Copyright 2019 China Mobile\r
+*=================================================================================\r
+* Licensed under the Apache License, Version 2.0 (the "License");\r
+* you may not use this file except in compliance with the License.\r
+* You may obtain a copy of the License at\r
+*\r
+* http://www.apache.org/licenses/LICENSE-2.0\r
+*\r
+* Unless required by applicable law or agreed to in writing, software\r
+* distributed under the License is distributed on an "AS IS" BASIS,\r
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+* See the License for the specific language governing permissions and\r
+* limitations under the License.\r
+* ============LICENSE_END=========================================================\r
+*/\r
+package org.onap.datalake.feeder.util;\r
+\r
+import static org.junit.Assert.assertEquals;\r
+import static org.junit.Assert.assertNotNull;\r
+\r
+import org.apache.velocity.VelocityContext;\r
+import org.junit.Test;\r
+import org.junit.runner.RunWith; \r
+import org.onap.datalake.feeder.config.ApplicationConfiguration;\r
+import org.springframework.beans.factory.annotation.Autowired; \r
+import org.springframework.boot.test.context.SpringBootTest; \r
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;\r
+\r
+/**\r
+ * Test DruidSupervisorGenerator\r
+ * @author Guobiao Mo\r
+ *\r
+ */\r
+\r
+@RunWith(SpringJUnit4ClassRunner.class)\r
+@SpringBootTest(classes = ApplicationConfiguration.class)\r
+\r
+public class DruidSupervisorGeneratorTest {\r
+\r
+ @Autowired\r
+ private ApplicationConfiguration config;\r
+\r
+ @Test\r
+ public void testConstructor() {\r
+ DruidSupervisorGenerator gen = new DruidSupervisorGenerator();\r
+ VelocityContext context= gen.getContext();\r
+\r
+ assertNotNull(context);\r
+ assertNotNull(gen.getDimensions() );\r
+ assertNotNull(gen.getTemplate() );\r
+ \r
+ String host = (String) context.get("host"); \r
+ assertEquals(host, config.getDmaapKafkaHostPort()); \r
+ }\r
+}\r
--- /dev/null
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DCAE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+/**
+ * test utils
+ *
+ * @author Guobiao Mo
+ *
+ */
+public class UtilTest {
+
+ @Test
+ //only dot(.) in key got replaced
+ public void replaceDotInKey() {
+ String a = "\"u-y.t.y-t\":\"u.gfh\",\\\"jg.h\\\":\"j_9889\"";
+ String b = "\"u-y_t_y-t\":\"u.gfh\",\\\"jg_h\\\":\"j_9889\"";
+
+ assertEquals(Util.replaceDotInKey(a), b);
+ }
+}
--- /dev/null
+
+server.port = 1680
+
+
+
+#For Beijing lab
+#dmaapZookeeperHostPort=zookeeper.mr01.onap.vip:80
+#dmaapKafkaHostPort=kafka.mr01.onap.vip:80
+#spring.couchbase.bootstrap-hosts=172.30.1.74
+#couchbaseHost=172.30.1.74
+
+
+#DMaaP
+#dmaapZookeeperHostPort=127.0.0.1:2181
+#dmaapKafkaHostPort=127.0.0.1:9092
+dmaapZookeeperHostPort=message-router-zookeeper:2181
+dmaapKafkaHostPort=message-router-kafka:9092
+dmaapKafkaGroup=dlgroup10
+dmaapKafkaTimeout=60
+#check for new topics
+dmaapCheckNewTopicIntervalInSec=3000
+
+kafkaConsumerCount=1
+
+#tolerate inconsistency when system crash, see PullThread.run()
+async=true
+
+#Logging
+logging.level.org.springframework.web=ERROR
+logging.level.com.att.nsa.apiClient.http=ERROR
+logging.level.org.onap.datalake=DEBUG
+
+
+#DL Feeder DB: Couchbase
+couchbaseHost=dl_couchbase
+#couchbaseHost=172.30.1.74
+couchbaseUser=dmaap
+couchbasePass=dmaap1234
+couchbaseBucket=dmaap
+
+#DL Feeder DB: Elasticsearch
+elasticsearchHost=dl_es