import java.io.IOException;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.service.PullService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.bind.annotation.*;
import io.swagger.annotations.ApiOperation;
*/
@RestController
-@RequestMapping(value = "/feeder", produces = { MediaType.TEXT_PLAIN_VALUE })
+@RequestMapping(value = "/feeder", produces = { MediaType.APPLICATION_JSON_VALUE })
public class FeederController {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
private PullService pullService;
+
+ @Autowired
+ ApplicationConfiguration config;
/**
* @return message that application is started
* @throws IOException
*/
- @GetMapping("/start")
+ @PostMapping("/start")
+ @ResponseBody
@ApiOperation(value="Start pulling data.")
public String start() throws IOException {
log.info("DataLake feeder starting to pull data from DMaaP...");
- pullService.start();
- return "DataLake feeder is running.";
+ if(pullService.isRunning() == false) {
+ pullService.start();
+ }
+ return "{\"running\": true}";
}
/**
* @return message that application stop process is triggered
*/
- @GetMapping("/stop")
+ @PostMapping("/stop")
+ @ResponseBody
@ApiOperation(value="Stop pulling data.")
- public String stop() {
- pullService.shutdown();
+ public String stop() {
+ if(pullService.isRunning() == true)
+ {
+ pullService.shutdown();
+ }
log.info("DataLake feeder is stopped.");
- return "DataLake feeder is stopped.";
+ return "{\"running\": false}";
}
/**
* @return feeder status
@ApiOperation(value="Retrieve feeder status.")
public String status() {
String status = "Feeder is running: "+pullService.isRunning();
- log.info("senting feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc.
- return status;
- }
+ log.info("senting feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc.
+
+ return "{\"version\": \""+config.getDatalakeVersion()+"\", \"running\": "+pullService.isRunning()+"}";
+ }
}
ConsumerRecords<String, String> records = ConsumerRecords.empty();
when(kafkaConsumer.poll(2)).thenReturn(records);
String start = feederController.start();
- assertEquals("DataLake feeder is running.", start);
+ assertEquals("{\"running\": true}", start);
}
@Test
FeederController feederController = new FeederController();
setAccessPrivateFields(feederController);
String stop = feederController.stop();
- assertEquals("DataLake feeder is stopped.", stop);
+ assertEquals("{\"running\": false}", stop);
}
@Test
public void testStatus() throws NoSuchFieldException, IllegalAccessException {
+ ApplicationConfiguration conf = new ApplicationConfiguration();
+ conf.setDatalakeVersion("0.0.1");
FeederController feederController = new FeederController();
+ feederController.config = conf;
setAccessPrivateFields(feederController);
String status = feederController.status();
- assertEquals("Feeder is running: false", status);
+ assertEquals("{\"version\": \"0.0.1\", \"running\": false}", status);
}
}