Remove dead code from VESCollector
[dcaegen2/collectors/ves.git] / src / main / java / org / onap / dcae / commonFunction / EventPublisherHash.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * org.onap.dcaegen2.collectors.ves
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright (C) 2018 Nokia. All rights reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.dcae.commonFunction;
23
24 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
25 import com.att.nsa.clock.SaClock;
26 import com.att.nsa.logging.LoggingContext;
27 import com.att.nsa.logging.log4j.EcompFields;
28 import com.google.common.annotations.VisibleForTesting;
29 import java.io.IOException;
30 import org.json.JSONObject;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 public class EventPublisherHash {
35
36     private static final String VES_UNIQUE_ID = "VESuniqueId";
37     private static final Logger log = LoggerFactory.getLogger(EventPublisherHash.class);
38     private static volatile EventPublisherHash instance = new EventPublisherHash(DmaapPublishers.create());
39     private final DmaapPublishers dmaapPublishers;
40
41     public static EventPublisherHash getInstance() {
42         return instance;
43     }
44
45     @VisibleForTesting
46     EventPublisherHash(DmaapPublishers dmaapPublishers) {
47         this.dmaapPublishers = dmaapPublishers;
48     }
49
50     void sendEvent(JSONObject event, String streamid) {
51         log.debug("EventPublisher.sendEvent: instance for publish is ready");
52         clearVesUniqueId(event);
53
54         try {
55             sendEventUsingCachedPublisher(streamid, event);
56         } catch (IOException | IllegalArgumentException e) {
57             log.error("Unable to publish event: {} streamID: {}. Exception: {}", event, streamid, e);
58             dmaapPublishers.closeByStreamId(streamid);
59         }
60     }
61
62     private void clearVesUniqueId(JSONObject event) {
63         if (event.has(VES_UNIQUE_ID)) {
64             String uuid = event.get(VES_UNIQUE_ID).toString();
65             LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
66             localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
67             log.debug("Removing VESuniqueid object from event");
68             event.remove(VES_UNIQUE_ID);
69         }
70     }
71
72     private void sendEventUsingCachedPublisher(String streamid, JSONObject event) throws IOException {
73         int pendingMsgs = dmaapPublishers.getByStreamId(streamid).send("MyPartitionKey", event.toString());
74         if (pendingMsgs > 100) {
75             log.info("Pending Message Count=" + pendingMsgs);
76         }
77         log.info("pub.send invoked - no error");
78         CommonStartup.oplog.info(String.format("StreamID:%s Event Published:%s ", streamid, event));
79     }
80
81     @VisibleForTesting
82     CambriaBatchingPublisher getDmaapPublisher(String streamId) {
83         return dmaapPublishers.getByStreamId(streamId);
84     }
85 }