Initial OpenECOMP Demo commit
[demo.git] / vnfs / vLB / DNSManager / src / main / java / FDServer.java
1
2 /*************************************************************************//**
3  *
4  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  ****************************************************************************/
18
19 package main.java;
20
21 import java.io.*;
22 import java.net.*;
23 import java.util.*;
24 import java.util.concurrent.ConcurrentHashMap;
25
26 /*
27  * Server-side Failure Detector (FD) implementation.
28  * @param port: the port that the FD service listens for incoming UDP packets
29  * @param timeout: how often the FD checks the status of client processes
30  * @param threshold: number of missing ping messages before declaring a client dead
31  * @param debug: debug mode on/off
32  */
33
34 public class FDServer implements Runnable {
35         // Input parameters
36         private String IP_ADDR;
37         private int PORT;
38         private long TIMEOUT;
39         private int THRESHOLD;
40         private boolean DEBUG;
41         
42         // Data structures that store information about alive processes, processes alive/dead this round
43         private Map<String, Integer> alive = new ConcurrentHashMap<String, Integer>(); // Key: process IP address; Value: # consecutive missed pings
44         private Set<String> alive_this_round = ConcurrentHashMap.newKeySet(); // Needs to be synchronized because it is accessed by multiple threads
45         private Set<String> dead = new HashSet<String>();
46         
47         public FDServer(String ip_addr, int port, long timeout, int threshold, boolean debug) throws IOException {
48                 IP_ADDR = ip_addr;
49                 PORT = port;
50                 TIMEOUT = timeout;
51                 THRESHOLD = threshold;
52                 DEBUG = debug;
53                 (new Thread(this)).start();
54         }
55         
56         @Override
57         public void run() {
58                 try {
59                         runFD();
60                 } catch (IOException e) {
61                         e.printStackTrace();
62                 }       
63         }
64         
65         private void runFD() throws IOException {
66                 // Check the status of client processes periodically
67                 TimerTask timer = new TimerTask() {
68                         public void run() {
69                                 checkClientStatus();
70                         }
71                 };
72                 new Timer().scheduleAtFixedRate(timer, 0, TIMEOUT);
73                 
74                 // Define a DatagramSocket object for receiving incoming UDP packets
75                 @SuppressWarnings("resource")
76                 DatagramSocket sock = new DatagramSocket(PORT);
77                 byte[] buffer = new byte[256];
78                 
79                 // Wait for incoming PING messages from remote clients
80                 while(true) {
81                         DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
82             sock.receive(packet);
83             String[] content = new String(packet.getData()).trim().split(":"); // Remove leading and trailing spaces
84             String msg_type = content[0];
85             String pid = content[1];
86             // Process only PING UDP packets
87             if(msg_type.equals("PING")) {
88                 String ip = packet.getAddress().getHostAddress();
89                 alive_this_round.add(pid);
90                                 if(DEBUG) System.out.println("Keep-alive message received from process " + pid + " (sender IP Address: " + ip +")");
91                                 sendReplyMessage(packet.getAddress());
92             }
93             else {
94                 if(DEBUG) System.out.println("The received message is not a PING. Received content: " + content);
95             }
96                 }
97         }
98         
99         private void sendReplyMessage(InetAddress address) throws IOException {
100                 DatagramSocket sock = new DatagramSocket();
101                 // Allocate buffer for the PING message
102                 String content = "PONG:" + IP_ADDR;
103                 byte[] buffer = content.getBytes();
104                 // Sent a PONG message
105                 DatagramPacket packet = new DatagramPacket(buffer, buffer.length, address, PORT);
106                 sock.send(packet);
107                 sock.close();
108         }
109
110         // Update the list of processes that are alive
111         private void checkClientStatus() {
112                 if(DEBUG) System.out.println("/================================/");
113                 if(DEBUG) System.out.println("Update status of remote processes");
114                 // Check if a process alive the previous round is still alive
115                 // Otherwise increment its counter
116                 Set<String> alive_processes = alive.keySet();
117                 Iterator<String> iter = alive_processes.iterator();
118                 while(iter.hasNext()) {
119                         String process = iter.next();
120                         if(!alive_this_round.contains(process)) {
121                                 int counter = alive.get(process) + 1;
122                                 alive.put(process, counter);
123                                 if(DEBUG) System.out.println("Process " + process + " hasn't sent a message " + counter + " time(s) in a row");
124                                 // If the number of consecutive missed ping messages reached the threshold, 
125                                 // then assume the process to be dead
126                                 if(counter == THRESHOLD) {
127                                         dead.add(process);
128                                         if(DEBUG) System.out.println("Process " + process + " is dead");
129                                 }
130                         }
131                 }
132                 
133                 // Processes alive this round
134                 iter = alive_this_round.iterator();
135                 while(iter.hasNext()) {
136                         String process = iter.next();
137                         alive.put(process, 0);
138                         if(DEBUG) System.out.println("Process " + process + " is alive this round");
139                 }
140                         
141                 
142                 // Remove dead processes
143                 iter = dead.iterator();
144                 while(iter.hasNext()) {
145                         String process = iter.next();
146                         if(alive.containsKey(process))
147                                 alive.remove(process);
148                         if(DEBUG) System.out.println("Process " + process + " is removed from the list of alive processes");
149                 }
150                 
151                 // Cleanup
152                 alive_this_round.clear();
153                 dead.clear();
154                 if(DEBUG) System.out.println();
155         }
156         
157         // Return the set of alive processes to up-stream applications
158         public Set<String> getAliveProcesses() {
159                 return alive.keySet();
160         }
161 }