PRIMARY KEY (`id`),\r
KEY `FK3njadtw43ieph7ftt4kxdhcko` (`db_type_id`),\r
CONSTRAINT `FK3njadtw43ieph7ftt4kxdhcko` FOREIGN KEY (`db_type_id`) REFERENCES `db_type` (`id`)\r
-) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8;\r
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
\r
CREATE TABLE `portal` (\r
`name` varchar(255) NOT NULL,\r
KEY `FKabb8e74230glxpaiai4aqsr34` (`topic_name_id`),\r
CONSTRAINT `FKabb8e74230glxpaiai4aqsr34` FOREIGN KEY (`topic_name_id`) REFERENCES `topic_name` (`id`),\r
CONSTRAINT `FKo43yi6aputq6kwqqu8eqbspm5` FOREIGN KEY (`design_type_id`) REFERENCES `design_type` (`id`)\r
-) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;\r
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
\r
CREATE TABLE `kafka` (\r
- `id` varchar(255) NOT NULL,\r
+ `id` int(11) NOT NULL AUTO_INCREMENT,\r
`broker_list` varchar(255) NOT NULL,\r
`consumer_count` int(11) DEFAULT 3,\r
`enabled` bit(1) NOT NULL,\r
) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
\r
CREATE TABLE `map_kafka_topic` (\r
- `kafka_id` varchar(255) NOT NULL,\r
+ `kafka_id` int(11) NOT NULL,\r
`topic_id` int(11) NOT NULL,\r
PRIMARY KEY (`topic_id`,`kafka_id`),\r
KEY `FKtdrme4h7rxfh04u2i2wqu23g5` (`kafka_id`),\r
,timeout_sec\r
,zk\r
) VALUES (\r
- 'KAFKA_1'\r
+ 1\r
,'main Kafka cluster' -- name - IN varchar(255)\r
,3 -- consumer_count - IN int(11)\r
,1 -- enabled - IN bit(1)\r
@PutMapping("/{id}")
@ResponseBody
@ApiOperation(value="Update a kafka.")
- public PostReturnBody<KafkaConfig> updateKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, @PathVariable String id, HttpServletResponse response) throws IOException {
+ public PostReturnBody<KafkaConfig> updateKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, @PathVariable int id, HttpServletResponse response) throws IOException {
if (result.hasErrors()) {
sendError(response, 400, "Error parsing KafkaConfig : "+result.toString());
@DeleteMapping("/{id}")
@ResponseBody
@ApiOperation(value="delete a kafka.")
- public void deleteKafka(@PathVariable("id") String id, HttpServletResponse response) throws IOException{
+ public void deleteKafka(@PathVariable("id") int id, HttpServletResponse response) throws IOException{
Kafka oldKafka = kafkaService.getKafkaById(id);
if (oldKafka == null) {
@GetMapping("/dmaap/{kafkaId}")
@ResponseBody
@ApiOperation(value = "List all topic names in DMaaP.")
- public List<String> listDmaapTopics(@PathVariable("kafkaId") String kafkaId ) {
+ public List<String> listDmaapTopics(@PathVariable("kafkaId") int kafkaId ) {
Kafka kafka = kafkaRepository.findById(kafkaId).get();
DmaapService dmaapService = context.getBean(DmaapService.class, kafka);
return dmaapService.getTopics();
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.FetchType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.JoinColumn;
import javax.persistence.JoinTable;
@Table(name = "kafka")
public class Kafka {
@Id
- @Column(name="`id`")
- private String id;
+ @GeneratedValue(strategy = GenerationType.IDENTITY)
+ @Column(name = "`id`")
+ private int id;
@Column(name="`name`", nullable = false)
private String name;
if (this.getClass() != obj.getClass())
return false;
- return id.equals(((Kafka) obj).getId());
+ return id == ((Kafka) obj).getId();
}
@Override
public int hashCode() {
- return id.hashCode();
+ return id;
}
public KafkaConfig getKafkaConfig() {
tConfig.setEnabledSinkdbs(enabledDbList);
Set<Kafka> topicKafka = getKafkas();
- List<String> kafkaList = new ArrayList<>();
+ List<Integer> kafkaList = new ArrayList<>();
if (topicKafka != null) {
for (Kafka kafka : topicKafka) {
kafkaList.add(kafka.getId());
}
}
- tConfig.setSinkKafkas(kafkaList);
+ tConfig.setKafkas(kafkaList);
return tConfig;
}
import lombok.Getter;
import lombok.Setter;
-import java.util.List;
-
/**
* JSON request body for Kafka Config.
*
@Setter
public class KafkaConfig {
- private String id;
+ private int id;
private String name;
import java.util.List;
-import org.apache.commons.lang3.StringUtils;
-import org.json.JSONObject;
-import org.onap.datalake.feeder.enumeration.DataFormat;
-
/**
* JSON request body for Topic manipulation.
*
private String messageIdPath;
private String aggregateArrayPath;
private String flattenArrayPath;
- private List<String> sinkKafkas;
+ private List<Integer> kafkas;
@Override
public String toString() {
*/
package org.onap.datalake.feeder.enumeration;
+import org.onap.datalake.feeder.service.db.CouchbaseService;
+import org.onap.datalake.feeder.service.db.DbStoreService;
+import org.onap.datalake.feeder.service.db.ElasticsearchService;
+import org.onap.datalake.feeder.service.db.HdfsService;
+import org.onap.datalake.feeder.service.db.MongodbService;
+
/**
* Database type
*
*
*/
public enum DbTypeEnum {
- CB("Couchbase"), DRUID("Druid"), ES("Elasticsearch"), HDFS("HDFS"), MONGO("MongoDB"), KIBANA("Kibana"), SUPERSET("Superset");
+ CB("Couchbase", CouchbaseService.class)
+ , DRUID("Druid", null)
+ , ES("Elasticsearch", ElasticsearchService.class)
+ , HDFS("HDFS", HdfsService.class)
+ , MONGO("MongoDB", MongodbService.class)
+ , KIBANA("Kibana", null)
+ , SUPERSET("Superset", null);
private final String name;
+ private final Class<? extends DbStoreService> serviceClass;
- DbTypeEnum(String name) {
+ DbTypeEnum(String name, Class<? extends DbStoreService> serviceClass) {
this.name = name;
+ this.serviceClass = serviceClass;
}
+ public Class<? extends DbStoreService> getServiceClass(){
+ return serviceClass;
+ }
}
*\r
*/ \r
\r
-public interface KafkaRepository extends CrudRepository<Kafka, String> {\r
+public interface KafkaRepository extends CrudRepository<Kafka, Integer> {\r
\r
}\r
import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.DbType;
import org.onap.datalake.feeder.enumeration.DbTypeEnum;
-import org.onap.datalake.feeder.service.db.CouchbaseService;
import org.onap.datalake.feeder.service.db.DbStoreService;
-import org.onap.datalake.feeder.service.db.ElasticsearchService;
-import org.onap.datalake.feeder.service.db.HdfsService;
-import org.onap.datalake.feeder.service.db.MongodbService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
private Map<Integer, DbStoreService> dbStoreServiceMap = new HashMap<>();
public DbStoreService findDbStoreService(Db db) {
- DbStoreService ret = dbStoreServiceMap.get(db.getId());
- if (ret != null) {
- return ret;
+ int dbId = db.getId();
+ if (dbStoreServiceMap.containsKey(dbId)) {
+ return dbStoreServiceMap.get(dbId);
}
DbType dbType = db.getDbType();
DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId());
- switch (dbTypeEnum) {
- case CB:
- ret = context.getBean(CouchbaseService.class, db);
- break;
- case ES:
- ret = context.getBean(ElasticsearchService.class, db);
- break;
- case HDFS:
- ret = context.getBean(HdfsService.class, db);
- break;
- case MONGO:
- ret = context.getBean(MongodbService.class, db);
- break;
- default:
+ Class<? extends DbStoreService> serviceClass = dbTypeEnum.getServiceClass();
+
+ if (serviceClass == null) {
log.error("Should not have come here {}", db);
- ret = null;
+ dbStoreServiceMap.put(dbId, null);
+ return null;
}
-
- dbStoreServiceMap.put(db.getId(), ret);
+
+ DbStoreService ret = context.getBean(serviceClass, db);
+ dbStoreServiceMap.put(dbId, ret);
return ret;
}
-
}
@Autowired
private KafkaRepository kafkaRepository;
- public Kafka getKafkaById(String id) {
+ public Kafka getKafkaById(int id) {
Optional<Kafka> ret = kafkaRepository.findById(id);
return ret.isPresent() ? ret.get() : null;
private final Logger log = LoggerFactory.getLogger(this.getClass());
+ //KafkaConsumer is not thread-safe.
private ThreadLocal<KafkaConsumer<String, String>> consumerLocal = new ThreadLocal<>(); //<String, String> is key-value type, in our case key is empty, value is JSON text
private boolean active = false;
storeService.saveMessages(kafka, partition.topic(), messages);
log.info("saved to topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
- if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
+ if (!async) {//for reliability, sync commit offset to Kafka right after saving the data to data store, this slows down a bit
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
private KafkaRepository kafkaRepository;
//effectiveTopic Map, 1st key is kafkaId, 2nd is topic name, the value is a list of EffectiveTopic.
- private Map<String, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>();
+ private Map<Integer, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>();
//private Map<String, TopicConfig> effectiveTopicConfigMap;
- //monitor Kafka topic list changes
- private Map<String, Set<String>> activeTopicMap;
+ //monitor Kafka topic list changes, key is kafka id, value is active Topics
+ private Map<Integer, Set<String>> activeTopicMap;
- private ThreadLocal<Map<String, Integer>> activeTopicsVersionLocal = ThreadLocal.withInitial(HashMap::new);//topic name:version
- private Map<String, Integer> currentActiveTopicsVersionMap = new HashMap<>();//topic name:version
- private Map<String, DmaapService> dmaapServiceMap = new HashMap<>();//kafka id:DmaapService
+ private ThreadLocal<Map<Integer, Integer>> activeTopicsVersionLocal = ThreadLocal.withInitial(HashMap::new);//kafkaId:version - local 'old' version
+ private Map<Integer, Integer> currentActiveTopicsVersionMap = new HashMap<>();//kafkaId:version - current/latest version
+ private Map<Integer, DmaapService> dmaapServiceMap = new HashMap<>();//kafka id:DmaapService
private boolean active = false;
}
public boolean isActiveTopicsChanged(Kafka kafka) {//update=true means sync local version
- String kafkaId = kafka.getId();
+ int kafkaId = kafka.getId();
int currentActiveTopicsVersion = currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1);//init did one version
int localActiveTopicsVersion = activeTopicsVersionLocal.get().getOrDefault(kafkaId, 0);
}
try {
- Map<String, Set<String>> newTopicsMap = poll();
+ Map<Integer, Set<String>> newTopicsMap = poll();
- for(Map.Entry<String, Set<String>> entry:newTopicsMap.entrySet()) {
- String kafkaId = entry.getKey();
+ for(Map.Entry<Integer, Set<String>> entry:newTopicsMap.entrySet()) {
+ Integer kafkaId = entry.getKey();
Set<String> newTopics = entry.getValue();
Set<String> activeTopics = activeTopicMap.get(kafkaId);
active = false;
}
- private Map<String, Set<String>> poll() throws IOException {
- Map<String, Set<String>> ret = new HashMap<>();
+ private Map<Integer, Set<String>> poll() throws IOException {
+ Map<Integer, Set<String>> ret = new HashMap<>();
Iterable<Kafka> kafkas = kafkaRepository.findAll();
for (Kafka kafka : kafkas) {
if (kafka.isEnabled()) {
}
Set<Kafka> relateKafka = new HashSet<>();
- if (tConfig.getSinkKafkas() != null) {
- for (String item : tConfig.getSinkKafkas()) {
+ if (tConfig.getKafkas() != null) {
+ for (int item : tConfig.getKafkas()) {
Optional<Kafka> sinkKafka = kafkaRepository.findById(item);
if (sinkKafka.isPresent()) {
relateKafka.add(sinkKafka.get());
ApplicationConfiguration config;
private Db couchbase;
+ //Bucket is thread-safe. https://docs.couchbase.com/java-sdk/current/managing-connections.html
Bucket bucket;
public CouchbaseService(Db db) {
@Autowired
private ApplicationConfiguration config;
- private RestHighLevelClient client;
+ private RestHighLevelClient client;//thread safe
ActionListener<BulkResponse> listener;
public ElasticsearchService(Db db) {
private ApplicationConfiguration config;
private boolean dbReady = false;
- //@Autowired
-// private DbService dbService;
-
private MongoDatabase database;
private MongoClient mongoClient;
+ //MongoCollection is ThreadSafe
private Map<String, MongoCollection<Document>> mongoCollectionMap = new HashMap<>();
private InsertManyOptions insertManyOptions;
init.invoke(topicConfigPollingService);
Set<String> activeTopics = new HashSet<>(Arrays.asList("test"));
- Map<String, Set<String>> activeTopicMap = new HashMap<>();
- activeTopicMap.put(KAFKA_NAME, activeTopics);
+ Map<Integer, Set<String>> activeTopicMap = new HashMap<>();
+ activeTopicMap.put(1, activeTopics);
Field activeTopicsField = TopicConfigPollingService.class.getDeclaredField("activeTopicMap");
activeTopicsField.setAccessible(true);
@Test
public void testGet() {
Kafka kafka = TestUtil.newKafka(KAFKA_NAME);
+ kafka.setId(1);
//assertNull(topicConfigPollingService.getEffectiveTopic (kafka, "test"));
assertNotNull(topicConfigPollingService.getActiveTopics(kafka));
db.setDbType(dbType);
Kafka kafka = new Kafka();
- kafka.setId("1234");
+ kafka.setName("1234");
kafkas.add(kafka);
TopicName topicName = new TopicName();
public static Kafka newKafka(String name) {
Kafka kafka = new Kafka();
- kafka.setId(name);
+ kafka.setId(i++);
+ kafka.setName(name);
return kafka ;
}