Allow retrieving multiple kafka topics status 31/84331/4
authorFilip Krzywka <filip.krzywka@nokia.com>
Fri, 5 Apr 2019 06:44:52 +0000 (08:44 +0200)
committerFilip Krzywka <filip.krzywka@nokia.com>
Wed, 10 Apr 2019 07:05:59 +0000 (09:05 +0200)
Change-Id: I5e8433873e5d594e6df9da8c4893b0f54614efae
Issue-ID: DCAEGEN2-1399
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt
sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/http.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/responses.kt [new file with mode: 0644]

index 93c12d2..33e9a37 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 
-import arrow.core.getOrElse
-import org.onap.dcae.collectors.veshv.utils.arrow.getOption
+import arrow.core.Option
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import java.io.InputStream
-import java.util.concurrent.atomic.AtomicReference
+import java.util.Collections.synchronizedMap
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -31,7 +30,7 @@ import java.util.concurrent.atomic.AtomicReference
  */
 class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
                        private val messageStreamValidation: MessageStreamValidation) {
-    private val consumerState: AtomicReference<ConsumerStateProvider> = AtomicReference()
+    private val consumerState: MutableMap<String, ConsumerStateProvider> = synchronizedMap(mutableMapOf())
 
     fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString))
 
@@ -42,24 +41,42 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
             throw IllegalArgumentException(message)
         }
 
-        logger.info { "Received new configuration. Creating consumer for topics: $topics" }
-        consumerState.set(consumerFactory.createConsumerForTopics(topics))
+        logger.info { "Received new configuration. Removing old consumers and creating consumers for topics: $topics" }
+        synchronized(consumerState) {
+            consumerState.clear()
+            consumerState.putAll(consumerFactory.createConsumersForTopics(topics))
+        }
     }
 
-    fun state() = consumerState.getOption().map { it.currentState() }
+    fun state(topic: String) =
+            consumerState(topic)
+                    .map(ConsumerStateProvider::currentState)
+                    .toEither {
+                        val message = "Failed to return consumer state. No consumer found for topic: $topic"
+                        logger.warn { message }
+                        MissingConsumerException(message)
+                    }
+
+    fun resetState(topic: String) =
+            consumerState(topic)
+                    .map { it.reset() }
+                    .toEither {
+                        val message = "Failed to reset consumer state. No consumer found for topic: $topic"
+                        logger.warn { message }
+                        MissingConsumerException(message)
+                    }
 
-    fun resetState() = consumerState.getOption().fold({ }, { it.reset() })
+    fun validate(jsonDescription: InputStream, topic: String) =
+            messageStreamValidation.validate(jsonDescription, currentMessages(topic))
 
+    private fun consumerState(topic: String) = Option.fromNullable(consumerState[topic])
 
-    fun validate(jsonDescription: InputStream)= messageStreamValidation.validate(jsonDescription, currentMessages())
 
-    private fun currentMessages(): List<ByteArray> =
-            consumerState.getOption()
-                    .map { it.currentState().consumedMessages }
-                    .getOrElse(::emptyList)
+    private fun currentMessages(topic: String): List<ByteArray> =
+            state(topic).fold({ emptyList() }, { it.consumedMessages })
 
     private fun extractTopics(topicsString: String): Set<String> =
-            topicsString.substringAfter("=")
+            topicsString.removeSurrounding("\"")
                     .split(",")
                     .toSet()
 
@@ -67,3 +84,5 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
         private val logger = Logger(DcaeAppSimulator::class)
     }
 }
+
+class MissingConsumerException(message: String) : Throwable(message)
index f3fd56b..6a09be9 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
 
+import arrow.core.Option
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
 import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
@@ -26,11 +27,13 @@ import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
 import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
 import org.onap.dcae.collectors.veshv.utils.http.Response
 import org.onap.dcae.collectors.veshv.utils.http.Responses
+import org.onap.dcae.collectors.veshv.utils.http.Responses.stringResponse
 import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
 import org.onap.dcae.collectors.veshv.utils.http.sendOrError
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Mono
 import reactor.netty.http.server.HttpServer
+import reactor.netty.http.server.HttpServerRequest
 import reactor.netty.http.server.HttpServerRoutes
 import java.net.InetSocketAddress
 
@@ -39,20 +42,6 @@ import java.net.InetSocketAddress
  * @since May 2018
  */
 class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
-    private val responseValid by lazy {
-        Responses.statusResponse(
-                name = "valid",
-                message = VALID_RESPONSE_MESSAGE
-        )
-    }
-
-    private val responseInvalid by lazy {
-        Responses.statusResponse(
-                name = "invalid",
-                message = INVALID_RESPONSE_MESSAGE,
-                httpStatus = HttpStatus.BAD_REQUEST
-        )
-    }
 
     fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): Mono<ServerHandle> =
             Mono.defer {
@@ -74,37 +63,49 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
                                 res.sendOrError { simulator.listenToTopics(it) }
                             }
                 }
-                .delete("/messages") { _, res ->
-                    logger.info { "Resetting simulator state" }
+                .delete("/messages/{$TOPIC_PARAM_KEY}") { req, res ->
+                    doWithTopicOrReturnInternalErrorResponse(req) {
+                        logger.info { "Resetting simulator state for topic $it" }
+                        simulator.resetState(it)
+                        Mono.just(Responses.Success)
+                    }.let(res::sendAndHandleErrors)
 
-                    res
-                            .header("Content-type", CONTENT_TEXT)
-                            .sendOrError { simulator.resetState() }
                 }
-                .get("/messages/all/count") { _, res ->
-                    logger.info { "Processing request for count of received messages" }
-                    simulator.state().fold(
-                            {
-                                logger.warn { "Error - number of messages could not be specified" }
-                                res.status(HttpConstants.STATUS_NOT_FOUND)
-                            },
-                            {
-                                logger.info { "Returned number of received messages: ${it.messagesCount}" }
-                                res.sendString(Mono.just(it.messagesCount.toString()))
-                            }
-                    )
+                .get("/messages/{$TOPIC_PARAM_KEY}/count") { req, res ->
+                    doWithTopicOrReturnInternalErrorResponse(req) {
+                        logger.info { "Processing request for count of received messages for topic $it" }
+                        simulator.state(it)
+                                .fold({
+                                    val errorMessage = { COUNT_NOT_RESOLVED_MESSAGE + ". Reason: ${it.message}" }
+                                    logger.warn(errorMessage)
+                                    Mono.just(Responses.statusResponse(
+                                            name = "Count not found",
+                                            message = errorMessage(),
+                                            httpStatus = HttpStatus.NOT_FOUND
+                                    )
+                                    )
+                                }, {
+                                    logger.info { "Returned number of received messages: ${it.messagesCount}" }
+                                    Mono.just(
+                                            Responses.stringResponse(
+                                                    message = it.messagesCount.toString(),
+                                                    httpStatus = HttpStatus.OK
+                                            )
+                                    )
+                                })
+                    }.let(res::sendAndHandleErrors)
                 }
-                .post("/messages/all/validate") { req, res ->
+                .post("/messages/{$TOPIC_PARAM_KEY}/validate") { req, res ->
                     req
                             .receive().aggregate().asInputStream()
-                            .map {
-                                logger.info { "Processing request for message validation" }
-                                simulator.validate(it)
-                                        .map(::resolveValidationResponse)
-                            }
-                            .flatMap {
-                                res.sendAndHandleErrors(it)
+                            .map { inputStream ->
+                                doWithTopicOrReturnInternalErrorResponse(req) {
+                                    logger.info { "Processing request for message validation" }
+                                    simulator.validate(inputStream, it)
+                                            .map(::resolveValidationResponse)
+                                }
                             }
+                            .flatMap(res::sendAndHandleErrors)
                 }
                 .get("/healthcheck") { _, res ->
                     val status = HttpConstants.STATUS_OK
@@ -113,6 +114,15 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
                 }
     }
 
+    private fun doWithTopicOrReturnInternalErrorResponse(req: HttpServerRequest,
+                                                         topicConsumer: (String) -> Mono<Response>) =
+            Option.fromNullable(req.param(TOPIC_PARAM_KEY))
+                    .fold({
+                        Mono.just(
+                                stringResponse("Failed to retrieve parameter from url",
+                                        HttpStatus.INTERNAL_SERVER_ERROR))
+                    }, topicConsumer)
+
     private fun resolveValidationResponse(isValid: Boolean): Response =
             if (isValid) {
                 logger.info { "Comparison result: $VALID_RESPONSE_MESSAGE" }
@@ -124,10 +134,26 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
 
 
     companion object {
-        private const val CONTENT_TEXT = "text/plain"
+        private val logger = Logger(DcaeAppApiServer::class)
         private const val VALID_RESPONSE_MESSAGE = "validation passed"
         private const val INVALID_RESPONSE_MESSAGE = "consumed messages don't match data from validation request"
-        private val logger = Logger(DcaeAppApiServer::class)
+        private const val COUNT_NOT_RESOLVED_MESSAGE = "Error - number of messages could not be specified"
+        private const val TOPIC_PARAM_KEY = "topic"
+
+        private val responseValid by lazy {
+            Responses.statusResponse(
+                    name = "valid",
+                    message = DcaeAppApiServer.VALID_RESPONSE_MESSAGE
+            )
+        }
+
+        private val responseInvalid by lazy {
+            Responses.statusResponse(
+                    name = "invalid",
+                    message = DcaeAppApiServer.INVALID_RESPONSE_MESSAGE,
+                    httpStatus = HttpStatus.BAD_REQUEST
+            )
+        }
     }
 }
 
index 3314805..0fd3bb1 100644 (file)
@@ -21,10 +21,11 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
 
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.Consumer
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Flux
 import reactor.kafka.receiver.KafkaReceiver
 import reactor.kafka.receiver.ReceiverOptions
+import reactor.kafka.receiver.ReceiverRecord
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -32,10 +33,9 @@ import reactor.kafka.receiver.ReceiverOptions
  */
 class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
 
-    fun start() = Consumer()
-            .also { consumer ->
-                receiver.receive().map(consumer::update).subscribe()
-            }
+    fun start(): Flux<ReceiverRecord<ByteArray, ByteArray>> =
+            receiver.receive()
+                    .also { logger.info { "Started Kafka source" } }
 
     companion object {
         private val logger = Logger(KafkaSource::class)
index 725248c..a6d1edd 100644 (file)
@@ -62,6 +62,19 @@ class Consumer : ConsumerStateProvider {
 }
 
 class ConsumerFactory(private val kafkaBootstrapServers: String) {
-    fun createConsumerForTopics(kafkaTopics: Set<String>): Consumer =
-            KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start()
+    fun createConsumersForTopics(kafkaTopics: Set<String>): Map<String, Consumer> =
+            KafkaSource.create(kafkaBootstrapServers, kafkaTopics).let { kafkaSource ->
+                val topicToConsumer = kafkaTopics.associate { it to Consumer() }
+                kafkaSource.start()
+                        .map {
+                            val topic = it.topic()
+                            topicToConsumer.get(topic)?.update(it)
+                                    ?: logger.warn { "No consumer configured for topic $topic" }
+                        }.subscribe()
+                topicToConsumer
+            }
+
+    companion object {
+        private val logger = Logger(ConsumerFactory::class)
+    }
 }
index e8ac6cd..a594215 100644 (file)
@@ -67,16 +67,16 @@ internal class ConsumerTest : Spek({
     }
 })
 
-fun assertEmptyState(cut: Consumer) {
+private fun assertEmptyState(cut: Consumer) {
     assertState(cut)
 }
 
-fun assertState(cut: Consumer, vararg values: ByteArray) {
+private fun assertState(cut: Consumer, vararg values: ByteArray) {
     assertThat(cut.currentState().consumedMessages)
             .containsOnly(*values)
     assertThat(cut.currentState().messagesCount)
             .isEqualTo(values.size)
 }
 
-fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) =
+private fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) =
         ReceiverRecord(ConsumerRecord(topic, 1, 100L, key, value), null)
index 493100f..e3e61c8 100644 (file)
@@ -19,8 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 
-import arrow.core.None
-import arrow.core.Some
+import arrow.core.Right
 import com.google.protobuf.ByteString
 import com.nhaarman.mockitokotlin2.any
 import com.nhaarman.mockitokotlin2.eq
@@ -41,6 +40,7 @@ import java.lang.IllegalArgumentException
 import java.util.concurrent.ConcurrentLinkedQueue
 import kotlin.test.assertFailsWith
 
+
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since August 2018
@@ -48,82 +48,87 @@ import kotlin.test.assertFailsWith
 internal class DcaeAppSimulatorTest : Spek({
     lateinit var consumerFactory: ConsumerFactory
     lateinit var messageStreamValidation: MessageStreamValidation
-    lateinit var consumer: Consumer
+    lateinit var perf3gpp_consumer: Consumer
+    lateinit var faults_consumer: Consumer
     lateinit var cut: DcaeAppSimulator
 
     beforeEachTest {
         consumerFactory = mock()
         messageStreamValidation = mock()
-        consumer = mock()
+        perf3gpp_consumer = mock()
+        faults_consumer = mock()
         cut = DcaeAppSimulator(consumerFactory, messageStreamValidation)
 
-        whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(consumer)
+        whenever(consumerFactory.createConsumersForTopics(anySet())).thenReturn(mapOf(
+                PERF3GPP_TOPIC to perf3gpp_consumer,
+                FAULTS_TOPICS to faults_consumer))
     }
 
     fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList()))
 
     describe("listenToTopics") {
-        val topics = setOf("perf3gpp", "faults")
-
         it("should fail when topic list is empty") {
-            assertFailsWith(IllegalArgumentException::class){
+            assertFailsWith(IllegalArgumentException::class) {
                 cut.listenToTopics(setOf())
             }
         }
 
         it("should fail when topic list contains empty strings") {
-            assertFailsWith(IllegalArgumentException::class){
-                cut.listenToTopics(setOf("perf3gpp", " ", "faults"))
+            assertFailsWith(IllegalArgumentException::class) {
+                cut.listenToTopics(setOf(PERF3GPP_TOPIC, " ", FAULTS_TOPICS))
             }
         }
 
         it("should subscribe to given topics") {
-            cut.listenToTopics(topics)
-            verify(consumerFactory).createConsumerForTopics(topics)
+            cut.listenToTopics(TWO_TOPICS)
+            verify(consumerFactory).createConsumersForTopics(TWO_TOPICS)
         }
 
         it("should subscribe to given topics when called with comma separated list") {
-            cut.listenToTopics("perf3gpp,faults")
-            verify(consumerFactory).createConsumerForTopics(topics)
+            cut.listenToTopics("$PERF3GPP_TOPIC,$FAULTS_TOPICS")
+            verify(consumerFactory).createConsumersForTopics(TWO_TOPICS)
         }
     }
 
     describe("state") {
-        it("should return None when topics hasn't been initialized") {
-            assertThat(cut.state()).isEqualTo(None)
+        it("should return Left when topics hasn't been initialized") {
+            assertThat(cut.state(PERF3GPP_TOPIC).isLeft()).isTrue()
         }
 
         describe("when topics are initialized") {
             beforeEachTest {
-                cut.listenToTopics("perf3gpp")
+                cut.listenToTopics(TWO_TOPICS)
             }
 
-            it("should return some state when it has been set") {
+            it("should return state when it has been set") {
                 val state = consumerState()
-                whenever(consumer.currentState()).thenReturn(state)
+                whenever(perf3gpp_consumer.currentState()).thenReturn(state)
+                whenever(faults_consumer.currentState()).thenReturn(state)
 
-                assertThat(cut.state()).isEqualTo(Some(state))
+                assertThat(cut.state(PERF3GPP_TOPIC)).isEqualTo(Right(state))
+                assertThat(cut.state(FAULTS_TOPICS)).isEqualTo(Right(state))
             }
         }
     }
 
     describe("resetState") {
         it("should do nothing when topics hasn't been initialized") {
-            cut.resetState()
-            verify(consumer, never()).reset()
+            cut.resetState(PERF3GPP_TOPIC)
+            cut.resetState(FAULTS_TOPICS)
+            verify(perf3gpp_consumer, never()).reset()
+            verify(faults_consumer, never()).reset()
         }
 
         describe("when topics are initialized") {
             beforeEachTest {
-                cut.listenToTopics("perf3gpp")
+                cut.listenToTopics(TWO_TOPICS)
             }
 
-            it("should reset the state") {
-                // when
-                cut.resetState()
+            it("should reset the state of given topic consumer") {
+                cut.resetState(PERF3GPP_TOPIC)
 
-                // then
-                verify(consumer).reset()
+                verify(perf3gpp_consumer).reset()
+                verify(faults_consumer, never()).reset()
             }
         }
     }
@@ -135,7 +140,7 @@ internal class DcaeAppSimulatorTest : Spek({
 
         it("should use empty list when consumer is unavailable") {
             StepVerifier
-                    .create(cut.validate("['The JSON']".byteInputStream()))
+                    .create(cut.validate("['The JSON']".byteInputStream(), PERF3GPP_TOPIC))
                     .expectNext(true)
                     .verifyComplete()
 
@@ -143,22 +148,24 @@ internal class DcaeAppSimulatorTest : Spek({
         }
 
         it("should delegate to MessageStreamValidation") {
-            // given
-            cut.listenToTopics("perf3gpp")
-            whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray()))
+            cut.listenToTopics(PERF3GPP_TOPIC)
+            whenever(perf3gpp_consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray()))
 
-           StepVerifier
-                    .create(cut.validate("['The JSON']".byteInputStream()))
-                   .expectNext(true)
+            StepVerifier
+                    .create(cut.validate("['The JSON']".byteInputStream(), PERF3GPP_TOPIC))
+                    .expectNext(true)
                     .verifyComplete()
 
-            // then
             verify(messageStreamValidation).validate(any(), any())
         }
     }
 })
 
 
+private const val PERF3GPP_TOPIC = "perf3gpp"
+private const val FAULTS_TOPICS = "faults"
+private val TWO_TOPICS = setOf(PERF3GPP_TOPIC, FAULTS_TOPICS)
+
 private const val DUMMY_EVENT_ID = "aaa"
 private const val DUMMY_PAYLOAD = "payload"
 
index c5c4639..02baff3 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,8 +20,6 @@
 package org.onap.dcae.collectors.veshv.utils.http
 
 import arrow.typeclasses.Show
-import java.util.*
-import javax.json.Json
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -56,26 +54,3 @@ enum class ContentType(val value: String) {
 
 data class Response(val status: HttpStatus, val content: Content<Any>)
 data class Content<T>(val type: ContentType, val value: T, val serializer: Show<T> = Show.any())
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since September 2018
- */
-object Responses {
-
-    fun acceptedResponse(id: UUID): Response {
-        return Response(
-                HttpStatus.ACCEPTED,
-                Content(ContentType.TEXT, id)
-        )
-    }
-
-    fun statusResponse(name: String, message: String, httpStatus: HttpStatus = HttpStatus.OK): Response {
-        return Response(httpStatus,
-                Content(ContentType.JSON,
-                        Json.createObjectBuilder()
-                                .add("status", name)
-                                .add("message", message)
-                                .build()))
-    }
-}
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/responses.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/responses.kt
new file mode 100644 (file)
index 0000000..0c64232
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.http
+
+import java.util.*
+import javax.json.Json
+
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
+ */
+object Responses {
+
+    fun acceptedResponse(id: UUID) = Response(
+            HttpStatus.ACCEPTED,
+            Content(ContentType.TEXT, id)
+    )
+
+    fun statusResponse(name: String, message: String, httpStatus: HttpStatus = HttpStatus.OK) =
+            Response(httpStatus,
+                    Content(ContentType.JSON,
+                            Json.createObjectBuilder()
+                                    .add("status", name)
+                                    .add("message", message)
+                                    .build()))
+
+    fun stringResponse(message: String, httpStatus: HttpStatus = HttpStatus.OK): Response {
+        return Response(httpStatus, Content(ContentType.JSON, message))
+    }
+
+    val Success by lazy {
+        statusResponse(
+                name = "Success",
+                message = "Request processed successfuly",
+                httpStatus = HttpStatus.OK
+        )
+    }
+}