2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019-2021 Nordix Foundation.
5 * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * SPDX-License-Identifier: Apache-2.0
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.apex.plugins.event.carrier.jms;
25 import java.io.Serializable;
26 import java.util.EnumMap;
28 import java.util.Properties;
29 import javax.jms.Connection;
30 import javax.jms.ConnectionFactory;
31 import javax.jms.Message;
32 import javax.jms.MessageProducer;
33 import javax.jms.Session;
34 import javax.jms.Topic;
35 import javax.naming.InitialContext;
36 import javax.naming.NamingException;
38 import org.onap.policy.apex.service.engine.event.ApexEventException;
39 import org.onap.policy.apex.service.engine.event.ApexEventProducer;
40 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
41 import org.onap.policy.apex.service.engine.event.PeeredReference;
42 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
43 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
44 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
49 * Concrete implementation of an Apex event producer that sends events using JMS.
51 * @author Liam Fallon (liam.fallon@ericsson.com)
53 public class ApexJmsProducer implements ApexEventProducer {
54 // Get a reference to the logger
55 private static final Logger LOGGER = LoggerFactory.getLogger(ApexJmsProducer.class);
57 // Recurring string constants
58 private static final String COULD_NOT_SEND_PREFIX = "could not send event \"";
59 private static final String FOR_PRODUCER_TAG = "\" for producer (";
60 private static final String JMS_MESSAGE_PRODUCER_TAG = "\" on JMS message producer ";
62 // The JMS parameters read from the parameter service
63 private JmsCarrierTechnologyParameters jmsProducerProperties;
65 // The connection to the JMS server
66 private Connection connection;
68 // The JMS session on which we will send events
69 private Session jmsSession;
71 // The producer on which we will send events
72 private MessageProducer messageProducer;
74 // The name for this producer
76 private String name = null;
78 // The peer references for this event handler
79 private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap =
80 new EnumMap<>(EventHandlerPeeredMode.class);
86 public void init(final String producerName, final EventHandlerParameters producerParameters)
87 throws ApexEventException {
88 this.name = producerName;
90 // Check and get the JMS Properties
91 if (!(producerParameters.getCarrierTechnologyParameters() instanceof JmsCarrierTechnologyParameters)) {
92 final String errorMessage = "specified producer properties are not applicable to a JMS producer ("
94 throw new ApexEventException(errorMessage);
96 jmsProducerProperties = (JmsCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
98 // Look up the JMS connection factory
99 InitialContext jmsContext;
100 ConnectionFactory connectionFactory;
102 jmsContext = getInitialContext();
103 connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsProducerProperties.getConnectionFactory());
105 // Check if we actually got a connection factory
106 if (connectionFactory == null) {
107 throw new IllegalArgumentException(
108 "JMS context lookup of \"" + jmsProducerProperties.getConnectionFactory()
109 + "\" returned null for producer (" + this.name + ")");
111 } catch (final Exception e) {
112 final String errorMessage = "lookup of JMS connection factory \""
113 + jmsProducerProperties.getConnectionFactory() + "\" failed for JMS producer properties \""
114 + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
115 throw new ApexEventException(errorMessage, e);
118 // Lookup the topic on which we will send events
119 Topic jmsOutgoingTopic;
121 jmsOutgoingTopic = (Topic) jmsContext.lookup(jmsProducerProperties.getProducerTopic());
123 // Check if we actually got a topic
124 if (jmsOutgoingTopic == null) {
125 throw new IllegalArgumentException("JMS context lookup of \"" + jmsProducerProperties.getProducerTopic()
126 + "\" returned null for producer (" + this.name + ")");
128 } catch (final Exception e) {
129 final String errorMessage = "lookup of JMS topic \"" + jmsProducerProperties.getProducerTopic()
130 + "\" failed for JMS producer properties \""
131 + jmsProducerProperties.getJmsProducerProperties() + FOR_PRODUCER_TAG + this.name + ")";
132 throw new ApexEventException(errorMessage, e);
135 // Create and start a connection to the JMS server
137 connection = connectionFactory.createConnection(jmsProducerProperties.getSecurityPrincipal(),
138 jmsProducerProperties.getSecurityCredentials());
140 } catch (final Exception e) {
141 final String errorMessage = "connection to JMS server failed for JMS properties \""
142 + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
143 throw new ApexEventException(errorMessage, e);
146 // Create a JMS session for sending events
148 jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
149 } catch (final Exception e) {
150 final String errorMessage = "creation of session to JMS server failed for JMS properties \""
151 + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
152 throw new ApexEventException(errorMessage, e);
155 // Create a JMS message producer for sending events
157 messageProducer = jmsSession.createProducer(jmsOutgoingTopic);
158 } catch (final Exception e) {
159 final String errorMessage = "creation of producer for sending events "
160 + "to JMS server failed for JMS properties \""
161 + jmsProducerProperties.getJmsConsumerProperties() + "\"";
162 throw new ApexEventException(errorMessage, e);
167 * Construct InitialContext. This function should not be run directly.
168 * Package-private access is set for testing purposes only.
170 * @return InitialContext
171 * @throws NamingException if a naming exception is encountered
173 public InitialContext getInitialContext() throws NamingException {
174 return new InitialContext(jmsProducerProperties.getJmsProducerProperties());
181 public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
182 return peerReferenceMap.get(peeredMode);
189 public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
190 peerReferenceMap.put(peeredMode, peeredReference);
197 public void sendEvent(final long executionId, final Properties executionProperties, final String eventname,
198 final Object eventObject) {
199 // Check if this is a synchronized event, if so we have received a reply
200 final var synchronousEventCache = (SynchronousEventCache) peerReferenceMap
201 .get(EventHandlerPeeredMode.SYNCHRONOUS);
202 if (synchronousEventCache != null) {
203 synchronousEventCache.removeCachedEventToApexIfExists(executionId);
206 // Check if the object to be sent is serializable
207 if (!Serializable.class.isAssignableFrom(eventObject.getClass())) {
208 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
209 + ", object of type \"" + eventObject.getClass().getName() + "\" is not serializable";
210 LOGGER.warn(errorMessage);
211 throw new ApexEventRuntimeException(errorMessage);
214 // The JMS message to send is constructed using the JMS session
217 // Check the type of JMS message to send
218 if (jmsProducerProperties.isObjectMessageSending()) {
219 // We should send a JMS Object Message
221 jmsMessage = jmsSession.createObjectMessage((Serializable) eventObject);
222 } catch (final Exception e) {
223 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
224 + ", could not create JMS Object Message for object \"" + eventObject;
225 LOGGER.warn(errorMessage, e);
226 throw new ApexEventRuntimeException(errorMessage);
229 // We should send a JMS Text Message
231 jmsMessage = jmsSession.createTextMessage(eventObject.toString());
232 } catch (final Exception e) {
233 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
234 + ", could not create JMS Text Message for object \"" + eventObject;
235 LOGGER.warn(errorMessage, e);
236 throw new ApexEventRuntimeException(errorMessage);
241 messageProducer.send(jmsMessage);
242 } catch (final Exception e) {
243 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
244 + ", send failed for object \"" + eventObject;
245 LOGGER.warn(errorMessage, e);
246 throw new ApexEventRuntimeException(errorMessage);
255 // Close the message producer
257 messageProducer.close();
258 } catch (final Exception e) {
259 final String errorMessage = "failed to close JMS message producer " + this.name + " for sending messages";
260 LOGGER.warn(errorMessage, e);
266 } catch (final Exception e) {
267 final String errorMessage = "failed to close the JMS session for " + this.name + " for sending messages";
268 LOGGER.warn(errorMessage, e);
271 // Close the connection to the JMS server
274 } catch (final Exception e) {
275 final String errorMessage = "close of connection to the JMS server for " + this.name + " failed";
276 LOGGER.warn(errorMessage, e);