6a09be9fb57db07605cdd03dc06bbd8d371c1192
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
21
22 import arrow.core.Option
23 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
24 import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
25 import org.onap.dcae.collectors.veshv.utils.ServerHandle
26 import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
27 import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
28 import org.onap.dcae.collectors.veshv.utils.http.Response
29 import org.onap.dcae.collectors.veshv.utils.http.Responses
30 import org.onap.dcae.collectors.veshv.utils.http.Responses.stringResponse
31 import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
32 import org.onap.dcae.collectors.veshv.utils.http.sendOrError
33 import org.onap.dcae.collectors.veshv.utils.logging.Logger
34 import reactor.core.publisher.Mono
35 import reactor.netty.http.server.HttpServer
36 import reactor.netty.http.server.HttpServerRequest
37 import reactor.netty.http.server.HttpServerRoutes
38 import java.net.InetSocketAddress
39
40 /**
41  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
42  * @since May 2018
43  */
44 class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
45
46     fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): Mono<ServerHandle> =
47             Mono.defer {
48                 simulator.listenToTopics(kafkaTopics)
49                 HttpServer.create()
50                         .host(socketAddress.hostName)
51                         .port(socketAddress.port)
52                         .route(::setRoutes)
53                         .bind()
54                         .map { NettyServerHandle(it) }
55             }
56
57     private fun setRoutes(route: HttpServerRoutes) {
58         route
59                 .put("/configuration/topics") { req, res ->
60                     req
61                             .receive().aggregate().asString()
62                             .flatMap {
63                                 res.sendOrError { simulator.listenToTopics(it) }
64                             }
65                 }
66                 .delete("/messages/{$TOPIC_PARAM_KEY}") { req, res ->
67                     doWithTopicOrReturnInternalErrorResponse(req) {
68                         logger.info { "Resetting simulator state for topic $it" }
69                         simulator.resetState(it)
70                         Mono.just(Responses.Success)
71                     }.let(res::sendAndHandleErrors)
72
73                 }
74                 .get("/messages/{$TOPIC_PARAM_KEY}/count") { req, res ->
75                     doWithTopicOrReturnInternalErrorResponse(req) {
76                         logger.info { "Processing request for count of received messages for topic $it" }
77                         simulator.state(it)
78                                 .fold({
79                                     val errorMessage = { COUNT_NOT_RESOLVED_MESSAGE + ". Reason: ${it.message}" }
80                                     logger.warn(errorMessage)
81                                     Mono.just(Responses.statusResponse(
82                                             name = "Count not found",
83                                             message = errorMessage(),
84                                             httpStatus = HttpStatus.NOT_FOUND
85                                     )
86                                     )
87                                 }, {
88                                     logger.info { "Returned number of received messages: ${it.messagesCount}" }
89                                     Mono.just(
90                                             Responses.stringResponse(
91                                                     message = it.messagesCount.toString(),
92                                                     httpStatus = HttpStatus.OK
93                                             )
94                                     )
95                                 })
96                     }.let(res::sendAndHandleErrors)
97                 }
98                 .post("/messages/{$TOPIC_PARAM_KEY}/validate") { req, res ->
99                     req
100                             .receive().aggregate().asInputStream()
101                             .map { inputStream ->
102                                 doWithTopicOrReturnInternalErrorResponse(req) {
103                                     logger.info { "Processing request for message validation" }
104                                     simulator.validate(inputStream, it)
105                                             .map(::resolveValidationResponse)
106                                 }
107                             }
108                             .flatMap(res::sendAndHandleErrors)
109                 }
110                 .get("/healthcheck") { _, res ->
111                     val status = HttpConstants.STATUS_OK
112                     logger.info { "Healthcheck OK, returning status: $status" }
113                     res.status(status).send()
114                 }
115     }
116
117     private fun doWithTopicOrReturnInternalErrorResponse(req: HttpServerRequest,
118                                                          topicConsumer: (String) -> Mono<Response>) =
119             Option.fromNullable(req.param(TOPIC_PARAM_KEY))
120                     .fold({
121                         Mono.just(
122                                 stringResponse("Failed to retrieve parameter from url",
123                                         HttpStatus.INTERNAL_SERVER_ERROR))
124                     }, topicConsumer)
125
126     private fun resolveValidationResponse(isValid: Boolean): Response =
127             if (isValid) {
128                 logger.info { "Comparison result: $VALID_RESPONSE_MESSAGE" }
129                 responseValid
130             } else {
131                 logger.info { "Comparison result: $INVALID_RESPONSE_MESSAGE" }
132                 responseInvalid
133             }
134
135
136     companion object {
137         private val logger = Logger(DcaeAppApiServer::class)
138         private const val VALID_RESPONSE_MESSAGE = "validation passed"
139         private const val INVALID_RESPONSE_MESSAGE = "consumed messages don't match data from validation request"
140         private const val COUNT_NOT_RESOLVED_MESSAGE = "Error - number of messages could not be specified"
141         private const val TOPIC_PARAM_KEY = "topic"
142
143         private val responseValid by lazy {
144             Responses.statusResponse(
145                     name = "valid",
146                     message = DcaeAppApiServer.VALID_RESPONSE_MESSAGE
147             )
148         }
149
150         private val responseInvalid by lazy {
151             Responses.statusResponse(
152                     name = "invalid",
153                     message = DcaeAppApiServer.INVALID_RESPONSE_MESSAGE,
154                     httpStatus = HttpStatus.BAD_REQUEST
155             )
156         }
157     }
158 }
159