Add engine
[holmes/engine-management.git] / engine-d / src / main / java / org / openo / holmes / enginemgt / listener / AlarmMqMessageListener.java
1 /**
2  * Copyright 2017 ZTE Corporation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.openo.holmes.enginemgt.listener;
17
18
19 import javax.annotation.PostConstruct;
20 import javax.inject.Inject;
21 import javax.jms.Connection;
22 import javax.jms.ConnectionFactory;
23 import javax.jms.Destination;
24 import javax.jms.JMSException;
25 import javax.jms.MessageConsumer;
26 import javax.jms.ObjectMessage;
27 import javax.jms.Session;
28 import lombok.extern.slf4j.Slf4j;
29 import org.apache.activemq.ActiveMQConnectionFactory;
30 import org.glassfish.hk2.api.IterableProvider;
31 import org.jvnet.hk2.annotations.Service;
32 import org.openo.holmes.common.api.stat.Alarm;
33 import org.openo.holmes.common.config.MQConfig;
34 import org.openo.holmes.common.constant.AlarmConst;
35 import org.openo.holmes.enginemgt.manager.DroolsEngine;
36
37 @Service
38 @Slf4j
39 public class AlarmMqMessageListener implements Runnable {
40
41     @Inject
42     private static IterableProvider<MQConfig> mqConfigProvider;
43     @Inject
44     DroolsEngine droolsEngine;
45     private ConnectionFactory connectionFactory;
46
47     @PostConstruct
48     public void init() {
49
50         String brokerURL =
51             "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
52         connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
53             mqConfigProvider.get().brokerPassword, brokerURL);
54     }
55
56
57     public void run() {
58         Connection connection;
59         Session session;
60         Destination destination;
61         MessageConsumer messageConsumer;
62
63         try {
64             connection = connectionFactory.createConnection();
65             connection.start();
66             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
67             destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
68             messageConsumer = session.createConsumer(destination);
69
70             while (true) {
71                 ObjectMessage objMessage = (ObjectMessage) messageConsumer.receive(100000);
72                 if (objMessage != null) {
73                     droolsEngine.putRaisedIntoStream((Alarm) objMessage.getObject());
74                 } else {
75                     break;
76                 }
77             }
78         } catch (JMSException e) {
79             log.debug("Receive alarm failure" + e.getMessage());
80         }
81
82     }
83 }