Catalog alignment
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / catalog / impl / DmaapProducer.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.catalog.impl;
22
23 import com.att.nsa.mr.client.MRBatchingPublisher;
24 import com.fasterxml.jackson.databind.ObjectMapper;
25 import org.openecomp.sdc.be.catalog.api.IMessageQueueHandlerProducer;
26 import org.openecomp.sdc.be.catalog.api.IStatus;
27 import org.openecomp.sdc.be.catalog.api.ITypeMessage;
28 import org.openecomp.sdc.be.catalog.enums.ResultStatusEnum;
29 import org.openecomp.sdc.be.components.distribution.engine.DmaapClientFactory;
30 import org.openecomp.sdc.be.config.ConfigurationManager;
31 import org.openecomp.sdc.be.config.DmaapProducerConfiguration;
32 import org.openecomp.sdc.common.log.enums.StatusCode;
33 import org.openecomp.sdc.common.log.wrappers.Logger;
34 import org.springframework.beans.factory.annotation.Autowired;
35 import org.springframework.stereotype.Component;
36
37 import javax.annotation.PostConstruct;
38 import javax.annotation.PreDestroy;
39
40 @Component
41 public class DmaapProducer implements IMessageQueueHandlerProducer {
42         private static final Logger LOG = Logger.getLogger(DmaapProducer.class.getName());
43         private static final Logger metricLog = Logger.getLogger(DmaapProducer.class.getName());
44
45         @Autowired
46         private DmaapClientFactory dmaapClientFactory;
47         private ConfigurationManager configurationManager = ConfigurationManager.getConfigurationManager();
48         private MRBatchingPublisher publisher;
49         @Autowired
50         private DmaapProducerHealth dmaapHealth;
51
52         public MRBatchingPublisher getPublisher() {
53                 return publisher;
54         }
55
56         @Override
57         public IStatus pushMessage(ITypeMessage message) {
58                 try {
59                         DmaapProducerConfiguration producerConfiguration = configurationManager.getConfiguration()
60                                         .getDmaapProducerConfiguration();
61                         if (!producerConfiguration.getActive()) {
62                                 LOG.info(
63                                                 "[Microservice DMAAP] producer is disabled [re-enable in configuration->isActive],message not sent.");
64                                 dmaapHealth.report(false);
65                                 return IStatus.getServiceDisabled();
66                         }
67                         if (publisher == null) {
68                                 IStatus initStatus = init();
69                                 if (initStatus.getResultStatus() != ResultStatusEnum.SUCCESS) {
70
71                                         return initStatus;
72                                 }
73                         }
74                         ObjectMapper mapper = new ObjectMapper();
75                         String jsonInString = mapper.writeValueAsString(message);
76                         if (publisher != null) {
77                             LOG.info("before send message . response {}", jsonInString);
78
79                                 LOG.invoke("Dmaap Producer", "DmaapProducer-pushMessage", DmaapProducer.class.getName(), message.toString());
80
81                                 int pendingMsg = publisher.send(jsonInString);
82                                 LOG.info("sent message . response {}", pendingMsg);
83                                 LOG.invokeReturn(producerConfiguration.getConsumerId(), "Dmaap Producer", StatusCode.COMPLETE.getStatusCodeEnum(), "DmaapProducer-pushMessage",message.toString(), pendingMsg );
84
85                         }
86
87                         
88                         
89                         dmaapHealth.report(true);
90                 } catch (Exception e) {
91                         LOG.error("Failed to send message . Exception {}", e.getMessage());
92                         return IStatus.getFailStatus();
93                 }
94
95                 return IStatus.getSuccessStatus();
96         }
97
98         @PostConstruct
99         @Override
100         public IStatus init() {
101                 LOG.debug("MessageQueueHandlerProducer:: Start initializing");
102                 DmaapProducerConfiguration configuration = configurationManager.getConfiguration()
103                                 .getDmaapProducerConfiguration();
104                 if (configuration.getActive()) {
105                         try {
106                                 publisher = dmaapClientFactory.createProducer(configuration);
107                                 if (publisher == null) {
108                                         LOG.error("Failed to connect to topic ");
109                                         dmaapHealth.report(false);
110                                         return IStatus.getFailStatus();
111                                 }
112
113                         } catch (Exception e) {
114                                 LOG.error("Failed to connect to topic . Exeption {}", e.getMessage());
115                                 dmaapHealth.report(false);
116                                 return IStatus.getFailStatus();
117                         }
118                         dmaapHealth.report(true);
119                         return IStatus.getSuccessStatus();
120                 }
121                 LOG.info("[Microservice DMAAP] producer is disabled [re-enable in configuration->isActive],message not sent.");
122                 dmaapHealth.report(false);
123                 return IStatus.getServiceDisabled();
124         }
125
126         @PreDestroy
127         public void shutdown() {
128                 LOG.debug("DmaapProducer::shutdown...");
129                 try {
130                         if (publisher != null) {
131                                 publisher.close();
132                         }
133                 } catch (Exception e) {
134                         LOG.error("Failed to close  messageQ . Exeption {}", e.getMessage());
135
136                 }
137
138         }
139
140 }