44ed810fdf42d84cff2df02a83bfb70d863f9813
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019-2021 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.plugins.event.carrier.jms;
23
24 import java.io.Serializable;
25 import java.util.EnumMap;
26 import java.util.Map;
27 import java.util.Properties;
28 import javax.jms.Connection;
29 import javax.jms.ConnectionFactory;
30 import javax.jms.Message;
31 import javax.jms.MessageProducer;
32 import javax.jms.Session;
33 import javax.jms.Topic;
34 import javax.naming.InitialContext;
35 import javax.naming.NamingException;
36 import org.onap.policy.apex.service.engine.event.ApexEventException;
37 import org.onap.policy.apex.service.engine.event.ApexEventProducer;
38 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
39 import org.onap.policy.apex.service.engine.event.PeeredReference;
40 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
41 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
42 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * Concrete implementation of an Apex event producer that sends events using JMS.
48  *
49  * @author Liam Fallon (liam.fallon@ericsson.com)
50  */
51 public class ApexJmsProducer implements ApexEventProducer {
52     // Get a reference to the logger
53     private static final Logger LOGGER = LoggerFactory.getLogger(ApexJmsProducer.class);
54
55     // Recurring string constants
56     private static final String COULD_NOT_SEND_PREFIX = "could not send event \"";
57     private static final String FOR_PRODUCER_TAG = "\" for producer (";
58     private static final String JMS_MESSAGE_PRODUCER_TAG = "\" on JMS message producer ";
59
60     // The JMS parameters read from the parameter service
61     private JmsCarrierTechnologyParameters jmsProducerProperties;
62
63     // The connection to the JMS server
64     private Connection connection;
65
66     // The JMS session on which we will send events
67     private Session jmsSession;
68
69     // The producer on which we will send events
70     private MessageProducer messageProducer;
71
72     // The name for this producer
73     private String name = null;
74
75     // The peer references for this event handler
76     private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap =
77         new EnumMap<>(EventHandlerPeeredMode.class);
78
79     /**
80      * {@inheritDoc}.
81      */
82     @Override
83     public void init(final String producerName, final EventHandlerParameters producerParameters)
84                     throws ApexEventException {
85         this.name = producerName;
86
87         // Check and get the JMS Properties
88         if (!(producerParameters.getCarrierTechnologyParameters() instanceof JmsCarrierTechnologyParameters)) {
89             final String errorMessage = "specified producer properties are not applicable to a JMS producer ("
90                             + this.name + ")";
91             throw new ApexEventException(errorMessage);
92         }
93         jmsProducerProperties = (JmsCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
94
95         // Look up the JMS connection factory
96         InitialContext jmsContext;
97         ConnectionFactory connectionFactory;
98         try {
99             jmsContext = getInitialContext();
100             connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsProducerProperties.getConnectionFactory());
101
102             // Check if we actually got a connection factory
103             if (connectionFactory == null) {
104                 throw new IllegalArgumentException(
105                                 "JMS context lookup of \"" + jmsProducerProperties.getConnectionFactory()
106                                                 + "\" returned null for producer (" + this.name + ")");
107             }
108         } catch (final Exception e) {
109             final String errorMessage = "lookup of JMS connection factory  \""
110                             + jmsProducerProperties.getConnectionFactory() + "\" failed for JMS producer properties \""
111                             + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
112             throw new ApexEventException(errorMessage, e);
113         }
114
115         // Lookup the topic on which we will send events
116         Topic jmsOutgoingTopic;
117         try {
118             jmsOutgoingTopic = (Topic) jmsContext.lookup(jmsProducerProperties.getProducerTopic());
119
120             // Check if we actually got a topic
121             if (jmsOutgoingTopic == null) {
122                 throw new IllegalArgumentException("JMS context lookup of \"" + jmsProducerProperties.getProducerTopic()
123                                 + "\" returned null for producer (" + this.name + ")");
124             }
125         } catch (final Exception e) {
126             final String errorMessage = "lookup of JMS topic  \"" + jmsProducerProperties.getProducerTopic()
127                             + "\" failed for JMS producer properties \""
128                             + jmsProducerProperties.getJmsProducerProperties() + FOR_PRODUCER_TAG + this.name + ")";
129             throw new ApexEventException(errorMessage, e);
130         }
131
132         // Create and start a connection to the JMS server
133         try {
134             connection = connectionFactory.createConnection(jmsProducerProperties.getSecurityPrincipal(),
135                             jmsProducerProperties.getSecurityCredentials());
136             connection.start();
137         } catch (final Exception e) {
138             final String errorMessage = "connection to JMS server failed for JMS properties \""
139                             + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
140             throw new ApexEventException(errorMessage, e);
141         }
142
143         // Create a JMS session for sending events
144         try {
145             jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
146         } catch (final Exception e) {
147             final String errorMessage = "creation of session to JMS server failed for JMS properties \""
148                             + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
149             throw new ApexEventException(errorMessage, e);
150         }
151
152         // Create a JMS message producer for sending events
153         try {
154             messageProducer = jmsSession.createProducer(jmsOutgoingTopic);
155         } catch (final Exception e) {
156             final String errorMessage = "creation of producer for sending events "
157                             + "to JMS server failed for JMS properties \""
158                             + jmsProducerProperties.getJmsConsumerProperties() + "\"";
159             throw new ApexEventException(errorMessage, e);
160         }
161     }
162
163     /**
164      * Construct InitialContext. This function should not be run directly.
165      * Package-private access is set for testing purposes only.
166      *
167      * @return InitialContext
168      * @throws NamingException if a naming exception is encountered
169      */
170     public InitialContext getInitialContext() throws NamingException {
171         return new InitialContext(jmsProducerProperties.getJmsProducerProperties());
172     }
173
174     /**
175      * {@inheritDoc}.
176      */
177     @Override
178     public String getName() {
179         return name;
180     }
181
182     /**
183      * {@inheritDoc}.
184      */
185     @Override
186     public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
187         return peerReferenceMap.get(peeredMode);
188     }
189
190     /**
191      * {@inheritDoc}.
192      */
193     @Override
194     public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
195         peerReferenceMap.put(peeredMode, peeredReference);
196     }
197
198     /**
199      * {@inheritDoc}.
200      */
201     @Override
202     public void sendEvent(final long executionId, final Properties executionProperties, final String eventname,
203                     final Object eventObject) {
204         // Check if this is a synchronized event, if so we have received a reply
205         final SynchronousEventCache synchronousEventCache = (SynchronousEventCache) peerReferenceMap
206                         .get(EventHandlerPeeredMode.SYNCHRONOUS);
207         if (synchronousEventCache != null) {
208             synchronousEventCache.removeCachedEventToApexIfExists(executionId);
209         }
210
211         // Check if the object to be sent is serializable
212         if (!Serializable.class.isAssignableFrom(eventObject.getClass())) {
213             final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
214                             + ", object of type \"" + eventObject.getClass().getName() + "\" is not serializable";
215             LOGGER.warn(errorMessage);
216             throw new ApexEventRuntimeException(errorMessage);
217         }
218
219         // The JMS message to send is constructed using the JMS session
220         Message jmsMessage;
221
222         // Check the type of JMS message to send
223         if (jmsProducerProperties.isObjectMessageSending()) {
224             // We should send a JMS Object Message
225             try {
226                 jmsMessage = jmsSession.createObjectMessage((Serializable) eventObject);
227             } catch (final Exception e) {
228                 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
229                                 + ", could not create JMS Object Message for object \"" + eventObject;
230                 LOGGER.warn(errorMessage, e);
231                 throw new ApexEventRuntimeException(errorMessage);
232             }
233         } else {
234             // We should send a JMS Text Message
235             try {
236                 jmsMessage = jmsSession.createTextMessage(eventObject.toString());
237             } catch (final Exception e) {
238                 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
239                                 + ", could not create JMS Text Message for object \"" + eventObject;
240                 LOGGER.warn(errorMessage, e);
241                 throw new ApexEventRuntimeException(errorMessage);
242             }
243         }
244
245         try {
246             messageProducer.send(jmsMessage);
247         } catch (final Exception e) {
248             final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
249                             + ", send failed for object \"" + eventObject;
250             LOGGER.warn(errorMessage, e);
251             throw new ApexEventRuntimeException(errorMessage);
252         }
253     }
254
255     /**
256      * {@inheritDoc}.
257      */
258     @Override
259     public void stop() {
260         // Close the message producer
261         try {
262             messageProducer.close();
263         } catch (final Exception e) {
264             final String errorMessage = "failed to close JMS message producer " + this.name + " for sending messages";
265             LOGGER.warn(errorMessage, e);
266         }
267
268         // Close the session
269         try {
270             jmsSession.close();
271         } catch (final Exception e) {
272             final String errorMessage = "failed to close the JMS session for  " + this.name + " for sending messages";
273             LOGGER.warn(errorMessage, e);
274         }
275
276         // Close the connection to the JMS server
277         try {
278             connection.close();
279         } catch (final Exception e) {
280             final String errorMessage = "close of connection to the JMS server for  " + this.name + " failed";
281             LOGGER.warn(errorMessage, e);
282         }
283     }
284 }