Use lombok in apex-pdp #4
[policy/apex-pdp.git] / testsuites / integration / integration-uservice-test / src / test / java / org / onap / policy / apex / testsuites / integration / uservice / adapt / jms / JmsEventSubscriber.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
5  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.apex.testsuites.integration.uservice.adapt.jms;
24
25 import javax.jms.Connection;
26 import javax.jms.ConnectionFactory;
27 import javax.jms.JMSException;
28 import javax.jms.Message;
29 import javax.jms.MessageConsumer;
30 import javax.jms.ObjectMessage;
31 import javax.jms.Session;
32 import javax.jms.TextMessage;
33 import javax.jms.Topic;
34 import lombok.Getter;
35 import org.apache.activemq.command.ActiveMQTopic;
36 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
37 import org.onap.policy.apex.service.engine.event.ApexEventException;
38 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
39 import org.onap.policy.apex.testsuites.integration.common.testclasses.PingTestClass;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 /**
44  * The Class JmsEventSubscriber.
45  *
46  * @author Liam Fallon (liam.fallon@ericsson.com)
47  */
48 public class JmsEventSubscriber implements Runnable {
49     private static final Logger LOGGER = LoggerFactory.getLogger(JmsEventSubscriber.class);
50
51     private final String topic;
52     @Getter
53     private long eventsReceivedCount = 0;
54
55     private final Thread subscriberThread;
56     private final Connection connection;
57
58     /**
59      * Instantiates a new jms event subscriber.
60      *
61      * @param topic the topic
62      * @param connectionFactory the connection factory
63      * @param username the username
64      * @param password the password
65      * @throws JMSException the JMS exception
66      */
67     public JmsEventSubscriber(final String topic, final ConnectionFactory connectionFactory, final String username,
68             final String password) throws JMSException {
69         this.topic = topic;
70         connection = connectionFactory.createConnection(username, password);
71         connection.start();
72
73         subscriberThread = new Thread(this);
74         subscriberThread.start();
75     }
76
77     /**
78      * {@inheritDoc}.
79      */
80     @Override
81     public void run() {
82         final Topic jmsTopic = new ActiveMQTopic(topic);
83         try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
84                 final MessageConsumer jmsConsumer = jmsSession.createConsumer(jmsTopic)) {
85
86             while (subscriberThread.isAlive() && !subscriberThread.isInterrupted()) {
87                 try {
88                     final Message message = jmsConsumer.receive(100);
89                     if (message == null) {
90                         continue;
91                     }
92
93                     if (message instanceof ObjectMessage) {
94                         final PingTestClass testPing = (PingTestClass) ((ObjectMessage) message).getObject();
95                         testPing.verify();
96                     } else if (message instanceof TextMessage) {
97                         ((TextMessage) message).getText();
98                     } else {
99                         throw new ApexEventException("unknowm message \"" + message + "\" of type \""
100                                 + message.getClass().getName() + "\" received");
101                     }
102                     eventsReceivedCount++;
103                 } catch (final Exception e) {
104                     if (!(e.getCause() instanceof InterruptedException)) {
105                         throw new ApexEventRuntimeException("JMS message reception failed", e);
106                     }
107                 }
108             }
109
110         } catch (final Exception e) {
111             throw new ApexEventRuntimeException("JMS event consumption failed", e);
112         }
113
114         LOGGER.info("{} : event reception completed, {} events received", this.getClass().getName(),
115                 eventsReceivedCount);
116     }
117
118     /**
119      * Shutdown.
120      *
121      * @throws JMSException the JMS exception
122      */
123     public void shutdown() throws JMSException {
124         LOGGER.info("{} : stopping...", this.getClass().getName());
125
126         subscriberThread.interrupt();
127
128         while (subscriberThread.isAlive()) {
129             ThreadUtilities.sleep(10);
130         }
131
132         connection.close();
133         LOGGER.info("{} : stopped", this.getClass().getName());
134     }
135
136 }