f2c59453cdcf8d574ff46e3dfb537df18364ae9a
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.plugins.event.carrier.websocket;
22
23 import java.util.EnumMap;
24 import java.util.Map;
25
26 import org.onap.policy.apex.core.infrastructure.messaging.MessagingException;
27 import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageClient;
28 import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageListener;
29 import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageServer;
30 import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessager;
31 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
32 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
33 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
34 import org.onap.policy.apex.service.engine.event.ApexEventException;
35 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
36 import org.onap.policy.apex.service.engine.event.PeeredReference;
37 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
38 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * Concrete implementation an Apex event consumer that receives events using Kafka.
44  *
45  * @author Liam Fallon (liam.fallon@ericsson.com)
46  */
47 public class ApexWebSocketConsumer implements ApexEventConsumer, WSStringMessageListener, Runnable {
48     private static final int WEB_SOCKET_WAIT_SLEEP_TIME = 100;
49
50     // Get a reference to the logger
51     private static final Logger LOGGER = LoggerFactory.getLogger(ApexWebSocketConsumer.class);
52
53     // The Web Socket properties
54     private WEBSOCKETCarrierTechnologyParameters webSocketConsumerProperties;
55
56     // The web socket messager, may be WS a server or a client
57     private WSStringMessager wsStringMessager;
58
59     // The event receiver that will receive events from this consumer
60     private ApexEventReceiver eventReceiver;
61
62     // The name for this consumer
63     private String name = null;
64
65     // The peer references for this event handler
66     private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
67
68     // The consumer thread and stopping flag
69     private Thread consumerThread;
70     private boolean stopOrderedFlag = false;
71
72     // The number of events read to date
73     private int eventsRead = 0;
74
75     @Override
76     public void init(final String consumerName, final EventHandlerParameters consumerParameters,
77             final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
78         this.eventReceiver = incomingEventReceiver;
79         this.name = consumerName;
80
81         // Check and get the Kafka Properties
82         if (!(consumerParameters.getCarrierTechnologyParameters() instanceof WEBSOCKETCarrierTechnologyParameters)) {
83             LOGGER.warn("specified consumer properties are not applicable to a web socket consumer");
84             throw new ApexEventException("specified consumer properties are not applicable to a web socket consumer");
85         }
86         webSocketConsumerProperties =
87                 (WEBSOCKETCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
88
89         // Check if this is a server or a client Web Socket
90         if (webSocketConsumerProperties.isWsClient()) {
91             // Create a WS client
92             wsStringMessager = new WSStringMessageClient(webSocketConsumerProperties.getHost(),
93                     webSocketConsumerProperties.getPort());
94         } else {
95             wsStringMessager = new WSStringMessageServer(webSocketConsumerProperties.getPort());
96         }
97
98         // Start reception of event strings on the web socket
99         try {
100             wsStringMessager.start(this);
101         } catch (final MessagingException e) {
102             LOGGER.warn("could not start web socket consumer");
103         }
104     }
105
106     /*
107      * (non-Javadoc)
108      *
109      * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#start()
110      */
111     @Override
112     public void start() {
113         // Configure and start the event reception thread
114         final String threadName = this.getClass().getName() + ":" + this.name;
115         consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
116         consumerThread.setDaemon(true);
117         consumerThread.start();
118     }
119
120     /*
121      * (non-Javadoc)
122      *
123      * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getName()
124      */
125     @Override
126     public String getName() {
127         return name;
128     }
129
130     /*
131      * (non-Javadoc)
132      *
133      * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getPeeredReference(org.onap.policy.apex.service.
134      * parameters. eventhandler.EventHandlerPeeredMode)
135      */
136     @Override
137     public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
138         return peerReferenceMap.get(peeredMode);
139     }
140
141     /*
142      * (non-Javadoc)
143      *
144      * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#setPeeredReference(org.onap.policy.apex.service.
145      * parameters. eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference)
146      */
147     @Override
148     public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
149         peerReferenceMap.put(peeredMode, peeredReference);
150     }
151
152     /*
153      * (non-Javadoc)
154      *
155      * @see java.lang.Runnable#run()
156      */
157     @Override
158     public void run() {
159         while (consumerThread.isAlive() && !stopOrderedFlag) {
160             ThreadUtilities.sleep(WEB_SOCKET_WAIT_SLEEP_TIME);
161         }
162     }
163
164     /*
165      * (non-Javadoc)
166      *
167      * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop()
168      */
169     @Override
170     public void stop() {
171         if (wsStringMessager != null) {
172             wsStringMessager.stop();
173         }
174         stopOrderedFlag = true;
175     }
176
177     /*
178      * (non-Javadoc)
179      *
180      * @see
181      * org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageListener#receiveString(java.
182      * lang. String)
183      */
184     @Override
185     public void receiveString(final String eventString) {
186         try {
187             eventReceiver.receiveEvent(eventString);
188             eventsRead++;
189         } catch (final Exception e) {
190             final String errorMessage = "Error sending event " + name + '_' + eventsRead + ", " + e.getMessage()
191                     + ", event:\n" + eventString;
192             LOGGER.warn(errorMessage);
193         }
194     }
195 }