need Override equals and hashCode method
[policy/drools-pdp.git] / feature-server-pool / src / main / java / org / onap / policy / drools / serverpool / Server.java
1 /*
2  * ============LICENSE_START=======================================================
3  * feature-server-pool
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.serverpool;
22
23 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_HTTPS;
24 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SELF_SIGNED_CERTIFICATES;
25 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_ADAPTIVE_GAP_ADJUST;
26 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_CONNECT_TIMEOUT;
27 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_INITIAL_ALLOWED_GAP;
28 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_IP_ADDRESS;
29 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_PORT;
30 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_READ_TIMEOUT;
31 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_THREADS_CORE_POOL_SIZE;
32 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_THREADS_KEEP_ALIVE_TIME;
33 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_THREADS_MAXIMUM_POOL_SIZE;
34 import static org.onap.policy.drools.serverpool.ServerPoolProperties.HOST_LIST;
35 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_ADAPTIVE_GAP_ADJUST;
36 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_CONNECT_TIMEOUT;
37 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_HTTPS;
38 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_INITIAL_ALLOWED_GAP;
39 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_IP_ADDRESS;
40 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_PORT;
41 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_READ_TIMEOUT;
42 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_SELF_SIGNED_CERTIFICATES;
43 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_THREADS_CORE_POOL_SIZE;
44 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_THREADS_KEEP_ALIVE_TIME;
45 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_THREADS_MAXIMUM_POOL_SIZE;
46 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SITE_IP_ADDRESS;
47 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SITE_PORT;
48 import static org.onap.policy.drools.serverpool.ServerPoolProperties.getProperty;
49
50 import java.io.ByteArrayInputStream;
51 import java.io.ByteArrayOutputStream;
52 import java.io.DataInputStream;
53 import java.io.DataOutputStream;
54 import java.io.IOException;
55 import java.io.PrintStream;
56 import java.io.StringReader;
57 import java.lang.reflect.Field;
58 import java.net.InetAddress;
59 import java.net.InetSocketAddress;
60 import java.net.UnknownHostException;
61 import java.nio.charset.StandardCharsets;
62 import java.security.KeyManagementException;
63 import java.security.NoSuchAlgorithmException;
64 import java.text.SimpleDateFormat;
65 import java.util.Base64;
66 import java.util.Collection;
67 import java.util.Date;
68 import java.util.HashSet;
69 import java.util.LinkedList;
70 import java.util.Objects;
71 import java.util.Properties;
72 import java.util.TreeMap;
73 import java.util.TreeSet;
74 import java.util.UUID;
75 import java.util.concurrent.ExecutionException;
76 import java.util.concurrent.FutureTask;
77 import java.util.concurrent.LinkedTransferQueue;
78 import java.util.concurrent.ThreadPoolExecutor;
79 import java.util.concurrent.TimeUnit;
80 import java.util.concurrent.TimeoutException;
81 import javax.ws.rs.client.Client;
82 import javax.ws.rs.client.Entity;
83 import javax.ws.rs.client.WebTarget;
84 import javax.ws.rs.core.MediaType;
85 import javax.ws.rs.core.Response;
86 import org.glassfish.jersey.client.ClientProperties;
87 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
88 import org.onap.policy.common.endpoints.http.client.HttpClient;
89 import org.onap.policy.common.endpoints.http.client.HttpClientConfigException;
90 import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
91 import org.onap.policy.common.endpoints.http.server.HttpServletServer;
92 import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
93 import org.onap.policy.drools.utils.PropertyUtil;
94 import org.slf4j.Logger;
95 import org.slf4j.LoggerFactory;
96
97 public class Server implements Comparable<Server> {
98     private static Logger logger = LoggerFactory.getLogger(Server.class);
99
100     // maps UUID to Server object for all known servers
101     private static TreeMap<UUID, Server> servers =
102         new TreeMap<>(Util.uuidComparator);
103
104     // maps UUID to Server object for all failed servers
105     // (so they aren't accidentally restored, due to updates from other hosts)
106     private static TreeMap<UUID, Server> failedServers =
107         new TreeMap<>(Util.uuidComparator);
108
109     // subset of servers to be notified (null means it needs to be rebuilt)
110     private static LinkedList<Server> notifyList = null;
111
112     // data to be sent out to notify list
113     private static TreeSet<Server> updatedList = new TreeSet<>();
114
115     // the server associated with the current host
116     private static Server thisServer = null;
117
118     // the current REST server
119     private static HttpServletServer restServer;
120
121     /*==================================================*/
122     /* Some properties extracted at initialization time */
123     /*==================================================*/
124
125     // initial value of gap to allow between pings
126     private static long initialAllowedGap;
127
128     // used in adaptive calculation of allowed gap between pings
129     private static long adaptiveGapAdjust;
130
131     // time to allow for TCP connect (long)
132     private static String connectTimeout;
133
134     // time to allow before TCP read timeout (long)
135     private static String readTimeout;
136
137     // outgoing per-server thread pool parameters
138     private static int corePoolSize;
139     private static int maximumPoolSize;
140     private static long keepAliveTime;
141
142     // https-related parameters
143     private static boolean useHttps;
144     private static boolean useSelfSignedCertificates;
145
146     // list of remote host names
147     private static String[] hostList = new String[0];
148
149     /*=========================================================*/
150     /* Fields included in every 'ping' message between servers */
151     /*=========================================================*/
152
153     // unique id for this server
154     private UUID uuid;
155
156     // counter periodically incremented to indicate the server is "alive"
157     private int count;
158
159     // 16 byte MD5 checksum over additional data that is NOT included in
160     // every 'ping' message -- used to determine whether the data is up-to-date
161     private byte[] checksum;
162
163     /*========================================================================*/
164     /* The following data is included in the checksum, and doesn't change too */
165     /* frequently (some fields may change as servers go up and down)          */
166     /*========================================================================*/
167
168     // IP address and port of listener
169     private InetSocketAddress socketAddress;
170
171     // site IP address and port
172     private InetSocketAddress siteSocketAddress = null;
173
174     /*============================================*/
175     /* Local information not included in checksum */
176     /*============================================*/
177
178     // destination socket information
179     private InetSocketAddress destSocketAddress;
180     private String destName;
181
182     // REST client fields
183     private HttpClient client;
184     private WebTarget target;
185     private ThreadPoolExecutor sendThreadPool = null;
186
187     // time when the 'count' field was last updated
188     private long lastUpdateTime;
189
190     // calculated field indicating the maximum time between updates
191     private long allowedGap = initialAllowedGap;
192
193     // indicates whether the 'Server' instance is active or not (synchronized)
194     private boolean active = true;
195
196     /*
197      * Tags for encoding of server data
198      */
199     static final int END_OF_PARAMETERS_TAG = 0;
200     static final int SOCKET_ADDRESS_TAG = 1;
201     static final int SITE_SOCKET_ADDRESS_TAG = 2;
202
203     // 'pingHosts' error
204     static final String PINGHOSTS_ERROR = "Server.pingHosts error";
205
206     /*==============================*/
207     /* Comparable<Server> interface */
208     /*==============================*/
209
210     /**
211      * Compare this instance to another one by comparing the 'uuid' field.
212      */
213     @Override
214     public int compareTo(Server other) {
215         return Util.uuidComparator.compare(uuid, other.uuid);
216     }
217
218     @Override
219     public int hashCode() {
220         return Objects.hash(uuid);
221     }
222
223     @Override
224     public boolean equals(Object obj) {
225         if (this == obj) {
226             return true;
227         }
228         if (!(obj instanceof Server)) {
229             return false;
230         }
231         Server other = (Server) obj;
232         return Objects.equals(uuid, other.uuid);
233     }
234
235     /**
236      * This method may be invoked from any thread, and is used as the main
237      * entry point when testing.
238      *
239      * @param args arguments contaning an '=' character are intepreted as
240      *     a property, other arguments are presumed to be a property file.
241      */
242     public static void main(String[] args) throws IOException {
243         Properties prop = new Properties();
244
245         for (String arg : args) {
246             // arguments with an '=' in them are a property definition;
247             // otherwise, they are a properties file name
248
249             if (arg.contains("=")) {
250                 prop.load(new StringReader(arg));
251             } else {
252                 prop.putAll(PropertyUtil.getProperties(arg));
253             }
254         }
255
256         String rval = startup(prop);
257         if (rval != null) {
258             logger.error("Server.startup failed: {}", rval);
259         }
260     }
261
262     /**
263      * This method may be invoked from any thread, and performs initialization.
264      *
265      * @param propertiesFile the name of a property file
266      */
267     public static String startup(String propertiesFile) {
268         Properties properties;
269         try {
270             properties = PropertyUtil.getProperties(propertiesFile);
271         } catch (IOException e) {
272             logger.error("Server.startup: exception reading properties", e);
273             properties = new Properties();
274         }
275         return startup(properties);
276     }
277
278     /**
279      * This method may be invoked from any thread, and performs initialization.
280      *
281      * @param properties contains properties used by the server
282      */
283     public static String startup(Properties properties) {
284         ServerPoolProperties.setProperties(properties);
285         logger.info("startup: properties={}", properties);
286
287         // fetch some static properties
288         initialAllowedGap = getProperty(SERVER_INITIAL_ALLOWED_GAP,
289                                         DEFAULT_SERVER_INITIAL_ALLOWED_GAP);
290         adaptiveGapAdjust = getProperty(SERVER_ADAPTIVE_GAP_ADJUST,
291                                         DEFAULT_SERVER_ADAPTIVE_GAP_ADJUST);
292         connectTimeout =
293             String.valueOf(getProperty(SERVER_CONNECT_TIMEOUT,
294                                        DEFAULT_SERVER_CONNECT_TIMEOUT));
295         readTimeout = String.valueOf(getProperty(SERVER_READ_TIMEOUT,
296                                      DEFAULT_SERVER_READ_TIMEOUT));
297         corePoolSize = getProperty(SERVER_THREADS_CORE_POOL_SIZE,
298                                    DEFAULT_SERVER_THREADS_CORE_POOL_SIZE);
299         maximumPoolSize = getProperty(SERVER_THREADS_MAXIMUM_POOL_SIZE,
300                                       DEFAULT_SERVER_THREADS_MAXIMUM_POOL_SIZE);
301         keepAliveTime = getProperty(SERVER_THREADS_KEEP_ALIVE_TIME,
302                                     DEFAULT_SERVER_THREADS_KEEP_ALIVE_TIME);
303         useHttps = getProperty(SERVER_HTTPS, DEFAULT_HTTPS);
304         useSelfSignedCertificates = getProperty(SERVER_SELF_SIGNED_CERTIFICATES,
305                                                 DEFAULT_SELF_SIGNED_CERTIFICATES);
306         String hostListNames = getProperty(HOST_LIST, null);
307         if (hostListNames != null) {
308             hostList = hostListNames.split(",");
309         }
310
311         String possibleError = null;
312         try {
313             // fetch server information
314             String ipAddressString =
315                 getProperty(SERVER_IP_ADDRESS, DEFAULT_SERVER_IP_ADDRESS);
316             int port = getProperty(SERVER_PORT, DEFAULT_SERVER_PORT);
317
318             possibleError = "Unknown Host: " + ipAddressString;
319             InetAddress address = InetAddress.getByName(ipAddressString);
320             InetSocketAddress socketAddress = new InetSocketAddress(address, port);
321
322             possibleError = "HTTP server initialization error";
323             restServer = HttpServletServerFactoryInstance.getServerFactory().build(
324                          "SERVER-POOL",                                 // name
325                          useHttps,                                      // https
326                          socketAddress.getAddress().getHostAddress(),   // host (maybe 0.0.0.0)
327                          port,                                          // port (can no longer be 0)
328                          null,                                          // contextPath
329                          false,                                         // swagger
330                          false);                                        // managed
331             restServer.addServletClass(null, RestServerPool.class.getName());
332
333             // add any additional servlets
334             for (ServerPoolApi feature : ServerPoolApi.impl.getList()) {
335                 Collection<Class<?>> classes = feature.servletClasses();
336                 if (classes != null) {
337                     for (Class<?> clazz : classes) {
338                         restServer.addServletClass(null, clazz.getName());
339                     }
340                 }
341             }
342
343             // we may not know the port until after the server is started
344             possibleError = "HTTP server start error";
345             restServer.start();
346             possibleError = null;
347
348             // determine the address to use
349             if (DEFAULT_SERVER_IP_ADDRESS.contentEquals(address.getHostAddress())) {
350                 address = InetAddress.getLocalHost();
351             }
352
353             thisServer = new Server(new InetSocketAddress(address, port));
354
355             // TBD: is this really appropriate?
356             thisServer.newServer();
357
358             // start background thread
359             MainLoop.startThread();
360             MainLoop.queueWork(() -> {
361                 // run this in the 'MainLoop' thread
362                 Leader.startup();
363                 Bucket.startup();
364             });
365             logger.info("Listening on port {}", port);
366
367             return null;
368         } catch (UnknownHostException e) {
369             logger.error("Server.startup: exception start server", e);
370             if (possibleError == null) {
371                 possibleError = e.toString();
372             }
373             return possibleError;
374         }
375     }
376
377     /**
378      * Shut down all threads associate with server pool.
379      */
380     public static void shutdown() {
381         Discovery.stopDiscovery();
382         MainLoop.stopThread();
383         TargetLock.shutdown();
384         Util.shutdown();
385
386         HashSet<Server> allServers = new HashSet<>();
387         allServers.addAll(servers.values());
388         allServers.addAll(failedServers.values());
389
390         for (Server server : allServers) {
391             if (server.sendThreadPool != null) {
392                 server.sendThreadPool.shutdown();
393             }
394         }
395         if (restServer != null) {
396             restServer.shutdown();
397         }
398     }
399
400     /**
401      * Return the Server instance associated with the current host.
402      *
403      * @return the Server instance associated with the current host
404      */
405     public static Server getThisServer() {
406         return thisServer;
407     }
408
409     /**
410      * Return the first Server instance in the 'servers' list.
411      *
412      * @return the first Server instance in the 'servers' list
413      *     (the one with the lowest UUID)
414      */
415     public static Server getFirstServer() {
416         return servers.firstEntry().getValue();
417     }
418
419     /**
420      * Lookup a Server instance associated with a UUID.
421      *
422      * @param uuid the key to the lookup
423      @ @return the associated 'Server' instance, or 'null' if none
424      */
425     public static Server getServer(UUID uuid) {
426         return servers.get(uuid);
427     }
428
429     /**
430      * Return a count of the number of servers.
431      *
432      * @return a count of the number of servers
433      */
434     public static int getServerCount() {
435         return servers.size();
436     }
437
438     /**
439      * Return the complete list of servers.
440      *
441      * @return the complete list of servers
442      */
443     public static Collection<Server> getServers() {
444         return servers.values();
445     }
446
447     /**
448      * This method is invoked from the 'startup' thread, and creates a new
449      * 'Server' instance for the current server.
450      *
451      * @param socketAddress the IP address and port the listener is bound to
452      */
453     private Server(InetSocketAddress socketAddress) {
454         this.uuid = UUID.randomUUID();
455         this.count = 1;
456         this.socketAddress = socketAddress;
457         this.lastUpdateTime = System.currentTimeMillis();
458
459         // site information
460
461         String siteIp = getProperty(SITE_IP_ADDRESS, null);
462         int sitePort = getProperty(SITE_PORT, 0);
463         if (siteIp != null && sitePort != 0) {
464             // we do have site information specified
465             try {
466                 siteSocketAddress = new InetSocketAddress(siteIp, sitePort);
467                 if (siteSocketAddress.getAddress() == null) {
468                     logger.error("Couldn't resolve site address: {}", siteIp);
469                     siteSocketAddress = null;
470                 }
471             } catch (IllegalArgumentException e) {
472                 logger.error("Illegal 'siteSocketAddress'", e);
473                 siteSocketAddress = null;
474             }
475         }
476
477         // TBD: calculate checksum
478     }
479
480     /**
481      * Initialize a 'Server' instance from a 'DataInputStream'. If it is new,
482      * it may get inserted in the table. If it is an update, fields in an
483      * existing 'Server' may be updated.
484      *
485      * @param is the 'DataInputStream'
486      */
487     Server(DataInputStream is) throws IOException {
488         // read in 16 byte UUID
489         uuid = Util.readUuid(is);
490
491         // read in 4 byte counter value
492         count = is.readInt();
493
494         // read in 16 byte MD5 checksum
495         checksum = new byte[16];
496         is.readFully(checksum);
497
498         // optional parameters
499         int tag;
500         while ((tag = is.readUnsignedByte()) != END_OF_PARAMETERS_TAG) {
501             switch (tag) {
502                 case SOCKET_ADDRESS_TAG:
503                     socketAddress = readSocketAddress(is);
504                     break;
505                 case SITE_SOCKET_ADDRESS_TAG:
506                     siteSocketAddress = readSocketAddress(is);
507                     break;
508                 default:
509                     // ignore tag
510                     logger.error("Illegal tag: {}", tag);
511                     break;
512             }
513         }
514     }
515
516     /**
517      * Read an 'InetSocketAddress' from a 'DataInputStream'.
518      *
519      * @param is the 'DataInputStream'
520      * @return the 'InetSocketAddress'
521      */
522     private static InetSocketAddress readSocketAddress(DataInputStream is) throws IOException {
523
524         byte[] ipAddress = new byte[4];
525         is.read(ipAddress, 0, 4);
526         int port = is.readUnsignedShort();
527         return new InetSocketAddress(InetAddress.getByAddress(ipAddress), port);
528     }
529
530     /**
531      * {@inheritDoc}
532      */
533     @Override
534     public String toString() {
535         return "Server[" + uuid + "]";
536     }
537
538     /**
539      * Return the UUID associated with this Server.
540      *
541      * @return the UUID associated with this Server
542      */
543     public UUID getUuid() {
544         return uuid;
545     }
546
547     /**
548      * Return the external InetSocketAddress of the site.
549      *
550      * @return the external InetSocketAddress of the site
551      *     ('null' if it doesn't exist)
552      */
553     public InetSocketAddress getSiteSocketAddress() {
554         return siteSocketAddress;
555     }
556
557     /**
558      * This method may be called from any thread.
559      *
560      * @return 'true' if the this server is active, and 'false' if not
561      */
562     public synchronized boolean isActive() {
563         return active;
564     }
565
566     /**
567      * This method writes out the data associated with the current Server
568      * instance.
569      *
570      * @param os outout stream that should receive the data
571      */
572     void writeServerData(DataOutputStream os) throws IOException {
573         // write out 16 byte UUID
574         Util.writeUuid(os, uuid);
575
576         // write out 4 byte counter value
577         os.writeInt(count);
578
579         // write out 16 byte MD5 checksum
580         // TBD: should this be implemented?
581         os.write(checksum == null ? new byte[16] : checksum);
582
583         if (socketAddress != null) {
584             // write out socket address
585             os.writeByte(SOCKET_ADDRESS_TAG);
586             os.write(socketAddress.getAddress().getAddress(), 0, 4);
587             os.writeShort(socketAddress.getPort());
588         }
589
590         if (siteSocketAddress != null) {
591             // write out socket address
592             os.writeByte(SITE_SOCKET_ADDRESS_TAG);
593             os.write(siteSocketAddress.getAddress().getAddress(), 0, 4);
594             os.writeShort(siteSocketAddress.getPort());
595         }
596
597         os.writeByte(END_OF_PARAMETERS_TAG);
598     }
599
600     /**
601      * Do any processing needed to create a new server. This method is invoked
602      * from the 'MainLoop' thread in every case except for the current server,
603      * in which case it is invoked in 'startup' prior to creating 'MainLoop'.
604      */
605     private void newServer() {
606         Server failed = failedServers.get(uuid);
607         if (failed != null) {
608             // this one is on the failed list -- see if the counter has advanced
609             if ((count - failed.count) <= 0) {
610                 // the counter has not advanced -- ignore
611                 return;
612             }
613
614             // the counter has advanced -- somehow, this server has returned
615             failedServers.remove(uuid);
616             synchronized (this) {
617                 active = true;
618             }
619             logger.error("Server reawakened: {} ({})", uuid, socketAddress);
620         }
621
622         lastUpdateTime = System.currentTimeMillis();
623         servers.put(uuid, this);
624         updatedList.add(this);
625
626         // notify list will need to be rebuilt
627         notifyList = null;
628
629         if (socketAddress != null && this != thisServer) {
630             // initialize 'client' and 'target' fields
631             if (siteSocketAddress != null
632                     && !siteSocketAddress.equals(thisServer.siteSocketAddress)) {
633                 // destination is on a remote site
634                 destSocketAddress = siteSocketAddress;
635             } else {
636                 // destination is on the local site -- use direct addressing
637                 destSocketAddress = socketAddress;
638             }
639             destName = socketAddressToName(destSocketAddress);
640             try {
641                 // 'client' is used for REST messages to the destination
642                 client = buildClient(uuid.toString(), destSocketAddress, destName);
643
644                 // initialize the 'target' field
645                 target = getTarget(client);
646             } catch (KeyManagementException | NoSuchAlgorithmException
647                          | NoSuchFieldException | IllegalAccessException
648                          | ClassNotFoundException | HttpClientConfigException e) {
649                 logger.error("Server.newServer: problems creating 'client'", e);
650             }
651         }
652         logger.info("New server: {} ({})", uuid, socketAddress);
653         for (Events listener : Events.getListeners()) {
654             listener.newServer(this);
655         }
656     }
657
658     /**
659      * Check the server state in response to some issue. At present, only the
660      * 'destName' information is checked.
661      */
662     private void checkServer() {
663         // recalculate 'destName' (we have seen DNS issues)
664         String newDestName = socketAddressToName(destSocketAddress);
665         if (newDestName.equals(destName)) {
666             return;
667         }
668         logger.warn("Remote host name for {} has changed from {} to {}",
669                     destSocketAddress, destName, newDestName);
670
671         // shut down old client, and rebuild
672         client.shutdown();
673         client = null;
674         target = null;
675
676         // update 'destName', and rebuild the client
677         destName = newDestName;
678         try {
679             // 'client' is used for REST messages to the destination
680             client = buildClient(uuid.toString(), destSocketAddress, destName);
681
682             // initialize the 'target' field
683             target = getTarget(client);
684         } catch (KeyManagementException | NoSuchAlgorithmException
685                      | NoSuchFieldException | IllegalAccessException
686                      | ClassNotFoundException | HttpClientConfigException e) {
687             logger.error("Server.checkServer: problems recreating 'client'", e);
688         }
689     }
690
691     /**
692      * Update server data.
693      *
694      * @param serverData this is a temporary 'Server' instance created from
695      *     an incoming message, which is used to update fields within the
696      *     'Server' instance identified by 'this'
697      */
698     private void updateServer(Server serverData) {
699         if (serverData.count > count) {
700             // an update has occurred
701             count = serverData.count;
702
703             // TBD: calculate and verify checksum, more fields may be updated
704
705             // adjust 'allowedGap' accordingly
706             long currentTime = System.currentTimeMillis();
707             long gap = currentTime - lastUpdateTime;
708
709             // adjust 'allowedGap' accordingly
710             // TBD: need properties to support overrides
711             gap = gap * 3 / 2 + adaptiveGapAdjust;
712             if (gap > allowedGap) {
713                 // update 'allowedGap' immediately
714                 allowedGap = gap;
715             } else {
716                 // gradually pull the allowed gap down
717                 // TBD: need properties to support overrides
718                 allowedGap = (allowedGap * 15 + gap) / 16;
719             }
720             lastUpdateTime = currentTime;
721
722             updatedList.add(this);
723         }
724     }
725
726     /**
727      * a server has failed.
728      */
729     private void serverFailed() {
730         // mark as inactive
731         synchronized (this) {
732             active = false;
733         }
734
735         // remove it from the table
736         servers.remove(uuid);
737
738         // add it to the failed servers table
739         failedServers.put(uuid, this);
740
741         // clean up client information
742         if (client != null) {
743             client.shutdown();
744             client = null;
745             target = null;
746         }
747
748         // log an error message
749         logger.error("Server failure: {} ({})", uuid, socketAddress);
750         for (Events listener : Events.getListeners()) {
751             listener.serverFailed(this);
752         }
753     }
754
755     /**
756      * Fetch, and possibily calculate, the "notify list" associated with this
757      * server. This is the list of servers to forward a server and bucket
758      * information to, and is approximately log2(n) in length, where 'n' is
759      * the total number of servers.
760      * It is calculated by starting with all of the servers sorted by UUID --
761      * let's say the current server is at position 's'. The notify list will
762      * contain the server at positions:
763      *     (s + 1) % n
764      *     (s + 2) % n
765      *     (s + 4) % n
766      *          ...
767      * Using all powers of 2 less than 'n'. If the total server count is 50,
768      * this list has 6 entries.
769      * @return the notify list
770      */
771     static Collection<Server> getNotifyList() {
772         // The 'notifyList' value is initially 'null', and it is reset to 'null'
773         // every time a new host joins, or one leaves. That way, it is marked for
774         // recalculation, but only when needed.
775         if (notifyList == null) {
776             // next index we are looking for
777             int dest = 1;
778
779             // our current position in the Server table -- starting at 'thisServer'
780             UUID current = thisServer.uuid;
781
782             // site socket address of 'current'
783             InetSocketAddress thisSiteSocketAddress = thisServer.siteSocketAddress;
784
785             // hash set of all site socket addresses located
786             HashSet<InetSocketAddress> siteSocketAddresses = new HashSet<>();
787             siteSocketAddresses.add(thisSiteSocketAddress);
788
789             // the list we are building
790             notifyList = new LinkedList<Server>();
791
792             int index = 1;
793             for ( ; ; ) {
794                 // move to the next key (UUID) -- if we hit the end of the table,
795                 // wrap to the beginning
796                 current = servers.higherKey(current);
797                 if (current == null) {
798                     current = servers.firstKey();
799                 }
800                 if (current.equals(thisServer.uuid)) {
801                     // we have looped through the entire list
802                     break;
803                 }
804
805                 // fetch associated server & site socket address
806                 Server server = servers.get(current);
807                 InetSocketAddress currentSiteSocketAddress =
808                     server.siteSocketAddress;
809
810                 if (Objects.equals(thisSiteSocketAddress,
811                                    currentSiteSocketAddress)) {
812                     // same site -- see if we should add this one
813                     if (index == dest) {
814                         // this is the next index we are looking for --
815                         // add the server
816                         notifyList.add(server);
817
818                         // advance to the next offset (current-offset * 2)
819                         dest = dest << 1;
820                     }
821                     index += 1;
822                 } else if (!siteSocketAddresses.contains(currentSiteSocketAddress)) {
823                     // we need at least one member from each site
824                     notifyList.add(server);
825                     siteSocketAddresses.add(currentSiteSocketAddress);
826                 }
827             }
828         }
829         return notifyList;
830     }
831
832     /**
833      * See if there is a host name associated with a destination socket address.
834      *
835      * @param dest the socket address of the destination
836      * @return the host name associated with the IP address, or the IP address
837      *     if no associated host name is found.
838      */
839     private static String socketAddressToName(InetSocketAddress dest) {
840         // destination IP address
841         InetAddress inetAddress = dest.getAddress();
842         String destName = null;
843
844         // go through the 'hostList' to see if there is a matching name
845         for (String hostName : hostList) {
846             try {
847                 if (inetAddress.equals(InetAddress.getByName(hostName))) {
848                     // this one matches -- use the name instead of the IP address
849                     destName = hostName;
850                     break;
851                 }
852             } catch (UnknownHostException e) {
853                 logger.debug("Server.socketAddressToName error", e);
854             }
855         }
856
857         // default name = string value of IP address
858         return destName == null ? inetAddress.getHostAddress() : destName;
859     }
860
861     /**
862      * Create an 'HttpClient' instance for a particular host.
863      *
864      * @param name of the host (currently a UUID or host:port string)
865      * @param dest the socket address of the destination
866      * @param destName the string name to use for the destination
867      */
868     static HttpClient buildClient(String name, InetSocketAddress dest, String destName)
869         throws KeyManagementException, NoSuchAlgorithmException,
870         ClassNotFoundException, HttpClientConfigException {
871
872         return HttpClientFactoryInstance.getClientFactory().build(
873             BusTopicParams.builder()
874                 .clientName(name)                               // name
875                 .useHttps(useHttps)                             // https
876                 .allowSelfSignedCerts(useSelfSignedCertificates)// selfSignedCerts
877                 .hostname(destName)                             // host
878                 .port(dest.getPort())                           // port
879                 .managed(false)                                 // managed
880                 .build());
881     }
882
883     /**
884      * Extract the 'WebTarget' information from the 'HttpClient'.
885      *
886      * @param client the associated HttpClient instance
887      * @return a WebTarget referring to the previously-specified 'baseUrl'
888      */
889     static WebTarget getTarget(HttpClient client)
890         throws NoSuchFieldException, IllegalAccessException {
891         // need access to the internal field 'client'
892         // TBD: We need a way to get this information without reflection
893         Field field = client.getClass().getDeclaredField("client");
894         field.setAccessible(true);
895         Client rsClient = (Client) field.get(client);
896         field.setAccessible(false);
897
898         rsClient.property(ClientProperties.CONNECT_TIMEOUT, connectTimeout);
899         rsClient.property(ClientProperties.READ_TIMEOUT, readTimeout);
900
901         // For performance reasons, the root 'WebTarget' is generated only once
902         // at initialization time for each remote host.
903         return rsClient.target(client.getBaseUrl());
904     }
905
906     /**
907      * This method may be invoked from any thread, and is used to send a
908      * message to the destination server associated with this 'Server' instance.
909      *
910      * @param path the path relative to the base URL
911      * @param entity the "request entity" containing the body of the
912      *     HTTP POST request
913      */
914     public void post(final String path, final Entity<?> entity) {
915         post(path, entity, null);
916     }
917
918     /**
919      * This method may be invoked from any thread, and is used to send a
920      * message to the destination server associated with this 'Server' instance.
921      *
922      * @param path the path relative to the base URL
923      * @param entity the "request entity" containing the body of the
924      *     HTTP POST request (if 'null', an HTTP GET is used instead)
925      * @param responseCallback if non-null, this callback may be used to
926      *     modify the WebTarget, and/or receive the POST response message
927      */
928     public void post(final String path, final Entity<?> entity,
929                      PostResponse responseCallback) {
930         if (target == null) {
931             return;
932         }
933
934         getThreadPool().execute(() -> {
935             /*
936              * This method is running within the 'MainLoop' thread.
937              */
938             try {
939                 WebTarget webTarget = target.path(path);
940                 if (responseCallback != null) {
941                     // give callback a chance to modify 'WebTarget'
942                     webTarget = responseCallback.webTarget(webTarget);
943
944                     // send the response to the callback
945                     Response response;
946                     if (entity == null) {
947                         response = webTarget.request().get();
948                     } else {
949                         response = webTarget.request().post(entity);
950                     }
951                     responseCallback.response(response);
952                 } else {
953                     // just do the invoke, and ignore the response
954                     if (entity == null) {
955                         webTarget.request().get();
956                     } else {
957                         webTarget.request().post(entity);
958                     }
959                 }
960             } catch (Exception e) {
961                 logger.error("Failed to send to {} ({}, {})",
962                              uuid, destSocketAddress, destName);
963                 if (responseCallback != null) {
964                     responseCallback.exceptionResponse(e);
965                 }
966                 MainLoop.queueWork(() -> {
967                     // this runs in the 'MainLoop' thread
968
969                     // the DNS cache may have been out-of-date when this server
970                     // was first contacted -- fix the problem, if needed
971                     checkServer();
972                 });
973             }
974         });
975     }
976
977     /**
978      * This method may be invoked from any thread.
979      *
980      * @return the 'ThreadPoolExecutor' associated with this server
981      */
982     public synchronized ThreadPoolExecutor getThreadPool() {
983         if (sendThreadPool == null) {
984             // build a thread pool for this Server
985             sendThreadPool =
986                 new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
987                                        keepAliveTime, TimeUnit.MILLISECONDS,
988                                        new LinkedTransferQueue<Runnable>());
989             sendThreadPool.allowCoreThreadTimeOut(true);
990         }
991         return sendThreadPool;
992     }
993
994     /**
995      * Lower-level method supporting HTTP, which requires that the caller's
996      * thread tolerate blocking. This method may be called from any thread.
997      *
998      * @param path the path relative to the base URL
999      * @return a 'WebTarget' instance pointing to this path
1000      */
1001     public WebTarget getWebTarget(String path) {
1002         return target == null ? null : target.path(path);
1003     }
1004
1005     /**
1006      * This method may be invoked from any thread, but its real intent is
1007      * to decode an incoming 'admin' message (which is Base-64-encoded),
1008      * and send it to the 'MainLoop' thread for processing.
1009      *
1010      * @param data the base-64-encoded data
1011      */
1012     static void adminRequest(byte[] data) {
1013         final byte[] packet = Base64.getDecoder().decode(data);
1014         Runnable task = () -> {
1015             try {
1016                 ByteArrayInputStream bis = new ByteArrayInputStream(packet);
1017                 DataInputStream dis = new DataInputStream(bis);
1018
1019                 while (dis.available() != 0) {
1020                     Server serverData = new Server(dis);
1021
1022                     // TBD: Compare with current server
1023
1024                     Server server = servers.get(serverData.uuid);
1025                     if (server == null) {
1026                         serverData.newServer();
1027                     } else {
1028                         server.updateServer(serverData);
1029                     }
1030                 }
1031             } catch (Exception e) {
1032                 logger.error("Server.adminRequest: can't decode packet", e);
1033             }
1034         };
1035         MainLoop.queueWork(task);
1036     }
1037
1038     /**
1039      * Send out information about servers 'updatedList' to all servers
1040      * in 'notifyList' (may need to build or rebuild 'notifyList').
1041      */
1042     static void sendOutData() throws IOException {
1043         // include 'thisServer' in the data -- first, advance the count
1044         thisServer.count += 1;
1045         if (thisServer.count == 0) {
1046             /*
1047              * counter wrapped (0 is a special case) --
1048              * actually, we could probably leave this out, because it would take
1049              * more than a century to wrap if the increment is 1 second
1050              */
1051             thisServer.count = 1;
1052         }
1053
1054         ByteArrayOutputStream bos = new ByteArrayOutputStream();
1055         DataOutputStream dos = new DataOutputStream(bos);
1056
1057         thisServer.lastUpdateTime = System.currentTimeMillis();
1058         thisServer.writeServerData(dos);
1059
1060         // include all hosts in the updated list
1061         for (Server server : updatedList) {
1062             server.writeServerData(dos);
1063         }
1064         updatedList.clear();
1065
1066         // create an 'Entity' that can be sent out to all hosts in the notify list
1067         Entity<String> entity = Entity.entity(
1068             new String(Base64.getEncoder().encode(bos.toByteArray()), StandardCharsets.UTF_8),
1069             MediaType.APPLICATION_OCTET_STREAM_TYPE);
1070         for (Server server : getNotifyList()) {
1071             server.post("admin", entity);
1072         }
1073     }
1074
1075     /**
1076      * Search for servers which have taken too long to respond.
1077      */
1078     static void searchForFailedServers() {
1079         long currentTime = System.currentTimeMillis();
1080
1081         // used to build a list of newly-failed servers
1082         LinkedList<Server> failed = new LinkedList<>();
1083         for (Server server : servers.values()) {
1084             if (server == thisServer) {
1085                 continue;
1086             }
1087             long gap = currentTime - server.lastUpdateTime;
1088             if (gap > server.allowedGap) {
1089                 // add it to the failed list -- we don't call 'serverFailed' yet,
1090                 // because this updates the server list, and leads to a
1091                 // 'ConcurrentModificationException'
1092                 failed.add(server);
1093             }
1094         }
1095
1096         // remove servers from our list
1097         if (!failed.isEmpty()) {
1098             for (Server server : failed) {
1099                 server.serverFailed();
1100             }
1101             notifyList = null;
1102         }
1103     }
1104
1105     /**
1106      * This method may be invoked from any thread:
1107      * Send information about 'thisServer' to the list of hosts.
1108      *
1109      * @param out the 'PrintStream' to use for displaying information
1110      * @param hosts a comma-separated list of entries containing
1111      *     'host:port' or just 'port' (current host is implied in this case)
1112      */
1113     static void pingHosts(PrintStream out, String hosts) {
1114         LinkedList<InetSocketAddress> addresses = new LinkedList<>();
1115         boolean error = false;
1116
1117         for (String host : hosts.split(",")) {
1118             try {
1119                 String[] segs = host.split(":");
1120
1121                 switch (segs.length) {
1122                     case 1:
1123                         addresses.add(new InetSocketAddress(InetAddress.getLocalHost(),
1124                                 Integer.parseInt(segs[0])));
1125                         break;
1126                     case 2:
1127                         addresses.add(new InetSocketAddress(segs[0],
1128                                 Integer.parseInt(segs[1])));
1129                         break;
1130                     default:
1131                         out.println(host + ": Invalid host/port value");
1132                         error = true;
1133                         break;
1134                 }
1135             } catch (NumberFormatException e) {
1136                 out.println(host + ": Invalid port value");
1137                 logger.error(PINGHOSTS_ERROR, e);
1138                 error = true;
1139             } catch (UnknownHostException e) {
1140                 out.println(host + ": Unknown host");
1141                 logger.error(PINGHOSTS_ERROR, e);
1142                 error = true;
1143             }
1144         }
1145         if (!error) {
1146             pingHosts(out, addresses);
1147         }
1148     }
1149
1150     /**
1151      * This method may be invoked from any thread:
1152      * Send information about 'thisServer' to the list of hosts.
1153      *
1154      * @param out the 'PrintStream' to use for displaying information
1155      * @param hosts a collection of 'InetSocketAddress' instances, which are
1156      *     the hosts to send the information to
1157      */
1158     static void pingHosts(final PrintStream out,
1159                           final Collection<InetSocketAddress> hosts) {
1160         FutureTask<Integer> ft = new FutureTask<>(() -> {
1161             ByteArrayOutputStream bos = new ByteArrayOutputStream();
1162             DataOutputStream dos = new DataOutputStream(bos);
1163
1164             // add information for this server only
1165             try {
1166                 thisServer.writeServerData(dos);
1167
1168                 // create an 'Entity' that can be sent out to all hosts
1169                 Entity<String> entity = Entity.entity(
1170                     new String(Base64.getEncoder().encode(bos.toByteArray()),
1171                         StandardCharsets.UTF_8),
1172                     MediaType.APPLICATION_OCTET_STREAM_TYPE);
1173
1174                 // loop through hosts
1175                 for (InetSocketAddress host : hosts) {
1176                     HttpClient httpClient = null;
1177
1178                     try {
1179                         httpClient = buildClient(host.toString(), host,
1180                                                  socketAddressToName(host));
1181                         getTarget(httpClient).path("admin").request().post(entity);
1182                         httpClient.shutdown();
1183                         httpClient = null;
1184                     } catch (KeyManagementException | NoSuchAlgorithmException e) {
1185                         out.println(host + ": Unable to create client connection");
1186                         logger.error(PINGHOSTS_ERROR, e);
1187                     } catch (NoSuchFieldException | IllegalAccessException e) {
1188                         out.println(host + ": Unable to get link to target");
1189                         logger.error(PINGHOSTS_ERROR, e);
1190                     } catch (Exception e) {
1191                         out.println(host + ": " + e);
1192                         logger.error(PINGHOSTS_ERROR, e);
1193                     }
1194                     if (httpClient != null) {
1195                         httpClient.shutdown();
1196                     }
1197                 }
1198             } catch (IOException e) {
1199                 out.println("Unable to generate 'ping' data: " + e);
1200                 logger.error(PINGHOSTS_ERROR, e);
1201             }
1202             return 0;
1203         });
1204
1205         MainLoop.queueWork(ft);
1206         try {
1207             ft.get(60, TimeUnit.SECONDS);
1208         } catch (InterruptedException e) {
1209             logger.error("Server.pingHosts: interrupted waiting for queued work", e);
1210             Thread.currentThread().interrupt();
1211         } catch (ExecutionException | TimeoutException e) {
1212             logger.error("Server.pingHosts: error waiting for queued work", e);
1213         }
1214     }
1215
1216     /**
1217      * This method may be invoked from any thread:
1218      * Dump out the current 'servers' table in a human-readable table form.
1219      *
1220      * @param out the 'PrintStream' to dump the table to
1221      */
1222     public static void dumpHosts(final PrintStream out) {
1223         FutureTask<Integer> ft = new FutureTask<>(() -> {
1224             dumpHostsInternal(out);
1225             return 0;
1226         });
1227         MainLoop.queueWork(ft);
1228         try {
1229             ft.get(60, TimeUnit.SECONDS);
1230         } catch (InterruptedException e) {
1231             logger.error("Server.dumpHosts: interrupted waiting for queued work", e);
1232             Thread.currentThread().interrupt();
1233         } catch (ExecutionException | TimeoutException e) {
1234             logger.error("Server.dumpHosts: error waiting for queued work", e);
1235         }
1236     }
1237
1238     /**
1239      * Dump out the current 'servers' table in a human-readable table form.
1240      *
1241      * @param out the 'PrintStream' to dump the table to
1242      */
1243     private static void dumpHostsInternal(PrintStream out) {
1244         // modifications to 'servers.values()' and 'notifyList'.
1245         HashSet<Server> localNotifyList = new HashSet<>(getNotifyList());
1246
1247         // see if we have any site information
1248         boolean siteData = false;
1249         for (Server server : servers.values()) {
1250             if (server.siteSocketAddress != null) {
1251                 siteData = true;
1252                 break;
1253             }
1254         }
1255
1256         String format = "%1s %-36s %-15s %5s %5s %12s %7s %7s\n";
1257         SimpleDateFormat dateFormat = new SimpleDateFormat("kk:mm:ss.SSS");
1258
1259         if (siteData) {
1260             format = "%1s %-36s %-15s %5s %-15s %5s %5s %12s %7s %7s\n";
1261             // @formatter:off
1262             out.printf(format, "", "UUID", "IP Address", "Port",
1263                        "Site IP Address", "Port",
1264                        "Count", "Update Time", "Elapsed", "Allowed");
1265             out.printf(format, "", "----", "----------", "----",
1266                        "---------------", "----",
1267                        "-----", "-----------", "-------", "-------");
1268             // @formatter:on
1269         } else {
1270             // @formatter:off
1271             out.printf(format, "", "UUID", "IP Address", "Port",
1272                        "Count", "Update Time", "Elapsed", "Allowed");
1273             out.printf(format, "", "----", "----------", "----",
1274                        "-----", "-----------", "-------", "-------");
1275             // @formatter:on
1276         }
1277
1278         long currentTime = System.currentTimeMillis();
1279         for (Server server : servers.values()) {
1280             String thisOne = "";
1281
1282             if (server == thisServer) {
1283                 thisOne = "*";
1284             } else if (localNotifyList.contains(server)) {
1285                 thisOne = "n";
1286             }
1287
1288             if (siteData) {
1289                 String siteIp = "";
1290                 String sitePort = "";
1291                 if (server.siteSocketAddress != null) {
1292                     siteIp =
1293                         server.siteSocketAddress.getAddress().getHostAddress();
1294                     sitePort = String.valueOf(server.siteSocketAddress.getPort());
1295                 }
1296
1297                 out.printf(format, thisOne, server.uuid,
1298                            server.socketAddress.getAddress().getHostAddress(),
1299                            server.socketAddress.getPort(),
1300                            siteIp, sitePort, server.count,
1301                            dateFormat.format(new Date(server.lastUpdateTime)),
1302                            currentTime - server.lastUpdateTime,
1303                            server.allowedGap);
1304             } else {
1305                 out.printf(format, thisOne, server.uuid,
1306                            server.socketAddress.getAddress().getHostAddress(),
1307                            server.socketAddress.getPort(), server.count,
1308                            dateFormat.format(new Date(server.lastUpdateTime)),
1309                            currentTime - server.lastUpdateTime,
1310                            server.allowedGap);
1311             }
1312         }
1313         out.println("Count: " + servers.size());
1314     }
1315
1316     /* ============================================================ */
1317
1318     /**
1319      * This interface supports the 'post' method, and provides the opportunity
1320      * to change the WebTarget and/or receive the POST response message.
1321      */
1322     interface PostResponse {
1323         /**
1324          * Callback that can be used to modify 'WebTarget', and do things like
1325          * add query parameters.
1326          *
1327          * @param webTarget the current WebTarget
1328          * @return the updated WebTarget
1329          */
1330         public default WebTarget webTarget(WebTarget webTarget) {
1331             return webTarget;
1332         }
1333
1334         /**
1335          * Callback that passes the POST response.
1336          *
1337          * @param response the POST response
1338          */
1339         public default void response(Response response) {
1340         }
1341
1342         /**
1343          * Callback that passes the POST exception response.
1344          *
1345          */
1346         public default void exceptionResponse(Exception exception) {
1347             Response.ResponseBuilder response;
1348             response = Response.serverError();
1349             this.response(response.build());
1350         }
1351     }
1352 }