2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.plugins.event.carrier.jms;
23 import java.io.Serializable;
24 import java.util.EnumMap;
27 import javax.jms.Connection;
28 import javax.jms.ConnectionFactory;
29 import javax.jms.Message;
30 import javax.jms.MessageProducer;
31 import javax.jms.Session;
32 import javax.jms.Topic;
33 import javax.naming.InitialContext;
35 import org.onap.policy.apex.service.engine.event.ApexEventException;
36 import org.onap.policy.apex.service.engine.event.ApexEventProducer;
37 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
38 import org.onap.policy.apex.service.engine.event.PeeredReference;
39 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
40 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
41 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * Concrete implementation of an Apex event producer that sends events using JMS.
48 * @author Liam Fallon (liam.fallon@ericsson.com)
50 public class ApexJMSProducer implements ApexEventProducer {
52 // Get a reference to the logger
53 private static final Logger LOGGER = LoggerFactory.getLogger(ApexJMSProducer.class);
55 // The JMS parameters read from the parameter service
56 private JMSCarrierTechnologyParameters jmsProducerProperties;
58 // The connection to the JMS server
59 private Connection connection;
61 // The topic on which we send events to JMS
62 private Topic jmsOutgoingTopic;
64 // The JMS session on which we will send events
65 private Session jmsSession;
67 // The producer on which we will send events
68 private MessageProducer messageProducer;
70 // The name for this producer
71 private String name = null;
73 // The peer references for this event handler
74 private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
79 * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#init(java.lang.String,
80 * org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters)
83 public void init(final String producerName, final EventHandlerParameters producerParameters)
84 throws ApexEventException {
85 this.name = producerName;
87 // Check and get the JMS Properties
88 if (!(producerParameters.getCarrierTechnologyParameters() instanceof JMSCarrierTechnologyParameters)) {
89 LOGGER.warn("specified producer properties are not applicable to a JMS producer (" + this.name + ")");
90 throw new ApexEventException(
91 "specified producer properties are not applicable to a JMS producer (" + this.name + ")");
93 jmsProducerProperties = (JMSCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
95 // Look up the JMS connection factory
96 InitialContext jmsContext = null;
97 ConnectionFactory connectionFactory = null;
99 jmsContext = new InitialContext(jmsProducerProperties.getJMSProducerProperties());
100 connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsProducerProperties.getConnectionFactory());
102 // Check if we actually got a connection factory
103 if (connectionFactory == null) {
104 throw new NullPointerException("JMS context lookup of \"" + jmsProducerProperties.getConnectionFactory()
105 + "\" returned null for producer (" + this.name + ")");
107 } catch (final Exception e) {
108 final String errorMessage = "lookup of JMS connection factory \""
109 + jmsProducerProperties.getConnectionFactory() + "\" failed for JMS producer properties \""
110 + jmsProducerProperties.getJMSConsumerProperties() + "\" for producer (" + this.name + ")";
111 LOGGER.warn(errorMessage, e);
112 throw new ApexEventException(errorMessage, e);
115 // Lookup the topic on which we will send events
117 jmsOutgoingTopic = (Topic) jmsContext.lookup(jmsProducerProperties.getProducerTopic());
119 // Check if we actually got a topic
120 if (jmsOutgoingTopic == null) {
121 throw new NullPointerException("JMS context lookup of \"" + jmsProducerProperties.getProducerTopic()
122 + "\" returned null for producer (" + this.name + ")");
124 } catch (final Exception e) {
125 final String errorMessage = "lookup of JMS topic \"" + jmsProducerProperties.getProducerTopic()
126 + "\" failed for JMS producer properties \"" + jmsProducerProperties.getJMSProducerProperties()
127 + "\" for producer (" + this.name + ")";
128 LOGGER.warn(errorMessage, e);
129 throw new ApexEventException(errorMessage, e);
132 // Create and start a connection to the JMS server
134 connection = connectionFactory.createConnection(jmsProducerProperties.getSecurityPrincipal(),
135 jmsProducerProperties.getSecurityCredentials());
137 } catch (final Exception e) {
138 final String errorMessage = "connection to JMS server failed for JMS properties \""
139 + jmsProducerProperties.getJMSConsumerProperties() + "\" for producer (" + this.name + ")";
140 LOGGER.warn(errorMessage, e);
141 throw new ApexEventException(errorMessage, e);
144 // Create a JMS session for sending events
146 jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
147 } catch (final Exception e) {
148 final String errorMessage = "creation of session to JMS server failed for JMS properties \""
149 + jmsProducerProperties.getJMSConsumerProperties() + "\" for producer (" + this.name + ")";
150 LOGGER.warn(errorMessage, e);
151 throw new ApexEventException(errorMessage, e);
154 // Create a JMS message producer for sending events
156 messageProducer = jmsSession.createProducer(jmsOutgoingTopic);
157 } catch (final Exception e) {
158 final String errorMessage =
159 "creation of producer for sending events to JMS server failed for JMS properties \""
160 + jmsProducerProperties.getJMSConsumerProperties() + "\"";
161 LOGGER.warn(errorMessage, e);
162 throw new ApexEventException(errorMessage, e);
169 * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName()
172 public String getName() {
179 * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap.policy.apex.service.
180 * parameters. eventhandler.EventHandlerPeeredMode)
183 public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
184 return peerReferenceMap.get(peeredMode);
190 * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap.policy.apex.service.
191 * parameters. eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference)
194 public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
195 peerReferenceMap.put(peeredMode, peeredReference);
201 * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#sendEvent(long, java.lang.String,
205 public void sendEvent(final long executionId, final String eventname, final Object eventObject) {
206 // Check if this is a synchronized event, if so we have received a reply
207 final SynchronousEventCache synchronousEventCache =
208 (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS);
209 if (synchronousEventCache != null) {
210 synchronousEventCache.removeCachedEventToApexIfExists(executionId);
213 // Check if the object to be sent is serializable
214 if (!Serializable.class.isAssignableFrom(eventObject.getClass())) {
215 final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer "
216 + this.name + ", object of type \"" + eventObject.getClass().getCanonicalName()
217 + "\" is not serializable";
218 LOGGER.warn(errorMessage);
219 throw new ApexEventRuntimeException(errorMessage);
222 // The JMS message to send is constructed using the JMS session
223 Message jmsMessage = null;
225 // Check the type of JMS message to send
226 if (jmsProducerProperties.isObjectMessageSending()) {
227 // We should send a JMS Object Message
229 jmsMessage = jmsSession.createObjectMessage((Serializable) eventObject);
230 } catch (final Exception e) {
231 final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer "
232 + this.name + ", could not create JMS Object Message for object \"" + eventObject;
233 LOGGER.warn(errorMessage);
234 throw new ApexEventRuntimeException(errorMessage);
237 // We should send a JMS Text Message
239 jmsMessage = jmsSession.createTextMessage(eventObject.toString());
240 } catch (final Exception e) {
241 final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer "
242 + this.name + ", could not create JMS Text Message for object \"" + eventObject;
243 LOGGER.warn(errorMessage);
244 throw new ApexEventRuntimeException(errorMessage);
249 messageProducer.send(jmsMessage);
250 } catch (final Exception e) {
251 final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer "
252 + this.name + ", send failed for object \"" + eventObject;
253 LOGGER.warn(errorMessage);
254 throw new ApexEventRuntimeException(errorMessage);
261 * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop()
265 // Close the message producer
267 messageProducer.close();
268 } catch (final Exception e) {
269 final String errorMessage = "failed to close JMS message producer " + this.name + " for sending messages";
270 LOGGER.warn(errorMessage, e);
276 } catch (final Exception e) {
277 final String errorMessage = "failed to close the JMS session for " + this.name + " for sending messages";
278 LOGGER.warn(errorMessage, e);
281 // Close the connection to the JMS server
284 } catch (final Exception e) {
285 final String errorMessage = "close of connection to the JMS server for " + this.name + " failed";
286 LOGGER.warn(errorMessage, e);