-/**
- * Copyright 2017 ZTE Corporation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.openo.holmes.common.producer;
-
-import javax.annotation.PostConstruct;
-import javax.inject.Inject;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.glassfish.hk2.api.IterableProvider;
-import org.jvnet.hk2.annotations.Service;
-import org.openo.holmes.common.api.entity.CorrelationResult;
-import org.openo.holmes.common.api.stat.Alarm;
-import org.openo.holmes.common.api.stat.AplusResult;
-import org.openo.holmes.common.config.MQConfig;
-import org.openo.holmes.common.constant.AlarmConst;
-
-@Service
-@Slf4j
-public class MQProducer {
-
- @Inject
- private static IterableProvider<MQConfig> mqConfigProvider;
- private ConnectionFactory connectionFactory;
-
- @PostConstruct
- public void init() {
-
- String brokerURL =
- "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
- connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
- mqConfigProvider.get().brokerPassword, brokerURL);
- }
-
- public void sendAlarmMQTopicMsg(Alarm alarm) {
-
- Connection connection = null;
- Session session;
- Destination destination;
- MessageProducer messageProducer;
-
- try {
-
- connection = connectionFactory.createConnection();
- connection.start();
- session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
- messageProducer = session.createProducer(destination);
- ObjectMessage message = session.createObjectMessage(alarm);
- messageProducer.send(message);
- session.commit();
-
- } catch (Exception e) {
- log.error("Failed send alarm." + e.getMessage(), e);
- } finally {
- if (connection != null) {
- try {
- connection.close();
- } catch (JMSException e) {
- log.error("Failed close connection." + e.getMessage(), e);
- }
- }
- }
- }
-
- public void sendCorrelationMQTopicMsg(String ruleId, long createTimeL, Alarm parentAlarm,
- Alarm childAlarm) {
-
- CorrelationResult correlationResult = new CorrelationResult();
- correlationResult.setRuleId(ruleId);
- correlationResult.setCreateTimeL(createTimeL);
- correlationResult.setResultType(AplusResult.APLUS_CORRELATION);
- correlationResult.setAffectedAlarms(new Alarm[]{parentAlarm, childAlarm});
-
- Connection connection = null;
- Session session;
- Destination destination;
- MessageProducer messageProducer;
-
- try {
-
- connection = connectionFactory.createConnection();
- connection.start();
- session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARMS_CORRELATION);
- messageProducer = session.createProducer(destination);
- ObjectMessage message = session.createObjectMessage(correlationResult);
- messageProducer.send(message);
- session.commit();
-
- } catch (Exception e) {
- log.error("Failed send correlation." + e.getMessage(), e);
- } finally {
- if (connection != null) {
- try {
- connection.close();
- } catch (JMSException e) {
- log.error("Failed close connection." + e.getMessage(), e);
- }
- }
- }
- }
-}
+/**\r
+ * Copyright 2017 ZTE Corporation.\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package org.openo.holmes.common.producer;\r
+\r
+import java.io.Serializable;\r
+import javax.inject.Inject;\r
+import javax.jms.Connection;\r
+import javax.jms.ConnectionFactory;\r
+import javax.jms.Destination;\r
+import javax.jms.JMSException;\r
+import javax.jms.MessageProducer;\r
+import javax.jms.ObjectMessage;\r
+import javax.jms.Session;\r
+import lombok.NoArgsConstructor;\r
+import lombok.extern.slf4j.Slf4j;\r
+import org.glassfish.hk2.api.IterableProvider;\r
+import org.jvnet.hk2.annotations.Service;\r
+import org.openo.holmes.common.api.entity.CorrelationResult;\r
+import org.openo.holmes.common.api.stat.Alarm;\r
+import org.openo.holmes.common.api.stat.AplusResult;\r
+import org.openo.holmes.common.config.MQConfig;\r
+import org.openo.holmes.common.constant.AlarmConst;\r
+import org.apache.activemq.ActiveMQConnectionFactory;\r
+\r
+@Service\r
+@Slf4j\r
+@NoArgsConstructor\r
+public class MQProducer {\r
+\r
+ @Inject\r
+ private IterableProvider<MQConfig> mqConfigProvider;\r
+ private ConnectionFactory connectionFactory;\r
+\r
+ public void init() {\r
+\r
+ String brokerURL =\r
+ "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;\r
+ connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,\r
+ mqConfigProvider.get().brokerPassword, brokerURL);\r
+ }\r
+\r
+ public void sendAlarmMQTopicMsg(Alarm alarm) {\r
+ sendMQTopicMsg(alarm);\r
+ }\r
+\r
+ public void sendCorrelationMQTopicMsg(String ruleId, long createTimeL, Alarm parentAlarm,\r
+ Alarm childAlarm) {\r
+ CorrelationResult correlationResult = getCorrelationResult(ruleId, createTimeL, parentAlarm, childAlarm);\r
+ sendMQTopicMsg(correlationResult);\r
+ }\r
+\r
+ private <T> void sendMQTopicMsg(T t) {\r
+ Serializable msgEntity = (Serializable) t;\r
+ Connection connection = null;\r
+ Session session;\r
+ Destination destination = null;\r
+ MessageProducer messageProducer;\r
+\r
+ try {\r
+ connection = connectionFactory.createConnection();\r
+ connection.start();\r
+ session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);\r
+ if (t instanceof CorrelationResult) {\r
+ destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARMS_CORRELATION);\r
+ } else if (t instanceof Alarm) {\r
+ destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);\r
+ }\r
+ messageProducer = session.createProducer(destination);\r
+ ObjectMessage message = session.createObjectMessage(msgEntity);\r
+ messageProducer.send(message);\r
+ session.commit();\r
+ } catch (Exception e) {\r
+ log.error("Failed send correlation." + e.getMessage(), e);\r
+ } finally {\r
+ if (connection != null) {\r
+ try {\r
+ connection.close();\r
+ } catch (JMSException e) {\r
+ log.error("Failed close connection." + e.getMessage(), e);\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+ private CorrelationResult getCorrelationResult(String ruleId, long createTimeL, Alarm parentAlarm,\r
+ Alarm childAlarm) {\r
+ CorrelationResult correlationResult = new CorrelationResult();\r
+ correlationResult.setRuleId(ruleId);\r
+ correlationResult.setCreateTimeL(createTimeL);\r
+ correlationResult.setResultType(AplusResult.APLUS_CORRELATION);\r
+ correlationResult.setAffectedAlarms(new Alarm[]{parentAlarm, childAlarm});\r
+ return correlationResult;\r
+ }\r
+}\r