2 * Copyright 2017 ZTE Corporation.
\r
4 * Licensed under the Apache License, Version 2.0 (the "License");
\r
5 * you may not use this file except in compliance with the License.
\r
6 * You may obtain a copy of the License at
\r
8 * http://www.apache.org/licenses/LICENSE-2.0
\r
10 * Unless required by applicable law or agreed to in writing, software
\r
11 * distributed under the License is distributed on an "AS IS" BASIS,
\r
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
13 * See the License for the specific language governing permissions and
\r
14 * limitations under the License.
\r
16 package org.openo.holmes.common.producer;
\r
18 import java.io.Serializable;
\r
19 import javax.inject.Inject;
\r
20 import javax.jms.Connection;
\r
21 import javax.jms.ConnectionFactory;
\r
22 import javax.jms.Destination;
\r
23 import javax.jms.JMSException;
\r
24 import javax.jms.MessageProducer;
\r
25 import javax.jms.ObjectMessage;
\r
26 import javax.jms.Session;
\r
27 import lombok.extern.slf4j.Slf4j;
\r
28 import org.glassfish.hk2.api.IterableProvider;
\r
29 import org.jvnet.hk2.annotations.Service;
\r
30 import org.openo.holmes.common.api.entity.CorrelationResult;
\r
31 import org.openo.holmes.common.api.stat.Alarm;
\r
32 import org.openo.holmes.common.api.stat.AplusResult;
\r
33 import org.openo.holmes.common.config.MQConfig;
\r
34 import org.openo.holmes.common.constant.AlarmConst;
\r
35 import org.apache.activemq.ActiveMQConnectionFactory;
\r
39 public class MQProducer {
\r
42 private IterableProvider<MQConfig> mqConfigProvider;
\r
43 private ConnectionFactory connectionFactory;
\r
45 public MQProducer() {
\r
49 public void init() {
\r
52 "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
\r
53 connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
\r
54 mqConfigProvider.get().brokerPassword, brokerURL);
\r
57 public void sendAlarmMQTopicMsg(Alarm alarm) {
\r
58 sendMQTopicMsg(alarm);
\r
61 public void sendCorrelationMQTopicMsg(String ruleId, long createTimeL, Alarm parentAlarm,
\r
63 CorrelationResult correlationResult = getCorrelationResult(ruleId, createTimeL, parentAlarm, childAlarm);
\r
64 sendMQTopicMsg(correlationResult);
\r
67 private <T> void sendMQTopicMsg(T t) {
\r
68 Serializable msgEntity = (Serializable) t;
\r
69 Connection connection = null;
\r
71 Destination destination = null;
\r
72 MessageProducer messageProducer;
\r
75 connection = connectionFactory.createConnection();
\r
77 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
\r
78 if (t instanceof CorrelationResult) {
\r
79 destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARMS_CORRELATION);
\r
80 } else if (t instanceof Alarm) {
\r
81 destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
\r
83 messageProducer = session.createProducer(destination);
\r
84 ObjectMessage message = session.createObjectMessage(msgEntity);
\r
85 messageProducer.send(message);
\r
87 } catch (Exception e) {
\r
88 log.error("Failed send correlation." + e.getMessage(), e);
\r
90 if (connection != null) {
\r
93 } catch (JMSException e) {
\r
94 log.error("Failed close connection." + e.getMessage(), e);
\r
100 private CorrelationResult getCorrelationResult(String ruleId, long createTimeL, Alarm parentAlarm,
\r
101 Alarm childAlarm) {
\r
102 CorrelationResult correlationResult = new CorrelationResult();
\r
103 correlationResult.setRuleId(ruleId);
\r
104 correlationResult.setCreateTimeL(createTimeL);
\r
105 correlationResult.setResultType(AplusResult.APLUS_CORRELATION);
\r
106 correlationResult.setAffectedAlarms(new Alarm[]{parentAlarm, childAlarm});
\r
107 return correlationResult;
\r