2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.datalake.feeder.controller;
23 import io.swagger.annotations.ApiOperation;
24 import org.onap.datalake.feeder.controller.domain.PostReturnBody;
25 import org.onap.datalake.feeder.domain.Kafka;
26 import org.onap.datalake.feeder.dto.KafkaConfig;
27 import org.onap.datalake.feeder.repository.KafkaRepository;
28 import org.onap.datalake.feeder.service.KafkaService;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.springframework.beans.factory.annotation.Autowired;
32 import org.springframework.http.MediaType;
33 import org.springframework.validation.BindingResult;
34 import org.springframework.web.bind.annotation.*;
36 import javax.servlet.http.HttpServletResponse;
37 import java.io.IOException;
38 import java.util.List;
41 * This controller manages kafka settings
46 @RequestMapping(value = "/kafkas", produces = { MediaType.APPLICATION_JSON_VALUE })
47 public class KafkaController {
49 private final Logger log = LoggerFactory.getLogger(this.getClass());
52 private KafkaService kafkaService;
55 private KafkaRepository kafkaRepository;
59 @ApiOperation(value="Create a kafka.")
60 public PostReturnBody<KafkaConfig> createKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, HttpServletResponse response) throws IOException {
62 if (result.hasErrors()) {
63 sendError(response, 400, "Error parsing KafkaConfig : "+result.toString());
67 Kafka oldKafka = kafkaService.getKafkaById(kafkaConfig.getId());
69 if (oldKafka != null) {
70 sendError(response, 400, "kafka is exist "+kafkaConfig.getId());
75 kafka = kafkaService.fillKafkaConfiguration(kafkaConfig);
76 } catch (Exception e) {
77 log.debug("FillKafkaConfiguration failed", e.getMessage());
78 sendError(response, 400, "Error FillKafkaConfiguration: "+e.getMessage());
81 kafkaRepository.save(kafka);
82 log.info("Kafka save successed");
83 return mkPostReturnBody(200, kafka);
89 @ApiOperation(value="Update a kafka.")
90 public PostReturnBody<KafkaConfig> updateKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, @PathVariable int id, HttpServletResponse response) throws IOException {
92 if (result.hasErrors()) {
93 sendError(response, 400, "Error parsing KafkaConfig : "+result.toString());
97 Kafka oldKafka = kafkaService.getKafkaById(id);
99 if (oldKafka == null) {
100 sendError(response, 400, "Kafka not found: "+id);
104 kafkaService.fillKafkaConfiguration(kafkaConfig, oldKafka);
105 } catch (Exception e) {
106 log.debug("FillKafkaConfiguration failed", e.getMessage());
107 sendError(response, 400, "Error FillKafkaConfiguration: "+e.getMessage());
110 kafkaRepository.save(oldKafka);
111 log.info("kafka update successed");
112 return mkPostReturnBody(200, oldKafka);
116 @DeleteMapping("/{id}")
118 @ApiOperation(value="delete a kafka.")
119 public void deleteKafka(@PathVariable("id") int id, HttpServletResponse response) throws IOException{
121 Kafka oldKafka = kafkaService.getKafkaById(id);
122 if (oldKafka == null) {
123 sendError(response, 400, "kafka not found "+id);
125 kafkaRepository.delete(oldKafka);
126 response.setStatus(204);
132 @ApiOperation(value="List all Kafkas")
133 public List<KafkaConfig> queryAllKafka(){
134 return kafkaService.getAllKafka();
137 private PostReturnBody<KafkaConfig> mkPostReturnBody(int statusCode, Kafka kafka) {
138 PostReturnBody<KafkaConfig> retBody = new PostReturnBody<>();
139 retBody.setStatusCode(statusCode);
140 retBody.setReturnBody(kafka.getKafkaConfig());
144 private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
146 response.sendError(sc, msg);