86e9555f9f89bb14c8848cc95f1c5e15ecdaae6b
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.plugins.event.carrier.jms;
22
23 import java.io.Serializable;
24 import java.util.EnumMap;
25 import java.util.Map;
26
27 import javax.jms.Connection;
28 import javax.jms.ConnectionFactory;
29 import javax.jms.Message;
30 import javax.jms.MessageProducer;
31 import javax.jms.Session;
32 import javax.jms.Topic;
33 import javax.naming.InitialContext;
34
35 import org.onap.policy.apex.service.engine.event.ApexEventException;
36 import org.onap.policy.apex.service.engine.event.ApexEventProducer;
37 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
38 import org.onap.policy.apex.service.engine.event.PeeredReference;
39 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
40 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
41 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Concrete implementation of an Apex event producer that sends events using JMS.
47  *
48  * @author Liam Fallon (liam.fallon@ericsson.com)
49  */
50 public class ApexJMSProducer implements ApexEventProducer {
51
52     // Get a reference to the logger
53     private static final Logger LOGGER = LoggerFactory.getLogger(ApexJMSProducer.class);
54
55     // The JMS parameters read from the parameter service
56     private JMSCarrierTechnologyParameters jmsProducerProperties;
57
58     // The connection to the JMS server
59     private Connection connection;
60
61     // The JMS session on which we will send events
62     private Session jmsSession;
63
64     // The producer on which we will send events
65     private MessageProducer messageProducer;
66
67     // The name for this producer
68     private String name = null;
69
70     // The peer references for this event handler
71     private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
72
73     /*
74      * (non-Javadoc)
75      *
76      * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#init(java.lang.String,
77      * org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters)
78      */
79     @Override
80     public void init(final String producerName, final EventHandlerParameters producerParameters)
81             throws ApexEventException {
82         this.name = producerName;
83
84         // Check and get the JMS Properties
85         if (!(producerParameters.getCarrierTechnologyParameters() instanceof JMSCarrierTechnologyParameters)) {
86             final String errorMessage = "specified producer properties are not applicable to a JMS producer (" + this.name + ")";
87             LOGGER.warn(errorMessage);
88             throw new ApexEventException(errorMessage);
89         }
90         jmsProducerProperties = (JMSCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
91
92         // Look up the JMS connection factory
93         InitialContext jmsContext = null;
94         ConnectionFactory connectionFactory = null;
95         try {
96             jmsContext = new InitialContext(jmsProducerProperties.getJmsProducerProperties());
97             connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsProducerProperties.getConnectionFactory());
98
99             // Check if we actually got a connection factory
100             if (connectionFactory == null) {
101                 throw new NullPointerException("JMS context lookup of \"" + jmsProducerProperties.getConnectionFactory()
102                         + "\" returned null for producer (" + this.name + ")");
103             }
104         } catch (final Exception e) {
105             final String errorMessage = "lookup of JMS connection factory  \""
106                     + jmsProducerProperties.getConnectionFactory() + "\" failed for JMS producer properties \""
107                     + jmsProducerProperties.getJmsConsumerProperties() + "\" for producer (" + this.name + ")";
108             LOGGER.warn(errorMessage, e);
109             throw new ApexEventException(errorMessage, e);
110         }
111
112         // Lookup the topic on which we will send events
113         Topic jmsOutgoingTopic;
114         try {
115             jmsOutgoingTopic = (Topic) jmsContext.lookup(jmsProducerProperties.getProducerTopic());
116
117             // Check if we actually got a topic
118             if (jmsOutgoingTopic == null) {
119                 throw new NullPointerException("JMS context lookup of \"" + jmsProducerProperties.getProducerTopic()
120                         + "\" returned null for producer (" + this.name + ")");
121             }
122         } catch (final Exception e) {
123             final String errorMessage = "lookup of JMS topic  \"" + jmsProducerProperties.getProducerTopic()
124                     + "\" failed for JMS producer properties \"" + jmsProducerProperties.getJmsProducerProperties()
125                     + "\" for producer (" + this.name + ")";
126             LOGGER.warn(errorMessage, e);
127             throw new ApexEventException(errorMessage, e);
128         }
129
130         // Create and start a connection to the JMS server
131         try {
132             connection = connectionFactory.createConnection(jmsProducerProperties.getSecurityPrincipal(),
133                     jmsProducerProperties.getSecurityCredentials());
134             connection.start();
135         } catch (final Exception e) {
136             final String errorMessage = "connection to JMS server failed for JMS properties \""
137                     + jmsProducerProperties.getJmsConsumerProperties() + "\" for producer (" + this.name + ")";
138             LOGGER.warn(errorMessage, e);
139             throw new ApexEventException(errorMessage, e);
140         }
141
142         // Create a JMS session for sending events
143         try {
144             jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
145         } catch (final Exception e) {
146             final String errorMessage = "creation of session to JMS server failed for JMS properties \""
147                     + jmsProducerProperties.getJmsConsumerProperties() + "\" for producer (" + this.name + ")";
148             LOGGER.warn(errorMessage, e);
149             throw new ApexEventException(errorMessage, e);
150         }
151
152         // Create a JMS message producer for sending events
153         try {
154             messageProducer = jmsSession.createProducer(jmsOutgoingTopic);
155         } catch (final Exception e) {
156             final String errorMessage =
157                     "creation of producer for sending events to JMS server failed for JMS properties \""
158                             + jmsProducerProperties.getJmsConsumerProperties() + "\"";
159             LOGGER.warn(errorMessage, e);
160             throw new ApexEventException(errorMessage, e);
161         }
162     }
163
164     /*
165      * (non-Javadoc)
166      *
167      * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName()
168      */
169     @Override
170     public String getName() {
171         return name;
172     }
173
174     /*
175      * (non-Javadoc)
176      *
177      * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap.policy.apex.service.
178      * parameters. eventhandler.EventHandlerPeeredMode)
179      */
180     @Override
181     public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
182         return peerReferenceMap.get(peeredMode);
183     }
184
185     /*
186      * (non-Javadoc)
187      *
188      * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap.policy.apex.service.
189      * parameters. eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference)
190      */
191     @Override
192     public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
193         peerReferenceMap.put(peeredMode, peeredReference);
194     }
195
196     /*
197      * (non-Javadoc)
198      *
199      * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#sendEvent(long, java.lang.String,
200      * java.lang.Object)
201      */
202     @Override
203     public void sendEvent(final long executionId, final String eventname, final Object eventObject) {
204         // Check if this is a synchronized event, if so we have received a reply
205         final SynchronousEventCache synchronousEventCache =
206                 (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS);
207         if (synchronousEventCache != null) {
208             synchronousEventCache.removeCachedEventToApexIfExists(executionId);
209         }
210
211         // Check if the object to be sent is serializable
212         if (!Serializable.class.isAssignableFrom(eventObject.getClass())) {
213             final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer "
214                     + this.name + ", object of type \"" + eventObject.getClass().getCanonicalName()
215                     + "\" is not serializable";
216             LOGGER.warn(errorMessage);
217             throw new ApexEventRuntimeException(errorMessage);
218         }
219
220         // The JMS message to send is constructed using the JMS session
221         Message jmsMessage = null;
222
223         // Check the type of JMS message to send
224         if (jmsProducerProperties.isObjectMessageSending()) {
225             // We should send a JMS Object Message
226             try {
227                 jmsMessage = jmsSession.createObjectMessage((Serializable) eventObject);
228             } catch (final Exception e) {
229                 final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer "
230                         + this.name + ", could not create JMS Object Message for object \"" + eventObject;
231                 LOGGER.warn(errorMessage);
232                 throw new ApexEventRuntimeException(errorMessage);
233             }
234         } else {
235             // We should send a JMS Text Message
236             try {
237                 jmsMessage = jmsSession.createTextMessage(eventObject.toString());
238             } catch (final Exception e) {
239                 final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer "
240                         + this.name + ", could not create JMS Text Message for object \"" + eventObject;
241                 LOGGER.warn(errorMessage);
242                 throw new ApexEventRuntimeException(errorMessage);
243             }
244         }
245
246         try {
247             messageProducer.send(jmsMessage);
248         } catch (final Exception e) {
249             final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer "
250                     + this.name + ", send failed for object \"" + eventObject;
251             LOGGER.warn(errorMessage);
252             throw new ApexEventRuntimeException(errorMessage);
253         }
254     }
255
256     /*
257      * (non-Javadoc)
258      *
259      * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop()
260      */
261     @Override
262     public void stop() {
263         // Close the message producer
264         try {
265             messageProducer.close();
266         } catch (final Exception e) {
267             final String errorMessage = "failed to close JMS message producer " + this.name + " for sending messages";
268             LOGGER.warn(errorMessage, e);
269         }
270
271         // Close the session
272         try {
273             jmsSession.close();
274         } catch (final Exception e) {
275             final String errorMessage = "failed to close the JMS session for  " + this.name + " for sending messages";
276             LOGGER.warn(errorMessage, e);
277         }
278
279         // Close the connection to the JMS server
280         try {
281             connection.close();
282         } catch (final Exception e) {
283             final String errorMessage = "close of connection to the JMS server for  " + this.name + " failed";
284             LOGGER.warn(errorMessage, e);
285         }
286     }
287 }