2b4cfc15a89924099a5222a9c387de69a334b465
[dcaegen2/collectors/ves.git] / src / main / java / org / onap / dcae / common / publishing / DMaaPEventPublisher.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * org.onap.dcaegen2.collectors.ves
4  * ================================================================================
5  * Copyright (C) 2017,2020 AT&T Intellectual Property. All rights reserved.
6  * Copyright (C) 2018,2020 Nokia. All rights reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.dcae.common.publishing;
23
24 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
25 import com.att.nsa.clock.SaClock;
26 import com.att.nsa.logging.LoggingContext;
27 import com.att.nsa.logging.log4j.EcompFields;
28 import io.vavr.collection.Map;
29 import io.vavr.control.Try;
30 import org.onap.dcae.common.VESLogger;
31 import org.onap.dcae.common.model.VesEvent;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 import java.io.IOException;
36
37 import static org.onap.dcae.common.publishing.VavrUtils.f;
38
39 /**
40  * @author Pawel Szalapski (pawel.szalapski@nokia.com)
41  */
42 public class DMaaPEventPublisher {
43     private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100;
44     private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class);
45     private DMaaPPublishersCache publishersCache;
46     private final Logger outputLogger = LoggerFactory.getLogger("org.onap.dcae.common.output");
47
48     DMaaPEventPublisher(DMaaPPublishersCache publishersCache) {
49         this.publishersCache = publishersCache;
50     }
51
52     public DMaaPEventPublisher(Map<String, PublisherConfig> dMaaPConfig) {
53         this(new DMaaPPublishersCache(dMaaPConfig));
54     }
55
56     /**
57      * Reload Dmaap configuration
58      * @param dmaapConfiguration Dmaap configuration
59      */
60     public void reload(Map<String, PublisherConfig> dmaapConfiguration){
61         this.publishersCache = new DMaaPPublishersCache(dmaapConfiguration);
62     }
63
64     public void sendEvent(VesEvent vesEvent, String dmaapId){
65         clearVesUniqueIdFromEvent(vesEvent);
66         publishersCache.getPublisher(dmaapId)
67                 .onEmpty(() ->
68                         log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", dmaapId, vesEvent)))
69                 .forEach(publisher -> sendEvent(vesEvent, dmaapId, publisher));
70     }
71
72     private void sendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher) {
73         Try.run(() -> uncheckedSendEvent(event, dmaapId, publisher))
74                 .onFailure(exc -> closePublisher(event, dmaapId, exc));
75     }
76
77     private void uncheckedSendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher)
78             throws IOException {
79
80         String pk = event.getPK();
81         int pendingMsgs = publisher.send(pk, event.asJsonObject().toString());
82         if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) {
83             log.info("Pending messages count: " + pendingMsgs);
84         }
85         String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, dmaapId);
86         log.info(infoMsg);
87         outputLogger.info(infoMsg);
88     }
89
90     private void closePublisher(VesEvent event, String dmaapId, Throwable e) {
91         log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.",
92                 event, dmaapId), e);
93         publishersCache.closePublisherFor(dmaapId);
94     }
95
96     private void clearVesUniqueIdFromEvent(VesEvent event) {
97         if (event.hasType(VesEvent.VES_UNIQUE_ID)) {
98             String uuid = event.getUniqueId().toString();
99             LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
100             localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
101             log.debug("Removing VESuniqueid object from event");
102             event.removeElement(VesEvent.VES_UNIQUE_ID);
103         }
104     }
105 }