393ea7310b5b81860d5f7ac895db2bb0539b6953
[policy/apex-pdp.git] / plugins / plugins-event / plugins-event-carrier / plugins-event-carrier-jms / src / main / java / org / onap / policy / apex / plugins / event / carrier / jms / ApexJmsProducer.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.plugins.event.carrier.jms;
23
24 import java.io.Serializable;
25 import java.util.EnumMap;
26 import java.util.Map;
27 import java.util.Properties;
28
29 import javax.jms.Connection;
30 import javax.jms.ConnectionFactory;
31 import javax.jms.Message;
32 import javax.jms.MessageProducer;
33 import javax.jms.Session;
34 import javax.jms.Topic;
35 import javax.naming.InitialContext;
36
37 import org.onap.policy.apex.service.engine.event.ApexEventException;
38 import org.onap.policy.apex.service.engine.event.ApexEventProducer;
39 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
40 import org.onap.policy.apex.service.engine.event.PeeredReference;
41 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
42 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
43 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * Concrete implementation of an Apex event producer that sends events using JMS.
49  *
50  * @author Liam Fallon (liam.fallon@ericsson.com)
51  */
52 public class ApexJmsProducer implements ApexEventProducer {
53     // Get a reference to the logger
54     private static final Logger LOGGER = LoggerFactory.getLogger(ApexJmsProducer.class);
55
56     // Recurring string constants
57     private static final String COULD_NOT_SEND_PREFIX = "could not send event \"";
58     private static final String FOR_PRODUCER_TAG = "\" for producer (";
59     private static final String JMS_MESSAGE_PRODUCER_TAG = "\" on JMS message producer ";
60
61     // The JMS parameters read from the parameter service
62     private JmsCarrierTechnologyParameters jmsProducerProperties;
63
64     // The connection to the JMS server
65     private Connection connection;
66
67     // The JMS session on which we will send events
68     private Session jmsSession;
69
70     // The producer on which we will send events
71     private MessageProducer messageProducer;
72
73     // The name for this producer
74     private String name = null;
75
76     // The peer references for this event handler
77     private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
78
79     /**
80      * {@inheritDoc}.
81      */
82     @Override
83     public void init(final String producerName, final EventHandlerParameters producerParameters)
84                     throws ApexEventException {
85         this.name = producerName;
86
87         // Check and get the JMS Properties
88         if (!(producerParameters.getCarrierTechnologyParameters() instanceof JmsCarrierTechnologyParameters)) {
89             final String errorMessage = "specified producer properties are not applicable to a JMS producer ("
90                             + this.name + ")";
91             throw new ApexEventException(errorMessage);
92         }
93         jmsProducerProperties = (JmsCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
94
95         // Look up the JMS connection factory
96         InitialContext jmsContext = null;
97         ConnectionFactory connectionFactory = null;
98         try {
99             jmsContext = new InitialContext(jmsProducerProperties.getJmsProducerProperties());
100             connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsProducerProperties.getConnectionFactory());
101
102             // Check if we actually got a connection factory
103             if (connectionFactory == null) {
104                 throw new IllegalArgumentException(
105                                 "JMS context lookup of \"" + jmsProducerProperties.getConnectionFactory()
106                                                 + "\" returned null for producer (" + this.name + ")");
107             }
108         } catch (final Exception e) {
109             final String errorMessage = "lookup of JMS connection factory  \""
110                             + jmsProducerProperties.getConnectionFactory() + "\" failed for JMS producer properties \""
111                             + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
112             throw new ApexEventException(errorMessage, e);
113         }
114
115         // Lookup the topic on which we will send events
116         Topic jmsOutgoingTopic;
117         try {
118             jmsOutgoingTopic = (Topic) jmsContext.lookup(jmsProducerProperties.getProducerTopic());
119
120             // Check if we actually got a topic
121             if (jmsOutgoingTopic == null) {
122                 throw new IllegalArgumentException("JMS context lookup of \"" + jmsProducerProperties.getProducerTopic()
123                                 + "\" returned null for producer (" + this.name + ")");
124             }
125         } catch (final Exception e) {
126             final String errorMessage = "lookup of JMS topic  \"" + jmsProducerProperties.getProducerTopic()
127                             + "\" failed for JMS producer properties \""
128                             + jmsProducerProperties.getJmsProducerProperties() + FOR_PRODUCER_TAG + this.name + ")";
129             throw new ApexEventException(errorMessage, e);
130         }
131
132         // Create and start a connection to the JMS server
133         try {
134             connection = connectionFactory.createConnection(jmsProducerProperties.getSecurityPrincipal(),
135                             jmsProducerProperties.getSecurityCredentials());
136             connection.start();
137         } catch (final Exception e) {
138             final String errorMessage = "connection to JMS server failed for JMS properties \""
139                             + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
140             throw new ApexEventException(errorMessage, e);
141         }
142
143         // Create a JMS session for sending events
144         try {
145             jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
146         } catch (final Exception e) {
147             final String errorMessage = "creation of session to JMS server failed for JMS properties \""
148                             + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
149             throw new ApexEventException(errorMessage, e);
150         }
151
152         // Create a JMS message producer for sending events
153         try {
154             messageProducer = jmsSession.createProducer(jmsOutgoingTopic);
155         } catch (final Exception e) {
156             final String errorMessage = "creation of producer for sending events "
157                             + "to JMS server failed for JMS properties \""
158                             + jmsProducerProperties.getJmsConsumerProperties() + "\"";
159             throw new ApexEventException(errorMessage, e);
160         }
161     }
162
163     /**
164      * {@inheritDoc}.
165      */
166     @Override
167     public String getName() {
168         return name;
169     }
170
171     /**
172      * {@inheritDoc}.
173      */
174     @Override
175     public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
176         return peerReferenceMap.get(peeredMode);
177     }
178
179     /**
180      * {@inheritDoc}.
181      */
182     @Override
183     public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
184         peerReferenceMap.put(peeredMode, peeredReference);
185     }
186
187     /**
188      * {@inheritDoc}.
189      */
190     @Override
191     public void sendEvent(final long executionId, final Properties executionProperties, final String eventname,
192                     final Object eventObject) {
193         // Check if this is a synchronized event, if so we have received a reply
194         final SynchronousEventCache synchronousEventCache = (SynchronousEventCache) peerReferenceMap
195                         .get(EventHandlerPeeredMode.SYNCHRONOUS);
196         if (synchronousEventCache != null) {
197             synchronousEventCache.removeCachedEventToApexIfExists(executionId);
198         }
199
200         // Check if the object to be sent is serializable
201         if (!Serializable.class.isAssignableFrom(eventObject.getClass())) {
202             final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
203                             + ", object of type \"" + eventObject.getClass().getName() + "\" is not serializable";
204             LOGGER.warn(errorMessage);
205             throw new ApexEventRuntimeException(errorMessage);
206         }
207
208         // The JMS message to send is constructed using the JMS session
209         Message jmsMessage = null;
210
211         // Check the type of JMS message to send
212         if (jmsProducerProperties.isObjectMessageSending()) {
213             // We should send a JMS Object Message
214             try {
215                 jmsMessage = jmsSession.createObjectMessage((Serializable) eventObject);
216             } catch (final Exception e) {
217                 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
218                                 + ", could not create JMS Object Message for object \"" + eventObject;
219                 LOGGER.warn(errorMessage, e);
220                 throw new ApexEventRuntimeException(errorMessage);
221             }
222         } else {
223             // We should send a JMS Text Message
224             try {
225                 jmsMessage = jmsSession.createTextMessage(eventObject.toString());
226             } catch (final Exception e) {
227                 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
228                                 + ", could not create JMS Text Message for object \"" + eventObject;
229                 LOGGER.warn(errorMessage, e);
230                 throw new ApexEventRuntimeException(errorMessage);
231             }
232         }
233
234         try {
235             messageProducer.send(jmsMessage);
236         } catch (final Exception e) {
237             final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
238                             + ", send failed for object \"" + eventObject;
239             LOGGER.warn(errorMessage, e);
240             throw new ApexEventRuntimeException(errorMessage);
241         }
242     }
243
244     /**
245      * {@inheritDoc}.
246      */
247     @Override
248     public void stop() {
249         // Close the message producer
250         try {
251             messageProducer.close();
252         } catch (final Exception e) {
253             final String errorMessage = "failed to close JMS message producer " + this.name + " for sending messages";
254             LOGGER.warn(errorMessage, e);
255         }
256
257         // Close the session
258         try {
259             jmsSession.close();
260         } catch (final Exception e) {
261             final String errorMessage = "failed to close the JMS session for  " + this.name + " for sending messages";
262             LOGGER.warn(errorMessage, e);
263         }
264
265         // Close the connection to the JMS server
266         try {
267             connection.close();
268         } catch (final Exception e) {
269             final String errorMessage = "close of connection to the JMS server for  " + this.name + " failed";
270             LOGGER.warn(errorMessage, e);
271         }
272     }
273 }