7c686f089f8bc7b4995e216aea41ef9bfc8f95d5
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / dmaap-lib / src / main / kotlin / org / onap / ccsdk / apps / blueprintsprocessor / dmaap / DmaapEventPublisher.kt
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - CDS
4  * ================================================================================
5  * Copyright (C) 2019 Huawei Technologies Co., Ltd. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.ccsdk.apps.blueprintsprocessor.dmaap
22
23 import com.att.nsa.mr.client.MRBatchingPublisher
24 import com.att.nsa.mr.client.MRClientFactory
25 import com.att.nsa.mr.client.MRPublisher
26 import org.slf4j.LoggerFactory
27 import org.springframework.boot.context.properties.bind.Binder
28 import org.springframework.boot.context.properties.source.ConfigurationPropertySources
29 import org.springframework.context.annotation.Configuration
30 import org.springframework.context.annotation.PropertySource
31 import org.springframework.context.annotation.PropertySources
32 import org.springframework.core.env.ConfigurableEnvironment
33 import org.springframework.core.env.Environment
34 import org.springframework.core.io.support.ResourcePropertySource
35 import java.io.IOException
36 import java.util.Properties
37 import java.util.concurrent.TimeUnit
38
39 /**
40  * Representation of DMaap event publisher, to create a session with the
41  * message router and send messages when asked for. The producer.properties
42  * is used for creating a session. In order to overwrite the parameters such
43  * as host, topic, username and password, the event.properties can be used.
44  *
45  * compName : Name of the component appended in the event.properties file
46  * to overwrite.
47  * (E.g., so.topic=cds_so : In this "so" is the component name)
48  */
49 @Configuration
50 @PropertySources(PropertySource("classpath:event.properties",
51         "classpath:producer.properties"))
52 open class DmaapEventPublisher(compName: String = ""): EventPublisher {
53
54     /**
55      * Static variable for logging.
56      */
57     companion object {
58         var log = LoggerFactory.getLogger(DmaapEventPublisher::class.java)!!
59     }
60
61     /**
62      * The component name used in defining the event.properties file.
63      */
64     private var cName:String? = null
65
66     /**
67      * List of topics for a given message to be sent.
68      */
69     var topics = mutableListOf<String>()
70
71     /**
72      * List of clients formed for the list of topics where the messages has to
73      * be sent.
74      */
75     var clients = mutableListOf<MRBatchingPublisher>()
76
77     /**
78      * The populated values from producer.properties which are overwritten
79      * by the event.properties values according to the component information.
80      */
81     var prodProps: Properties = Properties()
82
83
84     init {
85         cName = compName
86     }
87
88     /**
89      * Loads the producer.properties file and populates all the parameters
90      * and then loads the event.properties file and populates the finalized
91      * parameters such as host, topic, username and password if available for
92      * the specified component. With this updated producer.properties, for
93      * each topic a client will be created.
94      */
95     private fun loadPropertiesInfo() {
96         if (prodProps.isEmpty) {
97             parseEventProps(cName!!)
98             addClients()
99         }
100     }
101
102     /**
103      * Adds clients for each topic into a client list.
104      */
105     private fun addClients() {
106         for (topic in topics) {
107             prodProps.setProperty("topic", topic)
108             val client = MRClientFactory.createBatchingPublisher(prodProps)
109             clients.add(client)
110         }
111     }
112
113     /**
114      * Parses the event.properties file and update it into the producer
115      * .properties, where both the files are loaded and stored.
116      */
117     private fun parseEventProps(cName: String) {
118         val env = EnvironmentContext.env as Environment
119         val propSrc = ConfigurationPropertySources.get(env)
120         val proProps = (env as ConfigurableEnvironment).propertySources.get(
121                 "class path resource [producer.properties]")
122
123         if (proProps != null) {
124             val entries = (proProps as ResourcePropertySource).source.entries
125             for (e in entries) {
126                 prodProps.put(e.key, e.value)
127             }
128         } else {
129             log.info("Unable to load the producer.properties file")
130         }
131
132         val eProps = Binder(propSrc).bind(cName, Properties::class.java).get()
133         val top = eProps.get("topic").toString()
134         if (top != "") {
135             topics.addAll(top.split(","))
136         }
137         prodProps.putAll(eProps)
138     }
139
140     /**
141      * Sends message to the sessions created by the information provided in
142      * the producer.properties file.
143      */
144     override fun sendMessage(partition: String , messages: Collection<String>):
145             Boolean {
146         loadPropertiesInfo()
147         var success = true
148         val dmaapMsgs = mutableListOf<MRPublisher.message>()
149         for (m in messages) {
150             dmaapMsgs.add(MRPublisher.message(partition, m))
151         }
152         for (client in clients) {
153             log.info("Sending messages to the DMaap Server")
154             try {
155                 client.send(dmaapMsgs)
156             } catch (e: IOException) {
157                 log.error(e.message, e)
158                 success = false
159             }
160         }
161         return success
162     }
163
164     /**
165      * Closes the opened session that was used for sending messages.
166      */
167     override fun close(timeout: Long) {
168         log.debug("Closing the DMaap producer clients")
169         if (!clients.isEmpty()) {
170             for (client in clients) {
171                 try {
172                     client.close(timeout, TimeUnit.SECONDS)
173                 } catch (e : IOException) {
174                     log.warn("Unable to cleanly close the connection from " +
175                             "the client $client", e)
176                 }
177             }
178         }
179     }
180     
181 }