Unit test code for datalake seed code 62/86362/1
authorRama-Huawei <rama.subba.reddy.s@huawei.com>
Fri, 26 Apr 2019 08:22:06 +0000 (13:52 +0530)
committerRama-Huawei <rama.subba.reddy.s@huawei.com>
Fri, 26 Apr 2019 08:26:53 +0000 (13:56 +0530)
Improved Feeder Controller code coverage to 100%
Added null check for if no consumer records avaliable
then no need to process further

Issue-ID: DCAEGEN2-1309

Change-Id: Ib3eac572f1a008f9e3c958a483ce3bc1e313a904
Signed-off-by: Rama-Huawei <rama.subba.reddy.s@huawei.com>
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/FeederControllerTest.java [new file with mode: 0644]

index e9f36b2..ce671a9 100644 (file)
@@ -48,7 +48,7 @@ import org.springframework.stereotype.Service;
 
 /**
  * Thread that pulls messages from DMaaP and save them to Big Data DBs
- * 
+ *
  * @author Guobiao Mo
  *
  */
@@ -65,11 +65,11 @@ public class PullThread implements Runnable {
 
        @Autowired
        private ApplicationConfiguration config;
-               
+
        private final Logger log = LoggerFactory.getLogger(this.getClass());
 
        private KafkaConsumer<String, String> consumer; //<String, String> is key-value type, in our case key is empty, value is JSON text
-       private int id; 
+       private int id;
 
        private final AtomicBoolean active = new AtomicBoolean(false);
        private boolean async;
@@ -112,33 +112,34 @@ public class PullThread implements Runnable {
                        List<String> topics = dmaapService.getActiveTopics(); //TODO get updated topic list within loop
 
                        log.info("Thread {} going to subscribe to topics: {}", id, topics);
-                       
+
                        consumer.subscribe(topics, rebalanceListener);
 
                        while (active.get()) {
 
                                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout()));
-
-                               List<Pair<Long, String>> messages = new ArrayList<>(records.count());
-                               for (TopicPartition partition : records.partitions()) {
-                                       messages.clear();
-                                       List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
-                                       for (ConsumerRecord<String, String> record : partitionRecords) {
-                                               messages.add(Pair.of(record.timestamp(), record.value()));
-                                               //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
-                                       }
-                                       storeService.saveMessages(partition.topic(), messages);
-                                       log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
-                                       
-                                       if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
-                                               long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
-                                               consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
-                                       }
-                               }
-
-                               if (async) {//for high Throughput, async commit offset in batch to Kafka
-                                       consumer.commitAsync();
-                               }
+                if (records != null) {
+                    List<Pair<Long, String>> messages = new ArrayList<>(records.count());
+                    for (TopicPartition partition : records.partitions()) {
+                        messages.clear();
+                        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
+                        for (ConsumerRecord<String, String> record : partitionRecords) {
+                            messages.add(Pair.of(record.timestamp(), record.value()));
+                            //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
+                        }
+                        storeService.saveMessages(partition.topic(), messages);
+                        log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
+
+                        if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
+                            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+                            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
+                        }
+                    }
+
+                    if (async) {//for high Throughput, async commit offset in batch to Kafka
+                        consumer.commitAsync();
+                    }
+                }
                        }
                } catch (Exception e) {
                        log.error("Puller {} run():   exception={}", id, e.getMessage());
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/FeederControllerTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/FeederControllerTest.java
new file mode 100644 (file)
index 0000000..713d8b1
--- /dev/null
@@ -0,0 +1,115 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : DATALAKE
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.datalake.feeder.controller;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.service.DmaapService;
+import org.onap.datalake.feeder.service.PullService;
+import org.onap.datalake.feeder.service.PullThread;
+import org.springframework.context.ApplicationContext;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+
+public class FeederControllerTest {
+
+    @InjectMocks
+    private PullService pullService1;
+
+    @Mock
+    private ApplicationConfiguration config;
+
+    @Mock
+    private ApplicationContext context;
+
+    @Mock
+    private DmaapService dmaapService1;
+
+    @Mock
+    private KafkaConsumer<String, String> kafkaConsumer;
+
+    @Before
+    public void setupTest() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    private void setAccessPrivateFields(FeederController feederController) throws NoSuchFieldException,
+            IllegalAccessException {
+        Field pullService = feederController.getClass().getDeclaredField("pullService");
+        pullService.setAccessible(true);
+        pullService.set(feederController, pullService1);
+    }
+
+    @Test
+    public void testStart() throws IOException, NoSuchFieldException, IllegalAccessException {
+        FeederController feederController = new FeederController();
+        setAccessPrivateFields(feederController);
+        PullService pullService2 = new PullService();
+        Field applicationConfig = pullService2.getClass().getDeclaredField("config");
+        applicationConfig.setAccessible(true);
+        applicationConfig.set(pullService2, config);
+        Field applicationContext = pullService2.getClass().getDeclaredField("context");
+        applicationContext.setAccessible(true);
+        applicationContext.set(pullService2, context);
+        when(config.getKafkaConsumerCount()).thenReturn(1);
+        PullThread pullThread = new PullThread(1);
+        Field dmaapService = pullThread.getClass().getDeclaredField("dmaapService");
+        dmaapService.setAccessible(true);
+        dmaapService.set(pullThread, dmaapService1);
+        Field kafkaConsumer1 = pullThread.getClass().getDeclaredField("consumer");
+        kafkaConsumer1.setAccessible(true);
+        kafkaConsumer1.set(pullThread, kafkaConsumer);
+        applicationConfig = pullThread.getClass().getDeclaredField("config");
+        applicationConfig.setAccessible(true);
+        applicationConfig.set(pullThread, config);
+        when(context.getBean(PullThread.class, 0)).thenReturn(pullThread);
+        ConsumerRecords<String, String> records = ConsumerRecords.empty();
+        when(kafkaConsumer.poll(2)).thenReturn(records);
+        String start = feederController.start();
+        assertEquals("DataLake feeder is running.", start);
+    }
+
+    @Test
+    public void testStop() throws NoSuchFieldException, IllegalAccessException {
+        FeederController feederController = new FeederController();
+        setAccessPrivateFields(feederController);
+        String stop = feederController.stop();
+        assertEquals("DataLake feeder is stopped.", stop);
+    }
+
+    @Test
+    public void testStatus() throws NoSuchFieldException, IllegalAccessException {
+        FeederController feederController = new FeederController();
+        setAccessPrivateFields(feederController);
+        String status = feederController.status();
+        assertEquals("Feeder is running: false", status);
+    }
+}