* ================================================================================\r
* Copyright 2018 China Mobile\r
*=================================================================================\r
-* Licensed under the Apache License, Version 2.0 (the "License");\r
+* Licensed under the Apache License, Version 2.0 (the "License")\r
* you may not use this file except in compliance with the License.\r
* You may obtain a copy of the License at\r
*\r
@Column(name = "correlate_cleared_message")
private Boolean correlateClearedMessage;
- //the value in the JSON with this path will be used as DB id
+ //paths to the values in the JSON that are used to composite DB id, comma separated, example: "/event-header/id,/event-header/entity-type,/entity/product-name"
@Column(name = "message_id_path")
private String messageIdPath;
cluster.authenticate(couchbase.getLogin(), couchbase.getPass());
bucket = cluster.openBucket(couchbase.getDatabase());
- log.info("Connect to Couchbase " + couchbase.getHost());
+ log.info("Connect to Couchbase {}", couchbase.getHost());
// Create a N1QL Primary Index (but ignore if it exists)
bucket.bucketManager().createN1qlPrimaryIndex(true, false);
import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.repository.DbRepository;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class DbService {
- private final Logger log = LoggerFactory.getLogger(this.getClass());
-
@Autowired
private DbRepository dbRepository;
import javax.annotation.PreDestroy;
import org.apache.http.HttpHost;
-import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
private RestHighLevelClient client;
ActionListener<BulkResponse> listener;
+
+//ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication
+//Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html
@PostConstruct
private void init() {
Db elasticsearch = dbService.getElasticsearch();
// Initialize the Connection
client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http")));
- log.info("Connect to Elasticsearch Host " + elasticsearchHost);
+ log.info("Connect to Elasticsearch Host {}", elasticsearchHost);
listener = new ActionListener<BulkResponse>() {
@Override
if(!exists){
CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
- log.info(createIndexResponse.index()+" : created "+createIndexResponse.isAcknowledged());
+ log.info("{} : created {}", createIndexResponse.index(), createIndexResponse.isAcknowledged());
}
}
private boolean isRunning = false;
private ExecutorService executorService;
private List<PullThread> consumers;
-
+
@Autowired
private ApplicationContext context;
-
+
@Autowired
private ApplicationConfiguration config;
public boolean isRunning() {
return isRunning;
}
-
+
/**
* start pulling.
*
* @throws IOException
*/
- public synchronized void start() throws IOException {
+ public synchronized void start() {
if (isRunning) {
return;
}
executorService = Executors.newFixedThreadPool(numConsumers);
consumers = new ArrayList<>(numConsumers);
- for (int i = 0; i < numConsumers; i++) {
+ for (int i = 0; i < numConsumers; i++) {
PullThread puller = context.getBean(PullThread.class, i);
consumers.add(puller);
executorService.submit(puller);
}
isRunning = true;
-
- Runtime.getRuntime().addShutdownHook(new Thread(()->shutdown())) ;
+
+ Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
}
/**
for (PullThread puller : consumers) {
puller.shutdown();
}
-
+
executorService.shutdown();
-
+
try {
executorService.awaitTermination(10L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("executor.awaitTermination", e);
Thread.currentThread().interrupt();
}
-
+
isRunning = false;
}
import java.io.IOException;
import java.util.Optional;
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.repository.TopicRepository;
import org.slf4j.Logger;
@Autowired
private ElasticsearchService elasticsearchService;
- @PostConstruct
- private void init() {
- }
-
- @PreDestroy
- public void cleanUp() {
- }
-
public Topic getEffectiveTopic(String topicStr) {
try {
return getEffectiveTopic(topicStr, false);
return replaceDotInKey(newJson);// there maybe more to replace
}
}
-
- public static void main(String[] args) {
- String a = "\"u-y.t.y-t\":\"u.gfh\",\\\"jg.h\\\":\"j_9889\"";
- String b = replaceDotInKey(a);
- System.out.println(a);
- System.out.println(b);
- }
}
--- /dev/null
+package com.mongodb.internal.validator;\r
+\r
+/*\r
+* ============LICENSE_START=======================================================\r
+* ONAP : DataLake\r
+* ================================================================================\r
+* Copyright 2018 China Mobile\r
+*=================================================================================\r
+* Licensed under the Apache License, Version 2.0 (the "License")\r
+* you may not use this file except in compliance with the License.\r
+* You may obtain a copy of the License at\r
+*\r
+* http://www.apache.org/licenses/LICENSE-2.0\r
+*\r
+* Unless required by applicable law or agreed to in writing, software\r
+* distributed under the License is distributed on an "AS IS" BASIS,\r
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+* See the License for the specific language governing permissions and\r
+* limitations under the License.\r
+* ============LICENSE_END=========================================================\r
+*/\r
+import org.junit.Test;\r
+\r
+import static org.junit.Assert.assertFalse;\r
+import static org.junit.Assert.assertTrue;\r
+\r
+/**\r
+ * test CollectibleDocumentFieldNameValidator\r
+ * \r
+ * @author Guobiao Mo\r
+ *\r
+ */\r
+public class CollectibleDocumentFieldNameValidatorTest {\r
+ @Test\r
+ public void validate() {\r
+ CollectibleDocumentFieldNameValidator validator = new CollectibleDocumentFieldNameValidator();\r
+\r
+ assertTrue(validator.validate("$id"));\r
+ assertFalse(validator.validate("$abc"));\r
+ assertTrue(validator.validate("abc.abc"));\r
+\r
+ assertTrue(validator == validator.getValidatorForField("any"));\r
+\r
+ }\r
+\r
+ @Test(expected = IllegalArgumentException.class)\r
+ public void validateNull() {\r
+ CollectibleDocumentFieldNameValidator validator = new CollectibleDocumentFieldNameValidator();\r
+\r
+ assertTrue(validator.validate(null));\r
+ }\r
+}\r
public class ApplicationTest {
@Test
- //only dot(.) in key got replaced
- public void replaceDotInKey() {
+ public void testRunner() {
Application application = new Application();
application.commandLineRunner(new PullService());
}
assertTrue(config.getKafkaConsumerCount() > 0);
assertNotNull(config.isAsync());
+ assertNotNull(config.isEnableSSL());
}
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
topic.setMessageIdPath("/data/data2/value,/data/data3");
String value = topic.getMessageId(json);
-
assertEquals(value, "hello^world");
+ topic.setMessageIdPath("");
+ assertNull(topic.getMessageId(json));
+
Topic defaultTopic = new Topic("_DL_DEFAULT_");
Topic testTopic = new Topic("test");
testTopic.setDefaultTopic(defaultTopic);
--- /dev/null
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.when;
+
+import java.util.Optional;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.repository.DbRepository;
+
+/**
+ * Test Service for Dbs
+ *
+ * @author Guobiao Mo
+ *
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class DbServiceTest {
+
+ @Mock
+ private DbRepository dbRepository;
+
+ @InjectMocks
+ private DbService dbService;
+
+ @Test
+ public void testGetDb() {
+ String name = "a";
+ when(dbRepository.findById(name)).thenReturn(Optional.of(new Db(name)));
+ assertEquals(dbService.getDb(name), new Db(name));
+ }
+
+ @Test
+ public void testGetDbNull() {
+ String name = null;
+ when(dbRepository.findById(name)).thenReturn(Optional.empty());
+ assertNull(dbService.getDb(name));
+ }
+
+ @Test
+ public void testGetCouchbase() {
+ String name = "Couchbase";
+ when(dbRepository.findById(name)).thenReturn(Optional.of(new Db(name)));
+ assertEquals(dbService.getCouchbase(), new Db(name));
+ }
+
+ @Test
+ public void testGetElasticsearch() {
+ String name = "Elasticsearch";
+ when(dbRepository.findById(name)).thenReturn(Optional.of(new Db(name)));
+ assertEquals(dbService.getElasticsearch(), new Db(name));
+ }
+
+ @Test
+ public void testGetMongoDB() {
+ String name = "MongoDB";
+ when(dbRepository.findById(name)).thenReturn(Optional.of(new Db(name)));
+ assertEquals(dbService.getMongoDB(), new Db(name));
+ }
+
+ @Test
+ public void testGetDruid() {
+ String name = "Druid";
+ when(dbRepository.findById(name)).thenReturn(Optional.of(new Db(name)));
+ assertEquals(dbService.getDruid(), new Db(name));
+ }
+
+}
--- /dev/null
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.repository.TopicRepository;
+
+/**
+ * Test Service for Topic
+ *
+ * @author Guobiao Mo
+ *
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TopicServiceTest {
+
+ @Mock
+ private TopicRepository topicRepository;
+
+ @Mock
+ private ElasticsearchService elasticsearchService;
+
+ @InjectMocks
+ private TopicService topicService;
+
+ @Test
+ public void testGetTopic() {
+ String name = "a";
+ when(topicRepository.findById(name)).thenReturn(Optional.of(new Topic(name)));
+ assertEquals(topicService.getTopic(name), new Topic(name));
+ }
+
+ @Test
+ public void testGetTopicNull() {
+ String name = null;
+ when(topicRepository.findById(name)).thenReturn(Optional.empty());
+ assertNull(topicService.getTopic(name));
+ }
+
+ @Test
+ public void testGetDefaultTopic() {
+ String name = "_DL_DEFAULT_";
+ when(topicRepository.findById(name)).thenReturn(Optional.of(new Topic(name)));
+ assertEquals(topicService.getDefaultTopic(), new Topic(name));
+ }
+
+ @Test(expected = IOException.class)
+ public void testGetEffectiveTopic() throws IOException {
+ String name = "a";
+ Topic topic = new Topic(name);
+ topic.setEnabled(true);
+ Set<Db> dbSet = new HashSet<>();
+ dbSet.add(new Db("Elasticsearch"));
+ topic.setDbs(dbSet);
+
+ when(topicRepository.findById(name)).thenReturn(Optional.of(topic));
+ when(topicRepository.findById(null)).thenReturn(Optional.empty());
+ doThrow(IOException.class).when(elasticsearchService).ensureTableExist(name);
+
+ assertEquals(topicService.getEffectiveTopic(name), topicService.getEffectiveTopic(name, false));
+
+ assertNotNull(topicService.getEffectiveTopic(null));
+
+ topicService.getEffectiveTopic(name, true);
+ }
+}
*/
public class UtilTest {
- @Test
- //only dot(.) in key got replaced
- public void replaceDotInKey() {
- String a = "\"u-y.t.y-t\":\"u.gfh\",\\\"jg.h\\\":\"j_9889\"";
- String b = "\"u-y_t_y-t\":\"u.gfh\",\\\"jg_h\\\":\"j_9889\"";
+ @Test
+ //only dot(.) in key got replaced
+ public void replaceDotInKey() {
+ new Util();
- assertEquals(Util.replaceDotInKey(a), b);
+ String a = "\"u-y.t.y-t\":\"u.gfh\",\\\"jg.h\\\":\"j_9889\"";
+ String b = "\"u-y_t_y-t\":\"u.gfh\",\\\"jg_h\\\":\"j_9889\"";
- String[] strArray2 = {"test1", "test2", "test3"};
- Util.main(strArray2);
- }
+ assertEquals(Util.replaceDotInKey(a), b);
+ }
+
+ @Test(expected = IOException.class)
+ public void validateNull() throws IOException {
+ Util.getTextFromFile("no_such_file");
+ }
}
<java.version>1.8</java.version>
<mongojava.version>3.10.1</mongojava.version>
- <springboot.version>2.1.0.RELEASE</springboot.version>
+ <springboot.version>2.1.4.RELEASE</springboot.version>
<springcouchbase.version>3.1.2.RELEASE</springcouchbase.version>
- <jackson.version>2.9.6</jackson.version>
+ <jackson.version>2.9.8</jackson.version>
<kafka.version>2.0.0</kafka.version>
<elasticsearchjava.version>7.0.0</elasticsearchjava.version>