SonarCloud Migration
[holmes/engine-management.git] / engine-d / src / main / java / org / onap / holmes / engine / mqconsumer / MQConsumer.java
1 /**
2  * Copyright 2017 ZTE Corporation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.holmes.engine.mqconsumer;
17
18 import java.io.Serializable;
19 import javax.inject.Inject;
20 import javax.jms.Connection;
21 import javax.jms.ConnectionFactory;
22 import javax.jms.Destination;
23 import javax.jms.JMSException;
24 import javax.jms.Message;
25 import javax.jms.MessageConsumer;
26 import javax.jms.MessageListener;
27 import javax.jms.Session;
28 import lombok.NoArgsConstructor;
29 import lombok.extern.slf4j.Slf4j;
30 import org.apache.activemq.ActiveMQConnectionFactory;
31 import org.apache.activemq.command.ActiveMQObjectMessage;
32 import org.glassfish.hk2.api.IterableProvider;
33 import org.jvnet.hk2.annotations.Service;
34 import org.onap.holmes.common.api.stat.VesAlarm;
35 import org.onap.holmes.common.config.MQConfig;
36 import org.onap.holmes.common.constant.AlarmConst;
37 import org.onap.holmes.engine.manager.DroolsEngine;
38
39 @Service
40 @Slf4j
41 @NoArgsConstructor
42 public class MQConsumer {
43
44     @Inject
45     private IterableProvider<MQConfig> mqConfigProvider;
46     private ConnectionFactory connectionFactory;
47     private ConnectionFactory connectionFactory1;
48     @Inject
49     private DroolsEngine engine;
50
51     public void registerAlarmTopicListener() {
52         String brokerURL =
53                 "tcp://" + mqConfigProvider.get().getBrokerIp() + ":" + mqConfigProvider.get().getBrokerPort();
54         connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().getBrokerUsername(),
55                 mqConfigProvider.get().getBrokerPassword(), brokerURL);
56
57         AlarmMqMessageListener listener = new AlarmMqMessageListener();
58         listener.receive();
59     }
60     class AlarmMqMessageListener implements MessageListener {
61
62         private Connection connection = null;
63         private Session session = null;
64         private Destination destination = null;
65         private MessageConsumer consumer = null;
66
67         private void initialize() throws JMSException {
68             connection = connectionFactory.createConnection();
69             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
70             destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
71             consumer = session.createConsumer(destination);
72             connection.start();
73         }
74
75         public void receive() {
76             try {
77                 initialize();
78                 consumer.setMessageListener(this);
79             } catch (JMSException e) {
80                 log.error("Failed to connect to the MQ service : " + e.getMessage(), e);
81                 try {
82                     close();
83                 } catch (JMSException e1) {
84                     log.error("Failed close connection  " + e1.getMessage(), e1);
85                 }
86             }
87         }
88
89         public void onMessage(Message arg0) {
90             ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;
91             try {
92                 Serializable object = objectMessage.getObject();
93                 if (object instanceof VesAlarm) {
94                     VesAlarm vesAlarm = (VesAlarm) object;
95                     engine.putRaisedIntoStream(vesAlarm);
96                 }
97             } catch (JMSException e) {
98                 log.error("Failed get object : " + e.getMessage(), e);
99             }
100         }
101
102         private void close() throws JMSException {
103             if (consumer != null) {
104                 consumer.close();
105             }
106             if (session != null) {
107                 session.close();
108             }
109             if (connection != null) {
110                 connection.close();
111             }
112         }
113     }
114 }