41995e0454b441241cc49de396c1b5babcb4d221
[dcaegen2/services.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP : DataLake
4  * ================================================================================
5  * Copyright 2019 China Mobile
6  *=================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.datalake.feeder.controller;
22
23 import io.swagger.annotations.ApiOperation;
24 import org.onap.datalake.feeder.controller.domain.PostReturnBody;
25 import org.onap.datalake.feeder.domain.Kafka;
26 import org.onap.datalake.feeder.dto.KafkaConfig;
27 import org.onap.datalake.feeder.repository.KafkaRepository;
28 import org.onap.datalake.feeder.service.KafkaService;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.springframework.beans.factory.annotation.Autowired;
32 import org.springframework.http.MediaType;
33 import org.springframework.validation.BindingResult;
34 import org.springframework.web.bind.annotation.*;
35
36 import javax.servlet.http.HttpServletResponse;
37 import java.io.IOException;
38 import java.util.List;
39 import java.util.ArrayList;
40
41 /**
42  * This controller manages kafka settings
43  *
44  * @author guochunmeng
45  */
46 @RestController
47 @RequestMapping(value = "/kafkas", produces = { MediaType.APPLICATION_JSON_VALUE })
48 public class KafkaController {
49
50     private final Logger log = LoggerFactory.getLogger(this.getClass());
51
52     @Autowired
53     private KafkaService kafkaService;
54
55     @Autowired
56     private KafkaRepository kafkaRepository;
57
58     @PostMapping("")
59     @ResponseBody
60     @ApiOperation(value="Create a kafka.")
61     public PostReturnBody<KafkaConfig> createKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, HttpServletResponse response) throws IOException {
62
63         if (result.hasErrors()) {
64             sendError(response, 400, "Error parsing KafkaConfig : "+result.toString());
65             return null;
66         }
67
68         Kafka oldKafka = kafkaService.getKafkaById(kafkaConfig.getId());
69
70         if (oldKafka != null) {
71             sendError(response, 400, "kafka is exist "+kafkaConfig.getId());
72             return null;
73         } else {
74             Kafka kafka = null;
75             try {
76                 kafka = kafkaService.fillKafkaConfiguration(kafkaConfig);
77             } catch (Exception e) {
78                 log.debug("FillKafkaConfiguration failed", e.getMessage());
79                 sendError(response, 400, "Error FillKafkaConfiguration: "+e.getMessage());
80                 return null;
81             }
82             kafkaRepository.save(kafka);
83             log.info("Kafka save successed");
84             return mkPostReturnBody(200, kafka);
85         }
86     }
87
88     @PutMapping("/{id}")
89     @ResponseBody
90     @ApiOperation(value="Update a kafka.")
91     public PostReturnBody<KafkaConfig> updateKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, @PathVariable int id, HttpServletResponse response) throws IOException {
92
93         if (result.hasErrors()) {
94             sendError(response, 400, "Error parsing KafkaConfig : "+result.toString());
95             return null;
96         }
97
98         Kafka oldKafka = kafkaService.getKafkaById(id);
99
100         if (oldKafka == null) {
101             sendError(response, 400, "Kafka not found: "+id);
102             return null;
103         } else {
104             try {
105                 kafkaService.fillKafkaConfiguration(kafkaConfig, oldKafka);
106             } catch (Exception e) {
107                 log.debug("FillKafkaConfiguration failed", e.getMessage());
108                 sendError(response, 400, "Error FillKafkaConfiguration: "+e.getMessage());
109                 return null;
110             }
111             kafkaRepository.save(oldKafka);
112             log.info("kafka update successed");
113             return mkPostReturnBody(200, oldKafka);
114         }
115     }
116
117     @DeleteMapping("/{id}")
118     @ResponseBody
119     @ApiOperation(value="delete a kafka.")
120     public void deleteKafka(@PathVariable("id") int id, HttpServletResponse response) throws IOException{
121
122         Kafka oldKafka = kafkaService.getKafkaById(id);
123         if (oldKafka == null) {
124             sendError(response, 400, "kafka not found "+id);
125         } else {
126             kafkaRepository.delete(oldKafka);
127             response.setStatus(204);
128         }
129     }
130
131     @GetMapping("")
132     @ResponseBody
133     @ApiOperation(value="List all Kafka id")
134     public List<Integer> list() {
135         Iterable<Kafka> ret = kafkaRepository.findAll();
136         List<Integer> retString = new ArrayList<>();
137         for (Kafka k : ret)
138         {
139             retString.add(k.getId());
140         }
141         return retString;
142     }
143
144     public List<KafkaConfig> queryAllKafka(){
145         return kafkaService.getAllKafka();
146     }
147
148     private PostReturnBody<KafkaConfig> mkPostReturnBody(int statusCode, Kafka kafka) {
149         PostReturnBody<KafkaConfig> retBody = new PostReturnBody<>();
150         retBody.setStatusCode(statusCode);
151         retBody.setReturnBody(kafka.getKafkaConfig());
152         return retBody;
153     }
154
155     private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
156         log.info(msg);
157         response.sendError(sc, msg);
158     }
159
160 }