2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019 Huawei Technologies Co., Ltd. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.ccsdk.apps.blueprintsprocessor.dmaap
23 import com.att.nsa.mr.client.MRBatchingPublisher
24 import com.att.nsa.mr.client.MRClientFactory
25 import com.att.nsa.mr.client.MRPublisher
26 import org.slf4j.LoggerFactory
27 import org.springframework.boot.context.properties.bind.Binder
28 import org.springframework.boot.context.properties.source.ConfigurationPropertySources
29 import org.springframework.context.annotation.Configuration
30 import org.springframework.context.annotation.PropertySource
31 import org.springframework.context.annotation.PropertySources
32 import org.springframework.core.env.ConfigurableEnvironment
33 import org.springframework.core.env.Environment
34 import org.springframework.core.io.support.ResourcePropertySource
35 import java.io.IOException
36 import java.util.Properties
37 import java.util.concurrent.TimeUnit
40 * Representation of DMaap event publisher, to create a session with the
41 * message router and send messages when asked for. The producer.properties
42 * is used for creating a session. In order to overwrite the parameters such
43 * as host, topic, username and password, the event.properties can be used.
45 * compName : Name of the component appended in the event.properties file
47 * (E.g., so.topic=cds_so : In this "so" is the component name)
50 @PropertySources(PropertySource("classpath:event.properties",
51 "classpath:producer.properties"))
52 open class DmaapEventPublisher(compName: String = ""): EventPublisher {
55 * Static variable for logging.
58 var log = LoggerFactory.getLogger(DmaapEventPublisher::class.java)!!
62 * The component name used in defining the event.properties file.
64 private var cName:String? = null
67 * List of topics for a given message to be sent.
69 var topics = mutableListOf<String>()
72 * List of clients formed for the list of topics where the messages has to
75 var clients = mutableListOf<MRBatchingPublisher>()
78 * The populated values from producer.properties which are overwritten
79 * by the event.properties values according to the component information.
81 var prodProps: Properties = Properties()
89 * Loads the producer.properties file and populates all the parameters
90 * and then loads the event.properties file and populates the finalized
91 * parameters such as host, topic, username and password if available for
92 * the specified component. With this updated producer.properties, for
93 * each topic a client will be created.
95 private fun loadPropertiesInfo() {
96 if (prodProps.isEmpty) {
97 parseEventProps(cName!!)
103 * Adds clients for each topic into a client list.
105 private fun addClients() {
106 for (topic in topics) {
107 prodProps.setProperty("topic", topic)
108 val client = MRClientFactory.createBatchingPublisher(prodProps)
114 * Parses the event.properties file and update it into the producer
115 * .properties, where both the files are loaded and stored.
117 private fun parseEventProps(cName: String) {
118 val env = EnvironmentContext.env as Environment
119 val propSrc = ConfigurationPropertySources.get(env)
120 val proProps = (env as ConfigurableEnvironment).propertySources.get(
121 "class path resource [producer.properties]")
123 if (proProps != null) {
124 val entries = (proProps as ResourcePropertySource).source.entries
126 prodProps.put(e.key, e.value)
129 log.info("Unable to load the producer.properties file")
132 val eProps = Binder(propSrc).bind(cName, Properties::class.java).get()
133 val top = eProps.get("topic").toString()
135 topics.addAll(top.split(","))
137 prodProps.putAll(eProps)
141 * Sends message to the sessions created by the information provided in
142 * the producer.properties file.
144 override fun sendMessage(partition: String , messages: Collection<String>):
148 val dmaapMsgs = mutableListOf<MRPublisher.message>()
149 for (m in messages) {
150 dmaapMsgs.add(MRPublisher.message(partition, m))
152 for (client in clients) {
153 log.info("Sending messages to the DMaap Server")
155 client.send(dmaapMsgs)
156 } catch (e: IOException) {
157 log.error(e.message, e)
165 * Closes the opened session that was used for sending messages.
167 override fun close(timeout: Long) {
168 log.debug("Closing the DMaap producer clients")
169 if (!clients.isEmpty()) {
170 for (client in clients) {
172 client.close(timeout, TimeUnit.SECONDS)
173 } catch (e : IOException) {
174 log.warn("Unable to cleanly close the connection from " +
175 "the client $client", e)