e430a5e247ad747a52b6784c8875870b94e3dbd3
[ccsdk/features.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP : ccsdk features
4  * ================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property.
6  * All rights reserved.
7  * ================================================================================
8  * Update Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *     http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=======================================================
22  *
23  */
24 package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
25
26 import java.util.LinkedList;
27 import java.util.List;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.json.JSONObject;
30 import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.VESCollectorCfgService;
31 import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.VESCollectorService;
32 import org.onap.ccsdk.features.sdnr.wt.devicemanager.types.VESMessage;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 public class MountpointStatePublisher implements Runnable {
37
38     private static final Logger LOG = LoggerFactory.getLogger(MountpointStatePublisher.class);
39     private List<JSONObject> stateObjects = new LinkedList<>();
40     private boolean publish = true;
41     private int publishPause = 5000; // Default pause between fetch - 5 seconds
42     private VESCollectorService vesCollectorService;
43
44     public MountpointStatePublisher(@NonNull VESCollectorService vesCollectorService) {
45         this.vesCollectorService = vesCollectorService;
46     }
47
48     public void addToPublish(JSONObject publishObj) {
49         getStateObjects().add(publishObj);
50     }
51
52     public synchronized List<JSONObject> getStateObjects() {
53         return stateObjects;
54     }
55
56     public void stop() {
57         publish = false;
58     }
59
60     private void pauseThread() throws InterruptedException {
61         if (publishPause > 0) {
62             LOG.debug("No data yet to publish.  Pausing {} ms before retry ", publishPause);
63             Thread.sleep(publishPause);
64         } else {
65             LOG.debug("No data yet to publish. No publish pause specified - retrying immediately");
66         }
67     }
68
69
70     public VESMessage createVESMessage(JSONObject msg, VESCollectorCfgService vesCfg) {
71         MountpointStateVESMessageFormatter vesFormatter = new MountpointStateVESMessageFormatter(vesCfg, vesCollectorService);
72         return vesFormatter.createVESMessage(msg);
73     }
74
75     @Override
76     public void run() {
77         while (publish) {
78             try {
79                 if (!getStateObjects().isEmpty()) {
80                     JSONObject obj = ((LinkedList<JSONObject>) getStateObjects()).removeFirst();
81                     VESMessage vesMsg = createVESMessage(obj, vesCollectorService.getConfig());
82                     this.vesCollectorService.publishVESMessage(vesMsg);
83                 } else {
84                     pauseThread();
85                 }
86             } catch (Exception ex) {
87                 LOG.error("Exception while publishing message, ignoring and continuing ... ", ex);
88             }
89         }
90     }
91 }