Migrate "ms/controllerblueprints" from ccsdk/apps
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / dmaap-lib / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / dmaap / BluePrintDmaapClientService.kt
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - CDS
4  * ================================================================================
5  * Copyright (C) 2019 Huawei Technologies Co., Ltd. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.ccsdk.cds.blueprintsprocessor.dmaap
22
23 import com.att.nsa.mr.client.MRBatchingPublisher
24 import com.att.nsa.mr.client.MRPublisher
25 import org.slf4j.LoggerFactory
26 import java.io.IOException
27 import java.util.concurrent.TimeUnit
28
29
30 /**
31  * Abstraction of DMAAP client services that could form DMAAP client from the
32  * properties provided. This abstraction also provides a mechanism to send
33  * messages with the given partition in a session and closing the same.
34  */
35 interface BluePrintDmaapClientService {
36
37     /**
38      * Static variable for logging.
39      */
40     companion object {
41         var log = LoggerFactory.getLogger(
42             BluePrintDmaapClientService::class.java)!!
43     }
44
45     /**
46      * Returns the properly constructed DMAAP client with the type.
47      */
48     fun  getDmaapClient(): MutableList<MRBatchingPublisher>
49
50     /**
51      * Sends messages to the sessions created by the information provided from
52      * application.properties and event.properties file
53      */
54     fun sendMessage(msgs: Collection<String>): Boolean {
55         var success = true
56         val clients = getDmaapClient()
57         val dmaapMsgs = mutableListOf<MRPublisher.message>()
58         for (m in msgs) {
59             dmaapMsgs.add(MRPublisher.message("1", m))
60         }
61         log.info("Sending messages to the DMAAP Server")
62         for (client in clients) {
63             try {
64                 client.send(dmaapMsgs)
65             } catch (e: IOException) {
66                 success = false
67                 log.error(e.message, e)
68             }
69         }
70         return success
71     }
72
73     /**
74      * Sends message to the sessions created by the information provided from
75      * application.properties and event.properties file
76      */
77     fun sendMessage(msg: String): Boolean {
78         val msgs = mutableListOf<String>()
79         msgs.add(msg)
80         return sendMessage(msgs)
81     }
82
83     /**
84      * Closes the opened session that was used for sending messages.
85      */
86     fun close(timeout: Long): MutableList<MutableList<MRPublisher.message>>? {
87         log.debug("Closing the DMAAP producer clients")
88         var msgs: MutableList<MutableList<MRPublisher.message>> =
89             mutableListOf()
90         val clients = getDmaapClient()
91         for (client in clients) {
92             try {
93                 var ms = client.close(timeout, TimeUnit.SECONDS)
94                 msgs.add(ms)
95             } catch (e: IOException) {
96                 log.warn("Unable to cleanly close the connection from the " +
97                         "client $client", e)
98             }
99         }
100         return msgs
101     }
102 }