2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.datalake.feeder.controller;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
27 import javax.servlet.http.HttpServletResponse;
29 import org.onap.datalake.feeder.domain.Db;
30 import org.onap.datalake.feeder.domain.Topic;
31 import org.onap.datalake.feeder.controller.domain.PostReturnBody;
32 import org.onap.datalake.feeder.controller.domain.TopicConfig;
33 import org.onap.datalake.feeder.repository.DbRepository;
34 import org.onap.datalake.feeder.repository.TopicRepository;
35 import org.onap.datalake.feeder.service.DbService;
36 import org.onap.datalake.feeder.service.DmaapService;
37 import org.onap.datalake.feeder.service.TopicService;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import org.springframework.beans.factory.annotation.Autowired;
41 import org.springframework.http.MediaType;
42 import org.springframework.validation.BindingResult;
43 import org.springframework.web.bind.annotation.DeleteMapping;
44 import org.springframework.web.bind.annotation.GetMapping;
45 import org.springframework.web.bind.annotation.PathVariable;
46 import org.springframework.web.bind.annotation.PostMapping;
47 import org.springframework.web.bind.annotation.PutMapping;
48 import org.springframework.web.bind.annotation.RequestBody;
49 import org.springframework.web.bind.annotation.RequestMapping;
50 import org.springframework.web.bind.annotation.ResponseBody;
51 import org.springframework.web.bind.annotation.RestController;
53 import io.swagger.annotations.ApiOperation;
56 * This controller manages topic settings.
58 * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's
59 * enabled=null, _DL_DEFAULT_.enabled is used for that topic. All the settings
60 * are saved in database. topic "_DL_DEFAULT_" is populated at setup by a DB
64 * @contributor Kate Hsuan @ QCT
68 @RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })//, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE})
69 public class TopicController {
71 private final Logger log = LoggerFactory.getLogger(this.getClass());
74 private DmaapService dmaapService;
77 private TopicRepository topicRepository;
80 private DbRepository dbRepository;
83 private TopicService topicService;
86 private DbService dbService;
88 @GetMapping("/dmaap/")
90 @ApiOperation(value = "List all topic names in DMaaP.")
91 public List<String> listDmaapTopics() throws IOException {
92 return dmaapService.getTopics();
97 @ApiOperation(value="List all topics")
98 public List<String> list() throws IOException {
99 Iterable<Topic> ret = topicRepository.findAll();
100 List<String> retString = new ArrayList<>();
101 for(Topic item : ret)
103 if(!topicService.istDefaultTopic(item))
104 retString.add(item.getName());
111 @ApiOperation(value="Create a new topic.")
112 public PostReturnBody<TopicConfig> createTopic(@RequestBody TopicConfig topicConfig, BindingResult result, HttpServletResponse response) throws IOException {
114 if (result.hasErrors()) {
115 sendError(response, 400, "Error parsing Topic: "+result.toString());
118 Topic oldTopic = topicService.getTopic(topicConfig.getName());
119 if (oldTopic != null) {
120 sendError(response, 400, "Topic already exists "+topicConfig.getName());
123 PostReturnBody<TopicConfig> retBody = new PostReturnBody<>();
124 Topic wTopic = topicService.fillTopicConfiguration(topicConfig);
125 if(wTopic.getTtl() == 0)
127 topicRepository.save(wTopic);
128 mkPostReturnBody(retBody, 200, wTopic);
133 @GetMapping("/{topicName}")
135 @ApiOperation(value="Get a topic's settings.")
136 public TopicConfig getTopic(@PathVariable("topicName") String topicName, HttpServletResponse response) throws IOException {
137 Topic topic = topicService.getTopic(topicName);
139 sendError(response, 404, "Topic not found");
141 TopicConfig tConfig = new TopicConfig();
142 mkReturnMessage(topic, tConfig);
146 //This is not a partial update: old topic is wiped out, and new topic is created based on the input json.
147 //One exception is that old DBs are kept
148 @PutMapping("/{topicName}")
150 @ApiOperation(value="Update a topic.")
151 public PostReturnBody<TopicConfig> updateTopic(@PathVariable("topicName") String topicName, @RequestBody TopicConfig topicConfig, BindingResult result, HttpServletResponse response) throws IOException {
153 if (result.hasErrors()) {
154 sendError(response, 400, "Error parsing Topic: "+result.toString());
158 if(!topicName.equals(topicConfig.getName()))
160 sendError(response, 400, "Topic name mismatch" + topicName + topicConfig.getName());
164 Topic oldTopic = topicService.getTopic(topicConfig.getName());
165 if (oldTopic == null) {
166 sendError(response, 404, "Topic not found "+topicConfig.getName());
169 PostReturnBody<TopicConfig> retBody = new PostReturnBody<>();
170 topicService.fillTopicConfiguration(topicConfig, oldTopic);
171 topicRepository.save(oldTopic);
172 mkPostReturnBody(retBody, 200, oldTopic);
177 private void mkReturnMessage(Topic topic, TopicConfig tConfig)
179 tConfig.setName(topic.getName());
180 tConfig.setEnable(topic.getEnabled());
181 if(topic.getDataFormat() != null)
182 tConfig.setData_format(topic.getDataFormat().toString());
183 tConfig.setSave_raw(topic.getSaveRaw());
184 tConfig.setCorrelated_clearred_message((topic.getCorrelateClearedMessage() == null) ? topic.getCorrelateClearedMessage() : false);
185 tConfig.setMessage_id_path(topic.getMessageIdPath());
186 tConfig.setTtl(topic.getTtl());
187 Set<Db> topicDb = topic.getDbs();
188 List<String> dbList = new ArrayList<>();
189 for(Db item: topicDb)
191 dbList.add(item.getName());
193 tConfig.setSinkdbs(dbList);
196 private void mkPostReturnBody(PostReturnBody<TopicConfig> retBody, int statusCode, Topic topic)
198 TopicConfig retTopic = new TopicConfig();
199 retBody.setStatusCode(statusCode);
200 mkReturnMessage(topic, retTopic);
201 retBody.setReturnBody(retTopic);
204 private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
206 response.sendError(sc, msg);