2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.plugins.event.carrier.jms;
23 import java.io.Serializable;
24 import java.util.EnumMap;
27 import javax.jms.Connection;
28 import javax.jms.ConnectionFactory;
29 import javax.jms.Message;
30 import javax.jms.MessageProducer;
31 import javax.jms.Session;
32 import javax.jms.Topic;
33 import javax.naming.InitialContext;
35 import org.onap.policy.apex.service.engine.event.ApexEventException;
36 import org.onap.policy.apex.service.engine.event.ApexEventProducer;
37 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
38 import org.onap.policy.apex.service.engine.event.PeeredReference;
39 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
40 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
41 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * Concrete implementation of an Apex event producer that sends events using JMS.
48 * @author Liam Fallon (liam.fallon@ericsson.com)
50 public class ApexJmsProducer implements ApexEventProducer {
51 // Get a reference to the logger
52 private static final Logger LOGGER = LoggerFactory.getLogger(ApexJmsProducer.class);
54 // Recurring string constants
55 private static final String COULD_NOT_SEND_PREFIX = "could not send event \"";
56 private static final String FOR_PRODUCER_TAG = "\" for producer (";
57 private static final String JMS_MESSAGE_PRODUCER_TAG = "\" on JMS message producer ";
59 // The JMS parameters read from the parameter service
60 private JmsCarrierTechnologyParameters jmsProducerProperties;
62 // The connection to the JMS server
63 private Connection connection;
65 // The JMS session on which we will send events
66 private Session jmsSession;
68 // The producer on which we will send events
69 private MessageProducer messageProducer;
71 // The name for this producer
72 private String name = null;
74 // The peer references for this event handler
75 private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
80 * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#init(java.lang.String,
81 * org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters)
84 public void init(final String producerName, final EventHandlerParameters producerParameters)
85 throws ApexEventException {
86 this.name = producerName;
88 // Check and get the JMS Properties
89 if (!(producerParameters.getCarrierTechnologyParameters() instanceof JmsCarrierTechnologyParameters)) {
90 final String errorMessage = "specified producer properties are not applicable to a JMS producer ("
92 LOGGER.warn(errorMessage);
93 throw new ApexEventException(errorMessage);
95 jmsProducerProperties = (JmsCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
97 // Look up the JMS connection factory
98 InitialContext jmsContext = null;
99 ConnectionFactory connectionFactory = null;
101 jmsContext = new InitialContext(jmsProducerProperties.getJmsProducerProperties());
102 connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsProducerProperties.getConnectionFactory());
104 // Check if we actually got a connection factory
105 if (connectionFactory == null) {
106 throw new IllegalArgumentException(
107 "JMS context lookup of \"" + jmsProducerProperties.getConnectionFactory()
108 + "\" returned null for producer (" + this.name + ")");
110 } catch (final Exception e) {
111 final String errorMessage = "lookup of JMS connection factory \""
112 + jmsProducerProperties.getConnectionFactory() + "\" failed for JMS producer properties \""
113 + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
114 LOGGER.warn(errorMessage, e);
115 throw new ApexEventException(errorMessage, e);
118 // Lookup the topic on which we will send events
119 Topic jmsOutgoingTopic;
121 jmsOutgoingTopic = (Topic) jmsContext.lookup(jmsProducerProperties.getProducerTopic());
123 // Check if we actually got a topic
124 if (jmsOutgoingTopic == null) {
125 throw new IllegalArgumentException("JMS context lookup of \"" + jmsProducerProperties.getProducerTopic()
126 + "\" returned null for producer (" + this.name + ")");
128 } catch (final Exception e) {
129 final String errorMessage = "lookup of JMS topic \"" + jmsProducerProperties.getProducerTopic()
130 + "\" failed for JMS producer properties \""
131 + jmsProducerProperties.getJmsProducerProperties() + FOR_PRODUCER_TAG + this.name + ")";
132 LOGGER.warn(errorMessage, e);
133 throw new ApexEventException(errorMessage, e);
136 // Create and start a connection to the JMS server
138 connection = connectionFactory.createConnection(jmsProducerProperties.getSecurityPrincipal(),
139 jmsProducerProperties.getSecurityCredentials());
141 } catch (final Exception e) {
142 final String errorMessage = "connection to JMS server failed for JMS properties \""
143 + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
144 LOGGER.warn(errorMessage, e);
145 throw new ApexEventException(errorMessage, e);
148 // Create a JMS session for sending events
150 jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
151 } catch (final Exception e) {
152 final String errorMessage = "creation of session to JMS server failed for JMS properties \""
153 + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
154 LOGGER.warn(errorMessage, e);
155 throw new ApexEventException(errorMessage, e);
158 // Create a JMS message producer for sending events
160 messageProducer = jmsSession.createProducer(jmsOutgoingTopic);
161 } catch (final Exception e) {
162 final String errorMessage = "creation of producer for sending events "
163 + "to JMS server failed for JMS properties \""
164 + jmsProducerProperties.getJmsConsumerProperties() + "\"";
165 LOGGER.warn(errorMessage, e);
166 throw new ApexEventException(errorMessage, e);
173 * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName()
176 public String getName() {
183 * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap.policy.apex.service.
184 * parameters. eventhandler.EventHandlerPeeredMode)
187 public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
188 return peerReferenceMap.get(peeredMode);
194 * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap.policy.apex.service.
195 * parameters. eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference)
198 public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
199 peerReferenceMap.put(peeredMode, peeredReference);
205 * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#sendEvent(long, java.lang.String,
209 public void sendEvent(final long executionId, final String eventname, final Object eventObject) {
210 // Check if this is a synchronized event, if so we have received a reply
211 final SynchronousEventCache synchronousEventCache = (SynchronousEventCache) peerReferenceMap
212 .get(EventHandlerPeeredMode.SYNCHRONOUS);
213 if (synchronousEventCache != null) {
214 synchronousEventCache.removeCachedEventToApexIfExists(executionId);
217 // Check if the object to be sent is serializable
218 if (!Serializable.class.isAssignableFrom(eventObject.getClass())) {
219 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
220 + ", object of type \"" + eventObject.getClass().getCanonicalName()
221 + "\" is not serializable";
222 LOGGER.warn(errorMessage);
223 throw new ApexEventRuntimeException(errorMessage);
226 // The JMS message to send is constructed using the JMS session
227 Message jmsMessage = null;
229 // Check the type of JMS message to send
230 if (jmsProducerProperties.isObjectMessageSending()) {
231 // We should send a JMS Object Message
233 jmsMessage = jmsSession.createObjectMessage((Serializable) eventObject);
234 } catch (final Exception e) {
235 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
236 + ", could not create JMS Object Message for object \"" + eventObject;
237 LOGGER.warn(errorMessage, e);
238 throw new ApexEventRuntimeException(errorMessage);
241 // We should send a JMS Text Message
243 jmsMessage = jmsSession.createTextMessage(eventObject.toString());
244 } catch (final Exception e) {
245 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
246 + ", could not create JMS Text Message for object \"" + eventObject;
247 LOGGER.warn(errorMessage, e);
248 throw new ApexEventRuntimeException(errorMessage);
253 messageProducer.send(jmsMessage);
254 } catch (final Exception e) {
255 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
256 + ", send failed for object \"" + eventObject;
257 LOGGER.warn(errorMessage, e);
258 throw new ApexEventRuntimeException(errorMessage);
265 * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop()
269 // Close the message producer
271 messageProducer.close();
272 } catch (final Exception e) {
273 final String errorMessage = "failed to close JMS message producer " + this.name + " for sending messages";
274 LOGGER.warn(errorMessage, e);
280 } catch (final Exception e) {
281 final String errorMessage = "failed to close the JMS session for " + this.name + " for sending messages";
282 LOGGER.warn(errorMessage, e);
285 // Close the connection to the JMS server
288 } catch (final Exception e) {
289 final String errorMessage = "close of connection to the JMS server for " + this.name + " failed";
290 LOGGER.warn(errorMessage, e);