972c251e60b780075e4bfd39a476b67203de61b8
[ccsdk/features.git] /
1 package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
2 /*******************************************************************************
3  * ============LICENSE_START========================================================================
4  * ONAP : ccsdk feature sdnr wt
5  * =================================================================================================
6  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
7  * =================================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
9  * in compliance with the License. You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the License
14  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
15  * or implied. See the License for the specific language governing permissions and limitations under
16  * the License.
17  * ============LICENSE_END==========================================================================
18  ******************************************************************************/
19
20 import java.io.IOException;
21 import java.util.LinkedList;
22 import java.util.List;
23 import java.util.Properties;
24 import java.util.concurrent.TimeUnit;
25
26 import org.json.JSONObject;
27 import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
28 import org.onap.dmaap.mr.client.MRBatchingPublisher;
29 import org.onap.dmaap.mr.client.MRClientFactory;
30 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34
35 public class MountpointStatePublisher implements Runnable {
36
37         private static final Logger LOG = LoggerFactory.getLogger(MountpointStatePublisher.class);
38         public static final List<JSONObject> stateObjects = new LinkedList<JSONObject>();
39         static MRBatchingPublisher pub;
40         Properties publisherProperties = new Properties();
41         static boolean closePublisher = false;  //Set this to true in the "Close" method of MountpointStateProviderImpl
42         private int fetchPause = 5000; // Default pause between fetch - 5 seconds
43
44
45         public MountpointStatePublisher(Configuration config) {
46                 initialize(config);
47         }
48
49         public void initialize(Configuration config) {
50                 LOG.info("In initializePublisher method of MountpointStatePublisher");
51                 GeneralConfig generalCfg = (GeneralConfig)config;
52
53                 publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE, generalCfg.getTransportType());
54                 publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_HOST_PORT, generalCfg.getHostPort());
55                 publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_CONTENTTYPE, generalCfg.getContenttype());
56                 publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TOPIC, generalCfg.getTopic());
57                 publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE, generalCfg.getMaxBatchSize());
58                 publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXAGEMS, generalCfg.getMaxAgeMs());
59                 publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE, generalCfg.getMessageSentThreadOccurrence());
60
61                 createPublisher(publisherProperties);
62         }
63
64         public MRBatchingPublisher createPublisher(Properties publisherProperties) {
65
66                 try {
67                         pub = MRClientFactory.createBatchingPublisher(publisherProperties, false);
68                         return pub;
69                 } catch (IOException e) {
70                         LOG.info("Exception while creating a publisher", e);
71
72                 }
73                 return null;
74         }
75
76         public void publishMessage(MRBatchingPublisher pub, String msg) {
77                 LOG.info("Publishing message {} - ", msg);
78                 try {
79                         pub.send(msg);
80                 } catch (IOException e) {
81                         LOG.info("Exception while publishing a mesage ", e);
82                 }
83         }
84
85         public MRBatchingPublisher getPublisher() {
86                 return pub;
87         }
88
89         public void run() {
90
91                 while (!closePublisher) {
92                         try {
93                                 if (stateObjects.size() > 0) {
94                                         JSONObject obj = ((LinkedList<JSONObject>) stateObjects).removeFirst();
95                                         publishMessage(getPublisher(), obj.toString());
96                                 } else {
97                                         pauseThread();
98                                 }
99                         } catch(Exception ex) {
100                                 LOG.error("Exception while publishing message, ignoring and continuing ... ", ex);
101                         }
102
103                         MRPublisherResponse res= pub.sendBatchWithResponse(); // As per dmaap-client code understanding, this need not be called but for some reason the messages are not pushed unless this is called
104                         LOG.debug("Response message = {} ",res.toString());
105                 }
106         }
107
108         private void pauseThread() throws InterruptedException {
109                 if (fetchPause > 0) {
110                         LOG.debug("No data yet to publish.  Pausing {} ms before retry ", fetchPause);
111                         Thread.sleep(fetchPause);
112                 } else {
113                         LOG.debug("No data yet to publish. No fetch pause specified - retrying immediately");
114                 }
115         }
116
117         public static void stopPublisher() throws IOException, InterruptedException {
118                 closePublisher = true;
119                 pub.close(100, TimeUnit.MILLISECONDS); // Send any remaining messages and close
120         }
121 }