) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
\r
\r
-insert into db (`name`,`host`,`login`,`pass`,`database`) values ('Couchbase','dl_couchbase','dl','dl1234','datalake');\r
+insert into db (`name`,`host`,`login`,`pass`,`database_name`) values ('Couchbase','dl_couchbase','dl','dl1234','datalake');\r
insert into db (`name`,`host`) values ('Elasticsearch','dl_es');\r
-insert into db (`name`,`host`,`port`,`database`) values ('MongoDB','dl_mongodb',27017,'datalake');\r
+insert into db (`name`,`host`,`port`,`database_name`) values ('MongoDB','dl_mongodb',27017,'datalake');\r
insert into db (`name`,`host`) values ('Druid','dl_druid');\r
\r
\r
private boolean async;
private boolean enableSSL;
+ private String timestampLabel;
+ private String rawDataLabel;
+
+ private String defaultTopicName;
}
Db delDb = dbRepository.findByName(dbName);
if (delDb == null) {
sendError(response, 404, "Db not found: " + dbName);
+ return;
}
Set<Topic> topicRelation = delDb.getTopics();
topicRelation.clear();
import io.swagger.annotations.ApiOperation;
/**
- * This controller manages topic settings.
+ * This controller manages topic settings.
*
- * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is used for that topic.
- * All the settings are saved in database.
- * topic "_DL_DEFAULT_" is populated at setup by a DB script.
+ * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's
+ * enabled=null, _DL_DEFAULT_.enabled is used for that topic. All the settings
+ * are saved in database. topic "_DL_DEFAULT_" is populated at setup by a DB
+ * script.
*
* @author Guobiao Mo
*
*/
@RestController
-@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })//, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE})
+@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE }) //, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE})
public class TopicController {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
private TopicRepository topicRepository;
-
+
@Autowired
private TopicService topicService;
@Autowired
private DbService dbService;
-
+
@GetMapping("/dmaap/")
@ResponseBody
- @ApiOperation(value="List all topic names in DMaaP.")
+ @ApiOperation(value = "List all topic names in DMaaP.")
public List<String> listDmaapTopics() throws IOException {
return dmaapService.getTopics();
}
@GetMapping("/")
@ResponseBody
- @ApiOperation(value="List all topics' settings.")
+ @ApiOperation(value = "List all topics' settings.")
public Iterable<Topic> list() throws IOException {
Iterable<Topic> ret = topicRepository.findAll();
return ret;
@GetMapping("/{topicName}")
@ResponseBody
- @ApiOperation(value="Get a topic's settings.")
+ @ApiOperation(value = "Get a topic's settings.")
public Topic getTopic(@PathVariable("topicName") String topicName) throws IOException {
Topic topic = topicService.getTopic(topicName);
return topic;
@GetMapping("/{topicName}/dbs")
@ResponseBody
- @ApiOperation(value="Get all DBs in a topic.")
+ @ApiOperation(value = "Get all DBs in a topic.")
public Set<Db> getTopicDbs(@PathVariable("topicName") String topicName) throws IOException {
Topic topic = topicService.getTopic(topicName);
Set<Db> dbs = topic.getDbs();
//One exception is that old DBs are kept
@PutMapping("/")
@ResponseBody
- @ApiOperation(value="Update a topic.")
+ @ApiOperation(value = "Update a topic.")
public Topic updateTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException {
if (result.hasErrors()) {
- sendError(response, 400, "Error parsing Topic: "+result.toString());
- return null;
+ sendError(response, 400, "Error parsing Topic: " + result.toString());
+ return null;
}
Topic oldTopic = getTopic(topic.getName());
if (oldTopic == null) {
- sendError(response, 404, "Topic not found "+topic.getName());
- return null;
+ sendError(response, 404, "Topic not found " + topic.getName());
+ return null;
} else {
- if(!topic.isDefault()) {
+ if (!topicService.istDefaultTopic(topic)) {
Topic defaultTopic = topicService.getDefaultTopic();
topic.setDefaultTopic(defaultTopic);
}
-
+
topic.setDbs(oldTopic.getDbs());
topicRepository.save(topic);
return topic;
}
}
-
+
@PostMapping("/")
@ResponseBody
- @ApiOperation(value="Create a new topic.")
+ @ApiOperation(value = "Create a new topic.")
public Topic createTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException {
-
+
if (result.hasErrors()) {
- sendError(response, 400, "Error parsing Topic: "+result.toString());
+ sendError(response, 400, "Error parsing Topic: " + result.toString());
return null;
}
Topic oldTopic = getTopic(topic.getName());
if (oldTopic != null) {
- sendError(response, 400, "Topic already exists "+topic.getName());
+ sendError(response, 400, "Topic already exists " + topic.getName());
return null;
} else {
- if(!topic.isDefault()) {
+ if (!topicService.istDefaultTopic(topic)) {
Topic defaultTopic = topicService.getDefaultTopic();
topic.setDefaultTopic(defaultTopic);
}
-
+
topicRepository.save(topic);
return topic;
}
@DeleteMapping("/{topicName}/db/{dbName}")
@ResponseBody
- @ApiOperation(value="Delete a DB from a topic.")
+ @ApiOperation(value = "Delete a DB from a topic.")
public Set<Db> deleteDb(@PathVariable("topicName") String topicName, @PathVariable("dbName") String dbName, HttpServletResponse response) throws IOException {
Topic topic = topicService.getTopic(topicName);
Set<Db> dbs = topic.getDbs();
dbs.remove(new Db(dbName));
-
+
topicRepository.save(topic);
- return topic.getDbs();
+ return topic.getDbs();
}
@PutMapping("/{topicName}/db/{dbName}")
@ResponseBody
- @ApiOperation(value="Add a DB to a topic.")
+ @ApiOperation(value = "Add a DB to a topic.")
public Set<Db> addDb(@PathVariable("topicName") String topicName, @PathVariable("dbName") String dbName, HttpServletResponse response) throws IOException {
Topic topic = topicService.getTopic(topicName);
- Set<Db> dbs = topic.getDbs();
+ Set<Db> dbs = topic.getDbs();
- Db db = dbService.getDb(dbName);
+ Db db = dbService.getDb(dbName);
dbs.add(db);
-
+
topicRepository.save(topic);
- return topic.getDbs();
+ return topic.getDbs();
}
-
+
private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
log.info(msg);
- response.sendError(sc, msg);
+ response.sendError(sc, msg);
}
}
this.name = name;
}
- public boolean isDefault() {
- return "_DL_DEFAULT_".equals(name);
- }
-
public boolean isEnabled() {
return is(enabled, Topic::isEnabled);
}
import javax.annotation.PreDestroy;
import org.json.JSONObject;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.slf4j.Logger;
private final Logger log = LoggerFactory.getLogger(this.getClass());
+ @Autowired
+ ApplicationConfiguration config;
+
@Autowired
private DbService dbService;
log.info("Connect to Couchbase {}", couchbase.getHost());
// Create a N1QL Primary Index (but ignore if it exists)
bucket.bucketManager().createN1qlPrimaryIndex(true, false);
+ isReady = true;
}
catch(Exception ex)
{
isReady = false;
}
- isReady = true;
}
@PreDestroy
//convert to Couchbase JsonObject from org.json JSONObject
JsonObject jsonObject = JsonObject.fromJson(json.toString());
- long timestamp = jsonObject.getLong("_ts");//this is Kafka time stamp, which is added in StoreService.messageToJson()
+ long timestamp = jsonObject.getLong(config.getTimestampLabel());//this is Kafka time stamp, which is added in StoreService.messageToJson()
//setup TTL
int expiry = (int) (timestamp/1000L) + topic.getTtl()*3600*24; //in second
documents.add(doc);
}
saveDocuments(documents);
+ log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size());
}
public String getId(Topic topic, JSONObject json) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
messages.add(Pair.of(record.timestamp(), record.value()));
- log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
+ //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
}
storeService.saveMessages(partition.topic(), messages);
log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
+import java.util.Map;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.json.JSONException;
import org.json.JSONObject;
import org.json.XML;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.enumeration.DataFormat;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-
+
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
/**
* Service to store messages to varieties of DBs
*
- * comment out YAML support, since AML is for config and don't see this data type in DMaaP. Do we need to support XML?
+ * comment out YAML support, since AML is for config and don't see this data
+ * type in DMaaP. Do we need to support XML?
*
* @author Guobiao Mo
*
public class StoreService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
+ @Autowired
+ private ApplicationConfiguration config;
+
@Autowired
private TopicService topicService;
@Autowired
private ElasticsearchService elasticsearchService;
- private Map<String, Topic> topicMap = new HashMap<>();
+ private Map<String, Topic> topicMap = new HashMap<>();
- private ObjectMapper yamlReader;
+ private ObjectMapper yamlReader;
@PostConstruct
private void init() {
String text = pair.getRight();
//for debug, to be remove
-// String topicStr = topic.getId();
-// if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) {
- // log.debug("{} ={}", topicStr, text);
+ // String topicStr = topic.getId();
+ // if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) {
+ // log.debug("{} ={}", topicStr, text);
//}
boolean storeRaw = topic.isSaveRaw();
- JSONObject json = null;
+ JSONObject json = null;
DataFormat dataFormat = topic.getDataFormat();
break;
case XML://XML and YAML can be directly inserted into ES, we may not need to convert it to JSON
json = XML.toJSONObject(text);
- break;
+ break;
case YAML:// Do we need to support YAML?
Object obj = yamlReader.readValue(text, Object.class);
ObjectMapper jsonWriter = new ObjectMapper();
//FIXME for debug, to be remove
json.remove("_id");
json.remove("_dl_text_");
+ json.remove("_dl_type_");
- json.put("_ts", timestamp);
+ json.put(config.getTimestampLabel(), timestamp);
if (storeRaw) {
- json.put("_text", text);
+ json.put(config.getRawDataLabel(), text);
}
return json;
import java.io.IOException;
import java.util.Optional;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.repository.TopicRepository;
import org.slf4j.Logger;
private final Logger log = LoggerFactory.getLogger(this.getClass());
+ @Autowired
+ private ApplicationConfiguration config;
+
@Autowired
private TopicRepository topicRepository;
}
public Topic getDefaultTopic() {
- return getTopic("_DL_DEFAULT_");
+ return getTopic(config.getDefaultTopicName());
}
+ public boolean istDefaultTopic(Topic topic) {
+ if (topic == null) {
+ return false;
+ }
+ return topic.getName().equals(config.getDefaultTopicName());
+ }
}
#SSL global flag, if enabled, still need to check each individual DB SSL flag
enableSSL=false
+#names for extra fields that DL adds to each record
+timestampLabel=datalake_ts_
+rawDataLabel=datalake_text_
+
+defaultTopicName=_DL_DEFAULT_
+
+
#Logging
logging.level.org.springframework.web=ERROR
logging.level.com.att.nsa.apiClient.http=ERROR
assertTrue(config.getKafkaConsumerCount() > 0);
assertNotNull(config.isAsync());
assertNotNull(config.isEnableSSL());
+ assertNotNull(config.getDefaultTopicName());
+ assertNotNull(config.getRawDataLabel());
+ assertNotNull(config.getTimestampLabel());
}
}
Topic testTopic = new Topic("test");
testTopic.setDefaultTopic(defaultTopic);
- assertTrue(defaultTopic.isDefault());
- assertFalse(testTopic.isDefault());
-
assertTrue(testTopic.equals(new Topic("test")));
assertEquals(testTopic.hashCode(), (new Topic("test")).hashCode());
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Topic;
import java.util.ArrayList;
@Test
public void testSaveJsonsWithTopicId() {
+ ApplicationConfiguration appConfig = new ApplicationConfiguration();
+ appConfig.setTimestampLabel("datalake_ts_");
String text = "{ data: { data2 : { value : 'hello'}}}";
Topic topic = new Topic("test getMessageId");
topic.setMessageIdPath("/data/data2/value");
List<JSONObject> jsons = new ArrayList<>();
- json.put("_ts", 1234);
+ json.put(appConfig.getTimestampLabel(), 1234);
jsons.add(json);
CouchbaseService couchbaseService = new CouchbaseService();
couchbaseService.bucket = bucket;
+ couchbaseService.config = appConfig;
couchbaseService.saveJsons(topic, jsons);
}
@Test
public void testSaveJsonsWithOutTopicId() {
+ ApplicationConfiguration appConfig = new ApplicationConfiguration();
+ appConfig.setTimestampLabel("datalake_ts_");
String text = "{ data: { data2 : { value : 'hello'}}}";
Topic topic = new Topic("test getMessageId");
List<JSONObject> jsons = new ArrayList<>();
- json.put("_ts", 1234);
+ json.put(appConfig.getTimestampLabel(), 1234);
jsons.add(json);
CouchbaseService couchbaseService = new CouchbaseService();
couchbaseService.bucket = bucket;
+ couchbaseService.config = appConfig;
couchbaseService.saveJsons(topic, jsons);
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.repository.TopicRepository;
@RunWith(MockitoJUnitRunner.class)
public class TopicServiceTest {
+ static String DEFAULT_TOPIC_NAME = "_DL_DEFAULT_";
+
+ @Mock
+ private ApplicationConfiguration config;
+
@Mock
private TopicRepository topicRepository;
@Test
public void testGetDefaultTopic() {
- String name = "_DL_DEFAULT_";
- when(topicRepository.findById(name)).thenReturn(Optional.of(new Topic(name)));
- assertEquals(topicService.getDefaultTopic(), new Topic(name));
+ when(topicRepository.findById(DEFAULT_TOPIC_NAME)).thenReturn(Optional.of(new Topic(DEFAULT_TOPIC_NAME)));
+ when(config.getDefaultTopicName()).thenReturn(DEFAULT_TOPIC_NAME);
+ assertEquals(topicService.getDefaultTopic(), new Topic(DEFAULT_TOPIC_NAME));
+ assertTrue(topicService.istDefaultTopic(new Topic(DEFAULT_TOPIC_NAME)));
}
@Test(expected = IOException.class)
#tolerate inconsistency when system crash, see PullThread.run()
async=true
+#SSL global flag, if enabled, still need to check each individual DB SSL flag
+enableSSL=false
+
+#names for extra fields that DL adds to each record
+timestampLabel=datalake_ts_
+rawDataLabel=datalake_text_
+
+defaultTopicName=_DL_DEFAULT_
+
+
#Logging
logging.level.org.springframework.web=ERROR
logging.level.com.att.nsa.apiClient.http=ERROR