21309396a2579d987c870ef4098778b7d22561be
[policy/apex-pdp.git] / plugins / plugins-event / plugins-event-carrier / plugins-event-carrier-jms / src / main / java / org / onap / policy / apex / plugins / event / carrier / jms / ApexJmsProducer.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019-2021 Nordix Foundation.
5  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.apex.plugins.event.carrier.jms;
24
25 import java.io.Serializable;
26 import java.util.EnumMap;
27 import java.util.Map;
28 import java.util.Properties;
29 import javax.jms.Connection;
30 import javax.jms.ConnectionFactory;
31 import javax.jms.Message;
32 import javax.jms.MessageProducer;
33 import javax.jms.Session;
34 import javax.jms.Topic;
35 import javax.naming.InitialContext;
36 import javax.naming.NamingException;
37 import lombok.Getter;
38 import org.onap.policy.apex.service.engine.event.ApexEventException;
39 import org.onap.policy.apex.service.engine.event.ApexEventProducer;
40 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
41 import org.onap.policy.apex.service.engine.event.PeeredReference;
42 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
43 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
44 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 /**
49  * Concrete implementation of an Apex event producer that sends events using JMS.
50  *
51  * @author Liam Fallon (liam.fallon@ericsson.com)
52  */
53 public class ApexJmsProducer implements ApexEventProducer {
54     // Get a reference to the logger
55     private static final Logger LOGGER = LoggerFactory.getLogger(ApexJmsProducer.class);
56
57     // Recurring string constants
58     private static final String COULD_NOT_SEND_PREFIX = "could not send event \"";
59     private static final String FOR_PRODUCER_TAG = "\" for producer (";
60     private static final String JMS_MESSAGE_PRODUCER_TAG = "\" on JMS message producer ";
61
62     // The JMS parameters read from the parameter service
63     private JmsCarrierTechnologyParameters jmsProducerProperties;
64
65     // The connection to the JMS server
66     private Connection connection;
67
68     // The JMS session on which we will send events
69     private Session jmsSession;
70
71     // The producer on which we will send events
72     private MessageProducer messageProducer;
73
74     // The name for this producer
75     @Getter
76     private String name = null;
77
78     // The peer references for this event handler
79     private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap =
80         new EnumMap<>(EventHandlerPeeredMode.class);
81
82     /**
83      * {@inheritDoc}.
84      */
85     @Override
86     public void init(final String producerName, final EventHandlerParameters producerParameters)
87                     throws ApexEventException {
88         this.name = producerName;
89
90         // Check and get the JMS Properties
91         if (!(producerParameters.getCarrierTechnologyParameters() instanceof JmsCarrierTechnologyParameters)) {
92             final String errorMessage = "specified producer properties are not applicable to a JMS producer ("
93                             + this.name + ")";
94             throw new ApexEventException(errorMessage);
95         }
96         jmsProducerProperties = (JmsCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
97
98         // Look up the JMS connection factory
99         InitialContext jmsContext;
100         ConnectionFactory connectionFactory;
101         try {
102             jmsContext = getInitialContext();
103             connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsProducerProperties.getConnectionFactory());
104
105             // Check if we actually got a connection factory
106             if (connectionFactory == null) {
107                 throw new IllegalArgumentException(
108                                 "JMS context lookup of \"" + jmsProducerProperties.getConnectionFactory()
109                                                 + "\" returned null for producer (" + this.name + ")");
110             }
111         } catch (final Exception e) {
112             final String errorMessage = "lookup of JMS connection factory  \""
113                             + jmsProducerProperties.getConnectionFactory() + "\" failed for JMS producer properties \""
114                             + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
115             throw new ApexEventException(errorMessage, e);
116         }
117
118         // Lookup the topic on which we will send events
119         Topic jmsOutgoingTopic;
120         try {
121             jmsOutgoingTopic = (Topic) jmsContext.lookup(jmsProducerProperties.getProducerTopic());
122
123             // Check if we actually got a topic
124             if (jmsOutgoingTopic == null) {
125                 throw new IllegalArgumentException("JMS context lookup of \"" + jmsProducerProperties.getProducerTopic()
126                                 + "\" returned null for producer (" + this.name + ")");
127             }
128         } catch (final Exception e) {
129             final String errorMessage = "lookup of JMS topic  \"" + jmsProducerProperties.getProducerTopic()
130                             + "\" failed for JMS producer properties \""
131                             + jmsProducerProperties.getJmsProducerProperties() + FOR_PRODUCER_TAG + this.name + ")";
132             throw new ApexEventException(errorMessage, e);
133         }
134
135         // Create and start a connection to the JMS server
136         try {
137             connection = connectionFactory.createConnection(jmsProducerProperties.getSecurityPrincipal(),
138                             jmsProducerProperties.getSecurityCredentials());
139             connection.start();
140         } catch (final Exception e) {
141             final String errorMessage = "connection to JMS server failed for JMS properties \""
142                             + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
143             throw new ApexEventException(errorMessage, e);
144         }
145
146         // Create a JMS session for sending events
147         try {
148             jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
149         } catch (final Exception e) {
150             final String errorMessage = "creation of session to JMS server failed for JMS properties \""
151                             + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
152             throw new ApexEventException(errorMessage, e);
153         }
154
155         // Create a JMS message producer for sending events
156         try {
157             messageProducer = jmsSession.createProducer(jmsOutgoingTopic);
158         } catch (final Exception e) {
159             final String errorMessage = "creation of producer for sending events "
160                             + "to JMS server failed for JMS properties \""
161                             + jmsProducerProperties.getJmsConsumerProperties() + "\"";
162             throw new ApexEventException(errorMessage, e);
163         }
164     }
165
166     /**
167      * Construct InitialContext. This function should not be run directly.
168      * Package-private access is set for testing purposes only.
169      *
170      * @return InitialContext
171      * @throws NamingException if a naming exception is encountered
172      */
173     public InitialContext getInitialContext() throws NamingException {
174         return new InitialContext(jmsProducerProperties.getJmsProducerProperties());
175     }
176
177     /**
178      * {@inheritDoc}.
179      */
180     @Override
181     public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
182         return peerReferenceMap.get(peeredMode);
183     }
184
185     /**
186      * {@inheritDoc}.
187      */
188     @Override
189     public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
190         peerReferenceMap.put(peeredMode, peeredReference);
191     }
192
193     /**
194      * {@inheritDoc}.
195      */
196     @Override
197     public void sendEvent(final long executionId, final Properties executionProperties, final String eventname,
198                     final Object eventObject) {
199         // Check if this is a synchronized event, if so we have received a reply
200         final var synchronousEventCache = (SynchronousEventCache) peerReferenceMap
201                         .get(EventHandlerPeeredMode.SYNCHRONOUS);
202         if (synchronousEventCache != null) {
203             synchronousEventCache.removeCachedEventToApexIfExists(executionId);
204         }
205
206         // Check if the object to be sent is serializable
207         if (!Serializable.class.isAssignableFrom(eventObject.getClass())) {
208             final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
209                             + ", object of type \"" + eventObject.getClass().getName() + "\" is not serializable";
210             LOGGER.warn(errorMessage);
211             throw new ApexEventRuntimeException(errorMessage);
212         }
213
214         // The JMS message to send is constructed using the JMS session
215         Message jmsMessage;
216
217         // Check the type of JMS message to send
218         if (jmsProducerProperties.isObjectMessageSending()) {
219             // We should send a JMS Object Message
220             try {
221                 jmsMessage = jmsSession.createObjectMessage((Serializable) eventObject);
222             } catch (final Exception e) {
223                 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
224                                 + ", could not create JMS Object Message for object \"" + eventObject;
225                 LOGGER.warn(errorMessage, e);
226                 throw new ApexEventRuntimeException(errorMessage);
227             }
228         } else {
229             // We should send a JMS Text Message
230             try {
231                 jmsMessage = jmsSession.createTextMessage(eventObject.toString());
232             } catch (final Exception e) {
233                 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
234                                 + ", could not create JMS Text Message for object \"" + eventObject;
235                 LOGGER.warn(errorMessage, e);
236                 throw new ApexEventRuntimeException(errorMessage);
237             }
238         }
239
240         try {
241             messageProducer.send(jmsMessage);
242         } catch (final Exception e) {
243             final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
244                             + ", send failed for object \"" + eventObject;
245             LOGGER.warn(errorMessage, e);
246             throw new ApexEventRuntimeException(errorMessage);
247         }
248     }
249
250     /**
251      * {@inheritDoc}.
252      */
253     @Override
254     public void stop() {
255         // Close the message producer
256         try {
257             messageProducer.close();
258         } catch (final Exception e) {
259             final String errorMessage = "failed to close JMS message producer " + this.name + " for sending messages";
260             LOGGER.warn(errorMessage, e);
261         }
262
263         // Close the session
264         try {
265             jmsSession.close();
266         } catch (final Exception e) {
267             final String errorMessage = "failed to close the JMS session for  " + this.name + " for sending messages";
268             LOGGER.warn(errorMessage, e);
269         }
270
271         // Close the connection to the JMS server
272         try {
273             connection.close();
274         } catch (final Exception e) {
275             final String errorMessage = "close of connection to the JMS server for  " + this.name + " failed";
276             LOGGER.warn(errorMessage, e);
277         }
278     }
279 }