2 * Copyright 2017 ZTE Corporation.
\r
4 * Licensed under the Apache License, Version 2.0 (the "License");
\r
5 * you may not use this file except in compliance with the License.
\r
6 * You may obtain a copy of the License at
\r
8 * http://www.apache.org/licenses/LICENSE-2.0
\r
10 * Unless required by applicable law or agreed to in writing, software
\r
11 * distributed under the License is distributed on an "AS IS" BASIS,
\r
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
13 * See the License for the specific language governing permissions and
\r
14 * limitations under the License.
\r
16 package org.openo.holmes.common.producer;
\r
18 import java.io.Serializable;
\r
19 import javax.inject.Inject;
\r
20 import javax.jms.Connection;
\r
21 import javax.jms.ConnectionFactory;
\r
22 import javax.jms.Destination;
\r
23 import javax.jms.JMSException;
\r
24 import javax.jms.MessageProducer;
\r
25 import javax.jms.ObjectMessage;
\r
26 import javax.jms.Session;
\r
27 import lombok.NoArgsConstructor;
\r
28 import lombok.extern.slf4j.Slf4j;
\r
29 import org.glassfish.hk2.api.IterableProvider;
\r
30 import org.jvnet.hk2.annotations.Service;
\r
31 import org.openo.holmes.common.api.entity.CorrelationResult;
\r
32 import org.openo.holmes.common.api.stat.Alarm;
\r
33 import org.openo.holmes.common.api.stat.AplusResult;
\r
34 import org.openo.holmes.common.config.MQConfig;
\r
35 import org.openo.holmes.common.constant.AlarmConst;
\r
36 import org.apache.activemq.ActiveMQConnectionFactory;
\r
41 public class MQProducer {
\r
44 private IterableProvider<MQConfig> mqConfigProvider;
\r
45 private ConnectionFactory connectionFactory;
\r
47 public void init() {
\r
50 "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
\r
51 connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
\r
52 mqConfigProvider.get().brokerPassword, brokerURL);
\r
55 public void sendAlarmMQTopicMsg(Alarm alarm) {
\r
56 sendMQTopicMsg(alarm);
\r
59 public void sendCorrelationMQTopicMsg(String ruleId, long createTimeL, Alarm parentAlarm,
\r
61 CorrelationResult correlationResult = getCorrelationResult(ruleId, createTimeL, parentAlarm, childAlarm);
\r
62 sendMQTopicMsg(correlationResult);
\r
65 private <T> void sendMQTopicMsg(T t) {
\r
66 Serializable msgEntity = (Serializable) t;
\r
67 Connection connection = null;
\r
69 Destination destination = null;
\r
70 MessageProducer messageProducer;
\r
73 connection = connectionFactory.createConnection();
\r
75 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
\r
76 if (t instanceof CorrelationResult) {
\r
77 destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARMS_CORRELATION);
\r
78 } else if (t instanceof Alarm) {
\r
79 destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
\r
81 messageProducer = session.createProducer(destination);
\r
82 ObjectMessage message = session.createObjectMessage(msgEntity);
\r
83 messageProducer.send(message);
\r
85 } catch (Exception e) {
\r
86 log.error("Failed send correlation." + e.getMessage(), e);
\r
88 if (connection != null) {
\r
91 } catch (JMSException e) {
\r
92 log.error("Failed close connection." + e.getMessage(), e);
\r
98 private CorrelationResult getCorrelationResult(String ruleId, long createTimeL, Alarm parentAlarm,
\r
100 CorrelationResult correlationResult = new CorrelationResult();
\r
101 correlationResult.setRuleId(ruleId);
\r
102 correlationResult.setCreateTimeL(createTimeL);
\r
103 correlationResult.setResultType(AplusResult.APLUS_CORRELATION);
\r
104 correlationResult.setAffectedAlarms(new Alarm[]{parentAlarm, childAlarm});
\r
105 return correlationResult;
\r