2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.plugins.event.carrier.jms;
23 import java.io.Serializable;
24 import java.util.EnumMap;
26 import java.util.Properties;
28 import javax.jms.Connection;
29 import javax.jms.ConnectionFactory;
30 import javax.jms.Message;
31 import javax.jms.MessageProducer;
32 import javax.jms.Session;
33 import javax.jms.Topic;
34 import javax.naming.InitialContext;
36 import org.onap.policy.apex.service.engine.event.ApexEventException;
37 import org.onap.policy.apex.service.engine.event.ApexEventProducer;
38 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
39 import org.onap.policy.apex.service.engine.event.PeeredReference;
40 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
41 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
42 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * Concrete implementation of an Apex event producer that sends events using JMS.
49 * @author Liam Fallon (liam.fallon@ericsson.com)
51 public class ApexJmsProducer implements ApexEventProducer {
52 // Get a reference to the logger
53 private static final Logger LOGGER = LoggerFactory.getLogger(ApexJmsProducer.class);
55 // Recurring string constants
56 private static final String COULD_NOT_SEND_PREFIX = "could not send event \"";
57 private static final String FOR_PRODUCER_TAG = "\" for producer (";
58 private static final String JMS_MESSAGE_PRODUCER_TAG = "\" on JMS message producer ";
60 // The JMS parameters read from the parameter service
61 private JmsCarrierTechnologyParameters jmsProducerProperties;
63 // The connection to the JMS server
64 private Connection connection;
66 // The JMS session on which we will send events
67 private Session jmsSession;
69 // The producer on which we will send events
70 private MessageProducer messageProducer;
72 // The name for this producer
73 private String name = null;
75 // The peer references for this event handler
76 private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
81 * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#init(java.lang.String,
82 * org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters)
85 public void init(final String producerName, final EventHandlerParameters producerParameters)
86 throws ApexEventException {
87 this.name = producerName;
89 // Check and get the JMS Properties
90 if (!(producerParameters.getCarrierTechnologyParameters() instanceof JmsCarrierTechnologyParameters)) {
91 final String errorMessage =
92 "specified producer properties are not applicable to a JMS producer (" + this.name + ")";
93 LOGGER.warn(errorMessage);
94 throw new ApexEventException(errorMessage);
96 jmsProducerProperties = (JmsCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
98 // Look up the JMS connection factory
99 InitialContext jmsContext = null;
100 ConnectionFactory connectionFactory = null;
102 jmsContext = new InitialContext(jmsProducerProperties.getJmsProducerProperties());
103 connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsProducerProperties.getConnectionFactory());
105 // Check if we actually got a connection factory
106 if (connectionFactory == null) {
107 throw new IllegalArgumentException(
108 "JMS context lookup of \"" + jmsProducerProperties.getConnectionFactory()
109 + "\" returned null for producer (" + this.name + ")");
111 } catch (final Exception e) {
112 final String errorMessage = "lookup of JMS connection factory \""
113 + jmsProducerProperties.getConnectionFactory() + "\" failed for JMS producer properties \""
114 + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
115 LOGGER.warn(errorMessage, e);
116 throw new ApexEventException(errorMessage, e);
119 // Lookup the topic on which we will send events
120 Topic jmsOutgoingTopic;
122 jmsOutgoingTopic = (Topic) jmsContext.lookup(jmsProducerProperties.getProducerTopic());
124 // Check if we actually got a topic
125 if (jmsOutgoingTopic == null) {
126 throw new IllegalArgumentException("JMS context lookup of \"" + jmsProducerProperties.getProducerTopic()
127 + "\" returned null for producer (" + this.name + ")");
129 } catch (final Exception e) {
130 final String errorMessage = "lookup of JMS topic \"" + jmsProducerProperties.getProducerTopic()
131 + "\" failed for JMS producer properties \"" + jmsProducerProperties.getJmsProducerProperties()
132 + FOR_PRODUCER_TAG + this.name + ")";
133 LOGGER.warn(errorMessage, e);
134 throw new ApexEventException(errorMessage, e);
137 // Create and start a connection to the JMS server
139 connection = connectionFactory.createConnection(jmsProducerProperties.getSecurityPrincipal(),
140 jmsProducerProperties.getSecurityCredentials());
142 } catch (final Exception e) {
143 final String errorMessage = "connection to JMS server failed for JMS properties \""
144 + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
145 LOGGER.warn(errorMessage, e);
146 throw new ApexEventException(errorMessage, e);
149 // Create a JMS session for sending events
151 jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
152 } catch (final Exception e) {
153 final String errorMessage = "creation of session to JMS server failed for JMS properties \""
154 + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
155 LOGGER.warn(errorMessage, e);
156 throw new ApexEventException(errorMessage, e);
159 // Create a JMS message producer for sending events
161 messageProducer = jmsSession.createProducer(jmsOutgoingTopic);
162 } catch (final Exception e) {
163 final String errorMessage =
164 "creation of producer for sending events " + "to JMS server failed for JMS properties \""
165 + jmsProducerProperties.getJmsConsumerProperties() + "\"";
166 LOGGER.warn(errorMessage, e);
167 throw new ApexEventException(errorMessage, e);
174 * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName()
177 public String getName() {
184 * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap.policy.apex.service.
185 * parameters. eventhandler.EventHandlerPeeredMode)
188 public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
189 return peerReferenceMap.get(peeredMode);
195 * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap.policy.apex.service.
196 * parameters. eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference)
199 public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
200 peerReferenceMap.put(peeredMode, peeredReference);
206 * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#sendEvent(long, java.lang.String,
210 public void sendEvent(final long executionId, final Properties executionProperties, final String eventname,
211 final Object eventObject) {
212 // Check if this is a synchronized event, if so we have received a reply
213 final SynchronousEventCache synchronousEventCache =
214 (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS);
215 if (synchronousEventCache != null) {
216 synchronousEventCache.removeCachedEventToApexIfExists(executionId);
219 // Check if the object to be sent is serializable
220 if (!Serializable.class.isAssignableFrom(eventObject.getClass())) {
221 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
222 + ", object of type \"" + eventObject.getClass().getCanonicalName() + "\" is not serializable";
223 LOGGER.warn(errorMessage);
224 throw new ApexEventRuntimeException(errorMessage);
227 // The JMS message to send is constructed using the JMS session
228 Message jmsMessage = null;
230 // Check the type of JMS message to send
231 if (jmsProducerProperties.isObjectMessageSending()) {
232 // We should send a JMS Object Message
234 jmsMessage = jmsSession.createObjectMessage((Serializable) eventObject);
235 } catch (final Exception e) {
236 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
237 + ", could not create JMS Object Message for object \"" + eventObject;
238 LOGGER.warn(errorMessage, e);
239 throw new ApexEventRuntimeException(errorMessage);
242 // We should send a JMS Text Message
244 jmsMessage = jmsSession.createTextMessage(eventObject.toString());
245 } catch (final Exception e) {
246 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
247 + ", could not create JMS Text Message for object \"" + eventObject;
248 LOGGER.warn(errorMessage, e);
249 throw new ApexEventRuntimeException(errorMessage);
254 messageProducer.send(jmsMessage);
255 } catch (final Exception e) {
256 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
257 + ", send failed for object \"" + eventObject;
258 LOGGER.warn(errorMessage, e);
259 throw new ApexEventRuntimeException(errorMessage);
266 * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop()
270 // Close the message producer
272 messageProducer.close();
273 } catch (final Exception e) {
274 final String errorMessage = "failed to close JMS message producer " + this.name + " for sending messages";
275 LOGGER.warn(errorMessage, e);
281 } catch (final Exception e) {
282 final String errorMessage = "failed to close the JMS session for " + this.name + " for sending messages";
283 LOGGER.warn(errorMessage, e);
286 // Close the connection to the JMS server
289 } catch (final Exception e) {
290 final String errorMessage = "close of connection to the JMS server for " + this.name + " failed";
291 LOGGER.warn(errorMessage, e);