2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.catalog.impl;
23 import com.att.nsa.mr.client.MRBatchingPublisher;
24 import com.fasterxml.jackson.databind.ObjectMapper;
25 import org.openecomp.sdc.be.catalog.api.IMessageQueueHandlerProducer;
26 import org.openecomp.sdc.be.catalog.api.IStatus;
27 import org.openecomp.sdc.be.catalog.api.ITypeMessage;
28 import org.openecomp.sdc.be.catalog.enums.ResultStatusEnum;
29 import org.openecomp.sdc.be.components.distribution.engine.DmaapClientFactory;
30 import org.openecomp.sdc.be.config.ConfigurationManager;
31 import org.openecomp.sdc.be.config.DmaapProducerConfiguration;
32 import org.openecomp.sdc.common.log.enums.StatusCode;
33 import org.openecomp.sdc.common.log.wrappers.Logger;
34 import org.springframework.beans.factory.annotation.Autowired;
35 import org.springframework.stereotype.Component;
37 import javax.annotation.PostConstruct;
38 import javax.annotation.PreDestroy;
41 public class DmaapProducer implements IMessageQueueHandlerProducer {
42 private static final Logger LOG = Logger.getLogger(DmaapProducer.class.getName());
43 private static final Logger metricLog = Logger.getLogger(DmaapProducer.class.getName());
46 private DmaapClientFactory dmaapClientFactory;
47 private ConfigurationManager configurationManager = ConfigurationManager.getConfigurationManager();
48 private MRBatchingPublisher publisher;
50 private DmaapProducerHealth dmaapHealth;
52 public MRBatchingPublisher getPublisher() {
57 public IStatus pushMessage(ITypeMessage message) {
59 DmaapProducerConfiguration producerConfiguration = configurationManager.getConfiguration()
60 .getDmaapProducerConfiguration();
61 if (!producerConfiguration.getActive()) {
63 "[Microservice DMAAP] producer is disabled [re-enable in configuration->isActive],message not sent.");
64 dmaapHealth.report(false);
65 return IStatus.getServiceDisabled();
67 if (publisher == null) {
68 IStatus initStatus = init();
69 if (initStatus.getResultStatus() != ResultStatusEnum.SUCCESS) {
74 ObjectMapper mapper = new ObjectMapper();
75 String jsonInString = mapper.writeValueAsString(message);
76 if (publisher != null) {
77 LOG.info("before send message . response {}", jsonInString);
79 LOG.invoke("Dmaap Producer", "DmaapProducer-pushMessage", DmaapProducer.class.getName(), message.toString());
81 int pendingMsg = publisher.send(jsonInString);
82 LOG.info("sent message . response {}", pendingMsg);
83 LOG.invokeReturn(producerConfiguration.getConsumerId(), "Dmaap Producer", StatusCode.COMPLETE.getStatusCodeEnum(), "DmaapProducer-pushMessage",message.toString(), pendingMsg );
89 dmaapHealth.report(true);
90 } catch (Exception e) {
91 LOG.error("Failed to send message . Exception {}", e.getMessage());
92 return IStatus.getFailStatus();
95 return IStatus.getSuccessStatus();
100 public IStatus init() {
101 LOG.debug("MessageQueueHandlerProducer:: Start initializing");
102 DmaapProducerConfiguration configuration = configurationManager.getConfiguration()
103 .getDmaapProducerConfiguration();
104 if (configuration.getActive()) {
106 publisher = dmaapClientFactory.createProducer(configuration);
107 if (publisher == null) {
108 LOG.error("Failed to connect to topic ");
109 dmaapHealth.report(false);
110 return IStatus.getFailStatus();
113 } catch (Exception e) {
114 LOG.error("Failed to connect to topic . Exeption {}", e.getMessage());
115 dmaapHealth.report(false);
116 return IStatus.getFailStatus();
118 dmaapHealth.report(true);
119 return IStatus.getSuccessStatus();
121 LOG.info("[Microservice DMAAP] producer is disabled [re-enable in configuration->isActive],message not sent.");
122 dmaapHealth.report(false);
123 return IStatus.getServiceDisabled();
127 public void shutdown() {
128 LOG.debug("DmaapProducer::shutdown...");
130 if (publisher != null) {
133 } catch (Exception e) {
134 LOG.error("Failed to close messageQ . Exeption {}", e.getMessage());