3ff420ffa99bef10c79380b0430bd3785e248e31
[policy/clamp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2021 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
22
23 import java.io.Closeable;
24 import java.time.Instant;
25 import java.util.LinkedHashMap;
26 import java.util.Map;
27 import java.util.TimerTask;
28 import java.util.UUID;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ScheduledExecutorService;
31 import java.util.concurrent.TimeUnit;
32 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
33 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
34 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoops;
35 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics;
36 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopAck;
37 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister;
38 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
39 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseStatus;
40 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus;
41 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
42 import org.onap.policy.clamp.controlloop.participant.intermediary.api.ControlLoopElementListener;
43 import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
44 import org.onap.policy.models.base.PfModelException;
45 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * This class sends messages from participants to CLAMP.
51  */
52 public class MessageSender extends TimerTask implements Closeable {
53     private static final Logger LOGGER = LoggerFactory.getLogger(MessageSender.class);
54
55     private final ParticipantHandler participantHandler;
56     private final ParticipantMessagePublisher publisher;
57     private ScheduledExecutorService timerPool;
58
59     /**
60      * Constructor, set the publisher.
61      *
62      * @param participantHandler the participant handler to use for gathering information
63      * @param publisher the publisher to use for sending messages
64      * @param interval time interval to send Participant Status periodic messages
65      */
66     public MessageSender(ParticipantHandler participantHandler, ParticipantMessagePublisher publisher,
67             long interval) {
68         this.participantHandler = participantHandler;
69         this.publisher = publisher;
70
71         // Kick off the timer
72         timerPool = makeTimerPool();
73         timerPool.scheduleAtFixedRate(this, 0, interval, TimeUnit.MILLISECONDS);
74     }
75
76     @Override
77     public void run() {
78         LOGGER.debug("Sent heartbeat to CLAMP");
79         this.sendHeartbeat();
80     }
81
82     @Override
83     public void close() {
84         timerPool.shutdown();
85     }
86
87     /**
88      * Send a response message for this participant.
89      *
90      * @param ackMessage the details to include in the response message
91      */
92     public void sendAckResponse(ControlLoopAck ackMessage) {
93         sendAckResponse(null, ackMessage);
94     }
95
96     /**
97      * Dispatch a response message for this participant.
98      *
99      * @param controlLoopId the control loop to which this message is a response
100      * @param ackMessage the details to include in the response message
101      */
102     public void sendAckResponse(ToscaConceptIdentifier controlLoopId, ControlLoopAck ackMessage) {
103         // Participant related fields
104         ackMessage.setParticipantType(participantHandler.getParticipantType());
105         ackMessage.setParticipantId(participantHandler.getParticipantId());
106         publisher.sendControlLoopAck(ackMessage);
107     }
108
109     /**
110      * Send a ParticipantRegister message for this participant.
111      *
112      * @param message the participantRegister message
113      */
114     public void sendParticipantRegister(ParticipantRegister message) {
115         publisher.sendParticipantRegister(message);
116     }
117
118     /**
119      * Send a ParticipantDeregister message for this participant.
120      *
121      * @param message the participantDeRegister message
122      */
123     public void sendParticipantDeregister(ParticipantDeregister message) {
124         publisher.sendParticipantDeregister(message);
125     }
126
127     /**
128      * Send a ParticipantUpdateAck message for this participant update.
129      *
130      * @param message the participantUpdateAck message
131      */
132     public void sendParticipantUpdateAck(ParticipantUpdateAck message) {
133         publisher.sendParticipantUpdateAck(message);
134     }
135
136     /**
137      * Send a ParticipantStatus message for this participant.
138      *
139      * @param participantStatus the ParticipantStatus message
140      */
141     public void sendParticipantStatus(ParticipantStatus participantStatus) {
142         var controlLoops = participantHandler.getControlLoopHandler().getControlLoops();
143         for (ControlLoopElementListener clElementListener :
144             participantHandler.getControlLoopHandler().getListeners()) {
145             updateClElementStatistics(controlLoops, clElementListener);
146         }
147
148         publisher.sendParticipantStatus(participantStatus);
149     }
150
151     /**
152      * Dispatch a heartbeat for this participant.
153      */
154     public void sendHeartbeat() {
155         publisher.sendHeartbeat(participantHandler.makeHeartbeat());
156     }
157
158     /**
159      * Update ControlLoopElement statistics. The control loop elements listening will be
160      * notified to retrieve statistics from respective controlloop elements, and controlloopelements
161      * data on the handler will be updated.
162      *
163      * @param controlLoops the control loops
164      * @param clElementListener control loop element listener
165      */
166     public void updateClElementStatistics(ControlLoops controlLoops, ControlLoopElementListener clElementListener) {
167         for (ControlLoop controlLoop : controlLoops.getControlLoopList()) {
168             for (ControlLoopElement element : controlLoop.getElements().values()) {
169                 try {
170                     clElementListener.handleStatistics(element.getId());
171                 } catch (PfModelException e) {
172                     LOGGER.debug("Getting statistics for Control loop element failed");
173                 }
174             }
175         }
176     }
177
178     /**
179      * Makes a new timer pool.
180      *
181      * @return a new timer pool
182      */
183     protected ScheduledExecutorService makeTimerPool() {
184         return Executors.newScheduledThreadPool(1);
185     }
186 }