4dcbf80c81a4f4c5abaeec61af079c8cec3e2076
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.plugins.event.carrier.jms;
22
23 import java.io.Serializable;
24 import java.util.EnumMap;
25 import java.util.Map;
26
27 import javax.jms.Connection;
28 import javax.jms.ConnectionFactory;
29 import javax.jms.Message;
30 import javax.jms.MessageProducer;
31 import javax.jms.Session;
32 import javax.jms.Topic;
33 import javax.naming.InitialContext;
34
35 import org.onap.policy.apex.service.engine.event.ApexEventException;
36 import org.onap.policy.apex.service.engine.event.ApexEventProducer;
37 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
38 import org.onap.policy.apex.service.engine.event.PeeredReference;
39 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
40 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
41 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Concrete implementation of an Apex event producer that sends events using JMS.
47  *
48  * @author Liam Fallon (liam.fallon@ericsson.com)
49  */
50 public class ApexJmsProducer implements ApexEventProducer {
51     // Get a reference to the logger
52     private static final Logger LOGGER = LoggerFactory.getLogger(ApexJmsProducer.class);
53
54     // Recurring string constants
55     private static final String COULD_NOT_SEND_PREFIX = "could not send event \"";
56     private static final String FOR_PRODUCER_TAG = "\" for producer (";
57     private static final String JMS_MESSAGE_PRODUCER_TAG = "\" on JMS message producer ";
58
59     // The JMS parameters read from the parameter service
60     private JmsCarrierTechnologyParameters jmsProducerProperties;
61
62     // The connection to the JMS server
63     private Connection connection;
64
65     // The JMS session on which we will send events
66     private Session jmsSession;
67
68     // The producer on which we will send events
69     private MessageProducer messageProducer;
70
71     // The name for this producer
72     private String name = null;
73
74     // The peer references for this event handler
75     private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
76
77     /*
78      * (non-Javadoc)
79      *
80      * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#init(java.lang.String,
81      * org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters)
82      */
83     @Override
84     public void init(final String producerName, final EventHandlerParameters producerParameters)
85                     throws ApexEventException {
86         this.name = producerName;
87
88         // Check and get the JMS Properties
89         if (!(producerParameters.getCarrierTechnologyParameters() instanceof JmsCarrierTechnologyParameters)) {
90             final String errorMessage = "specified producer properties are not applicable to a JMS producer ("
91                             + this.name + ")";
92             LOGGER.warn(errorMessage);
93             throw new ApexEventException(errorMessage);
94         }
95         jmsProducerProperties = (JmsCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
96
97         // Look up the JMS connection factory
98         InitialContext jmsContext = null;
99         ConnectionFactory connectionFactory = null;
100         try {
101             jmsContext = new InitialContext(jmsProducerProperties.getJmsProducerProperties());
102             connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsProducerProperties.getConnectionFactory());
103
104             // Check if we actually got a connection factory
105             if (connectionFactory == null) {
106                 throw new IllegalArgumentException(
107                                 "JMS context lookup of \"" + jmsProducerProperties.getConnectionFactory()
108                                                 + "\" returned null for producer (" + this.name + ")");
109             }
110         } catch (final Exception e) {
111             final String errorMessage = "lookup of JMS connection factory  \""
112                             + jmsProducerProperties.getConnectionFactory() + "\" failed for JMS producer properties \""
113                             + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
114             LOGGER.warn(errorMessage, e);
115             throw new ApexEventException(errorMessage, e);
116         }
117
118         // Lookup the topic on which we will send events
119         Topic jmsOutgoingTopic;
120         try {
121             jmsOutgoingTopic = (Topic) jmsContext.lookup(jmsProducerProperties.getProducerTopic());
122
123             // Check if we actually got a topic
124             if (jmsOutgoingTopic == null) {
125                 throw new IllegalArgumentException("JMS context lookup of \"" + jmsProducerProperties.getProducerTopic()
126                                 + "\" returned null for producer (" + this.name + ")");
127             }
128         } catch (final Exception e) {
129             final String errorMessage = "lookup of JMS topic  \"" + jmsProducerProperties.getProducerTopic()
130                             + "\" failed for JMS producer properties \""
131                             + jmsProducerProperties.getJmsProducerProperties() + FOR_PRODUCER_TAG + this.name + ")";
132             LOGGER.warn(errorMessage, e);
133             throw new ApexEventException(errorMessage, e);
134         }
135
136         // Create and start a connection to the JMS server
137         try {
138             connection = connectionFactory.createConnection(jmsProducerProperties.getSecurityPrincipal(),
139                             jmsProducerProperties.getSecurityCredentials());
140             connection.start();
141         } catch (final Exception e) {
142             final String errorMessage = "connection to JMS server failed for JMS properties \""
143                             + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
144             LOGGER.warn(errorMessage, e);
145             throw new ApexEventException(errorMessage, e);
146         }
147
148         // Create a JMS session for sending events
149         try {
150             jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
151         } catch (final Exception e) {
152             final String errorMessage = "creation of session to JMS server failed for JMS properties \""
153                             + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")";
154             LOGGER.warn(errorMessage, e);
155             throw new ApexEventException(errorMessage, e);
156         }
157
158         // Create a JMS message producer for sending events
159         try {
160             messageProducer = jmsSession.createProducer(jmsOutgoingTopic);
161         } catch (final Exception e) {
162             final String errorMessage = "creation of producer for sending events "
163                             + "to JMS server failed for JMS properties \""
164                             + jmsProducerProperties.getJmsConsumerProperties() + "\"";
165             LOGGER.warn(errorMessage, e);
166             throw new ApexEventException(errorMessage, e);
167         }
168     }
169
170     /*
171      * (non-Javadoc)
172      *
173      * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName()
174      */
175     @Override
176     public String getName() {
177         return name;
178     }
179
180     /*
181      * (non-Javadoc)
182      *
183      * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap.policy.apex.service.
184      * parameters. eventhandler.EventHandlerPeeredMode)
185      */
186     @Override
187     public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
188         return peerReferenceMap.get(peeredMode);
189     }
190
191     /*
192      * (non-Javadoc)
193      *
194      * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap.policy.apex.service.
195      * parameters. eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference)
196      */
197     @Override
198     public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
199         peerReferenceMap.put(peeredMode, peeredReference);
200     }
201
202     /*
203      * (non-Javadoc)
204      *
205      * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#sendEvent(long, java.lang.String,
206      * java.lang.Object)
207      */
208     @Override
209     public void sendEvent(final long executionId, final String eventname, final Object eventObject) {
210         // Check if this is a synchronized event, if so we have received a reply
211         final SynchronousEventCache synchronousEventCache = (SynchronousEventCache) peerReferenceMap
212                         .get(EventHandlerPeeredMode.SYNCHRONOUS);
213         if (synchronousEventCache != null) {
214             synchronousEventCache.removeCachedEventToApexIfExists(executionId);
215         }
216
217         // Check if the object to be sent is serializable
218         if (!Serializable.class.isAssignableFrom(eventObject.getClass())) {
219             final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
220                             + ", object of type \"" + eventObject.getClass().getCanonicalName()
221                             + "\" is not serializable";
222             LOGGER.warn(errorMessage);
223             throw new ApexEventRuntimeException(errorMessage);
224         }
225
226         // The JMS message to send is constructed using the JMS session
227         Message jmsMessage = null;
228
229         // Check the type of JMS message to send
230         if (jmsProducerProperties.isObjectMessageSending()) {
231             // We should send a JMS Object Message
232             try {
233                 jmsMessage = jmsSession.createObjectMessage((Serializable) eventObject);
234             } catch (final Exception e) {
235                 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
236                                 + ", could not create JMS Object Message for object \"" + eventObject;
237                 LOGGER.warn(errorMessage, e);
238                 throw new ApexEventRuntimeException(errorMessage);
239             }
240         } else {
241             // We should send a JMS Text Message
242             try {
243                 jmsMessage = jmsSession.createTextMessage(eventObject.toString());
244             } catch (final Exception e) {
245                 final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
246                                 + ", could not create JMS Text Message for object \"" + eventObject;
247                 LOGGER.warn(errorMessage, e);
248                 throw new ApexEventRuntimeException(errorMessage);
249             }
250         }
251
252         try {
253             messageProducer.send(jmsMessage);
254         } catch (final Exception e) {
255             final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name
256                             + ", send failed for object \"" + eventObject;
257             LOGGER.warn(errorMessage, e);
258             throw new ApexEventRuntimeException(errorMessage);
259         }
260     }
261
262     /*
263      * (non-Javadoc)
264      *
265      * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop()
266      */
267     @Override
268     public void stop() {
269         // Close the message producer
270         try {
271             messageProducer.close();
272         } catch (final Exception e) {
273             final String errorMessage = "failed to close JMS message producer " + this.name + " for sending messages";
274             LOGGER.warn(errorMessage, e);
275         }
276
277         // Close the session
278         try {
279             jmsSession.close();
280         } catch (final Exception e) {
281             final String errorMessage = "failed to close the JMS session for  " + this.name + " for sending messages";
282             LOGGER.warn(errorMessage, e);
283         }
284
285         // Close the connection to the JMS server
286         try {
287             connection.close();
288         } catch (final Exception e) {
289             final String errorMessage = "close of connection to the JMS server for  " + this.name + " failed";
290             LOGGER.warn(errorMessage, e);
291         }
292     }
293 }