eb0357220013a9609b45b0774cf0279945221ed3
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.plugins.event.carrier.jms;
23
24 import java.util.EnumMap;
25 import java.util.Map;
26
27 import javax.jms.Connection;
28 import javax.jms.ConnectionFactory;
29 import javax.jms.Message;
30 import javax.jms.MessageConsumer;
31 import javax.jms.MessageListener;
32 import javax.jms.Session;
33 import javax.jms.Topic;
34 import javax.naming.InitialContext;
35
36 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
37 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
38 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
39 import org.onap.policy.apex.service.engine.event.ApexEventException;
40 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
41 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
42 import org.onap.policy.apex.service.engine.event.PeeredReference;
43 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
44 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 /**
49  * This class implements an Apex event consumer that receives events using JMS.
50  *
51  * @author Liam Fallon (liam.fallon@ericsson.com)
52  */
53 public class ApexJmsConsumer implements MessageListener, ApexEventConsumer, Runnable {
54     // Get a reference to the logger
55     private static final Logger LOGGER = LoggerFactory.getLogger(ApexJmsConsumer.class);
56
57     // The Apex and JMS parameters read from the parameter service
58     private JmsCarrierTechnologyParameters jmsConsumerProperties;
59
60     // The event receiver that will receive events from this consumer
61     private ApexEventReceiver eventReceiver;
62
63     // The consumer thread and stopping flag
64     private Thread consumerThread;
65     private boolean stopOrderedFlag = false;
66
67     // The connection to the JMS server
68     private Connection connection;
69
70     // The topic on which we receive events from JMS
71     private Topic jmsIncomingTopic;
72
73     // The name for this consumer
74     private String name = null;
75
76     // The peer references for this event handler
77     private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
78
79     @Override
80     public void init(final String consumerName, final EventHandlerParameters consumerParameters,
81             final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
82         this.eventReceiver = incomingEventReceiver;
83
84         this.name = consumerName;
85
86         // Check and get the JMS Properties
87         if (!(consumerParameters.getCarrierTechnologyParameters() instanceof JmsCarrierTechnologyParameters)) {
88             final String errorMessage = "specified consumer properties of type \""
89                     + consumerParameters.getCarrierTechnologyParameters().getClass().getCanonicalName()
90                     + "\" are not applicable to a JMS consumer";
91             LOGGER.warn(errorMessage);
92             throw new ApexEventException(errorMessage);
93         }
94         jmsConsumerProperties = (JmsCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
95
96         // Look up the JMS connection factory
97         InitialContext jmsContext = null;
98         ConnectionFactory connectionFactory = null;
99         try {
100             jmsContext = new InitialContext(jmsConsumerProperties.getJmsConsumerProperties());
101             connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsConsumerProperties.getConnectionFactory());
102
103             // Check if we actually got a connection factory
104             if (connectionFactory == null) {
105                 throw new IllegalArgumentException(
106                         "JMS context lookup of \"" + jmsConsumerProperties.getConnectionFactory() + "\" returned null");
107             }
108         } catch (final Exception e) {
109             final String errorMessage = "lookup of JMS connection factory  \""
110                     + jmsConsumerProperties.getConnectionFactory() + "\" failed for JMS consumer properties \""
111                     + jmsConsumerProperties.getJmsConsumerProperties() + "\"";
112             LOGGER.warn(errorMessage, e);
113             throw new ApexEventException(errorMessage, e);
114         }
115
116         // Lookup the topic on which we will receive events
117         try {
118             jmsIncomingTopic = (Topic) jmsContext.lookup(jmsConsumerProperties.getConsumerTopic());
119
120             // Check if we actually got a topic
121             if (jmsIncomingTopic == null) {
122                 throw new IllegalArgumentException(
123                         "JMS context lookup of \"" + jmsConsumerProperties.getConsumerTopic() + "\" returned null");
124             }
125         } catch (final Exception e) {
126             final String errorMessage = "lookup of JMS topic  \"" + jmsConsumerProperties.getConsumerTopic()
127                     + "\" failed for JMS consumer properties \"" + jmsConsumerProperties.getJmsConsumerProperties()
128                     + "\"";
129             LOGGER.warn(errorMessage, e);
130             throw new ApexEventException(errorMessage, e);
131         }
132
133         // Create and start a connection to the JMS server
134         try {
135             connection = connectionFactory.createConnection(jmsConsumerProperties.getSecurityPrincipal(),
136                     jmsConsumerProperties.getSecurityCredentials());
137             connection.start();
138         } catch (final Exception e) {
139             final String errorMessage = "connection to the JMS server failed for JMS properties \""
140                     + jmsConsumerProperties.getJmsConsumerProperties() + "\"";
141             LOGGER.warn(errorMessage, e);
142             throw new ApexEventException(errorMessage, e);
143         }
144     }
145
146     /**
147      * {@inheritDoc}.
148      */
149     @Override
150     public void start() {
151         // Configure and start the event reception thread
152         final String threadName = this.getClass().getName() + ":" + this.name;
153         consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
154         consumerThread.setDaemon(true);
155         consumerThread.start();
156     }
157
158     /**
159      * {@inheritDoc}.
160      */
161     @Override
162     public String getName() {
163         return name;
164     }
165
166     /**
167      * {@inheritDoc}.
168      */
169     @Override
170     public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
171         return peerReferenceMap.get(peeredMode);
172     }
173
174     /**
175      * {@inheritDoc}.
176      */
177     @Override
178     public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
179         peerReferenceMap.put(peeredMode, peeredReference);
180     }
181
182     /**
183      * {@inheritDoc}.
184      */
185     @Override
186     public void run() {
187         // JMS session and message consumer for receiving messages
188         try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
189             // Create a message consumer for reception of messages and set this class as a message listener
190             createMessageConsumer(jmsSession);
191         } catch (final Exception e) {
192             final String errorMessage = "failed to create a JMS session towards the JMS server for receiving messages";
193             LOGGER.warn(errorMessage, e);
194             throw new ApexEventRuntimeException(errorMessage, e);
195         }
196         // Everything is now set up
197         if (LOGGER.isDebugEnabled()) {
198             LOGGER.debug("event receiver " + this.getClass().getName() + ":" + this.name + " subscribed to JMS topic: "
199                     + jmsConsumerProperties.getConsumerTopic());
200         }
201     }
202
203     /**
204      * The helper function to create a message consumer from a given JMS session.
205      *
206      * @param jmsSession a JMS session
207      */
208     private void createMessageConsumer(final Session jmsSession) {
209         try (final MessageConsumer messageConsumer = jmsSession.createConsumer(jmsIncomingTopic)) {
210             messageConsumer.setMessageListener(this);
211
212             // The endless loop that receives events over JMS
213             while (consumerThread.isAlive() && !stopOrderedFlag) {
214                 ThreadUtilities.sleep(jmsConsumerProperties.getConsumerWaitTime());
215             }
216         } catch (final Exception e) {
217             final String errorMessage = "failed to create a JMS message consumer for receiving messages";
218             LOGGER.warn(errorMessage, e);
219             throw new ApexEventRuntimeException(errorMessage, e);
220         }
221     }
222
223     /**
224      * {@inheritDoc}.
225      */
226     @Override
227     public void onMessage(final Message jmsMessage) {
228         try {
229             if (LOGGER.isTraceEnabled()) {
230                 LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}",
231                         this.getClass().getName() + ":" + this.name, jmsMessage.getJMSMessageID(),
232                         jmsMessage.getJMSType());
233             }
234
235             eventReceiver.receiveEvent(null, jmsMessage);
236         } catch (final Exception e) {
237             final String errorMessage = "failed to receive message from JMS";
238             LOGGER.warn(errorMessage, e);
239             throw new ApexEventRuntimeException(errorMessage, e);
240         }
241     }
242
243     /**
244      * {@inheritDoc}.
245      */
246     @Override
247     public void stop() {
248         stopOrderedFlag = true;
249
250         while (consumerThread.isAlive()) {
251             ThreadUtilities.sleep(jmsConsumerProperties.getConsumerWaitTime());
252         }
253
254         // Close the connection to the JMS server
255         try {
256             if (connection != null) {
257                 connection.close();
258             }
259         } catch (final Exception e) {
260             final String errorMessage = "close of connection to the JMS server failed";
261             LOGGER.warn(errorMessage, e);
262         }
263     }
264
265 }