017f07f6f6898f88cb9828c9fb40ed0703d9fe6a
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.plugins.event.carrier.jms;
22
23 import java.io.Serializable;
24 import java.util.EnumMap;
25 import java.util.Map;
26
27 import javax.jms.Connection;
28 import javax.jms.ConnectionFactory;
29 import javax.jms.Message;
30 import javax.jms.MessageProducer;
31 import javax.jms.Session;
32 import javax.jms.Topic;
33 import javax.naming.InitialContext;
34
35 import org.onap.policy.apex.service.engine.event.ApexEventException;
36 import org.onap.policy.apex.service.engine.event.ApexEventProducer;
37 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
38 import org.onap.policy.apex.service.engine.event.PeeredReference;
39 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
40 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
41 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Concrete implementation of an Apex event producer that sends events using JMS.
47  *
48  * @author Liam Fallon (liam.fallon@ericsson.com)
49  */
50 public class ApexJMSProducer implements ApexEventProducer {
51
52     // Get a reference to the logger
53     private static final Logger LOGGER = LoggerFactory.getLogger(ApexJMSProducer.class);
54
55     // The JMS parameters read from the parameter service
56     private JMSCarrierTechnologyParameters jmsProducerProperties;
57
58     // The connection to the JMS server
59     private Connection connection;
60
61     // The topic on which we send events to JMS
62     private Topic jmsOutgoingTopic;
63
64     // The JMS session on which we will send events
65     private Session jmsSession;
66
67     // The producer on which we will send events
68     private MessageProducer messageProducer;
69
70     // The name for this producer
71     private String name = null;
72
73     // The peer references for this event handler
74     private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
75
76     /*
77      * (non-Javadoc)
78      *
79      * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#init(java.lang.String,
80      * org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters)
81      */
82     @Override
83     public void init(final String producerName, final EventHandlerParameters producerParameters)
84             throws ApexEventException {
85         this.name = producerName;
86
87         // Check and get the JMS Properties
88         if (!(producerParameters.getCarrierTechnologyParameters() instanceof JMSCarrierTechnologyParameters)) {
89             LOGGER.warn("specified producer properties are not applicable to a JMS producer (" + this.name + ")");
90             throw new ApexEventException(
91                     "specified producer properties are not applicable to a JMS producer (" + this.name + ")");
92         }
93         jmsProducerProperties = (JMSCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
94
95         // Look up the JMS connection factory
96         InitialContext jmsContext = null;
97         ConnectionFactory connectionFactory = null;
98         try {
99             jmsContext = new InitialContext(jmsProducerProperties.getJMSProducerProperties());
100             connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsProducerProperties.getConnectionFactory());
101
102             // Check if we actually got a connection factory
103             if (connectionFactory == null) {
104                 throw new NullPointerException("JMS context lookup of \"" + jmsProducerProperties.getConnectionFactory()
105                         + "\" returned null for producer (" + this.name + ")");
106             }
107         } catch (final Exception e) {
108             final String errorMessage = "lookup of JMS connection factory  \""
109                     + jmsProducerProperties.getConnectionFactory() + "\" failed for JMS producer properties \""
110                     + jmsProducerProperties.getJMSConsumerProperties() + "\" for producer (" + this.name + ")";
111             LOGGER.warn(errorMessage, e);
112             throw new ApexEventException(errorMessage, e);
113         }
114
115         // Lookup the topic on which we will send events
116         try {
117             jmsOutgoingTopic = (Topic) jmsContext.lookup(jmsProducerProperties.getProducerTopic());
118
119             // Check if we actually got a topic
120             if (jmsOutgoingTopic == null) {
121                 throw new NullPointerException("JMS context lookup of \"" + jmsProducerProperties.getProducerTopic()
122                         + "\" returned null for producer (" + this.name + ")");
123             }
124         } catch (final Exception e) {
125             final String errorMessage = "lookup of JMS topic  \"" + jmsProducerProperties.getProducerTopic()
126                     + "\" failed for JMS producer properties \"" + jmsProducerProperties.getJMSProducerProperties()
127                     + "\" for producer (" + this.name + ")";
128             LOGGER.warn(errorMessage, e);
129             throw new ApexEventException(errorMessage, e);
130         }
131
132         // Create and start a connection to the JMS server
133         try {
134             connection = connectionFactory.createConnection(jmsProducerProperties.getSecurityPrincipal(),
135                     jmsProducerProperties.getSecurityCredentials());
136             connection.start();
137         } catch (final Exception e) {
138             final String errorMessage = "connection to JMS server failed for JMS properties \""
139                     + jmsProducerProperties.getJMSConsumerProperties() + "\" for producer (" + this.name + ")";
140             LOGGER.warn(errorMessage, e);
141             throw new ApexEventException(errorMessage, e);
142         }
143
144         // Create a JMS session for sending events
145         try {
146             jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
147         } catch (final Exception e) {
148             final String errorMessage = "creation of session to JMS server failed for JMS properties \""
149                     + jmsProducerProperties.getJMSConsumerProperties() + "\" for producer (" + this.name + ")";
150             LOGGER.warn(errorMessage, e);
151             throw new ApexEventException(errorMessage, e);
152         }
153
154         // Create a JMS message producer for sending events
155         try {
156             messageProducer = jmsSession.createProducer(jmsOutgoingTopic);
157         } catch (final Exception e) {
158             final String errorMessage =
159                     "creation of producer for sending events to JMS server failed for JMS properties \""
160                             + jmsProducerProperties.getJMSConsumerProperties() + "\"";
161             LOGGER.warn(errorMessage, e);
162             throw new ApexEventException(errorMessage, e);
163         }
164     }
165
166     /*
167      * (non-Javadoc)
168      *
169      * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName()
170      */
171     @Override
172     public String getName() {
173         return name;
174     }
175
176     /*
177      * (non-Javadoc)
178      *
179      * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap.policy.apex.service.
180      * parameters. eventhandler.EventHandlerPeeredMode)
181      */
182     @Override
183     public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
184         return peerReferenceMap.get(peeredMode);
185     }
186
187     /*
188      * (non-Javadoc)
189      *
190      * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap.policy.apex.service.
191      * parameters. eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference)
192      */
193     @Override
194     public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
195         peerReferenceMap.put(peeredMode, peeredReference);
196     }
197
198     /*
199      * (non-Javadoc)
200      *
201      * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#sendEvent(long, java.lang.String,
202      * java.lang.Object)
203      */
204     @Override
205     public void sendEvent(final long executionId, final String eventname, final Object eventObject) {
206         // Check if this is a synchronized event, if so we have received a reply
207         final SynchronousEventCache synchronousEventCache =
208                 (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS);
209         if (synchronousEventCache != null) {
210             synchronousEventCache.removeCachedEventToApexIfExists(executionId);
211         }
212
213         // Check if the object to be sent is serializable
214         if (!Serializable.class.isAssignableFrom(eventObject.getClass())) {
215             final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer "
216                     + this.name + ", object of type \"" + eventObject.getClass().getCanonicalName()
217                     + "\" is not serializable";
218             LOGGER.warn(errorMessage);
219             throw new ApexEventRuntimeException(errorMessage);
220         }
221
222         // The JMS message to send is constructed using the JMS session
223         Message jmsMessage = null;
224
225         // Check the type of JMS message to send
226         if (jmsProducerProperties.isObjectMessageSending()) {
227             // We should send a JMS Object Message
228             try {
229                 jmsMessage = jmsSession.createObjectMessage((Serializable) eventObject);
230             } catch (final Exception e) {
231                 final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer "
232                         + this.name + ", could not create JMS Object Message for object \"" + eventObject;
233                 LOGGER.warn(errorMessage);
234                 throw new ApexEventRuntimeException(errorMessage);
235             }
236         } else {
237             // We should send a JMS Text Message
238             try {
239                 jmsMessage = jmsSession.createTextMessage(eventObject.toString());
240             } catch (final Exception e) {
241                 final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer "
242                         + this.name + ", could not create JMS Text Message for object \"" + eventObject;
243                 LOGGER.warn(errorMessage);
244                 throw new ApexEventRuntimeException(errorMessage);
245             }
246         }
247
248         try {
249             messageProducer.send(jmsMessage);
250         } catch (final Exception e) {
251             final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer "
252                     + this.name + ", send failed for object \"" + eventObject;
253             LOGGER.warn(errorMessage);
254             throw new ApexEventRuntimeException(errorMessage);
255         }
256     }
257
258     /*
259      * (non-Javadoc)
260      *
261      * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop()
262      */
263     @Override
264     public void stop() {
265         // Close the message producer
266         try {
267             messageProducer.close();
268         } catch (final Exception e) {
269             final String errorMessage = "failed to close JMS message producer " + this.name + " for sending messages";
270             LOGGER.warn(errorMessage, e);
271         }
272
273         // Close the session
274         try {
275             jmsSession.close();
276         } catch (final Exception e) {
277             final String errorMessage = "failed to close the JMS session for  " + this.name + " for sending messages";
278             LOGGER.warn(errorMessage, e);
279         }
280
281         // Close the connection to the JMS server
282         try {
283             connection.close();
284         } catch (final Exception e) {
285             final String errorMessage = "close of connection to the JMS server for  " + this.name + " failed";
286             LOGGER.warn(errorMessage, e);
287         }
288     }
289 }