2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright 2019 China Mobile
 
   6  *=================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *     http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.datalake.feeder.service;
 
  23 import static org.junit.Assert.assertTrue;
 
  24 import static org.mockito.Mockito.when;
 
  26 import java.lang.reflect.InvocationTargetException;
 
  27 import java.lang.reflect.Method;
 
  28 import java.util.ArrayList;
 
  29 import java.util.List;
 
  31 import org.apache.commons.lang3.tuple.Pair;
 
  32 import org.junit.Test;
 
  33 import org.junit.runner.RunWith;
 
  34 import org.mockito.InjectMocks;
 
  35 import org.mockito.Mock;
 
  36 import org.mockito.junit.MockitoJUnitRunner;
 
  37 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 
  38 import org.onap.datalake.feeder.domain.Kafka;
 
  39 import org.onap.datalake.feeder.dto.TopicConfig;
 
  40 import org.springframework.context.ApplicationContext;
 
  48 @RunWith(MockitoJUnitRunner.class)
 
  49 public class StoreServiceTest {
 
  52         private StoreService storeService = new StoreService();
 
  55         private ApplicationContext context;
 
  58         private ApplicationConfiguration config;
 
  61         private TopicConfigPollingService configPollingService;
 
  64         private MongodbService mongodbService;
 
  67         private CouchbaseService couchbaseService;
 
  70         private ElasticsearchService elasticsearchService;
 
  73         private HdfsService hdfsService;
 
  78         public void testInit() throws IllegalAccessException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
 
  79                 Method init = storeService.getClass().getDeclaredMethod("init");
 
  80                 init.setAccessible(true);
 
  81                 init.invoke(storeService);
 
  84         private TopicConfig createTopicConfig(String topicStr, String type) {
 
  86                 TopicConfig topicConfig = new TopicConfig();
 
  87                 topicConfig.setName(topicStr);
 
  88                 topicConfig.setDataFormat(type);
 
  89                 topicConfig.setSaveRaw(true);
 
  91                 when(configPollingService.getEffectiveTopicConfig(topicStr)).thenReturn(topicConfig);
 
  97         public void saveMessages() throws IllegalAccessException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
 
 100                 TopicConfig topicConfig = createTopicConfig("test1", "JSON");
 
 101                 topicConfig.setAggregateArrayPath("/test");
 
 102                 topicConfig.setFlattenArrayPath("/test");
 
 104                 topicConfig = createTopicConfig("test2", "XML");
 
 105                 topicConfig.setSaveRaw(false);
 
 107                 topicConfig = createTopicConfig("test3", "YAML");
 
 109                 topicConfig.setSinkdbs(new ArrayList<>());
 
 110                 topicConfig.getSinkdbs().add("Elasticsearch");
 
 111                 topicConfig.getSinkdbs().add("Couchbase");
 
 112                 topicConfig.getSinkdbs().add("Druid");
 
 113                 topicConfig.getSinkdbs().add("MongoDB");
 
 114                 topicConfig.getSinkdbs().add("HDFS");
 
 117                 topicConfig.setEnabledSinkdbs(new ArrayList<>());
 
 118                 topicConfig.getEnabledSinkdbs().add("Elasticsearch");
 
 119                 assertTrue(topicConfig.supportElasticsearch());
 
 122                 createTopicConfig("test4", "TEXT");
 
 124                 when(config.getTimestampLabel()).thenReturn("ts");
 
 125                 when(config.getRawDataLabel()).thenReturn("raw");
 
 128                 List<Pair<Long, String>> messages = new ArrayList<>();
 
 129                 messages.add(Pair.of(100L, "{test: 1}"));
 
 131                 storeService.saveMessages(kafka, "test1", messages);
 
 134                 List<Pair<Long, String>> messagesXml = new ArrayList<>();
 
 135                 messagesXml.add(Pair.of(100L, "<test></test>")); 
 
 136                 messagesXml.add(Pair.of(100L, "<test></test"));//bad xml to trigger exception
 
 138                 storeService.saveMessages(kafka, "test2", messagesXml);
 
 141                 List<Pair<Long, String>> messagesYaml = new ArrayList<>();
 
 142                 messagesYaml.add(Pair.of(100L, "test: yes"));
 
 144                 storeService.saveMessages(kafka, "test3", messagesYaml);
 
 147                 List<Pair<Long, String>> messagesText = new ArrayList<>();
 
 148                 messagesText.add(Pair.of(100L, "test message"));
 
 150                 storeService.saveMessages(kafka, "test4", messagesText);
 
 153                 storeService.saveMessages(kafka, "test", null);
 
 157         public void testFlush() {
 
 158                 storeService.flush();
 
 159                 storeService.flushStall();