Fix locally failing TCs in catalog-be
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / catalog / impl / DmaapProducer.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.catalog.impl;
22
23 import com.att.nsa.mr.client.MRBatchingPublisher;
24 import com.fasterxml.jackson.databind.ObjectMapper;
25 import javax.annotation.PostConstruct;
26 import javax.annotation.PreDestroy;
27 import org.openecomp.sdc.be.catalog.api.IMessageQueueHandlerProducer;
28 import org.openecomp.sdc.be.catalog.api.IStatus;
29 import org.openecomp.sdc.be.catalog.api.ITypeMessage;
30 import org.openecomp.sdc.be.catalog.enums.ResultStatusEnum;
31 import org.openecomp.sdc.be.components.distribution.engine.DmaapClientFactory;
32 import org.openecomp.sdc.be.config.ConfigurationManager;
33 import org.openecomp.sdc.be.config.DmaapProducerConfiguration;
34 import org.openecomp.sdc.common.log.enums.EcompLoggerErrorCode;
35 import org.openecomp.sdc.common.log.enums.StatusCode;
36 import org.openecomp.sdc.common.log.wrappers.Logger;
37 import org.springframework.stereotype.Component;
38
39 @Component
40 public class DmaapProducer implements IMessageQueueHandlerProducer {
41
42     private static final Logger LOG = Logger.getLogger(DmaapProducer.class.getName());
43     private final DmaapClientFactory dmaapClientFactory;
44     private final DmaapProducerHealth dmaapHealth;
45     private final ConfigurationManager configurationManager = ConfigurationManager.getConfigurationManager();
46     private MRBatchingPublisher publisher;
47
48     public DmaapProducer(final DmaapClientFactory dmaapClientFactory,
49                          final DmaapProducerHealth dmaapHealth) {
50         this.dmaapClientFactory = dmaapClientFactory;
51         this.dmaapHealth = dmaapHealth;
52     }
53
54     @Override
55     public IStatus pushMessage(ITypeMessage message) {
56         try {
57             DmaapProducerConfiguration producerConfiguration = configurationManager.getConfiguration()
58                 .getDmaapProducerConfiguration();
59             if (!producerConfiguration.getActive()) {
60                 LOG.info(
61                     "[Microservice DMAAP] producer is disabled [re-enable in configuration->isActive],message not sent.");
62                 dmaapHealth.report(false);
63                 return IStatus.getServiceDisabled();
64             }
65             if (publisher == null) {
66                 IStatus initStatus = init();
67                 if (initStatus.getResultStatus() != ResultStatusEnum.SUCCESS) {
68
69                     return initStatus;
70                 }
71             }
72             ObjectMapper mapper = new ObjectMapper();
73             String jsonInString = mapper.writeValueAsString(message);
74             if (publisher != null) {
75                 LOG.info("before send message . response {}", jsonInString);
76
77                 LOG.invoke("Dmaap Producer", "DmaapProducer-pushMessage", DmaapProducer.class.getName(),
78                     message.toString());
79
80                 int pendingMsg = publisher.send(jsonInString);
81                 LOG.info("sent message . response {}", pendingMsg);
82                 LOG.invokeReturn(producerConfiguration.getConsumerId(), "Dmaap Producer",
83                     StatusCode.COMPLETE.getStatusCode(), "DmaapProducer-pushMessage", message.toString(), pendingMsg);
84
85             }
86
87             dmaapHealth.report(true);
88         } catch (Exception e) {
89             LOG.error(EcompLoggerErrorCode.BUSINESS_PROCESS_ERROR, "Failed to send message . Exception {}", e.getMessage());
90             return IStatus.getFailStatus();
91         }
92
93         return IStatus.getSuccessStatus();
94     }
95
96     @PostConstruct
97     @Override
98     public IStatus init() {
99         LOG.debug("MessageQueueHandlerProducer:: Start initializing");
100         DmaapProducerConfiguration configuration = configurationManager.getConfiguration()
101             .getDmaapProducerConfiguration();
102         if (configuration.getActive()) {
103             try {
104                 publisher = dmaapClientFactory.createProducer(configuration);
105                 if (publisher == null) {
106                     LOG.error("Failed to connect to topic ");
107                     dmaapHealth.report(false);
108                     return IStatus.getFailStatus();
109                 }
110
111             } catch (Exception e) {
112                 LOG.error("Failed to connect to topic . Exeption {}", e.getMessage());
113                 dmaapHealth.report(false);
114                 return IStatus.getFailStatus();
115             }
116             dmaapHealth.report(true);
117             return IStatus.getSuccessStatus();
118         }
119         LOG.info("[Microservice DMAAP] producer is disabled [re-enable in configuration->isActive],message not sent.");
120         dmaapHealth.report(false);
121         return IStatus.getServiceDisabled();
122     }
123
124     @PreDestroy
125     public void shutdown() {
126         LOG.debug("DmaapProducer::shutdown...");
127         try {
128             if (publisher != null) {
129                 publisher.close();
130             }
131         } catch (Exception e) {
132             LOG.error("Failed to close  messageQ . Exeption {}", e.getMessage());
133
134         }
135
136     }
137
138 }