2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019-2020 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.apex.plugins.event.carrier.jms;
24 import java.io.Serializable;
25 import java.util.EnumMap;
27 import java.util.Properties;
29 import javax.jms.Connection;
30 import javax.jms.ConnectionFactory;
31 import javax.jms.Message;
32 import javax.jms.MessageProducer;
33 import javax.jms.Session;
34 import javax.jms.Topic;
35 import javax.naming.InitialContext;
37 import org.onap.policy.apex.service.engine.event.ApexEventException;
38 import org.onap.policy.apex.service.engine.event.ApexEventProducer;
39 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
40 import org.onap.policy.apex.service.engine.event.PeeredReference;
41 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
42 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
43 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
48 * Concrete implementation of an Apex event producer that sends events using JMS.
50 * @author Liam Fallon (liam.fallon@ericsson.com)
52 public class ApexJmsProducer implements ApexEventProducer {
53 // Get a reference to the logger
54 private static final Logger LOGGER = LoggerFactory.getLogger(ApexJmsProducer.class);
56 // Recurring string constants
57 private static final String COULD_NOT_SEND_PREFIX = "could not send event \"";
58 private static final String FOR_PRODUCER_TAG = "\" for producer (";
59 private static final String JMS_MESSAGE_PRODUCER_TAG = "\" on JMS message producer ";
61 // The JMS parameters read from the parameter service
62 private JmsCarrierTechnologyParameters jmsProducerProperties;
64 // The connection to the JMS server
65 private Connection connection;
67 // The JMS session on which we will send events
68 private Session jmsSession;
70 // The producer on which we will send events
71 private MessageProducer messageProducer;
73 // The name for this producer
74 private String name = null;
76 // The peer references for this event handler
77 private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
83 public void init(final String producerName, final EventHandlerParameters producerParameters)
84 throws ApexEventException {
85 this.name = producerName;
87 // Check and get the JMS Properties
88 if (!(producerParameters.getCarrierTechnologyParameters() instanceof JmsCarrierTechnologyParameters)) {
89 final String errorMessage = "specified producer properties are not applicable to a JMS producer ("
91 throw new ApexEventException(errorMessage);
93 jmsProducerProperties = (JmsCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
95 // Look up the JMS connection factory
96 InitialContext jmsContext = null;
97 ConnectionFactory connectionFactory = null;
99 jmsContext = new InitialContext(jmsProducerProperties.getJmsProducerProperties());
100 connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsProducerProperties.getConnectionFactory());
102 // Check if we actually got a connection factory
103 if (connectionFactory == null) {
104 throw new IllegalArgumentException(
105 "JMS context lookup of \"" + jmsProducerProperties.getConnectionFactory()
106 + "\" returned null for producer (" + this.name + ")");
108 } catch (final Exception e) {
109 final String errorMessage = "lookup of JMS connection factory \""
110 + jmsProducerProperties.getConnectionFactory() + "\" failed for JMS producer properties \""
111 + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
112 throw new ApexEventException(errorMessage, e);
115 // Lookup the topic on which we will send events
116 Topic jmsOutgoingTopic;
118 jmsOutgoingTopic = (Topic) jmsContext.lookup(jmsProducerProperties.getProducerTopic());
120 // Check if we actually got a topic
121 if (jmsOutgoingTopic == null) {
122 throw new IllegalArgumentException("JMS context lookup of \"" + jmsProducerProperties.getProducerTopic()
123 + "\" returned null for producer (" + this.name + ")");
125 } catch (final Exception e) {
126 final String errorMessage = "lookup of JMS topic \"" + jmsProducerProperties.getProducerTopic()
127 + "\" failed for JMS producer properties \""
128 + jmsProducerProperties.getJmsProducerProperties() + FOR_PRODUCER_TAG + this.name + ")";
129 throw new ApexEventException(errorMessage, e);
132 // Create and start a connection to the JMS server
134 connection = connectionFactory.createConnection(jmsProducerProperties.getSecurityPrincipal(),
135 jmsProducerProperties.getSecurityCredentials());
137 } catch (final Exception e) {
138 final String errorMessage = "connection to JMS server failed for JMS properties \""
139 + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
140 throw new ApexEventException(errorMessage, e);
143 // Create a JMS session for sending events
145 jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
146 } catch (final Exception e) {
147 final String errorMessage = "creation of session to JMS server failed for JMS properties \""
148 + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
149 throw new ApexEventException(errorMessage, e);
152 // Create a JMS message producer for sending events
154 messageProducer = jmsSession.createProducer(jmsOutgoingTopic);
155 } catch (final Exception e) {
156 final String errorMessage = "creation of producer for sending events "
157 + "to JMS server failed for JMS properties \""
158 + jmsProducerProperties.getJmsConsumerProperties() + "\"";
159 throw new ApexEventException(errorMessage, e);
167 public String getName() {
175 public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
176 return peerReferenceMap.get(peeredMode);
183 public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
184 peerReferenceMap.put(peeredMode, peeredReference);
191 public void sendEvent(final long executionId, final Properties executionProperties, final String eventname,
192 final Object eventObject) {
193 // Check if this is a synchronized event, if so we have received a reply
194 final SynchronousEventCache synchronousEventCache = (SynchronousEventCache) peerReferenceMap
195 .get(EventHandlerPeeredMode.SYNCHRONOUS);
196 if (synchronousEventCache != null) {
197 synchronousEventCache.removeCachedEventToApexIfExists(executionId);
200 // Check if the object to be sent is serializable
201 if (!Serializable.class.isAssignableFrom(eventObject.getClass())) {
202 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
203 + ", object of type \"" + eventObject.getClass().getName() + "\" is not serializable";
204 LOGGER.warn(errorMessage);
205 throw new ApexEventRuntimeException(errorMessage);
208 // The JMS message to send is constructed using the JMS session
209 Message jmsMessage = null;
211 // Check the type of JMS message to send
212 if (jmsProducerProperties.isObjectMessageSending()) {
213 // We should send a JMS Object Message
215 jmsMessage = jmsSession.createObjectMessage((Serializable) eventObject);
216 } catch (final Exception e) {
217 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
218 + ", could not create JMS Object Message for object \"" + eventObject;
219 LOGGER.warn(errorMessage, e);
220 throw new ApexEventRuntimeException(errorMessage);
223 // We should send a JMS Text Message
225 jmsMessage = jmsSession.createTextMessage(eventObject.toString());
226 } catch (final Exception e) {
227 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
228 + ", could not create JMS Text Message for object \"" + eventObject;
229 LOGGER.warn(errorMessage, e);
230 throw new ApexEventRuntimeException(errorMessage);
235 messageProducer.send(jmsMessage);
236 } catch (final Exception e) {
237 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
238 + ", send failed for object \"" + eventObject;
239 LOGGER.warn(errorMessage, e);
240 throw new ApexEventRuntimeException(errorMessage);
249 // Close the message producer
251 messageProducer.close();
252 } catch (final Exception e) {
253 final String errorMessage = "failed to close JMS message producer " + this.name + " for sending messages";
254 LOGGER.warn(errorMessage, e);
260 } catch (final Exception e) {
261 final String errorMessage = "failed to close the JMS session for " + this.name + " for sending messages";
262 LOGGER.warn(errorMessage, e);
265 // Close the connection to the JMS server
268 } catch (final Exception e) {
269 final String errorMessage = "close of connection to the JMS server for " + this.name + " failed";
270 LOGGER.warn(errorMessage, e);