f3fd56bbecad69a56883a2d7d8560686abd6f885
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
21
22 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
23 import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
24 import org.onap.dcae.collectors.veshv.utils.ServerHandle
25 import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
26 import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
27 import org.onap.dcae.collectors.veshv.utils.http.Response
28 import org.onap.dcae.collectors.veshv.utils.http.Responses
29 import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
30 import org.onap.dcae.collectors.veshv.utils.http.sendOrError
31 import org.onap.dcae.collectors.veshv.utils.logging.Logger
32 import reactor.core.publisher.Mono
33 import reactor.netty.http.server.HttpServer
34 import reactor.netty.http.server.HttpServerRoutes
35 import java.net.InetSocketAddress
36
37 /**
38  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
39  * @since May 2018
40  */
41 class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
42     private val responseValid by lazy {
43         Responses.statusResponse(
44                 name = "valid",
45                 message = VALID_RESPONSE_MESSAGE
46         )
47     }
48
49     private val responseInvalid by lazy {
50         Responses.statusResponse(
51                 name = "invalid",
52                 message = INVALID_RESPONSE_MESSAGE,
53                 httpStatus = HttpStatus.BAD_REQUEST
54         )
55     }
56
57     fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): Mono<ServerHandle> =
58             Mono.defer {
59                 simulator.listenToTopics(kafkaTopics)
60                 HttpServer.create()
61                         .host(socketAddress.hostName)
62                         .port(socketAddress.port)
63                         .route(::setRoutes)
64                         .bind()
65                         .map { NettyServerHandle(it) }
66             }
67
68     private fun setRoutes(route: HttpServerRoutes) {
69         route
70                 .put("/configuration/topics") { req, res ->
71                     req
72                             .receive().aggregate().asString()
73                             .flatMap {
74                                 res.sendOrError { simulator.listenToTopics(it) }
75                             }
76                 }
77                 .delete("/messages") { _, res ->
78                     logger.info { "Resetting simulator state" }
79
80                     res
81                             .header("Content-type", CONTENT_TEXT)
82                             .sendOrError { simulator.resetState() }
83                 }
84                 .get("/messages/all/count") { _, res ->
85                     logger.info { "Processing request for count of received messages" }
86                     simulator.state().fold(
87                             {
88                                 logger.warn { "Error - number of messages could not be specified" }
89                                 res.status(HttpConstants.STATUS_NOT_FOUND)
90                             },
91                             {
92                                 logger.info { "Returned number of received messages: ${it.messagesCount}" }
93                                 res.sendString(Mono.just(it.messagesCount.toString()))
94                             }
95                     )
96                 }
97                 .post("/messages/all/validate") { req, res ->
98                     req
99                             .receive().aggregate().asInputStream()
100                             .map {
101                                 logger.info { "Processing request for message validation" }
102                                 simulator.validate(it)
103                                         .map(::resolveValidationResponse)
104                             }
105                             .flatMap {
106                                 res.sendAndHandleErrors(it)
107                             }
108                 }
109                 .get("/healthcheck") { _, res ->
110                     val status = HttpConstants.STATUS_OK
111                     logger.info { "Healthcheck OK, returning status: $status" }
112                     res.status(status).send()
113                 }
114     }
115
116     private fun resolveValidationResponse(isValid: Boolean): Response =
117             if (isValid) {
118                 logger.info { "Comparison result: $VALID_RESPONSE_MESSAGE" }
119                 responseValid
120             } else {
121                 logger.info { "Comparison result: $INVALID_RESPONSE_MESSAGE" }
122                 responseInvalid
123             }
124
125
126     companion object {
127         private const val CONTENT_TEXT = "text/plain"
128         private const val VALID_RESPONSE_MESSAGE = "validation passed"
129         private const val INVALID_RESPONSE_MESSAGE = "consumed messages don't match data from validation request"
130         private val logger = Logger(DcaeAppApiServer::class)
131     }
132 }
133