2 * Copyright 2017 ZTE Corporation.
\r
4 * Licensed under the Apache License, Version 2.0 (the "License");
\r
5 * you may not use this file except in compliance with the License.
\r
6 * You may obtain a copy of the License at
\r
8 * http://www.apache.org/licenses/LICENSE-2.0
\r
10 * Unless required by applicable law or agreed to in writing, software
\r
11 * distributed under the License is distributed on an "AS IS" BASIS,
\r
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
13 * See the License for the specific language governing permissions and
\r
14 * limitations under the License.
\r
16 package org.onap.holmes.common.producer;
\r
18 import java.io.Serializable;
\r
19 import javax.inject.Inject;
\r
20 import javax.jms.Connection;
\r
21 import javax.jms.ConnectionFactory;
\r
22 import javax.jms.Destination;
\r
23 import javax.jms.JMSException;
\r
24 import javax.jms.MessageProducer;
\r
25 import javax.jms.ObjectMessage;
\r
26 import javax.jms.Session;
\r
27 import lombok.NoArgsConstructor;
\r
28 import lombok.extern.slf4j.Slf4j;
\r
29 import org.glassfish.hk2.api.IterableProvider;
\r
30 import org.jvnet.hk2.annotations.Service;
\r
31 import org.onap.holmes.common.api.stat.Alarm;
\r
32 import org.onap.holmes.common.api.stat.VesAlarm;
\r
33 import org.onap.holmes.common.constant.AlarmConst;
\r
34 import org.onap.holmes.common.api.entity.CorrelationResult;
\r
35 import org.onap.holmes.common.api.stat.AplusResult;
\r
36 import org.onap.holmes.common.config.MQConfig;
\r
37 import org.apache.activemq.ActiveMQConnectionFactory;
\r
42 public class MQProducer {
\r
45 private IterableProvider<MQConfig> mqConfigProvider;
\r
46 private ConnectionFactory connectionFactory;
\r
48 public void init() {
\r
51 "tcp://" + mqConfigProvider.get().getBrokerIp() + ":" + mqConfigProvider.get().getBrokerPort();
\r
52 connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().getBrokerUsername(),
\r
53 mqConfigProvider.get().getBrokerPassword(), brokerURL);
\r
56 public void sendAlarmMQTopicMsg(VesAlarm alarm) {
\r
57 sendMQTopicMsg(alarm);
\r
60 public void sendCorrelationMQTopicMsg(String ruleId, long createTimeL, Alarm parentAlarm,
\r
62 CorrelationResult correlationResult = getCorrelationResult(ruleId, createTimeL, parentAlarm, childAlarm);
\r
63 sendMQTopicMsg(correlationResult);
\r
66 private <T> void sendMQTopicMsg(T t) {
\r
67 Serializable msgEntity = (Serializable) t;
\r
68 Connection connection = null;
\r
70 Destination destination = null;
\r
71 MessageProducer messageProducer;
\r
74 connection = connectionFactory.createConnection();
\r
76 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
\r
77 if (t instanceof CorrelationResult) {
\r
78 destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARMS_CORRELATION);
\r
79 } else if (t instanceof VesAlarm) {
\r
80 destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
\r
82 messageProducer = session.createProducer(destination);
\r
83 ObjectMessage message = session.createObjectMessage(msgEntity);
\r
84 messageProducer.send(message);
\r
86 } catch (Exception e) {
\r
87 log.error("Failed send correlation. " + e.getMessage(), e);
\r
89 if (connection != null) {
\r
92 } catch (JMSException e) {
\r
93 log.error("Failed close connection. " + e.getMessage(), e);
\r
99 private CorrelationResult getCorrelationResult(String ruleId, long createTimeL, Alarm parentAlarm,
\r
100 Alarm childAlarm) {
\r
101 CorrelationResult correlationResult = new CorrelationResult();
\r
102 correlationResult.setRuleId(ruleId);
\r
103 correlationResult.setCreateTimeL(createTimeL);
\r
104 correlationResult.setResultType(AplusResult.APLUS_CORRELATION);
\r
105 correlationResult.setAffectedAlarms(new Alarm[]{parentAlarm, childAlarm});
\r
106 return correlationResult;
\r