e98ee8ff8ef008a7f76a5ac75af26392ce586832
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.plugins.event.carrier.jms;
22
23 import java.io.Serializable;
24 import java.util.EnumMap;
25 import java.util.Map;
26 import java.util.Properties;
27
28 import javax.jms.Connection;
29 import javax.jms.ConnectionFactory;
30 import javax.jms.Message;
31 import javax.jms.MessageProducer;
32 import javax.jms.Session;
33 import javax.jms.Topic;
34 import javax.naming.InitialContext;
35
36 import org.onap.policy.apex.service.engine.event.ApexEventException;
37 import org.onap.policy.apex.service.engine.event.ApexEventProducer;
38 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
39 import org.onap.policy.apex.service.engine.event.PeeredReference;
40 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
41 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
42 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * Concrete implementation of an Apex event producer that sends events using JMS.
48  *
49  * @author Liam Fallon (liam.fallon@ericsson.com)
50  */
51 public class ApexJmsProducer implements ApexEventProducer {
52     // Get a reference to the logger
53     private static final Logger LOGGER = LoggerFactory.getLogger(ApexJmsProducer.class);
54
55     // Recurring string constants
56     private static final String COULD_NOT_SEND_PREFIX = "could not send event \"";
57     private static final String FOR_PRODUCER_TAG = "\" for producer (";
58     private static final String JMS_MESSAGE_PRODUCER_TAG = "\" on JMS message producer ";
59
60     // The JMS parameters read from the parameter service
61     private JmsCarrierTechnologyParameters jmsProducerProperties;
62
63     // The connection to the JMS server
64     private Connection connection;
65
66     // The JMS session on which we will send events
67     private Session jmsSession;
68
69     // The producer on which we will send events
70     private MessageProducer messageProducer;
71
72     // The name for this producer
73     private String name = null;
74
75     // The peer references for this event handler
76     private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
77
78     /*
79      * (non-Javadoc)
80      *
81      * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#init(java.lang.String,
82      * org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters)
83      */
84     @Override
85     public void init(final String producerName, final EventHandlerParameters producerParameters)
86             throws ApexEventException {
87         this.name = producerName;
88
89         // Check and get the JMS Properties
90         if (!(producerParameters.getCarrierTechnologyParameters() instanceof JmsCarrierTechnologyParameters)) {
91             final String errorMessage =
92                     "specified producer properties are not applicable to a JMS producer (" + this.name + ")";
93             LOGGER.warn(errorMessage);
94             throw new ApexEventException(errorMessage);
95         }
96         jmsProducerProperties = (JmsCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
97
98         // Look up the JMS connection factory
99         InitialContext jmsContext = null;
100         ConnectionFactory connectionFactory = null;
101         try {
102             jmsContext = new InitialContext(jmsProducerProperties.getJmsProducerProperties());
103             connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsProducerProperties.getConnectionFactory());
104
105             // Check if we actually got a connection factory
106             if (connectionFactory == null) {
107                 throw new IllegalArgumentException(
108                         "JMS context lookup of \"" + jmsProducerProperties.getConnectionFactory()
109                                 + "\" returned null for producer (" + this.name + ")");
110             }
111         } catch (final Exception e) {
112             final String errorMessage = "lookup of JMS connection factory  \""
113                     + jmsProducerProperties.getConnectionFactory() + "\" failed for JMS producer properties \""
114                     + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
115             LOGGER.warn(errorMessage, e);
116             throw new ApexEventException(errorMessage, e);
117         }
118
119         // Lookup the topic on which we will send events
120         Topic jmsOutgoingTopic;
121         try {
122             jmsOutgoingTopic = (Topic) jmsContext.lookup(jmsProducerProperties.getProducerTopic());
123
124             // Check if we actually got a topic
125             if (jmsOutgoingTopic == null) {
126                 throw new IllegalArgumentException("JMS context lookup of \"" + jmsProducerProperties.getProducerTopic()
127                         + "\" returned null for producer (" + this.name + ")");
128             }
129         } catch (final Exception e) {
130             final String errorMessage = "lookup of JMS topic  \"" + jmsProducerProperties.getProducerTopic()
131                     + "\" failed for JMS producer properties \"" + jmsProducerProperties.getJmsProducerProperties()
132                     + FOR_PRODUCER_TAG + this.name + ")";
133             LOGGER.warn(errorMessage, e);
134             throw new ApexEventException(errorMessage, e);
135         }
136
137         // Create and start a connection to the JMS server
138         try {
139             connection = connectionFactory.createConnection(jmsProducerProperties.getSecurityPrincipal(),
140                     jmsProducerProperties.getSecurityCredentials());
141             connection.start();
142         } catch (final Exception e) {
143             final String errorMessage = "connection to JMS server failed for JMS properties \""
144                     + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
145             LOGGER.warn(errorMessage, e);
146             throw new ApexEventException(errorMessage, e);
147         }
148
149         // Create a JMS session for sending events
150         try {
151             jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
152         } catch (final Exception e) {
153             final String errorMessage = "creation of session to JMS server failed for JMS properties \""
154                     + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
155             LOGGER.warn(errorMessage, e);
156             throw new ApexEventException(errorMessage, e);
157         }
158
159         // Create a JMS message producer for sending events
160         try {
161             messageProducer = jmsSession.createProducer(jmsOutgoingTopic);
162         } catch (final Exception e) {
163             final String errorMessage =
164                     "creation of producer for sending events " + "to JMS server failed for JMS properties \""
165                             + jmsProducerProperties.getJmsConsumerProperties() + "\"";
166             LOGGER.warn(errorMessage, e);
167             throw new ApexEventException(errorMessage, e);
168         }
169     }
170
171     /*
172      * (non-Javadoc)
173      *
174      * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName()
175      */
176     @Override
177     public String getName() {
178         return name;
179     }
180
181     /*
182      * (non-Javadoc)
183      *
184      * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap.policy.apex.service.
185      * parameters. eventhandler.EventHandlerPeeredMode)
186      */
187     @Override
188     public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
189         return peerReferenceMap.get(peeredMode);
190     }
191
192     /*
193      * (non-Javadoc)
194      *
195      * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap.policy.apex.service.
196      * parameters. eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference)
197      */
198     @Override
199     public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
200         peerReferenceMap.put(peeredMode, peeredReference);
201     }
202
203     /*
204      * (non-Javadoc)
205      *
206      * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#sendEvent(long, java.lang.String,
207      * java.lang.Object)
208      */
209     @Override
210     public void sendEvent(final long executionId, final Properties executionProperties, final String eventname,
211             final Object eventObject) {
212         // Check if this is a synchronized event, if so we have received a reply
213         final SynchronousEventCache synchronousEventCache =
214                 (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS);
215         if (synchronousEventCache != null) {
216             synchronousEventCache.removeCachedEventToApexIfExists(executionId);
217         }
218
219         // Check if the object to be sent is serializable
220         if (!Serializable.class.isAssignableFrom(eventObject.getClass())) {
221             final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
222                     + ", object of type \"" + eventObject.getClass().getCanonicalName() + "\" is not serializable";
223             LOGGER.warn(errorMessage);
224             throw new ApexEventRuntimeException(errorMessage);
225         }
226
227         // The JMS message to send is constructed using the JMS session
228         Message jmsMessage = null;
229
230         // Check the type of JMS message to send
231         if (jmsProducerProperties.isObjectMessageSending()) {
232             // We should send a JMS Object Message
233             try {
234                 jmsMessage = jmsSession.createObjectMessage((Serializable) eventObject);
235             } catch (final Exception e) {
236                 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
237                         + ", could not create JMS Object Message for object \"" + eventObject;
238                 LOGGER.warn(errorMessage, e);
239                 throw new ApexEventRuntimeException(errorMessage);
240             }
241         } else {
242             // We should send a JMS Text Message
243             try {
244                 jmsMessage = jmsSession.createTextMessage(eventObject.toString());
245             } catch (final Exception e) {
246                 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
247                         + ", could not create JMS Text Message for object \"" + eventObject;
248                 LOGGER.warn(errorMessage, e);
249                 throw new ApexEventRuntimeException(errorMessage);
250             }
251         }
252
253         try {
254             messageProducer.send(jmsMessage);
255         } catch (final Exception e) {
256             final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
257                     + ", send failed for object \"" + eventObject;
258             LOGGER.warn(errorMessage, e);
259             throw new ApexEventRuntimeException(errorMessage);
260         }
261     }
262
263     /*
264      * (non-Javadoc)
265      *
266      * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop()
267      */
268     @Override
269     public void stop() {
270         // Close the message producer
271         try {
272             messageProducer.close();
273         } catch (final Exception e) {
274             final String errorMessage = "failed to close JMS message producer " + this.name + " for sending messages";
275             LOGGER.warn(errorMessage, e);
276         }
277
278         // Close the session
279         try {
280             jmsSession.close();
281         } catch (final Exception e) {
282             final String errorMessage = "failed to close the JMS session for  " + this.name + " for sending messages";
283             LOGGER.warn(errorMessage, e);
284         }
285
286         // Close the connection to the JMS server
287         try {
288             connection.close();
289         } catch (final Exception e) {
290             final String errorMessage = "close of connection to the JMS server for  " + this.name + " failed";
291             LOGGER.warn(errorMessage, e);
292         }
293     }
294 }