1162aedd10260484030cc7de4b162cf121e3f37a
[dcaegen2/services.git] /
1 /*
2 * ============LICENSE_START=======================================================
3 * ONAP : DataLake
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *     http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 */
20 package org.onap.datalake.feeder.controller;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Set;
26
27 import javax.servlet.http.HttpServletResponse;
28
29 import org.onap.datalake.feeder.domain.Db;
30 import org.onap.datalake.feeder.domain.Kafka;
31 import org.onap.datalake.feeder.domain.Topic;
32 import org.onap.datalake.feeder.controller.domain.PostReturnBody;
33 import org.onap.datalake.feeder.dto.TopicConfig;
34 import org.onap.datalake.feeder.repository.KafkaRepository;
35 import org.onap.datalake.feeder.repository.TopicRepository;
36 import org.onap.datalake.feeder.service.DmaapService;
37 import org.onap.datalake.feeder.service.TopicService;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import org.springframework.beans.factory.annotation.Autowired;
41 import org.springframework.context.ApplicationContext;
42 import org.springframework.http.MediaType;
43 import org.springframework.validation.BindingResult;
44 import org.springframework.web.bind.annotation.DeleteMapping;
45 import org.springframework.web.bind.annotation.GetMapping;
46 import org.springframework.web.bind.annotation.PathVariable;
47 import org.springframework.web.bind.annotation.PostMapping;
48 import org.springframework.web.bind.annotation.PutMapping;
49 import org.springframework.web.bind.annotation.RequestBody;
50 import org.springframework.web.bind.annotation.RequestMapping;
51 import org.springframework.web.bind.annotation.ResponseBody;
52 import org.springframework.web.bind.annotation.RestController;
53
54
55 import io.swagger.annotations.ApiOperation;
56
57 /**
58  * This controller manages topic settings.
59  * 
60  * Topic "_DL_DEFAULT_" acts as the default. 
61  * If a topic is not present in database, "_DL_DEFAULT_" is used for it.
62  * If a topic is present in database, itself should be complete, and no setting from "_DL_DEFAULT_" is used.
63  * Topic "_DL_DEFAULT_" is populated at setup by a DB script.
64  * 
65  * @author Guobiao Mo
66  * @contributor Kate Hsuan @ QCT
67  */
68
69 @RestController
70 @RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })//, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE})
71 public class TopicController {
72
73         private final Logger log = LoggerFactory.getLogger(this.getClass());
74
75         //@Autowired
76         //private DmaapService dmaapService;
77
78         @Autowired
79         private ApplicationContext context;
80
81         @Autowired
82         private KafkaRepository kafkaRepository;
83         
84         @Autowired
85         private TopicRepository topicRepository;
86
87         @Autowired
88         private TopicService topicService;
89
90         @GetMapping("/dmaap/{kafkaId}")
91         @ResponseBody
92         @ApiOperation(value = "List all topic names in DMaaP.")
93         public List<String> listDmaapTopics(@PathVariable("kafkaId") String kafkaId ) {
94                 Kafka kafka = kafkaRepository.findById(kafkaId).get();
95                 DmaapService dmaapService = context.getBean(DmaapService.class, kafka); 
96                 return dmaapService.getTopics();
97         }
98
99         @GetMapping("")
100         @ResponseBody
101         @ApiOperation(value="List all topic names in database")
102         public List<String> list() {
103                 Iterable<Topic> ret = topicRepository.findAll();
104                 List<String> retString = new ArrayList<>();
105                 for(Topic item : ret)
106                 {
107                         if(!topicService.isDefaultTopic(item))
108                                 retString.add(item.getName());
109                 }
110                 return retString;
111         }
112
113         @PostMapping("")
114         @ResponseBody
115         @ApiOperation(value="Create a new topic.")
116         public PostReturnBody<TopicConfig> createTopic(@RequestBody TopicConfig topicConfig, BindingResult result, HttpServletResponse response) throws IOException {
117
118                 if (result.hasErrors()) {
119                         sendError(response, 400, "Error parsing Topic: "+result.toString());
120                         return null;
121                 }
122                 /*Topic oldTopic = topicService.getTopic(topicConfig.getName());
123                 if (oldTopic != null) {
124                         sendError(response, 400, "Topic already exists "+topicConfig.getName());
125                         return null;
126                 } else {*/
127                         Topic wTopic = topicService.fillTopicConfiguration(topicConfig);
128                         if(wTopic.getTtl() == 0)
129                                 wTopic.setTtl(3650);
130                         topicRepository.save(wTopic); 
131                         return mkPostReturnBody(200, wTopic);
132                 //}
133                         //FIXME need to connect to Kafka
134         }
135
136         @GetMapping("/{topicId}")
137         @ResponseBody
138         @ApiOperation(value="Get a topic's settings.")
139         public TopicConfig getTopic(@PathVariable("topicId") int topicId, HttpServletResponse response) throws IOException {
140                 Topic topic = topicService.getTopic(topicId);
141                 if(topic == null) {
142                         sendError(response, 404, "Topic not found");
143                         return null;
144                 }
145                 return topic.getTopicConfig();
146         }
147
148         //This is not a partial update: old topic is wiped out, and new topic is created based on the input json.
149         //One exception is that old DBs are kept
150         @PutMapping("/{topicId}")
151         @ResponseBody
152         @ApiOperation(value="Update a topic.")
153         public PostReturnBody<TopicConfig> updateTopic(@PathVariable("topicId") int topicId, @RequestBody TopicConfig topicConfig, BindingResult result, HttpServletResponse response) throws IOException {
154
155                 if (result.hasErrors()) {
156                         sendError(response, 400, "Error parsing Topic: "+result.toString());
157                         return null;
158                 }
159
160                 if(topicId!=topicConfig.getId())
161                 {
162                         sendError(response, 400, "Topic name mismatch" + topicId + topicConfig);
163                         return null;
164                 }
165
166                 Topic oldTopic = topicService.getTopic(topicId);
167                 if (oldTopic == null) {
168                         sendError(response, 404, "Topic not found "+topicConfig.getName());
169                         return null;
170                 } else {
171                         topicService.fillTopicConfiguration(topicConfig, oldTopic);
172                         topicRepository.save(oldTopic);
173                         return mkPostReturnBody(200, oldTopic);
174                 }
175         }
176
177         @DeleteMapping("/{topicId}")
178         @ResponseBody
179         @ApiOperation(value="Delete a topic.")
180         public void deleteTopic(@PathVariable("topicId") int topicId, HttpServletResponse response) throws IOException
181         {
182                 Topic oldTopic = topicService.getTopic(topicId);
183                 if (oldTopic == null) {
184                         sendError(response, 404, "Topic not found "+topicId);
185                 } else {
186                         Set<Db> dbRelation = oldTopic.getDbs();
187                         dbRelation.clear();
188                         topicRepository.save(oldTopic);
189                         topicRepository.delete(oldTopic);
190                         response.setStatus(204);
191                 }
192         }
193
194         private PostReturnBody<TopicConfig> mkPostReturnBody(int statusCode, Topic topic)
195         {
196                 PostReturnBody<TopicConfig> retBody = new PostReturnBody<>();
197         retBody.setStatusCode(statusCode);
198         retBody.setReturnBody(topic.getTopicConfig());
199         
200         return retBody;
201         }
202         
203         private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
204                 log.info(msg);
205                 response.sendError(sc, msg);            
206         }
207 }