2 /*************************************************************************//**
4 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 ****************************************************************************/
24 import java.util.concurrent.ConcurrentHashMap;
27 * Server-side Failure Detector (FD) implementation.
28 * @param port: the port that the FD service listens for incoming UDP packets
29 * @param timeout: how often the FD checks the status of client processes
30 * @param threshold: number of missing ping messages before declaring a client dead
31 * @param debug: debug mode on/off
34 public class FDServer implements Runnable {
36 private String IP_ADDR;
39 private int THRESHOLD;
40 private boolean DEBUG;
42 // Data structures that store information about alive processes, processes alive/dead this round
43 private Map<String, Integer> alive = new ConcurrentHashMap<String, Integer>(); // Key: process IP address; Value: # consecutive missed pings
44 private Set<String> alive_this_round = ConcurrentHashMap.newKeySet(); // Needs to be synchronized because it is accessed by multiple threads
45 private Set<String> dead = new HashSet<String>();
47 public FDServer(String ip_addr, int port, long timeout, int threshold, boolean debug) throws IOException {
51 THRESHOLD = threshold;
53 (new Thread(this)).start();
60 } catch (IOException e) {
65 private void runFD() throws IOException {
66 // Check the status of client processes periodically
67 TimerTask timer = new TimerTask() {
72 new Timer().scheduleAtFixedRate(timer, 0, TIMEOUT);
74 // Define a DatagramSocket object for receiving incoming UDP packets
75 @SuppressWarnings("resource")
76 DatagramSocket sock = new DatagramSocket(PORT);
77 byte[] buffer = new byte[256];
79 // Wait for incoming PING messages from remote clients
81 DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
83 String[] content = new String(packet.getData()).trim().split(":"); // Remove leading and trailing spaces
84 String msg_type = content[0];
85 String pid = content[1];
86 // Process only PING UDP packets
87 if(msg_type.equals("PING")) {
88 String ip = packet.getAddress().getHostAddress();
89 alive_this_round.add(pid);
90 if(DEBUG) System.out.println("Keep-alive message received from process " + pid + " (sender IP Address: " + ip +")");
91 sendReplyMessage(packet.getAddress());
94 if(DEBUG) System.out.println("The received message is not a PING. Received content: " + content);
99 private void sendReplyMessage(InetAddress address) throws IOException {
100 DatagramSocket sock = new DatagramSocket();
101 // Allocate buffer for the PING message
102 String content = "PONG:" + IP_ADDR;
103 byte[] buffer = content.getBytes();
104 // Sent a PONG message
105 DatagramPacket packet = new DatagramPacket(buffer, buffer.length, address, PORT);
110 // Update the list of processes that are alive
111 private void checkClientStatus() {
112 if(DEBUG) System.out.println("/================================/");
113 if(DEBUG) System.out.println("Update status of remote processes");
114 // Check if a process alive the previous round is still alive
115 // Otherwise increment its counter
116 Set<String> alive_processes = alive.keySet();
117 Iterator<String> iter = alive_processes.iterator();
118 while(iter.hasNext()) {
119 String process = iter.next();
120 if(!alive_this_round.contains(process)) {
121 int counter = alive.get(process) + 1;
122 alive.put(process, counter);
123 if(DEBUG) System.out.println("Process " + process + " hasn't sent a message " + counter + " time(s) in a row");
124 // If the number of consecutive missed ping messages reached the threshold,
125 // then assume the process to be dead
126 if(counter == THRESHOLD) {
128 if(DEBUG) System.out.println("Process " + process + " is dead");
133 // Processes alive this round
134 iter = alive_this_round.iterator();
135 while(iter.hasNext()) {
136 String process = iter.next();
137 alive.put(process, 0);
138 if(DEBUG) System.out.println("Process " + process + " is alive this round");
142 // Remove dead processes
143 iter = dead.iterator();
144 while(iter.hasNext()) {
145 String process = iter.next();
146 if(alive.containsKey(process))
147 alive.remove(process);
148 if(DEBUG) System.out.println("Process " + process + " is removed from the list of alive processes");
152 alive_this_round.clear();
154 if(DEBUG) System.out.println();
157 // Return the set of alive processes to up-stream applications
158 public Set<String> getAliveProcesses() {
159 return alive.keySet();