8d1bf31681c5c779710e5974f8d6de06b80c1b2f
[dcaegen2/services.git] / components / datalake-handler / feeder / src / main / java / org / onap / datalake / feeder / controller / KafkaController.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP : DataLake
4  * ================================================================================
5  * Copyright 2019 China Mobile
6  *=================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.datalake.feeder.controller;
22
23 import io.swagger.annotations.ApiOperation;
24 import org.onap.datalake.feeder.controller.domain.PostReturnBody;
25 import org.onap.datalake.feeder.domain.Kafka;
26 import org.onap.datalake.feeder.dto.KafkaConfig;
27 import org.onap.datalake.feeder.repository.KafkaRepository;
28 import org.onap.datalake.feeder.service.KafkaService;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.springframework.beans.factory.annotation.Autowired;
32 import org.springframework.http.MediaType;
33 import org.springframework.validation.BindingResult;
34 import org.springframework.web.bind.annotation.*;
35
36 import javax.servlet.http.HttpServletResponse;
37 import java.io.IOException;
38 import java.util.List;
39
40 /**
41  * This controller manages kafka settings
42  *
43  * @author guochunmeng
44  */
45 @RestController
46 @RequestMapping(value = "/kafkas", produces = { MediaType.APPLICATION_JSON_VALUE })
47 public class KafkaController {
48
49     private final Logger log = LoggerFactory.getLogger(this.getClass());
50
51     @Autowired
52     private KafkaService kafkaService;
53
54     @Autowired
55     private KafkaRepository kafkaRepository;
56
57     @PostMapping("")
58     @ResponseBody
59     @ApiOperation(value="Create a kafka.")
60     public PostReturnBody<KafkaConfig> createKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, HttpServletResponse response) throws IOException {
61
62         if (result.hasErrors()) {
63             sendError(response, 400, "Error parsing KafkaConfig : "+result.toString());
64             return null;
65         }
66
67         Kafka oldKafka = kafkaService.getKafkaById(kafkaConfig.getId());
68
69         if (oldKafka != null) {
70             sendError(response, 400, "kafka is exist "+kafkaConfig.getId());
71             return null;
72         } else {
73             Kafka kafka = null;
74             try {
75                 kafka = kafkaService.fillKafkaConfiguration(kafkaConfig);
76             } catch (Exception e) {
77                 log.debug("FillKafkaConfiguration failed", e.getMessage());
78                 sendError(response, 400, "Error FillKafkaConfiguration: "+e.getMessage());
79                 return null;
80             }
81             kafkaRepository.save(kafka);
82             log.info("Kafka save successed");
83             return mkPostReturnBody(200, kafka);
84         }
85     }
86
87     @PutMapping("/{id}")
88     @ResponseBody
89     @ApiOperation(value="Update a kafka.")
90     public PostReturnBody<KafkaConfig> updateKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, @PathVariable int id, HttpServletResponse response) throws IOException {
91
92         if (result.hasErrors()) {
93             sendError(response, 400, "Error parsing KafkaConfig : "+result.toString());
94             return null;
95         }
96
97         Kafka oldKafka = kafkaService.getKafkaById(id);
98
99         if (oldKafka == null) {
100             sendError(response, 400, "Kafka not found: "+id);
101             return null;
102         } else {
103             try {
104                 kafkaService.fillKafkaConfiguration(kafkaConfig, oldKafka);
105             } catch (Exception e) {
106                 log.debug("FillKafkaConfiguration failed", e.getMessage());
107                 sendError(response, 400, "Error FillKafkaConfiguration: "+e.getMessage());
108                 return null;
109             }
110             kafkaRepository.save(oldKafka);
111             log.info("kafka update successed");
112             return mkPostReturnBody(200, oldKafka);
113         }
114     }
115
116     @DeleteMapping("/{id}")
117     @ResponseBody
118     @ApiOperation(value="delete a kafka.")
119     public void deleteKafka(@PathVariable("id") int id, HttpServletResponse response) throws IOException{
120
121         Kafka oldKafka = kafkaService.getKafkaById(id);
122         if (oldKafka == null) {
123             sendError(response, 400, "kafka not found "+id);
124         } else {
125             kafkaRepository.delete(oldKafka);
126             response.setStatus(204);
127         }
128     }
129
130     @GetMapping("")
131     @ResponseBody
132     @ApiOperation(value="List all Kafkas")
133     public List<KafkaConfig> queryAllKafka(){
134         return kafkaService.getAllKafka();
135     }
136
137     private PostReturnBody<KafkaConfig> mkPostReturnBody(int statusCode, Kafka kafka) {
138         PostReturnBody<KafkaConfig> retBody = new PostReturnBody<>();
139         retBody.setStatusCode(statusCode);
140         retBody.setReturnBody(kafka.getKafkaConfig());
141         return retBody;
142     }
143
144     private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
145         log.info(msg);
146         response.sendError(sc, msg);
147     }
148
149 }