2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.catalog.impl;
23 import com.att.nsa.mr.client.MRBatchingPublisher;
24 import com.fasterxml.jackson.databind.ObjectMapper;
25 import javax.annotation.PostConstruct;
26 import javax.annotation.PreDestroy;
27 import org.openecomp.sdc.be.catalog.api.IMessageQueueHandlerProducer;
28 import org.openecomp.sdc.be.catalog.api.IStatus;
29 import org.openecomp.sdc.be.catalog.api.ITypeMessage;
30 import org.openecomp.sdc.be.catalog.enums.ResultStatusEnum;
31 import org.openecomp.sdc.be.components.distribution.engine.DmaapClientFactory;
32 import org.openecomp.sdc.be.config.ConfigurationManager;
33 import org.openecomp.sdc.be.config.DmaapProducerConfiguration;
34 import org.openecomp.sdc.common.log.enums.EcompLoggerErrorCode;
35 import org.openecomp.sdc.common.log.enums.StatusCode;
36 import org.openecomp.sdc.common.log.wrappers.Logger;
37 import org.springframework.stereotype.Component;
40 public class DmaapProducer implements IMessageQueueHandlerProducer {
42 private static final Logger LOG = Logger.getLogger(DmaapProducer.class.getName());
43 private final DmaapClientFactory dmaapClientFactory;
44 private final DmaapProducerHealth dmaapHealth;
45 private final ConfigurationManager configurationManager = ConfigurationManager.getConfigurationManager();
46 private MRBatchingPublisher publisher;
48 public DmaapProducer(final DmaapClientFactory dmaapClientFactory,
49 final DmaapProducerHealth dmaapHealth) {
50 this.dmaapClientFactory = dmaapClientFactory;
51 this.dmaapHealth = dmaapHealth;
55 public IStatus pushMessage(ITypeMessage message) {
57 DmaapProducerConfiguration producerConfiguration = configurationManager.getConfiguration()
58 .getDmaapProducerConfiguration();
59 if (!producerConfiguration.getActive()) {
61 "[Microservice DMAAP] producer is disabled [re-enable in configuration->isActive],message not sent.");
62 dmaapHealth.report(false);
63 return IStatus.getServiceDisabled();
65 if (publisher == null) {
66 IStatus initStatus = init();
67 if (initStatus.getResultStatus() != ResultStatusEnum.SUCCESS) {
72 ObjectMapper mapper = new ObjectMapper();
73 String jsonInString = mapper.writeValueAsString(message);
74 if (publisher != null) {
75 LOG.info("before send message . response {}", jsonInString);
77 LOG.invoke("Dmaap Producer", "DmaapProducer-pushMessage", DmaapProducer.class.getName(),
80 int pendingMsg = publisher.send(jsonInString);
81 LOG.info("sent message . response {}", pendingMsg);
82 LOG.invokeReturn(producerConfiguration.getConsumerId(), "Dmaap Producer",
83 StatusCode.COMPLETE.getStatusCode(), "DmaapProducer-pushMessage", message.toString(), pendingMsg);
87 dmaapHealth.report(true);
88 } catch (Exception e) {
89 LOG.error(EcompLoggerErrorCode.BUSINESS_PROCESS_ERROR, "Failed to send message . Exception {}", e.getMessage());
90 return IStatus.getFailStatus();
93 return IStatus.getSuccessStatus();
98 public IStatus init() {
99 LOG.debug("MessageQueueHandlerProducer:: Start initializing");
100 DmaapProducerConfiguration configuration = configurationManager.getConfiguration()
101 .getDmaapProducerConfiguration();
102 if (configuration.getActive()) {
104 publisher = dmaapClientFactory.createProducer(configuration);
105 if (publisher == null) {
106 LOG.error("Failed to connect to topic ");
107 dmaapHealth.report(false);
108 return IStatus.getFailStatus();
111 } catch (Exception e) {
112 LOG.error("Failed to connect to topic . Exeption {}", e.getMessage());
113 dmaapHealth.report(false);
114 return IStatus.getFailStatus();
116 dmaapHealth.report(true);
117 return IStatus.getSuccessStatus();
119 LOG.info("[Microservice DMAAP] producer is disabled [re-enable in configuration->isActive],message not sent.");
120 dmaapHealth.report(false);
121 return IStatus.getServiceDisabled();
125 public void shutdown() {
126 LOG.debug("DmaapProducer::shutdown...");
128 if (publisher != null) {
131 } catch (Exception e) {
132 LOG.error("Failed to close messageQ . Exeption {}", e.getMessage());