1 package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
2 /*******************************************************************************
3 * ============LICENSE_START========================================================================
4 * ONAP : ccsdk feature sdnr wt
5 * =================================================================================================
6 * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
7 * =================================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
9 * in compliance with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software distributed under the License
14 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
15 * or implied. See the License for the specific language governing permissions and limitations under
17 * ============LICENSE_END==========================================================================
18 ******************************************************************************/
20 import java.io.IOException;
21 import java.util.LinkedList;
22 import java.util.List;
23 import java.util.Properties;
24 import java.util.concurrent.TimeUnit;
26 import org.json.JSONObject;
27 import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
28 import org.onap.dmaap.mr.client.MRBatchingPublisher;
29 import org.onap.dmaap.mr.client.MRClientFactory;
30 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
35 public class MountpointStatePublisher implements Runnable {
37 private static final Logger LOG = LoggerFactory.getLogger(MountpointStatePublisher.class);
38 public static final List<JSONObject> stateObjects = new LinkedList<JSONObject>();
39 static MRBatchingPublisher pub;
40 Properties publisherProperties = new Properties();
41 static boolean closePublisher = false; //Set this to true in the "Close" method of MountpointStateProviderImpl
42 private int fetchPause = 5000; // Default pause between fetch - 5 seconds
45 public MountpointStatePublisher(Configuration config) {
49 public void initialize(Configuration config) {
50 LOG.info("In initializePublisher method of MountpointStatePublisher");
51 GeneralConfig generalCfg = (GeneralConfig)config;
53 publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE, generalCfg.getTransportType());
54 publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_HOST_PORT, generalCfg.getHostPort());
55 publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_CONTENTTYPE, generalCfg.getContenttype());
56 publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TOPIC, generalCfg.getTopic());
57 publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE, generalCfg.getMaxBatchSize());
58 publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXAGEMS, generalCfg.getMaxAgeMs());
59 publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE, generalCfg.getMessageSentThreadOccurrence());
61 createPublisher(publisherProperties);
64 public MRBatchingPublisher createPublisher(Properties publisherProperties) {
67 pub = MRClientFactory.createBatchingPublisher(publisherProperties, false);
69 } catch (IOException e) {
70 LOG.info("Exception while creating a publisher", e);
76 public void publishMessage(MRBatchingPublisher pub, String msg) {
77 LOG.info("Publishing message {} - ", msg);
80 } catch (IOException e) {
81 LOG.info("Exception while publishing a mesage ", e);
85 public MRBatchingPublisher getPublisher() {
91 while (!closePublisher) {
93 if (stateObjects.size() > 0) {
94 JSONObject obj = ((LinkedList<JSONObject>) stateObjects).removeFirst();
95 publishMessage(getPublisher(), obj.toString());
99 } catch(Exception ex) {
100 LOG.error("Exception while publishing message, ignoring and continuing ... ", ex);
103 MRPublisherResponse res= pub.sendBatchWithResponse(); // As per dmaap-client code understanding, this need not be called but for some reason the messages are not pushed unless this is called
104 LOG.debug("Response message = {} ",res.toString());
108 private void pauseThread() throws InterruptedException {
109 if (fetchPause > 0) {
110 LOG.debug("No data yet to publish. Pausing {} ms before retry ", fetchPause);
111 Thread.sleep(fetchPause);
113 LOG.debug("No data yet to publish. No fetch pause specified - retrying immediately");
117 public static void stopPublisher() throws IOException, InterruptedException {
118 closePublisher = true;
119 pub.close(100, TimeUnit.MILLISECONDS); // Send any remaining messages and close