2 * ============LICENSE_START=======================================================
3 * org.onap.dcaegen2.collectors.ves
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright (C) 2018 Nokia. All rights reserved.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.dcae.commonFunction;
24 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
25 import com.att.nsa.clock.SaClock;
26 import com.att.nsa.logging.LoggingContext;
27 import com.att.nsa.logging.log4j.EcompFields;
28 import com.google.common.annotations.VisibleForTesting;
29 import java.io.IOException;
30 import org.json.JSONObject;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
34 public class EventPublisherHash {
36 private static final String VES_UNIQUE_ID = "VESuniqueId";
37 private static final Logger log = LoggerFactory.getLogger(EventPublisherHash.class);
38 private static volatile EventPublisherHash instance = new EventPublisherHash(DmaapPublishers.create());
39 private final DmaapPublishers dmaapPublishers;
41 public static EventPublisherHash getInstance() {
46 EventPublisherHash(DmaapPublishers dmaapPublishers) {
47 this.dmaapPublishers = dmaapPublishers;
50 void sendEvent(JSONObject event, String streamid) {
51 log.debug("EventPublisher.sendEvent: instance for publish is ready");
52 clearVesUniqueId(event);
55 sendEventUsingCachedPublisher(streamid, event);
56 } catch (IOException | IllegalArgumentException e) {
57 log.error("Unable to publish event: {} streamID: {}. Exception: {}", event, streamid, e);
58 dmaapPublishers.closeByStreamId(streamid);
62 private void clearVesUniqueId(JSONObject event) {
63 if (event.has(VES_UNIQUE_ID)) {
64 String uuid = event.get(VES_UNIQUE_ID).toString();
65 LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
66 localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
67 log.debug("Removing VESuniqueid object from event");
68 event.remove(VES_UNIQUE_ID);
72 private void sendEventUsingCachedPublisher(String streamid, JSONObject event) throws IOException {
73 int pendingMsgs = dmaapPublishers.getByStreamId(streamid).send("MyPartitionKey", event.toString());
74 if (pendingMsgs > 100) {
75 log.info("Pending Message Count=" + pendingMsgs);
77 log.info("pub.send invoked - no error");
78 CommonStartup.oplog.info(String.format("StreamID:%s Event Published:%s ", streamid, event));
82 CambriaBatchingPublisher getDmaapPublisher(String streamId) {
83 return dmaapPublishers.getByStreamId(streamId);