Replace non-Javadoc comments with inheritDocs
[policy/apex-pdp.git] / plugins / plugins-event / plugins-event-carrier / plugins-event-carrier-websocket / src / main / java / org / onap / policy / apex / plugins / event / carrier / websocket / ApexWebSocketConsumer.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.plugins.event.carrier.websocket;
22
23 import java.util.EnumMap;
24 import java.util.Map;
25
26 import org.onap.policy.apex.core.infrastructure.messaging.MessagingException;
27 import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WsStringMessageClient;
28 import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WsStringMessageListener;
29 import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WsStringMessageServer;
30 import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WsStringMessager;
31 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
32 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
33 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
34 import org.onap.policy.apex.service.engine.event.ApexEventException;
35 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
36 import org.onap.policy.apex.service.engine.event.PeeredReference;
37 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
38 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * Concrete implementation an Apex event consumer that receives events using Kafka.
44  *
45  * @author Liam Fallon (liam.fallon@ericsson.com)
46  */
47 public class ApexWebSocketConsumer implements ApexEventConsumer, WsStringMessageListener, Runnable {
48     private static final int WEB_SOCKET_WAIT_SLEEP_TIME = 100;
49
50     // Get a reference to the logger
51     private static final Logger LOGGER = LoggerFactory.getLogger(ApexWebSocketConsumer.class);
52
53     // The web socket messager, may be WS a server or a client
54     private WsStringMessager wsStringMessager;
55
56     // The event receiver that will receive events from this consumer
57     private ApexEventReceiver eventReceiver;
58
59     // The name for this consumer
60     private String name = null;
61
62     // The peer references for this event handler
63     private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
64
65     // The consumer thread and stopping flag
66     private Thread consumerThread;
67     private boolean stopOrderedFlag = false;
68
69     // The number of events read to date
70     private int eventsRead = 0;
71
72     @Override
73     public void init(final String consumerName, final EventHandlerParameters consumerParameters,
74             final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
75         this.eventReceiver = incomingEventReceiver;
76         this.name = consumerName;
77
78         // Check and get the Kafka Properties
79         if (!(consumerParameters.getCarrierTechnologyParameters() instanceof WebSocketCarrierTechnologyParameters)) {
80             LOGGER.warn("specified consumer properties are not applicable to a web socket consumer");
81             throw new ApexEventException("specified consumer properties are not applicable to a web socket consumer");
82         }
83
84         // The Web Socket properties
85         WebSocketCarrierTechnologyParameters webSocketConsumerProperties =
86                 (WebSocketCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
87
88         // Check if this is a server or a client Web Socket
89         if (webSocketConsumerProperties.isWsClient()) {
90             // Create a WS client
91             wsStringMessager = new WsStringMessageClient(webSocketConsumerProperties.getHost(),
92                     webSocketConsumerProperties.getPort());
93         } else {
94             wsStringMessager = new WsStringMessageServer(webSocketConsumerProperties.getPort());
95         }
96
97         // Start reception of event strings on the web socket
98         try {
99             wsStringMessager.start(this);
100         } catch (final MessagingException e) {
101             LOGGER.warn("could not start web socket consumer", e);
102         }
103     }
104
105     /**
106      * {@inheritDoc}.
107      */
108     @Override
109     public void start() {
110         // Configure and start the event reception thread
111         final String threadName = this.getClass().getName() + ":" + this.name;
112         consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
113         consumerThread.setDaemon(true);
114         consumerThread.start();
115     }
116
117     /**
118      * {@inheritDoc}.
119      */
120     @Override
121     public String getName() {
122         return name;
123     }
124
125     /**
126      * {@inheritDoc}.
127      */
128     @Override
129     public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
130         return peerReferenceMap.get(peeredMode);
131     }
132
133     /**
134      * {@inheritDoc}.
135      */
136     @Override
137     public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
138         peerReferenceMap.put(peeredMode, peeredReference);
139     }
140
141     /**
142      * {@inheritDoc}.
143      */
144     @Override
145     public void run() {
146         while (consumerThread.isAlive() && !stopOrderedFlag) {
147             ThreadUtilities.sleep(WEB_SOCKET_WAIT_SLEEP_TIME);
148         }
149     }
150
151     /**
152      * {@inheritDoc}.
153      */
154     @Override
155     public void stop() {
156         if (wsStringMessager != null) {
157             wsStringMessager.stop();
158         }
159         stopOrderedFlag = true;
160     }
161
162     /**
163      * {@inheritDoc}.
164      */
165     @Override
166     public void receiveString(final String eventString) {
167         try {
168             eventReceiver.receiveEvent(null, eventString);
169             eventsRead++;
170         } catch (final Exception e) {
171             final String errorMessage = "Error sending event " + name + '_' + eventsRead + ", " + e.getMessage()
172                     + ", event:\n" + eventString;
173             LOGGER.warn(errorMessage, e);
174         }
175     }
176 }