2 * Copyright 2017 ZTE Corporation.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onap.holmes.engine.mqconsumer;
18 import java.io.Serializable;
19 import javax.inject.Inject;
20 import javax.jms.Connection;
21 import javax.jms.ConnectionFactory;
22 import javax.jms.Destination;
23 import javax.jms.JMSException;
24 import javax.jms.Message;
25 import javax.jms.MessageConsumer;
26 import javax.jms.MessageListener;
27 import javax.jms.Session;
28 import lombok.NoArgsConstructor;
29 import lombok.extern.slf4j.Slf4j;
30 import org.apache.activemq.ActiveMQConnectionFactory;
31 import org.apache.activemq.command.ActiveMQObjectMessage;
32 import org.glassfish.hk2.api.IterableProvider;
33 import org.jvnet.hk2.annotations.Service;
34 import org.onap.holmes.common.api.stat.VesAlarm;
35 import org.onap.holmes.common.config.MQConfig;
36 import org.onap.holmes.common.constant.AlarmConst;
37 import org.onap.holmes.engine.manager.DroolsEngine;
42 public class MQConsumer {
45 private IterableProvider<MQConfig> mqConfigProvider;
46 private ConnectionFactory connectionFactory;
47 private ConnectionFactory connectionFactory1;
49 private DroolsEngine engine;
51 public void registerAlarmTopicListener() {
53 "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
54 connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
55 mqConfigProvider.get().brokerPassword, brokerURL);
57 AlarmMqMessageListener listener = new AlarmMqMessageListener();
60 class AlarmMqMessageListener implements MessageListener {
62 private Connection connection = null;
63 private Session session = null;
64 private Destination destination = null;
65 private MessageConsumer consumer = null;
67 private void initialize() throws JMSException {
68 connection = connectionFactory.createConnection();
69 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
70 destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
71 consumer = session.createConsumer(destination);
75 public void receive() {
78 consumer.setMessageListener(this);
79 } catch (JMSException e) {
80 log.error("Failed to connect to the MQ service : " + e.getMessage(), e);
83 } catch (JMSException e1) {
84 log.error("Failed close connection " + e1.getMessage(), e1);
89 public void onMessage(Message arg0) {
90 ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;
92 Serializable object = objectMessage.getObject();
93 if (object instanceof VesAlarm) {
94 VesAlarm vesAlarm = (VesAlarm) object;
95 engine.putRaisedIntoStream(vesAlarm);
97 } catch (JMSException e) {
98 log.error("Failed get object : " + e.getMessage(), e);
102 private void close() throws JMSException {
103 if (consumer != null) {
106 if (session != null) {
109 if (connection != null) {