reduce sonar issue - drools-pdp
[policy/drools-pdp.git] / feature-server-pool / src / main / java / org / onap / policy / drools / serverpool / Server.java
1 /*
2  * ============LICENSE_START=======================================================
3  * feature-server-pool
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.serverpool;
22
23 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_HTTPS;
24 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SELF_SIGNED_CERTIFICATES;
25 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_ADAPTIVE_GAP_ADJUST;
26 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_CONNECT_TIMEOUT;
27 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_INITIAL_ALLOWED_GAP;
28 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_IP_ADDRESS;
29 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_PORT;
30 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_READ_TIMEOUT;
31 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_THREADS_CORE_POOL_SIZE;
32 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_THREADS_KEEP_ALIVE_TIME;
33 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_THREADS_MAXIMUM_POOL_SIZE;
34 import static org.onap.policy.drools.serverpool.ServerPoolProperties.HOST_LIST;
35 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_ADAPTIVE_GAP_ADJUST;
36 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_CONNECT_TIMEOUT;
37 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_HTTPS;
38 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_INITIAL_ALLOWED_GAP;
39 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_IP_ADDRESS;
40 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_PORT;
41 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_READ_TIMEOUT;
42 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_SELF_SIGNED_CERTIFICATES;
43 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_THREADS_CORE_POOL_SIZE;
44 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_THREADS_KEEP_ALIVE_TIME;
45 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_THREADS_MAXIMUM_POOL_SIZE;
46 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SITE_IP_ADDRESS;
47 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SITE_PORT;
48 import static org.onap.policy.drools.serverpool.ServerPoolProperties.getProperty;
49
50 import java.io.ByteArrayInputStream;
51 import java.io.ByteArrayOutputStream;
52 import java.io.DataInputStream;
53 import java.io.DataOutputStream;
54 import java.io.IOException;
55 import java.io.PrintStream;
56 import java.io.StringReader;
57 import java.lang.reflect.Field;
58 import java.net.InetAddress;
59 import java.net.InetSocketAddress;
60 import java.net.UnknownHostException;
61 import java.nio.charset.StandardCharsets;
62 import java.security.KeyManagementException;
63 import java.security.NoSuchAlgorithmException;
64 import java.text.SimpleDateFormat;
65 import java.util.Base64;
66 import java.util.Collection;
67 import java.util.Date;
68 import java.util.HashSet;
69 import java.util.LinkedList;
70 import java.util.Objects;
71 import java.util.Properties;
72 import java.util.TreeMap;
73 import java.util.TreeSet;
74 import java.util.UUID;
75 import java.util.concurrent.ExecutionException;
76 import java.util.concurrent.FutureTask;
77 import java.util.concurrent.LinkedTransferQueue;
78 import java.util.concurrent.ThreadPoolExecutor;
79 import java.util.concurrent.TimeUnit;
80 import java.util.concurrent.TimeoutException;
81
82 import javax.ws.rs.client.Client;
83 import javax.ws.rs.client.Entity;
84 import javax.ws.rs.client.WebTarget;
85 import javax.ws.rs.core.MediaType;
86 import javax.ws.rs.core.Response;
87
88 import org.glassfish.jersey.client.ClientProperties;
89 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
90 import org.onap.policy.common.endpoints.http.client.HttpClient;
91 import org.onap.policy.common.endpoints.http.client.HttpClientConfigException;
92 import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
93 import org.onap.policy.common.endpoints.http.server.HttpServletServer;
94 import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
95 import org.onap.policy.drools.utils.PropertyUtil;
96 import org.slf4j.Logger;
97 import org.slf4j.LoggerFactory;
98
99 public class Server implements Comparable<Server> {
100     private static Logger logger = LoggerFactory.getLogger(Server.class);
101
102     // maps UUID to Server object for all known servers
103     private static TreeMap<UUID, Server> servers =
104         new TreeMap<>(Util.uuidComparator);
105
106     // maps UUID to Server object for all failed servers
107     // (so they aren't accidentally restored, due to updates from other hosts)
108     private static TreeMap<UUID, Server> failedServers =
109         new TreeMap<>(Util.uuidComparator);
110
111     // subset of servers to be notified (null means it needs to be rebuilt)
112     private static LinkedList<Server> notifyList = null;
113
114     // data to be sent out to notify list
115     private static TreeSet<Server> updatedList = new TreeSet<>();
116
117     // the server associated with the current host
118     private static Server thisServer = null;
119
120     // the current REST server
121     private static HttpServletServer restServer;
122
123     /*==================================================*/
124     /* Some properties extracted at initialization time */
125     /*==================================================*/
126
127     // initial value of gap to allow between pings
128     private static long initialAllowedGap;
129
130     // used in adaptive calculation of allowed gap between pings
131     private static long adaptiveGapAdjust;
132
133     // time to allow for TCP connect (long)
134     private static String connectTimeout;
135
136     // time to allow before TCP read timeout (long)
137     private static String readTimeout;
138
139     // outgoing per-server thread pool parameters
140     private static int corePoolSize;
141     private static int maximumPoolSize;
142     private static long keepAliveTime;
143
144     // https-related parameters
145     private static boolean useHttps;
146     private static boolean useSelfSignedCertificates;
147
148     // list of remote host names
149     private static String[] hostList = new String[0];
150
151     /*=========================================================*/
152     /* Fields included in every 'ping' message between servers */
153     /*=========================================================*/
154
155     // unique id for this server
156     private UUID uuid;
157
158     // counter periodically incremented to indicate the server is "alive"
159     private int count;
160
161     // 16 byte MD5 checksum over additional data that is NOT included in
162     // every 'ping' message -- used to determine whether the data is up-to-date
163     private byte[] checksum;
164
165     /*========================================================================*/
166     /* The following data is included in the checksum, and doesn't change too */
167     /* frequently (some fields may change as servers go up and down)          */
168     /*========================================================================*/
169
170     // IP address and port of listener
171     private InetSocketAddress socketAddress;
172
173     // site IP address and port
174     private InetSocketAddress siteSocketAddress = null;
175
176     /*============================================*/
177     /* Local information not included in checksum */
178     /*============================================*/
179
180     // destination socket information
181     private InetSocketAddress destSocketAddress;
182     private String destName;
183
184     // REST client fields
185     private HttpClient client;
186     private WebTarget target;
187     private ThreadPoolExecutor sendThreadPool = null;
188
189     // time when the 'count' field was last updated
190     private long lastUpdateTime;
191
192     // calculated field indicating the maximum time between updates
193     private long allowedGap = initialAllowedGap;
194
195     // indicates whether the 'Server' instance is active or not (synchronized)
196     private boolean active = true;
197
198     /*
199      * Tags for encoding of server data
200      */
201     static final int END_OF_PARAMETERS_TAG = 0;
202     static final int SOCKET_ADDRESS_TAG = 1;
203     static final int SITE_SOCKET_ADDRESS_TAG = 2;
204
205     // 'pingHosts' error
206     static final String PINGHOSTS_ERROR = "Server.pingHosts error";
207
208     /*==============================*/
209     /* Comparable<Server> interface */
210     /*==============================*/
211
212     /**
213      * Compare this instance to another one by comparing the 'uuid' field.
214      */
215     @Override
216     public int compareTo(Server other) {
217         return Util.uuidComparator.compare(uuid, other.uuid);
218     }
219
220     /**
221      * This method may be invoked from any thread, and is used as the main
222      * entry point when testing.
223      *
224      * @param args arguments contaning an '=' character are intepreted as
225      *     a property, other arguments are presumed to be a property file.
226      */
227     public static void main(String[] args) throws IOException {
228         Properties prop = new Properties();
229
230         for (String arg : args) {
231             // arguments with an '=' in them are a property definition;
232             // otherwise, they are a properties file name
233
234             if (arg.contains("=")) {
235                 prop.load(new StringReader(arg));
236             } else {
237                 prop.putAll(PropertyUtil.getProperties(arg));
238             }
239         }
240
241         String rval = startup(prop);
242         if (rval != null) {
243             logger.error("Server.startup failed: {}", rval);
244         }
245     }
246
247     /**
248      * This method may be invoked from any thread, and performs initialization.
249      *
250      * @param propertiesFile the name of a property file
251      */
252     public static String startup(String propertiesFile) {
253         Properties properties;
254         try {
255             properties = PropertyUtil.getProperties(propertiesFile);
256         } catch (IOException e) {
257             logger.error("Server.startup: exception reading properties", e);
258             properties = new Properties();
259         }
260         return startup(properties);
261     }
262
263     /**
264      * This method may be invoked from any thread, and performs initialization.
265      *
266      * @param properties contains properties used by the server
267      */
268     public static String startup(Properties properties) {
269         ServerPoolProperties.setProperties(properties);
270         logger.info("startup: properties={}", properties);
271
272         // fetch some static properties
273         initialAllowedGap = getProperty(SERVER_INITIAL_ALLOWED_GAP,
274                                         DEFAULT_SERVER_INITIAL_ALLOWED_GAP);
275         adaptiveGapAdjust = getProperty(SERVER_ADAPTIVE_GAP_ADJUST,
276                                         DEFAULT_SERVER_ADAPTIVE_GAP_ADJUST);
277         connectTimeout =
278             String.valueOf(getProperty(SERVER_CONNECT_TIMEOUT,
279                                        DEFAULT_SERVER_CONNECT_TIMEOUT));
280         readTimeout = String.valueOf(getProperty(SERVER_READ_TIMEOUT,
281                                      DEFAULT_SERVER_READ_TIMEOUT));
282         corePoolSize = getProperty(SERVER_THREADS_CORE_POOL_SIZE,
283                                    DEFAULT_SERVER_THREADS_CORE_POOL_SIZE);
284         maximumPoolSize = getProperty(SERVER_THREADS_MAXIMUM_POOL_SIZE,
285                                       DEFAULT_SERVER_THREADS_MAXIMUM_POOL_SIZE);
286         keepAliveTime = getProperty(SERVER_THREADS_KEEP_ALIVE_TIME,
287                                     DEFAULT_SERVER_THREADS_KEEP_ALIVE_TIME);
288         useHttps = getProperty(SERVER_HTTPS, DEFAULT_HTTPS);
289         useSelfSignedCertificates = getProperty(SERVER_SELF_SIGNED_CERTIFICATES,
290                                                 DEFAULT_SELF_SIGNED_CERTIFICATES);
291         String hostListNames = getProperty(HOST_LIST, null);
292         if (hostListNames != null) {
293             hostList = hostListNames.split(",");
294         }
295
296         String possibleError = null;
297         try {
298             // fetch server information
299             String ipAddressString =
300                 getProperty(SERVER_IP_ADDRESS, DEFAULT_SERVER_IP_ADDRESS);
301             int port = getProperty(SERVER_PORT, DEFAULT_SERVER_PORT);
302
303             possibleError = "Unknown Host: " + ipAddressString;
304             InetAddress address = InetAddress.getByName(ipAddressString);
305             InetSocketAddress socketAddress = new InetSocketAddress(address, port);
306
307             possibleError = "HTTP server initialization error";
308             restServer = HttpServletServerFactoryInstance.getServerFactory().build(
309                          "SERVER-POOL",                              // name
310                          useHttps,                                   // https
311                          socketAddress.getAddress().getHostAddress(),// host (maybe 0.0.0.0)
312                          port,                                       // port (can no longer be 0)
313                          null,                                       // contextPath
314                          false,                                      // swagger
315                          false);                                     // managed
316             restServer.addServletClass(null, RestServerPool.class.getName());
317
318             // add any additional servlets
319             for (ServerPoolApi feature : ServerPoolApi.impl.getList()) {
320                 Collection<Class<?>> classes = feature.servletClasses();
321                 if (classes != null) {
322                     for (Class<?> clazz : classes) {
323                         restServer.addServletClass(null, clazz.getName());
324                     }
325                 }
326             }
327
328             // we may not know the port until after the server is started
329             possibleError = "HTTP server start error";
330             restServer.start();
331             possibleError = null;
332
333             // determine the address to use
334             if (DEFAULT_SERVER_IP_ADDRESS.contentEquals(address.getHostAddress())) {
335                 address = InetAddress.getLocalHost();
336             }
337
338             thisServer = new Server(new InetSocketAddress(address, port));
339
340             // TBD: is this really appropriate?
341             thisServer.newServer();
342
343             // start background thread
344             MainLoop.startThread();
345             MainLoop.queueWork(() -> {
346                 // run this in the 'MainLoop' thread
347                 Leader.startup();
348                 Bucket.startup();
349             });
350             logger.info("Listening on port {}", port);
351
352             return null;
353         } catch (UnknownHostException e) {
354             logger.error("Server.startup: exception start server", e);
355             if (possibleError == null) {
356                 possibleError = e.toString();
357             }
358             return possibleError;
359         }
360     }
361
362     /**
363      * Shut down all threads associate with server pool.
364      */
365     public static void shutdown() {
366         Discovery.stopDiscovery();
367         MainLoop.stopThread();
368         TargetLock.shutdown();
369         Util.shutdown();
370
371         HashSet<Server> allServers = new HashSet<>();
372         allServers.addAll(servers.values());
373         allServers.addAll(failedServers.values());
374
375         for (Server server : allServers) {
376             if (server.sendThreadPool != null) {
377                 server.sendThreadPool.shutdown();
378             }
379         }
380         if (restServer != null) {
381             restServer.shutdown();
382         }
383     }
384
385     /**
386      * Return the Server instance associated with the current host.
387      *
388      * @return the Server instance associated with the current host
389      */
390     public static Server getThisServer() {
391         return thisServer;
392     }
393
394     /**
395      * Return the first Server instance in the 'servers' list.
396      *
397      * @return the first Server instance in the 'servers' list
398      *     (the one with the lowest UUID)
399      */
400     public static Server getFirstServer() {
401         return servers.firstEntry().getValue();
402     }
403
404     /**
405      * Lookup a Server instance associated with a UUID.
406      *
407      * @param uuid the key to the lookup
408      @ @return the associated 'Server' instance, or 'null' if none
409      */
410     public static Server getServer(UUID uuid) {
411         return servers.get(uuid);
412     }
413
414     /**
415      * Return a count of the number of servers.
416      *
417      * @return a count of the number of servers
418      */
419     public static int getServerCount() {
420         return servers.size();
421     }
422
423     /**
424      * Return the complete list of servers.
425      *
426      * @return the complete list of servers
427      */
428     public static Collection<Server> getServers() {
429         return servers.values();
430     }
431
432     /**
433      * This method is invoked from the 'startup' thread, and creates a new
434      * 'Server' instance for the current server.
435      *
436      * @param socketAddress the IP address and port the listener is bound to
437      */
438     private Server(InetSocketAddress socketAddress) {
439         this.uuid = UUID.randomUUID();
440         this.count = 1;
441         this.socketAddress = socketAddress;
442         this.lastUpdateTime = System.currentTimeMillis();
443
444         // site information
445
446         String siteIp = getProperty(SITE_IP_ADDRESS, null);
447         int sitePort = getProperty(SITE_PORT, 0);
448         if (siteIp != null && sitePort != 0) {
449             // we do have site information specified
450             try {
451                 siteSocketAddress = new InetSocketAddress(siteIp, sitePort);
452                 if (siteSocketAddress.getAddress() == null) {
453                     logger.error("Couldn't resolve site address: {}", siteIp);
454                     siteSocketAddress = null;
455                 }
456             } catch (IllegalArgumentException e) {
457                 logger.error("Illegal 'siteSocketAddress'", e);
458                 siteSocketAddress = null;
459             }
460         }
461
462         // TBD: calculate checksum
463     }
464
465     /**
466      * Initialize a 'Server' instance from a 'DataInputStream'. If it is new,
467      * it may get inserted in the table. If it is an update, fields in an
468      * existing 'Server' may be updated.
469      *
470      * @param is the 'DataInputStream'
471      */
472     Server(DataInputStream is) throws IOException {
473         // read in 16 byte UUID
474         uuid = Util.readUuid(is);
475
476         // read in 4 byte counter value
477         count = is.readInt();
478
479         // read in 16 byte MD5 checksum
480         checksum = new byte[16];
481         is.readFully(checksum);
482
483         // optional parameters
484         int tag;
485         while ((tag = is.readUnsignedByte()) != END_OF_PARAMETERS_TAG) {
486             switch (tag) {
487                 case SOCKET_ADDRESS_TAG:
488                     socketAddress = readSocketAddress(is);
489                     break;
490                 case SITE_SOCKET_ADDRESS_TAG:
491                     siteSocketAddress = readSocketAddress(is);
492                     break;
493                 default:
494                     // ignore tag
495                     logger.error("Illegal tag: {}", tag);
496                     break;
497             }
498         }
499     }
500
501     /**
502      * Read an 'InetSocketAddress' from a 'DataInputStream'.
503      *
504      * @param is the 'DataInputStream'
505      * @return the 'InetSocketAddress'
506      */
507     private static InetSocketAddress readSocketAddress(DataInputStream is) throws IOException {
508
509         byte[] ipAddress = new byte[4];
510         is.read(ipAddress, 0, 4);
511         int port = is.readUnsignedShort();
512         return new InetSocketAddress(InetAddress.getByAddress(ipAddress), port);
513     }
514
515     /**
516      * {@inheritDoc}
517      */
518     @Override
519     public String toString() {
520         return "Server[" + uuid + "]";
521     }
522
523     /**
524      * Return the UUID associated with this Server.
525      *
526      * @return the UUID associated with this Server
527      */
528     public UUID getUuid() {
529         return uuid;
530     }
531
532     /**
533      * Return the external InetSocketAddress of the site.
534      *
535      * @return the external InetSocketAddress of the site
536      *     ('null' if it doesn't exist)
537      */
538     public InetSocketAddress getSiteSocketAddress() {
539         return siteSocketAddress;
540     }
541
542     /**
543      * This method may be called from any thread.
544      *
545      * @return 'true' if the this server is active, and 'false' if not
546      */
547     public synchronized boolean isActive() {
548         return active;
549     }
550
551     /**
552      * This method writes out the data associated with the current Server
553      * instance.
554      *
555      * @param os outout stream that should receive the data
556      */
557     void writeServerData(DataOutputStream os) throws IOException {
558         // write out 16 byte UUID
559         Util.writeUuid(os, uuid);
560
561         // write out 4 byte counter value
562         os.writeInt(count);
563
564         // write out 16 byte MD5 checksum
565         // TBD: should this be implemented?
566         os.write(checksum == null ? new byte[16] : checksum);
567
568         if (socketAddress != null) {
569             // write out socket address
570             os.writeByte(SOCKET_ADDRESS_TAG);
571             os.write(socketAddress.getAddress().getAddress(), 0, 4);
572             os.writeShort(socketAddress.getPort());
573         }
574
575         if (siteSocketAddress != null) {
576             // write out socket address
577             os.writeByte(SITE_SOCKET_ADDRESS_TAG);
578             os.write(siteSocketAddress.getAddress().getAddress(), 0, 4);
579             os.writeShort(siteSocketAddress.getPort());
580         }
581
582         os.writeByte(END_OF_PARAMETERS_TAG);
583     }
584
585     /**
586      * Do any processing needed to create a new server. This method is invoked
587      * from the 'MainLoop' thread in every case except for the current server,
588      * in which case it is invoked in 'startup' prior to creating 'MainLoop'.
589      */
590     private void newServer() {
591         Server failed = failedServers.get(uuid);
592         if (failed != null) {
593             // this one is on the failed list -- see if the counter has advanced
594             if ((count - failed.count) <= 0) {
595                 // the counter has not advanced -- ignore
596                 return;
597             }
598
599             // the counter has advanced -- somehow, this server has returned
600             failedServers.remove(uuid);
601             synchronized (this) {
602                 active = true;
603             }
604             logger.error("Server reawakened: {} ({})", uuid, socketAddress);
605         }
606
607         lastUpdateTime = System.currentTimeMillis();
608         servers.put(uuid, this);
609         updatedList.add(this);
610
611         // notify list will need to be rebuilt
612         notifyList = null;
613
614         if (socketAddress != null && this != thisServer) {
615             // initialize 'client' and 'target' fields
616             if (siteSocketAddress != null
617                     && !siteSocketAddress.equals(thisServer.siteSocketAddress)) {
618                 // destination is on a remote site
619                 destSocketAddress = siteSocketAddress;
620             } else {
621                 // destination is on the local site -- use direct addressing
622                 destSocketAddress = socketAddress;
623             }
624             destName = socketAddressToName(destSocketAddress);
625             try {
626                 // 'client' is used for REST messages to the destination
627                 client = buildClient(uuid.toString(), destSocketAddress, destName);
628
629                 // initialize the 'target' field
630                 target = getTarget(client);
631             } catch (KeyManagementException | NoSuchAlgorithmException
632                          | NoSuchFieldException | IllegalAccessException
633                          | ClassNotFoundException | HttpClientConfigException e) {
634                 logger.error("Server.newServer: problems creating 'client'", e);
635             }
636         }
637         logger.info("New server: {} ({})", uuid, socketAddress);
638         for (Events listener : Events.getListeners()) {
639             listener.newServer(this);
640         }
641     }
642
643     /**
644      * Check the server state in response to some issue. At present, only the
645      * 'destName' information is checked.
646      */
647     private void checkServer() {
648         // recalculate 'destName' (we have seen DNS issues)
649         String newDestName = socketAddressToName(destSocketAddress);
650         if (newDestName.equals(destName)) {
651             return;
652         }
653         logger.warn("Remote host name for {} has changed from {} to {}",
654                     destSocketAddress, destName, newDestName);
655
656         // shut down old client, and rebuild
657         client.shutdown();
658         client = null;
659         target = null;
660
661         // update 'destName', and rebuild the client
662         destName = newDestName;
663         try {
664             // 'client' is used for REST messages to the destination
665             client = buildClient(uuid.toString(), destSocketAddress, destName);
666
667             // initialize the 'target' field
668             target = getTarget(client);
669         } catch (KeyManagementException | NoSuchAlgorithmException
670                      | NoSuchFieldException | IllegalAccessException
671                      | ClassNotFoundException | HttpClientConfigException e) {
672             logger.error("Server.checkServer: problems recreating 'client'", e);
673         }
674     }
675
676     /**
677      * Update server data.
678      *
679      * @param serverData this is a temporary 'Server' instance created from
680      *     an incoming message, which is used to update fields within the
681      *     'Server' instance identified by 'this'
682      */
683     private void updateServer(Server serverData) {
684         if (serverData.count > count) {
685             // an update has occurred
686             count = serverData.count;
687
688             // TBD: calculate and verify checksum, more fields may be updated
689
690             // adjust 'allowedGap' accordingly
691             long currentTime = System.currentTimeMillis();
692             long gap = currentTime - lastUpdateTime;
693
694             // adjust 'allowedGap' accordingly
695             // TBD: need properties to support overrides
696             gap = gap * 3 / 2 + adaptiveGapAdjust;
697             if (gap > allowedGap) {
698                 // update 'allowedGap' immediately
699                 allowedGap = gap;
700             } else {
701                 // gradually pull the allowed gap down
702                 // TBD: need properties to support overrides
703                 allowedGap = (allowedGap * 15 + gap) / 16;
704             }
705             lastUpdateTime = currentTime;
706
707             updatedList.add(this);
708         }
709     }
710
711     /**
712      * a server has failed.
713      */
714     private void serverFailed() {
715         // mark as inactive
716         synchronized (this) {
717             active = false;
718         }
719
720         // remove it from the table
721         servers.remove(uuid);
722
723         // add it to the failed servers table
724         failedServers.put(uuid, this);
725
726         // clean up client information
727         if (client != null) {
728             client.shutdown();
729             client = null;
730             target = null;
731         }
732
733         // log an error message
734         logger.error("Server failure: {} ({})", uuid, socketAddress);
735         for (Events listener : Events.getListeners()) {
736             listener.serverFailed(this);
737         }
738     }
739
740     /**
741      * Fetch, and possibily calculate, the "notify list" associated with this
742      * server. This is the list of servers to forward a server and bucket
743      * information to, and is approximately log2(n) in length, where 'n' is
744      * the total number of servers.
745      * It is calculated by starting with all of the servers sorted by UUID --
746      * let's say the current server is at position 's'. The notify list will
747      * contain the server at positions:
748      *     (s + 1) % n
749      *     (s + 2) % n
750      *     (s + 4) % n
751      *          ...
752      * Using all powers of 2 less than 'n'. If the total server count is 50,
753      * this list has 6 entries.
754      * @return the notify list
755      */
756     static Collection<Server> getNotifyList() {
757         // The 'notifyList' value is initially 'null', and it is reset to 'null'
758         // every time a new host joins, or one leaves. That way, it is marked for
759         // recalculation, but only when needed.
760         if (notifyList == null) {
761             // next index we are looking for
762             int dest = 1;
763
764             // our current position in the Server table -- starting at 'thisServer'
765             UUID current = thisServer.uuid;
766
767             // site socket address of 'current'
768             InetSocketAddress thisSiteSocketAddress = thisServer.siteSocketAddress;
769
770             // hash set of all site socket addresses located
771             HashSet<InetSocketAddress> siteSocketAddresses = new HashSet<>();
772             siteSocketAddresses.add(thisSiteSocketAddress);
773
774             // the list we are building
775             notifyList = new LinkedList<Server>();
776
777             int index = 1;
778             for ( ; ; ) {
779                 // move to the next key (UUID) -- if we hit the end of the table,
780                 // wrap to the beginning
781                 current = servers.higherKey(current);
782                 if (current == null) {
783                     current = servers.firstKey();
784                 }
785                 if (current.equals(thisServer.uuid)) {
786                     // we have looped through the entire list
787                     break;
788                 }
789
790                 // fetch associated server & site socket address
791                 Server server = servers.get(current);
792                 InetSocketAddress currentSiteSocketAddress =
793                     server.siteSocketAddress;
794
795                 if (Objects.equals(thisSiteSocketAddress,
796                                    currentSiteSocketAddress)) {
797                     // same site -- see if we should add this one
798                     if (index == dest) {
799                         // this is the next index we are looking for --
800                         // add the server
801                         notifyList.add(server);
802
803                         // advance to the next offset (current-offset * 2)
804                         dest = dest << 1;
805                     }
806                     index += 1;
807                 } else if (!siteSocketAddresses.contains(currentSiteSocketAddress)) {
808                     // we need at least one member from each site
809                     notifyList.add(server);
810                     siteSocketAddresses.add(currentSiteSocketAddress);
811                 }
812             }
813         }
814         return notifyList;
815     }
816
817     /**
818      * See if there is a host name associated with a destination socket address.
819      *
820      * @param dest the socket address of the destination
821      * @return the host name associated with the IP address, or the IP address
822      *     if no associated host name is found.
823      */
824     private static String socketAddressToName(InetSocketAddress dest) {
825         // destination IP address
826         InetAddress inetAddress = dest.getAddress();
827         String destName = null;
828
829         // go through the 'hostList' to see if there is a matching name
830         for (String hostName : hostList) {
831             try {
832                 if (inetAddress.equals(InetAddress.getByName(hostName))) {
833                     // this one matches -- use the name instead of the IP address
834                     destName = hostName;
835                     break;
836                 }
837             } catch (UnknownHostException e) {
838                 logger.debug("Server.socketAddressToName error", e);
839             }
840         }
841
842         // default name = string value of IP address
843         return destName == null ? inetAddress.getHostAddress() : destName;
844     }
845
846     /**
847      * Create an 'HttpClient' instance for a particular host.
848      *
849      * @param name of the host (currently a UUID or host:port string)
850      * @param dest the socket address of the destination
851      * @param destName the string name to use for the destination
852      */
853     static HttpClient buildClient(String name, InetSocketAddress dest, String destName)
854         throws KeyManagementException, NoSuchAlgorithmException,
855         ClassNotFoundException, HttpClientConfigException {
856
857         return HttpClientFactoryInstance.getClientFactory().build(
858             BusTopicParams.builder()
859                 .clientName(name)                               // name
860                 .useHttps(useHttps)                             // https
861                 .allowSelfSignedCerts(useSelfSignedCertificates)// selfSignedCerts
862                 .hostname(destName)                             // host
863                 .port(dest.getPort())                           // port
864                 .managed(false)                                 // managed
865                 .build());
866     }
867
868     /**
869      * Extract the 'WebTarget' information from the 'HttpClient'.
870      *
871      * @param client the associated HttpClient instance
872      * @return a WebTarget referring to the previously-specified 'baseUrl'
873      */
874     static WebTarget getTarget(HttpClient client)
875         throws NoSuchFieldException, IllegalAccessException {
876         // need access to the internal field 'client'
877         // TBD: We need a way to get this information without reflection
878         Field field = client.getClass().getDeclaredField("client");
879         field.setAccessible(true);
880         Client rsClient = (Client)field.get(client);
881         field.setAccessible(false);
882
883         rsClient.property(ClientProperties.CONNECT_TIMEOUT, connectTimeout);
884         rsClient.property(ClientProperties.READ_TIMEOUT, readTimeout);
885
886         // For performance reasons, the root 'WebTarget' is generated only once
887         // at initialization time for each remote host.
888         return rsClient.target(client.getBaseUrl());
889     }
890
891     /**
892      * This method may be invoked from any thread, and is used to send a
893      * message to the destination server associated with this 'Server' instance.
894      *
895      * @param path the path relative to the base URL
896      * @param entity the "request entity" containing the body of the
897      *     HTTP POST request
898      */
899     public void post(final String path, final Entity<?> entity) {
900         post(path, entity, null);
901     }
902
903     /**
904      * This method may be invoked from any thread, and is used to send a
905      * message to the destination server associated with this 'Server' instance.
906      *
907      * @param path the path relative to the base URL
908      * @param entity the "request entity" containing the body of the
909      *     HTTP POST request (if 'null', an HTTP GET is used instead)
910      * @param responseCallback if non-null, this callback may be used to
911      *     modify the WebTarget, and/or receive the POST response message
912      */
913     public void post(final String path, final Entity<?> entity,
914                      PostResponse responseCallback) {
915         if (target == null) {
916             return;
917         }
918
919         getThreadPool().execute(() -> {
920             /**
921              * This method is running within the 'MainLoop' thread.
922              */
923             try {
924                 WebTarget webTarget = target.path(path);
925                 if (responseCallback != null) {
926                     // give callback a chance to modify 'WebTarget'
927                     webTarget = responseCallback.webTarget(webTarget);
928
929                     // send the response to the callback
930                     Response response;
931                     if (entity == null) {
932                         response = webTarget.request().get();
933                     } else {
934                         response = webTarget.request().post(entity);
935                     }
936                     responseCallback.response(response);
937                 } else {
938                     // just do the invoke, and ignore the response
939                     if (entity == null) {
940                         webTarget.request().get();
941                     } else {
942                         webTarget.request().post(entity);
943                     }
944                 }
945             } catch (Exception e) {
946                 logger.error("Failed to send to {} ({}, {})",
947                              uuid, destSocketAddress, destName);
948                 if (responseCallback != null) {
949                     responseCallback.exceptionResponse(e);
950                 }
951                 MainLoop.queueWork(() -> {
952                     // this runs in the 'MainLoop' thread
953
954                     // the DNS cache may have been out-of-date when this server
955                     // was first contacted -- fix the problem, if needed
956                     checkServer();
957                 });
958             }
959         });
960     }
961
962     /**
963      * This method may be invoked from any thread.
964      *
965      * @return the 'ThreadPoolExecutor' associated with this server
966      */
967     public synchronized ThreadPoolExecutor getThreadPool() {
968         if (sendThreadPool == null) {
969             // build a thread pool for this Server
970             sendThreadPool =
971                 new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
972                                        keepAliveTime, TimeUnit.MILLISECONDS,
973                                        new LinkedTransferQueue<Runnable>());
974             sendThreadPool.allowCoreThreadTimeOut(true);
975         }
976         return sendThreadPool;
977     }
978
979     /**
980      * Lower-level method supporting HTTP, which requires that the caller's
981      * thread tolerate blocking. This method may be called from any thread.
982      *
983      * @param path the path relative to the base URL
984      * @return a 'WebTarget' instance pointing to this path
985      */
986     public WebTarget getWebTarget(String path) {
987         return target == null ? null : target.path(path);
988     }
989
990     /**
991      * This method may be invoked from any thread, but its real intent is
992      * to decode an incoming 'admin' message (which is Base-64-encoded),
993      * and send it to the 'MainLoop' thread for processing.
994      *
995      * @param data the base-64-encoded data
996      */
997     static void adminRequest(byte[] data) {
998         final byte[] packet = Base64.getDecoder().decode(data);
999         Runnable task = () -> {
1000             try {
1001                 ByteArrayInputStream bis = new ByteArrayInputStream(packet);
1002                 DataInputStream dis = new DataInputStream(bis);
1003
1004                 while (dis.available() != 0) {
1005                     Server serverData = new Server(dis);
1006
1007                     // TBD: Compare with current server
1008
1009                     Server server = servers.get(serverData.uuid);
1010                     if (server == null) {
1011                         serverData.newServer();
1012                     } else {
1013                         server.updateServer(serverData);
1014                     }
1015                 }
1016             } catch (Exception e) {
1017                 logger.error("Server.adminRequest: can't decode packet", e);
1018             }
1019         };
1020         MainLoop.queueWork(task);
1021     }
1022
1023     /**
1024      * Send out information about servers 'updatedList' to all servers
1025      * in 'notifyList' (may need to build or rebuild 'notifyList').
1026      */
1027     static void sendOutData() throws IOException {
1028         // include 'thisServer' in the data -- first, advance the count
1029         thisServer.count += 1;
1030         if (thisServer.count == 0) {
1031             /*
1032              * counter wrapped (0 is a special case) --
1033              * actually, we could probably leave this out, because it would take
1034              * more than a century to wrap if the increment is 1 second
1035              */
1036             thisServer.count = 1;
1037         }
1038
1039         ByteArrayOutputStream bos = new ByteArrayOutputStream();
1040         DataOutputStream dos = new DataOutputStream(bos);
1041
1042         thisServer.lastUpdateTime = System.currentTimeMillis();
1043         thisServer.writeServerData(dos);
1044
1045         // include all hosts in the updated list
1046         for (Server server : updatedList) {
1047             server.writeServerData(dos);
1048         }
1049         updatedList.clear();
1050
1051         // create an 'Entity' that can be sent out to all hosts in the notify list
1052         Entity<String> entity = Entity.entity(
1053             new String(Base64.getEncoder().encode(bos.toByteArray()), StandardCharsets.UTF_8),
1054             MediaType.APPLICATION_OCTET_STREAM_TYPE);
1055         for (Server server : getNotifyList()) {
1056             server.post("admin", entity);
1057         }
1058     }
1059
1060     /**
1061      * Search for servers which have taken too long to respond.
1062      */
1063     static void searchForFailedServers() {
1064         long currentTime = System.currentTimeMillis();
1065
1066         // used to build a list of newly-failed servers
1067         LinkedList<Server> failed = new LinkedList<>();
1068         for (Server server : servers.values()) {
1069             if (server == thisServer) {
1070                 continue;
1071             }
1072             long gap = currentTime - server.lastUpdateTime;
1073             if (gap > server.allowedGap) {
1074                 // add it to the failed list -- we don't call 'serverFailed' yet,
1075                 // because this updates the server list, and leads to a
1076                 // 'ConcurrentModificationException'
1077                 failed.add(server);
1078             }
1079         }
1080
1081         // remove servers from our list
1082         if (!failed.isEmpty()) {
1083             for (Server server : failed) {
1084                 server.serverFailed();
1085             }
1086             notifyList = null;
1087         }
1088     }
1089
1090     /**
1091      * This method may be invoked from any thread:
1092      * Send information about 'thisServer' to the list of hosts.
1093      *
1094      * @param out the 'PrintStream' to use for displaying information
1095      * @param hosts a comma-separated list of entries containing
1096      *     'host:port' or just 'port' (current host is implied in this case)
1097      */
1098     static void pingHosts(PrintStream out, String hosts) {
1099         LinkedList<InetSocketAddress> addresses = new LinkedList<>();
1100         boolean error = false;
1101
1102         for (String host : hosts.split(",")) {
1103             try {
1104                 String[] segs = host.split(":");
1105
1106                 switch (segs.length) {
1107                     case 1:
1108                         addresses.add(new InetSocketAddress(InetAddress.getLocalHost(),
1109                                 Integer.parseInt(segs[0])));
1110                         break;
1111                     case 2:
1112                         addresses.add(new InetSocketAddress(segs[0],
1113                                 Integer.parseInt(segs[1])));
1114                         break;
1115                     default:
1116                         out.println(host + ": Invalid host/port value");
1117                         error = true;
1118                         break;
1119                 }
1120             } catch (NumberFormatException e) {
1121                 out.println(host + ": Invalid port value");
1122                 logger.error(PINGHOSTS_ERROR, e);
1123                 error = true;
1124             } catch (UnknownHostException e) {
1125                 out.println(host + ": Unknown host");
1126                 logger.error(PINGHOSTS_ERROR, e);
1127                 error = true;
1128             }
1129         }
1130         if (!error) {
1131             pingHosts(out, addresses);
1132         }
1133     }
1134
1135     /**
1136      * This method may be invoked from any thread:
1137      * Send information about 'thisServer' to the list of hosts.
1138      *
1139      * @param out the 'PrintStream' to use for displaying information
1140      * @param hosts a collection of 'InetSocketAddress' instances, which are
1141      *     the hosts to send the information to
1142      */
1143     static void pingHosts(final PrintStream out,
1144                           final Collection<InetSocketAddress> hosts) {
1145         FutureTask<Integer> ft = new FutureTask<>(() -> {
1146             ByteArrayOutputStream bos = new ByteArrayOutputStream();
1147             DataOutputStream dos = new DataOutputStream(bos);
1148
1149             // add information for this server only
1150             try {
1151                 thisServer.writeServerData(dos);
1152
1153                 // create an 'Entity' that can be sent out to all hosts
1154                 Entity<String> entity = Entity.entity(
1155                     new String(Base64.getEncoder().encode(bos.toByteArray()),
1156                         StandardCharsets.UTF_8),
1157                     MediaType.APPLICATION_OCTET_STREAM_TYPE);
1158
1159                 // loop through hosts
1160                 for (InetSocketAddress host : hosts) {
1161                     HttpClient httpClient = null;
1162
1163                     try {
1164                         httpClient = buildClient(host.toString(), host,
1165                                                  socketAddressToName(host));
1166                         getTarget(httpClient).path("admin").request().post(entity);
1167                         httpClient.shutdown();
1168                         httpClient = null;
1169                     } catch (KeyManagementException | NoSuchAlgorithmException e) {
1170                         out.println(host + ": Unable to create client connection");
1171                         logger.error(PINGHOSTS_ERROR, e);
1172                     } catch (NoSuchFieldException | IllegalAccessException e) {
1173                         out.println(host + ": Unable to get link to target");
1174                         logger.error(PINGHOSTS_ERROR, e);
1175                     } catch (Exception e) {
1176                         out.println(host + ": " + e);
1177                         logger.error(PINGHOSTS_ERROR, e);
1178                     }
1179                     if (httpClient != null) {
1180                         httpClient.shutdown();
1181                     }
1182                 }
1183             } catch (IOException e) {
1184                 out.println("Unable to generate 'ping' data: " + e);
1185                 logger.error(PINGHOSTS_ERROR, e);
1186             }
1187             return 0;
1188         });
1189
1190         MainLoop.queueWork(ft);
1191         try {
1192             ft.get(60, TimeUnit.SECONDS);
1193         } catch (InterruptedException e) {
1194             logger.error("Server.pingHosts: interrupted waiting for queued work", e);
1195             Thread.currentThread().interrupt();
1196         } catch (ExecutionException | TimeoutException e) {
1197             logger.error("Server.pingHosts: error waiting for queued work", e);
1198         }
1199     }
1200
1201     /**
1202      * This method may be invoked from any thread:
1203      * Dump out the current 'servers' table in a human-readable table form.
1204      *
1205      * @param out the 'PrintStream' to dump the table to
1206      */
1207     public static void dumpHosts(final PrintStream out) {
1208         FutureTask<Integer> ft = new FutureTask<>(() -> {
1209             dumpHostsInternal(out);
1210             return 0;
1211         });
1212         MainLoop.queueWork(ft);
1213         try {
1214             ft.get(60, TimeUnit.SECONDS);
1215         } catch (InterruptedException e) {
1216             logger.error("Server.dumpHosts: interrupted waiting for queued work", e);
1217             Thread.currentThread().interrupt();
1218         } catch (ExecutionException | TimeoutException e) {
1219             logger.error("Server.dumpHosts: error waiting for queued work", e);
1220         }
1221     }
1222
1223     /**
1224      * Dump out the current 'servers' table in a human-readable table form.
1225      *
1226      * @param out the 'PrintStream' to dump the table to
1227      */
1228     private static void dumpHostsInternal(PrintStream out) {
1229         // modifications to 'servers.values()' and 'notifyList'.
1230         HashSet<Server> localNotifyList = new HashSet<>(getNotifyList());
1231
1232         // see if we have any site information
1233         boolean siteData = false;
1234         for (Server server : servers.values()) {
1235             if (server.siteSocketAddress != null) {
1236                 siteData = true;
1237                 break;
1238             }
1239         }
1240
1241         String format = "%1s %-36s %-15s %5s %5s %12s %7s %7s\n";
1242         SimpleDateFormat dateFormat = new SimpleDateFormat("kk:mm:ss.SSS");
1243
1244         if (siteData) {
1245             format = "%1s %-36s %-15s %5s %-15s %5s %5s %12s %7s %7s\n";
1246             // @formatter:off
1247             out.printf(format, "", "UUID", "IP Address", "Port",
1248                        "Site IP Address", "Port",
1249                        "Count", "Update Time", "Elapsed", "Allowed");
1250             out.printf(format, "", "----", "----------", "----",
1251                        "---------------", "----",
1252                        "-----", "-----------", "-------", "-------");
1253             // @formatter:on
1254         } else {
1255             // @formatter:off
1256             out.printf(format, "", "UUID", "IP Address", "Port",
1257                        "Count", "Update Time", "Elapsed", "Allowed");
1258             out.printf(format, "", "----", "----------", "----",
1259                        "-----", "-----------", "-------", "-------");
1260             // @formatter:on
1261         }
1262
1263         long currentTime = System.currentTimeMillis();
1264         for (Server server : servers.values()) {
1265             String thisOne = "";
1266
1267             if (server == thisServer) {
1268                 thisOne = "*";
1269             } else if (localNotifyList.contains(server)) {
1270                 thisOne = "n";
1271             }
1272
1273             if (siteData) {
1274                 String siteIp = "";
1275                 String sitePort = "";
1276                 if (server.siteSocketAddress != null) {
1277                     siteIp =
1278                         server.siteSocketAddress.getAddress().getHostAddress();
1279                     sitePort = String.valueOf(server.siteSocketAddress.getPort());
1280                 }
1281
1282                 out.printf(format, thisOne, server.uuid,
1283                            server.socketAddress.getAddress().getHostAddress(),
1284                            server.socketAddress.getPort(),
1285                            siteIp, sitePort, server.count,
1286                            dateFormat.format(new Date(server.lastUpdateTime)),
1287                            currentTime - server.lastUpdateTime,
1288                            server.allowedGap);
1289             } else {
1290                 out.printf(format, thisOne, server.uuid,
1291                            server.socketAddress.getAddress().getHostAddress(),
1292                            server.socketAddress.getPort(), server.count,
1293                            dateFormat.format(new Date(server.lastUpdateTime)),
1294                            currentTime - server.lastUpdateTime,
1295                            server.allowedGap);
1296             }
1297         }
1298         out.println("Count: " + servers.size());
1299     }
1300
1301     /* ============================================================ */
1302
1303     /**
1304      * This interface supports the 'post' method, and provides the opportunity
1305      * to change the WebTarget and/or receive the POST response message.
1306      */
1307     interface PostResponse {
1308         /**
1309          * Callback that can be used to modify 'WebTarget', and do things like
1310          * add query parameters.
1311          *
1312          * @param webTarget the current WebTarget
1313          * @return the updated WebTarget
1314          */
1315         public default WebTarget webTarget(WebTarget webTarget) {
1316             return webTarget;
1317         }
1318
1319         /**
1320          * Callback that passes the POST response.
1321          *
1322          * @param response the POST response
1323          */
1324         public default void response(Response response) {
1325         }
1326
1327         /**
1328          * Callback that passes the POST exception response.
1329          *
1330          */
1331         public default void exceptionResponse(Exception exception) {
1332             Response.ResponseBuilder response;
1333             response = Response.serverError();
1334             this.response(response.build());
1335         }
1336     }
1337 }