b857d8e142aab10e33296e1268fcc3267082c170
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.plugins.event.carrier.jms;
23
24 import java.util.EnumMap;
25 import java.util.Map;
26
27 import javax.jms.Connection;
28 import javax.jms.ConnectionFactory;
29 import javax.jms.Message;
30 import javax.jms.MessageConsumer;
31 import javax.jms.MessageListener;
32 import javax.jms.Session;
33 import javax.jms.Topic;
34 import javax.naming.InitialContext;
35
36 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
37 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
38 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
39 import org.onap.policy.apex.service.engine.event.ApexEventException;
40 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
41 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
42 import org.onap.policy.apex.service.engine.event.PeeredReference;
43 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
44 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 /**
49  * This class implements an Apex event consumer that receives events using JMS.
50  *
51  * @author Liam Fallon (liam.fallon@ericsson.com)
52  */
53 public class ApexJmsConsumer implements MessageListener, ApexEventConsumer, Runnable {
54     // Get a reference to the logger
55     private static final Logger LOGGER = LoggerFactory.getLogger(ApexJmsConsumer.class);
56
57     // The Apex and JMS parameters read from the parameter service
58     private JmsCarrierTechnologyParameters jmsConsumerProperties;
59
60     // The event receiver that will receive events from this consumer
61     private ApexEventReceiver eventReceiver;
62
63     // The consumer thread and stopping flag
64     private Thread consumerThread;
65     private boolean stopOrderedFlag = false;
66
67     // The connection to the JMS server
68     private Connection connection;
69
70     // The topic on which we receive events from JMS
71     private Topic jmsIncomingTopic;
72
73     // The name for this consumer
74     private String name = null;
75
76     // The peer references for this event handler
77     private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
78
79     @Override
80     public void init(final String consumerName, final EventHandlerParameters consumerParameters,
81             final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
82         this.eventReceiver = incomingEventReceiver;
83
84         this.name = consumerName;
85
86         // Check and get the JMS Properties
87         if (!(consumerParameters.getCarrierTechnologyParameters() instanceof JmsCarrierTechnologyParameters)) {
88             final String errorMessage = "specified consumer properties of type \""
89                     + consumerParameters.getCarrierTechnologyParameters().getClass().getCanonicalName()
90                     + "\" are not applicable to a JMS consumer";
91             LOGGER.warn(errorMessage);
92             throw new ApexEventException(errorMessage);
93         }
94         jmsConsumerProperties = (JmsCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
95
96         // Look up the JMS connection factory
97         InitialContext jmsContext = null;
98         ConnectionFactory connectionFactory = null;
99         try {
100             jmsContext = new InitialContext(jmsConsumerProperties.getJmsConsumerProperties());
101             connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsConsumerProperties.getConnectionFactory());
102
103             // Check if we actually got a connection factory
104             if (connectionFactory == null) {
105                 throw new IllegalArgumentException(
106                         "JMS context lookup of \"" + jmsConsumerProperties.getConnectionFactory() + "\" returned null");
107             }
108         } catch (final Exception e) {
109             final String errorMessage = "lookup of JMS connection factory  \""
110                     + jmsConsumerProperties.getConnectionFactory() + "\" failed for JMS consumer properties \""
111                     + jmsConsumerProperties.getJmsConsumerProperties() + "\"";
112             LOGGER.warn(errorMessage, e);
113             throw new ApexEventException(errorMessage, e);
114         }
115
116         // Lookup the topic on which we will receive events
117         try {
118             jmsIncomingTopic = (Topic) jmsContext.lookup(jmsConsumerProperties.getConsumerTopic());
119
120             // Check if we actually got a topic
121             if (jmsIncomingTopic == null) {
122                 throw new IllegalArgumentException(
123                         "JMS context lookup of \"" + jmsConsumerProperties.getConsumerTopic() + "\" returned null");
124             }
125         } catch (final Exception e) {
126             final String errorMessage = "lookup of JMS topic  \"" + jmsConsumerProperties.getConsumerTopic()
127                     + "\" failed for JMS consumer properties \"" + jmsConsumerProperties.getJmsConsumerProperties()
128                     + "\"";
129             LOGGER.warn(errorMessage, e);
130             throw new ApexEventException(errorMessage, e);
131         }
132
133         // Create and start a connection to the JMS server
134         try {
135             connection = connectionFactory.createConnection(jmsConsumerProperties.getSecurityPrincipal(),
136                     jmsConsumerProperties.getSecurityCredentials());
137             connection.start();
138         } catch (final Exception e) {
139             final String errorMessage = "connection to the JMS server failed for JMS properties \""
140                     + jmsConsumerProperties.getJmsConsumerProperties() + "\"";
141             LOGGER.warn(errorMessage, e);
142             throw new ApexEventException(errorMessage, e);
143         }
144     }
145
146     /**
147      * {@inheritDoc}.
148      */
149     @Override
150     public void start() {
151         // Configure and start the event reception thread
152         final String threadName = this.getClass().getName() + ":" + this.name;
153         consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
154         consumerThread.setDaemon(true);
155         consumerThread.start();
156     }
157
158     /**
159      * {@inheritDoc}.
160      */
161     @Override
162     public String getName() {
163         return name;
164     }
165
166     /**
167      * {@inheritDoc}.
168      */
169     @Override
170     public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
171         return peerReferenceMap.get(peeredMode);
172     }
173
174     /*
175      * (non-Javadoc)
176      *
177      * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#setPeeredReference(org.onap.policy.apex.service.
178      * parameters. eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference)
179      */
180     @Override
181     public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
182         peerReferenceMap.put(peeredMode, peeredReference);
183     }
184
185     /*
186      * (non-Javadoc)
187      *
188      * @see java.lang.Runnable#run()
189      */
190     @Override
191     public void run() {
192         // JMS session and message consumer for receiving messages
193         try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
194             // Create a message consumer for reception of messages and set this class as a message listener
195             createMessageConsumer(jmsSession);
196         } catch (final Exception e) {
197             final String errorMessage = "failed to create a JMS session towards the JMS server for receiving messages";
198             LOGGER.warn(errorMessage, e);
199             throw new ApexEventRuntimeException(errorMessage, e);
200         }
201         // Everything is now set up
202         if (LOGGER.isDebugEnabled()) {
203             LOGGER.debug("event receiver " + this.getClass().getName() + ":" + this.name + " subscribed to JMS topic: "
204                     + jmsConsumerProperties.getConsumerTopic());
205         }
206     }
207
208     /**
209      * The helper function to create a message consumer from a given JMS session.
210      *
211      * @param jmsSession a JMS session
212      */
213     private void createMessageConsumer(final Session jmsSession) {
214         try (final MessageConsumer messageConsumer = jmsSession.createConsumer(jmsIncomingTopic)) {
215             messageConsumer.setMessageListener(this);
216
217             // The endless loop that receives events over JMS
218             while (consumerThread.isAlive() && !stopOrderedFlag) {
219                 ThreadUtilities.sleep(jmsConsumerProperties.getConsumerWaitTime());
220             }
221         } catch (final Exception e) {
222             final String errorMessage = "failed to create a JMS message consumer for receiving messages";
223             LOGGER.warn(errorMessage, e);
224             throw new ApexEventRuntimeException(errorMessage, e);
225         }
226     }
227
228     /**
229      * {@inheritDoc}.
230      */
231     @Override
232     public void onMessage(final Message jmsMessage) {
233         try {
234             if (LOGGER.isTraceEnabled()) {
235                 LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}",
236                         this.getClass().getName() + ":" + this.name, jmsMessage.getJMSMessageID(),
237                         jmsMessage.getJMSType());
238             }
239
240             eventReceiver.receiveEvent(null, jmsMessage);
241         } catch (final Exception e) {
242             final String errorMessage = "failed to receive message from JMS";
243             LOGGER.warn(errorMessage, e);
244             throw new ApexEventRuntimeException(errorMessage, e);
245         }
246     }
247
248     /**
249      * {@inheritDoc}.
250      */
251     @Override
252     public void stop() {
253         stopOrderedFlag = true;
254
255         while (consumerThread.isAlive()) {
256             ThreadUtilities.sleep(jmsConsumerProperties.getConsumerWaitTime());
257         }
258
259         // Close the connection to the JMS server
260         try {
261             if (connection != null) {
262                 connection.close();
263             }
264         } catch (final Exception e) {
265             final String errorMessage = "close of connection to the JMS server failed";
266             LOGGER.warn(errorMessage, e);
267         }
268     }
269
270 }