b59b2a7bb73a4b185cbcae649b9d851d815a3fd9
[dcaegen2/services.git] / components / datalake-handler / feeder / src / main / java / org / onap / datalake / feeder / controller / TopicController.java
1 /*
2 * ============LICENSE_START=======================================================
3 * ONAP : DataLake
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *     http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 */
20 package org.onap.datalake.feeder.controller;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Set;
26
27 import javax.servlet.http.HttpServletResponse;
28
29 import org.onap.datalake.feeder.domain.Db;
30 import org.onap.datalake.feeder.domain.Kafka;
31 import org.onap.datalake.feeder.domain.Topic;
32 import org.onap.datalake.feeder.controller.domain.PostReturnBody;
33 import org.onap.datalake.feeder.dto.TopicConfig;
34 import org.onap.datalake.feeder.repository.KafkaRepository;
35 import org.onap.datalake.feeder.repository.TopicRepository;
36 import org.onap.datalake.feeder.service.DmaapService;
37 import org.onap.datalake.feeder.service.TopicService;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import org.springframework.beans.factory.annotation.Autowired;
41 import org.springframework.context.ApplicationContext;
42 import org.springframework.http.MediaType;
43 import org.springframework.validation.BindingResult;
44 import org.springframework.web.bind.annotation.DeleteMapping;
45 import org.springframework.web.bind.annotation.GetMapping;
46 import org.springframework.web.bind.annotation.PathVariable;
47 import org.springframework.web.bind.annotation.PostMapping;
48 import org.springframework.web.bind.annotation.PutMapping;
49 import org.springframework.web.bind.annotation.RequestBody;
50 import org.springframework.web.bind.annotation.RequestMapping;
51 import org.springframework.web.bind.annotation.ResponseBody;
52 import org.springframework.web.bind.annotation.RestController;
53
54
55 import io.swagger.annotations.ApiOperation;
56
57 /**
58  * This controller manages topic settings.
59  * 
60  * Topic "_DL_DEFAULT_" acts as the default. 
61  * If a topic is not present in database, "_DL_DEFAULT_" is used for it.
62  * If a topic is present in database, itself should be complete, and no setting from "_DL_DEFAULT_" is used.
63  * Topic "_DL_DEFAULT_" is populated at setup by a DB script.
64  * 
65  * @author Guobiao Mo
66  * @contributor Kate Hsuan @ QCT
67  */
68
69 @RestController
70 @RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })//, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE})
71 public class TopicController {
72
73         private final Logger log = LoggerFactory.getLogger(this.getClass());
74
75         @Autowired
76         private ApplicationContext context;
77
78         @Autowired
79         private KafkaRepository kafkaRepository;
80         
81         @Autowired
82         private TopicRepository topicRepository;
83
84         @Autowired
85         private TopicService topicService;
86
87         @GetMapping("/dmaap/{kafkaId}")
88         @ResponseBody
89         @ApiOperation(value = "List all topic names in DMaaP.")
90         public List<String> listDmaapTopics(@PathVariable("kafkaId") int kafkaId ) {
91                 Kafka kafka = kafkaRepository.findById(kafkaId).get();
92                 DmaapService dmaapService = context.getBean(DmaapService.class, kafka); 
93                 return dmaapService.getTopics();
94         }
95
96         @GetMapping("")
97         @ResponseBody
98         @ApiOperation(value="List all topic names in database")
99         public List<String> list() {
100                 Iterable<Topic> ret = topicRepository.findAll();
101                 List<String> retString = new ArrayList<>();
102                 for(Topic item : ret)
103                 {
104                         if(!topicService.isDefaultTopic(item))
105                                 retString.add(item.getName());
106                 }
107                 return retString;
108         }
109
110         @PostMapping("")
111         @ResponseBody
112         @ApiOperation(value="Create a new topic.")
113         public PostReturnBody<TopicConfig> createTopic(@RequestBody TopicConfig topicConfig, BindingResult result, HttpServletResponse response) throws IOException {
114
115                 if (result.hasErrors()) {
116                         sendError(response, 400, "Error parsing Topic: "+result.toString());
117                         return null;
118                 }
119                 /*Topic oldTopic = topicService.getTopic(topicConfig.getName());
120                 if (oldTopic != null) {
121                         sendError(response, 400, "Topic already exists "+topicConfig.getName());
122                         return null;
123                 } else {*/
124                         Topic wTopic = topicService.fillTopicConfiguration(topicConfig);
125                         if(wTopic.getTtl() == 0)
126                                 wTopic.setTtl(3650);
127                         topicRepository.save(wTopic); 
128                         return mkPostReturnBody(200, wTopic);
129                 //}
130                         //FIXME need to connect to Kafka
131         }
132
133         @GetMapping("/{topicId}")
134         @ResponseBody
135         @ApiOperation(value="Get a topic's settings.")
136         public TopicConfig getTopic(@PathVariable("topicId") int topicId, HttpServletResponse response) throws IOException {
137                 Topic topic = topicService.getTopic(topicId);
138                 if(topic == null) {
139                         sendError(response, 404, "Topic not found");
140                         return null;
141                 }
142                 return topic.getTopicConfig();
143         }
144
145         //This is not a partial update: old topic is wiped out, and new topic is created based on the input json.
146         //One exception is that old DBs are kept
147         @PutMapping("/{topicId}")
148         @ResponseBody
149         @ApiOperation(value="Update a topic.")
150         public PostReturnBody<TopicConfig> updateTopic(@PathVariable("topicId") int topicId, @RequestBody TopicConfig topicConfig, BindingResult result, HttpServletResponse response) throws IOException {
151
152                 if (result.hasErrors()) {
153                         sendError(response, 400, "Error parsing Topic: "+result.toString());
154                         return null;
155                 }
156
157                 if(topicId!=topicConfig.getId())
158                 {
159                         sendError(response, 400, "Topic name mismatch" + topicId + topicConfig);
160                         return null;
161                 }
162
163                 Topic oldTopic = topicService.getTopic(topicId);
164                 if (oldTopic == null) {
165                         sendError(response, 404, "Topic not found "+topicConfig.getName());
166                         return null;
167                 } else {
168                         topicService.fillTopicConfiguration(topicConfig, oldTopic);
169                         topicRepository.save(oldTopic);
170                         return mkPostReturnBody(200, oldTopic);
171                 }
172         }
173
174         @DeleteMapping("/{topicId}")
175         @ResponseBody
176         @ApiOperation(value="Delete a topic.")
177         public void deleteTopic(@PathVariable("topicId") int topicId, HttpServletResponse response) throws IOException
178         {
179                 Topic oldTopic = topicService.getTopic(topicId);
180                 if (oldTopic == null) {
181                         sendError(response, 404, "Topic not found "+topicId);
182                 } else {
183                         Set<Db> dbRelation = oldTopic.getDbs();
184                         dbRelation.clear();
185                         topicRepository.save(oldTopic);
186                         topicRepository.delete(oldTopic);
187                         response.setStatus(204);
188                 }
189         }
190
191         private PostReturnBody<TopicConfig> mkPostReturnBody(int statusCode, Topic topic)
192         {
193                 PostReturnBody<TopicConfig> retBody = new PostReturnBody<>();
194         retBody.setStatusCode(statusCode);
195         retBody.setReturnBody(topic.getTopicConfig());
196         
197         return retBody;
198         }
199         
200         private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
201                 log.info(msg);
202                 response.sendError(sc, msg);            
203         }
204 }