2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019-2021, 2023 Nordix Foundation.
5 * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * SPDX-License-Identifier: Apache-2.0
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.apex.plugins.event.carrier.jms;
25 import jakarta.jms.Connection;
26 import jakarta.jms.ConnectionFactory;
27 import jakarta.jms.Message;
28 import jakarta.jms.MessageListener;
29 import jakarta.jms.Session;
30 import jakarta.jms.Topic;
31 import java.util.Properties;
32 import javax.naming.InitialContext;
33 import javax.naming.NamingException;
34 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
35 import org.onap.policy.apex.service.engine.event.ApexEventException;
36 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
37 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
38 import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer;
39 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
44 * This class implements an Apex event consumer that receives events using JMS.
46 * @author Liam Fallon (liam.fallon@ericsson.com)
48 public class ApexJmsConsumer extends ApexPluginsEventConsumer implements MessageListener {
49 // Get a reference to the logger
50 private static final Logger LOGGER = LoggerFactory.getLogger(ApexJmsConsumer.class);
52 // The Apex and JMS parameters read from the parameter service
53 private JmsCarrierTechnologyParameters jmsConsumerProperties;
55 // The event receiver that will receive events from this consumer
56 private ApexEventReceiver eventReceiver;
58 // The connection to the JMS server
59 private Connection connection;
61 // The topic on which we receive events from JMS
62 private Topic jmsIncomingTopic;
65 public void init(final String consumerName, final EventHandlerParameters consumerParameters,
66 final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
67 this.eventReceiver = incomingEventReceiver;
69 this.name = consumerName;
71 // Check and get the JMS Properties
72 if (!(consumerParameters.getCarrierTechnologyParameters() instanceof JmsCarrierTechnologyParameters)) {
73 final String errorMessage = "specified consumer properties of type \""
74 + consumerParameters.getCarrierTechnologyParameters().getClass().getName()
75 + "\" are not applicable to a JMS consumer";
76 throw new ApexEventException(errorMessage);
78 jmsConsumerProperties = (JmsCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
80 // Look up the JMS connection factory
81 InitialContext jmsContext;
82 ConnectionFactory connectionFactory;
84 jmsContext = getInitialContext();
85 connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsConsumerProperties.getConnectionFactory());
87 // Check if we actually got a connection factory
88 if (connectionFactory == null) {
89 throw new IllegalArgumentException(
90 "JMS context lookup of \"" + jmsConsumerProperties.getConnectionFactory() + "\" returned null");
92 } catch (final Exception e) {
93 final String errorMessage = "lookup of JMS connection factory \""
94 + jmsConsumerProperties.getConnectionFactory() + "\" failed for JMS consumer properties \""
95 + jmsConsumerProperties.getJmsConsumerProperties() + "\"";
96 throw new ApexEventException(errorMessage, e);
99 // Lookup the topic on which we will receive events
101 jmsIncomingTopic = (Topic) jmsContext.lookup(jmsConsumerProperties.getConsumerTopic());
103 // Check if we actually got a topic
104 if (jmsIncomingTopic == null) {
105 throw new IllegalArgumentException(
106 "JMS context lookup of \"" + jmsConsumerProperties.getConsumerTopic() + "\" returned null");
108 } catch (final Exception e) {
109 final String errorMessage = "lookup of JMS topic \"" + jmsConsumerProperties.getConsumerTopic()
110 + "\" failed for JMS consumer properties \"" + jmsConsumerProperties.getJmsConsumerProperties()
112 throw new ApexEventException(errorMessage, e);
115 // Create and start a connection to the JMS server
117 connection = connectionFactory.createConnection(jmsConsumerProperties.getSecurityPrincipal(),
118 jmsConsumerProperties.getSecurityCredentials());
120 } catch (final Exception e) {
121 final String errorMessage = "connection to the JMS server failed for JMS properties \""
122 + jmsConsumerProperties.getJmsConsumerProperties() + "\"";
123 throw new ApexEventException(errorMessage, e);
128 * Construct InitialContext. This function should not be run directly.
129 * Package-private access is set for testing purposes only.
131 * @return InitialContext
132 * @throws NamingException if a naming exception is encountered
134 InitialContext getInitialContext() throws NamingException {
135 return new InitialContext(jmsConsumerProperties.getJmsConsumerProperties());
143 // JMS session and message consumer for receiving messages
144 try (final var jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
145 // Create a message consumer for reception of messages and set this class as a message listener
146 createMessageConsumer(jmsSession);
147 } catch (final Exception exc) {
148 final var errorMessage = "failed to create a JMS session towards the JMS server for receiving messages";
149 throw new ApexEventRuntimeException(errorMessage, exc);
151 // Everything is now set up
152 if (LOGGER.isDebugEnabled()) {
153 LOGGER.debug("event receiver {}:{} subscribed to JMS topic: {}", this.getClass().getName(),
154 this.name, jmsConsumerProperties.getConsumerTopic());
159 * The helper function to create a message consumer from a given JMS session.
161 * @param jmsSession a JMS session
163 private void createMessageConsumer(final Session jmsSession) {
164 try (final var messageConsumer = jmsSession.createConsumer(jmsIncomingTopic)) {
165 messageConsumer.setMessageListener(this);
167 // The endless loop that receives events over JMS
168 while (consumerThread.isAlive() && !stopOrderedFlag) {
169 ThreadUtilities.sleep(jmsConsumerProperties.getConsumerWaitTime());
171 } catch (final Exception exc) {
172 throw new ApexEventRuntimeException("failed to create a JMS message consumer for receiving messages", exc);
180 public void onMessage(final Message jmsMessage) {
182 if (LOGGER.isTraceEnabled()) {
183 LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}",
184 this.getClass().getName() + ":" + this.name, jmsMessage.getJMSMessageID(),
185 jmsMessage.getJMSType());
188 eventReceiver.receiveEvent(new Properties(), jmsMessage);
189 } catch (final Exception e) {
190 throw new ApexEventRuntimeException("failed to receive message from JMS", e);
199 stopOrderedFlag = true;
201 while (consumerThread.isAlive()) {
202 ThreadUtilities.sleep(jmsConsumerProperties.getConsumerWaitTime());
205 // Close the connection to the JMS server
207 if (connection != null) {
210 } catch (final Exception e) {
211 LOGGER.warn("close of connection to the JMS server failed", e);