Merge "Updating drools pdp dependencies"
[policy/drools-pdp.git] / feature-server-pool / src / main / java / org / onap / policy / drools / serverpool / Server.java
1 /*
2  * ============LICENSE_START=======================================================
3  * feature-server-pool
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.serverpool;
22
23 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_HTTPS;
24 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SELF_SIGNED_CERTIFICATES;
25 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_ADAPTIVE_GAP_ADJUST;
26 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_CONNECT_TIMEOUT;
27 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_INITIAL_ALLOWED_GAP;
28 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_IP_ADDRESS;
29 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_PORT;
30 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_READ_TIMEOUT;
31 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_THREADS_CORE_POOL_SIZE;
32 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_THREADS_KEEP_ALIVE_TIME;
33 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_THREADS_MAXIMUM_POOL_SIZE;
34 import static org.onap.policy.drools.serverpool.ServerPoolProperties.HOST_LIST;
35 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_ADAPTIVE_GAP_ADJUST;
36 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_CONNECT_TIMEOUT;
37 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_HTTPS;
38 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_INITIAL_ALLOWED_GAP;
39 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_IP_ADDRESS;
40 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_PORT;
41 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_READ_TIMEOUT;
42 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_SELF_SIGNED_CERTIFICATES;
43 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_THREADS_CORE_POOL_SIZE;
44 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_THREADS_KEEP_ALIVE_TIME;
45 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_THREADS_MAXIMUM_POOL_SIZE;
46 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SITE_IP_ADDRESS;
47 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SITE_PORT;
48 import static org.onap.policy.drools.serverpool.ServerPoolProperties.getProperty;
49
50 import java.io.ByteArrayInputStream;
51 import java.io.ByteArrayOutputStream;
52 import java.io.DataInputStream;
53 import java.io.DataOutputStream;
54 import java.io.IOException;
55 import java.io.PrintStream;
56 import java.io.StringReader;
57 import java.lang.reflect.Field;
58 import java.net.InetAddress;
59 import java.net.InetSocketAddress;
60 import java.net.UnknownHostException;
61 import java.nio.charset.StandardCharsets;
62 import java.security.KeyManagementException;
63 import java.security.NoSuchAlgorithmException;
64 import java.text.SimpleDateFormat;
65 import java.util.Base64;
66 import java.util.Collection;
67 import java.util.Date;
68 import java.util.HashSet;
69 import java.util.LinkedList;
70 import java.util.Objects;
71 import java.util.Properties;
72 import java.util.TreeMap;
73 import java.util.TreeSet;
74 import java.util.UUID;
75 import java.util.concurrent.ExecutionException;
76 import java.util.concurrent.FutureTask;
77 import java.util.concurrent.LinkedTransferQueue;
78 import java.util.concurrent.ThreadPoolExecutor;
79 import java.util.concurrent.TimeUnit;
80 import java.util.concurrent.TimeoutException;
81 import javax.ws.rs.client.Client;
82 import javax.ws.rs.client.Entity;
83 import javax.ws.rs.client.WebTarget;
84 import javax.ws.rs.core.MediaType;
85 import javax.ws.rs.core.Response;
86 import org.glassfish.jersey.client.ClientProperties;
87 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
88 import org.onap.policy.common.endpoints.http.client.HttpClient;
89 import org.onap.policy.common.endpoints.http.client.HttpClientConfigException;
90 import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
91 import org.onap.policy.common.endpoints.http.server.HttpServletServer;
92 import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
93 import org.onap.policy.drools.utils.PropertyUtil;
94 import org.slf4j.Logger;
95 import org.slf4j.LoggerFactory;
96
97 public class Server implements Comparable<Server> {
98     private static Logger logger = LoggerFactory.getLogger(Server.class);
99
100     // maps UUID to Server object for all known servers
101     private static TreeMap<UUID, Server> servers =
102         new TreeMap<>(Util.uuidComparator);
103
104     // maps UUID to Server object for all failed servers
105     // (so they aren't accidentally restored, due to updates from other hosts)
106     private static TreeMap<UUID, Server> failedServers =
107         new TreeMap<>(Util.uuidComparator);
108
109     // subset of servers to be notified (null means it needs to be rebuilt)
110     private static LinkedList<Server> notifyList = null;
111
112     // data to be sent out to notify list
113     private static TreeSet<Server> updatedList = new TreeSet<>();
114
115     // the server associated with the current host
116     private static Server thisServer = null;
117
118     // the current REST server
119     private static HttpServletServer restServer;
120
121     /*==================================================*/
122     /* Some properties extracted at initialization time */
123     /*==================================================*/
124
125     // initial value of gap to allow between pings
126     private static long initialAllowedGap;
127
128     // used in adaptive calculation of allowed gap between pings
129     private static long adaptiveGapAdjust;
130
131     // time to allow for TCP connect (long)
132     private static String connectTimeout;
133
134     // time to allow before TCP read timeout (long)
135     private static String readTimeout;
136
137     // outgoing per-server thread pool parameters
138     private static int corePoolSize;
139     private static int maximumPoolSize;
140     private static long keepAliveTime;
141
142     // https-related parameters
143     private static boolean useHttps;
144     private static boolean useSelfSignedCertificates;
145
146     // list of remote host names
147     private static String[] hostList = new String[0];
148
149     /*=========================================================*/
150     /* Fields included in every 'ping' message between servers */
151     /*=========================================================*/
152
153     // unique id for this server
154     private UUID uuid;
155
156     // counter periodically incremented to indicate the server is "alive"
157     private int count;
158
159     // 16 byte MD5 checksum over additional data that is NOT included in
160     // every 'ping' message -- used to determine whether the data is up-to-date
161     private byte[] checksum;
162
163     /*========================================================================*/
164     /* The following data is included in the checksum, and doesn't change too */
165     /* frequently (some fields may change as servers go up and down)          */
166     /*========================================================================*/
167
168     // IP address and port of listener
169     private InetSocketAddress socketAddress;
170
171     // site IP address and port
172     private InetSocketAddress siteSocketAddress = null;
173
174     /*============================================*/
175     /* Local information not included in checksum */
176     /*============================================*/
177
178     // destination socket information
179     private InetSocketAddress destSocketAddress;
180     private String destName;
181
182     // REST client fields
183     private HttpClient client;
184     private WebTarget target;
185     private ThreadPoolExecutor sendThreadPool = null;
186
187     // time when the 'count' field was last updated
188     private long lastUpdateTime;
189
190     // calculated field indicating the maximum time between updates
191     private long allowedGap = initialAllowedGap;
192
193     // indicates whether the 'Server' instance is active or not (synchronized)
194     private boolean active = true;
195
196     /*
197      * Tags for encoding of server data
198      */
199     static final int END_OF_PARAMETERS_TAG = 0;
200     static final int SOCKET_ADDRESS_TAG = 1;
201     static final int SITE_SOCKET_ADDRESS_TAG = 2;
202
203     // 'pingHosts' error
204     static final String PINGHOSTS_ERROR = "Server.pingHosts error";
205
206     // a string for print
207     static final String PRINTOUT_DASHES = "-------";
208
209     /*==============================*/
210     /* Comparable<Server> interface */
211     /*==============================*/
212
213     /**
214      * Compare this instance to another one by comparing the 'uuid' field.
215      */
216     @Override
217     public int compareTo(Server other) {
218         return Util.uuidComparator.compare(uuid, other.uuid);
219     }
220
221     @Override
222     public int hashCode() {
223         return Objects.hash(uuid);
224     }
225
226     @Override
227     public boolean equals(Object obj) {
228         if (this == obj) {
229             return true;
230         }
231         if (!(obj instanceof Server)) {
232             return false;
233         }
234         Server other = (Server) obj;
235         return Objects.equals(uuid, other.uuid);
236     }
237
238     /**
239      * This method may be invoked from any thread, and is used as the main
240      * entry point when testing.
241      *
242      * @param args arguments contaning an '=' character are intepreted as
243      *     a property, other arguments are presumed to be a property file.
244      */
245     public static void main(String[] args) throws IOException {
246         Properties prop = new Properties();
247
248         for (String arg : args) {
249             // arguments with an '=' in them are a property definition;
250             // otherwise, they are a properties file name
251
252             if (arg.contains("=")) {
253                 prop.load(new StringReader(arg));
254             } else {
255                 prop.putAll(PropertyUtil.getProperties(arg));
256             }
257         }
258
259         String rval = startup(prop);
260         if (rval != null) {
261             logger.error("Server.startup failed: {}", rval);
262         }
263     }
264
265     /**
266      * This method may be invoked from any thread, and performs initialization.
267      *
268      * @param propertiesFile the name of a property file
269      */
270     public static String startup(String propertiesFile) {
271         Properties properties;
272         try {
273             properties = PropertyUtil.getProperties(propertiesFile);
274         } catch (IOException e) {
275             logger.error("Server.startup: exception reading properties", e);
276             properties = new Properties();
277         }
278         return startup(properties);
279     }
280
281     /**
282      * This method may be invoked from any thread, and performs initialization.
283      *
284      * @param properties contains properties used by the server
285      */
286     public static String startup(Properties properties) {
287         ServerPoolProperties.setProperties(properties);
288         logger.info("startup: properties={}", properties);
289
290         // fetch some static properties
291         initialAllowedGap = getProperty(SERVER_INITIAL_ALLOWED_GAP,
292                                         DEFAULT_SERVER_INITIAL_ALLOWED_GAP);
293         adaptiveGapAdjust = getProperty(SERVER_ADAPTIVE_GAP_ADJUST,
294                                         DEFAULT_SERVER_ADAPTIVE_GAP_ADJUST);
295         connectTimeout =
296             String.valueOf(getProperty(SERVER_CONNECT_TIMEOUT,
297                                        DEFAULT_SERVER_CONNECT_TIMEOUT));
298         readTimeout = String.valueOf(getProperty(SERVER_READ_TIMEOUT,
299                                      DEFAULT_SERVER_READ_TIMEOUT));
300         corePoolSize = getProperty(SERVER_THREADS_CORE_POOL_SIZE,
301                                    DEFAULT_SERVER_THREADS_CORE_POOL_SIZE);
302         maximumPoolSize = getProperty(SERVER_THREADS_MAXIMUM_POOL_SIZE,
303                                       DEFAULT_SERVER_THREADS_MAXIMUM_POOL_SIZE);
304         keepAliveTime = getProperty(SERVER_THREADS_KEEP_ALIVE_TIME,
305                                     DEFAULT_SERVER_THREADS_KEEP_ALIVE_TIME);
306         useHttps = getProperty(SERVER_HTTPS, DEFAULT_HTTPS);
307         useSelfSignedCertificates = getProperty(SERVER_SELF_SIGNED_CERTIFICATES,
308                                                 DEFAULT_SELF_SIGNED_CERTIFICATES);
309         String hostListNames = getProperty(HOST_LIST, null);
310         if (hostListNames != null) {
311             hostList = hostListNames.split(",");
312         }
313
314         String possibleError = null;
315         try {
316             // fetch server information
317             String ipAddressString =
318                 getProperty(SERVER_IP_ADDRESS, DEFAULT_SERVER_IP_ADDRESS);
319             int port = getProperty(SERVER_PORT, DEFAULT_SERVER_PORT);
320
321             possibleError = "Unknown Host: " + ipAddressString;
322             InetAddress address = InetAddress.getByName(ipAddressString);
323             InetSocketAddress socketAddress = new InetSocketAddress(address, port);
324
325             possibleError = "HTTP server initialization error";
326             restServer = HttpServletServerFactoryInstance.getServerFactory().build(
327                          "SERVER-POOL",                                 // name
328                          useHttps,                                      // https
329                          socketAddress.getAddress().getHostAddress(),   // host (maybe 0.0.0.0)
330                          port,                                          // port (can no longer be 0)
331                          null,                                          // contextPath
332                          false,                                         // swagger
333                          false);                                        // managed
334             restServer.addServletClass(null, RestServerPool.class.getName());
335
336             // add any additional servlets
337             for (ServerPoolApi feature : ServerPoolApi.impl.getList()) {
338                 Collection<Class<?>> classes = feature.servletClasses();
339                 if (classes != null) {
340                     for (Class<?> clazz : classes) {
341                         restServer.addServletClass(null, clazz.getName());
342                     }
343                 }
344             }
345
346             // we may not know the port until after the server is started
347             possibleError = "HTTP server start error";
348             restServer.start();
349             possibleError = null;
350
351             // determine the address to use
352             if (DEFAULT_SERVER_IP_ADDRESS.contentEquals(address.getHostAddress())) {
353                 address = InetAddress.getLocalHost();
354             }
355
356             thisServer = new Server(new InetSocketAddress(address, port));
357
358             // TBD: is this really appropriate?
359             thisServer.newServer();
360
361             // start background thread
362             MainLoop.startThread();
363             MainLoop.queueWork(() -> {
364                 // run this in the 'MainLoop' thread
365                 Leader.startup();
366                 Bucket.startup();
367             });
368             logger.info("Listening on port {}", port);
369
370             return null;
371         } catch (UnknownHostException e) {
372             logger.error("Server.startup: exception start server", e);
373             if (possibleError == null) {
374                 possibleError = e.toString();
375             }
376             return possibleError;
377         }
378     }
379
380     /**
381      * Shut down all threads associate with server pool.
382      */
383     public static void shutdown() {
384         Discovery.stopDiscovery();
385         MainLoop.stopThread();
386         TargetLock.shutdown();
387         Util.shutdown();
388
389         HashSet<Server> allServers = new HashSet<>();
390         allServers.addAll(servers.values());
391         allServers.addAll(failedServers.values());
392
393         for (Server server : allServers) {
394             if (server.sendThreadPool != null) {
395                 server.sendThreadPool.shutdown();
396             }
397         }
398         if (restServer != null) {
399             restServer.shutdown();
400         }
401     }
402
403     /**
404      * Return the Server instance associated with the current host.
405      *
406      * @return the Server instance associated with the current host
407      */
408     public static Server getThisServer() {
409         return thisServer;
410     }
411
412     /**
413      * Return the first Server instance in the 'servers' list.
414      *
415      * @return the first Server instance in the 'servers' list
416      *     (the one with the lowest UUID)
417      */
418     public static Server getFirstServer() {
419         return servers.firstEntry().getValue();
420     }
421
422     /**
423      * Lookup a Server instance associated with a UUID.
424      *
425      * @param uuid the key to the lookup
426      @ @return the associated 'Server' instance, or 'null' if none
427      */
428     public static Server getServer(UUID uuid) {
429         return servers.get(uuid);
430     }
431
432     /**
433      * Return a count of the number of servers.
434      *
435      * @return a count of the number of servers
436      */
437     public static int getServerCount() {
438         return servers.size();
439     }
440
441     /**
442      * Return the complete list of servers.
443      *
444      * @return the complete list of servers
445      */
446     public static Collection<Server> getServers() {
447         return servers.values();
448     }
449
450     /**
451      * This method is invoked from the 'startup' thread, and creates a new
452      * 'Server' instance for the current server.
453      *
454      * @param socketAddress the IP address and port the listener is bound to
455      */
456     private Server(InetSocketAddress socketAddress) {
457         this.uuid = UUID.randomUUID();
458         this.count = 1;
459         this.socketAddress = socketAddress;
460         this.lastUpdateTime = System.currentTimeMillis();
461
462         // site information
463
464         String siteIp = getProperty(SITE_IP_ADDRESS, null);
465         int sitePort = getProperty(SITE_PORT, 0);
466         if (siteIp != null && sitePort != 0) {
467             // we do have site information specified
468             try {
469                 siteSocketAddress = new InetSocketAddress(siteIp, sitePort);
470                 if (siteSocketAddress.getAddress() == null) {
471                     logger.error("Couldn't resolve site address: {}", siteIp);
472                     siteSocketAddress = null;
473                 }
474             } catch (IllegalArgumentException e) {
475                 logger.error("Illegal 'siteSocketAddress'", e);
476                 siteSocketAddress = null;
477             }
478         }
479
480         // TBD: calculate checksum
481     }
482
483     /**
484      * Initialize a 'Server' instance from a 'DataInputStream'. If it is new,
485      * it may get inserted in the table. If it is an update, fields in an
486      * existing 'Server' may be updated.
487      *
488      * @param is the 'DataInputStream'
489      */
490     Server(DataInputStream is) throws IOException {
491         // read in 16 byte UUID
492         uuid = Util.readUuid(is);
493
494         // read in 4 byte counter value
495         count = is.readInt();
496
497         // read in 16 byte MD5 checksum
498         checksum = new byte[16];
499         is.readFully(checksum);
500
501         // optional parameters
502         int tag;
503         while ((tag = is.readUnsignedByte()) != END_OF_PARAMETERS_TAG) {
504             switch (tag) {
505                 case SOCKET_ADDRESS_TAG:
506                     socketAddress = readSocketAddress(is);
507                     break;
508                 case SITE_SOCKET_ADDRESS_TAG:
509                     siteSocketAddress = readSocketAddress(is);
510                     break;
511                 default:
512                     // ignore tag
513                     logger.error("Illegal tag: {}", tag);
514                     break;
515             }
516         }
517     }
518
519     /**
520      * Read an 'InetSocketAddress' from a 'DataInputStream'.
521      *
522      * @param is the 'DataInputStream'
523      * @return the 'InetSocketAddress'
524      */
525     private static InetSocketAddress readSocketAddress(DataInputStream is) throws IOException {
526
527         byte[] ipAddress = new byte[4];
528         is.read(ipAddress, 0, 4);
529         int port = is.readUnsignedShort();
530         return new InetSocketAddress(InetAddress.getByAddress(ipAddress), port);
531     }
532
533     /**
534      * {@inheritDoc}
535      */
536     @Override
537     public String toString() {
538         return "Server[" + uuid + "]";
539     }
540
541     /**
542      * Return the UUID associated with this Server.
543      *
544      * @return the UUID associated with this Server
545      */
546     public UUID getUuid() {
547         return uuid;
548     }
549
550     /**
551      * Return the external InetSocketAddress of the site.
552      *
553      * @return the external InetSocketAddress of the site
554      *     ('null' if it doesn't exist)
555      */
556     public InetSocketAddress getSiteSocketAddress() {
557         return siteSocketAddress;
558     }
559
560     /**
561      * This method may be called from any thread.
562      *
563      * @return 'true' if the this server is active, and 'false' if not
564      */
565     public synchronized boolean isActive() {
566         return active;
567     }
568
569     /**
570      * This method writes out the data associated with the current Server
571      * instance.
572      *
573      * @param os outout stream that should receive the data
574      */
575     void writeServerData(DataOutputStream os) throws IOException {
576         // write out 16 byte UUID
577         Util.writeUuid(os, uuid);
578
579         // write out 4 byte counter value
580         os.writeInt(count);
581
582         // write out 16 byte MD5 checksum
583         // TBD: should this be implemented?
584         os.write(checksum == null ? new byte[16] : checksum);
585
586         if (socketAddress != null) {
587             // write out socket address
588             os.writeByte(SOCKET_ADDRESS_TAG);
589             os.write(socketAddress.getAddress().getAddress(), 0, 4);
590             os.writeShort(socketAddress.getPort());
591         }
592
593         if (siteSocketAddress != null) {
594             // write out socket address
595             os.writeByte(SITE_SOCKET_ADDRESS_TAG);
596             os.write(siteSocketAddress.getAddress().getAddress(), 0, 4);
597             os.writeShort(siteSocketAddress.getPort());
598         }
599
600         os.writeByte(END_OF_PARAMETERS_TAG);
601     }
602
603     /**
604      * Do any processing needed to create a new server. This method is invoked
605      * from the 'MainLoop' thread in every case except for the current server,
606      * in which case it is invoked in 'startup' prior to creating 'MainLoop'.
607      */
608     private void newServer() {
609         Server failed = failedServers.get(uuid);
610         if (failed != null) {
611             // this one is on the failed list -- see if the counter has advanced
612             if ((count - failed.count) <= 0) {
613                 // the counter has not advanced -- ignore
614                 return;
615             }
616
617             // the counter has advanced -- somehow, this server has returned
618             failedServers.remove(uuid);
619             synchronized (this) {
620                 active = true;
621             }
622             logger.error("Server reawakened: {} ({})", uuid, socketAddress);
623         }
624
625         lastUpdateTime = System.currentTimeMillis();
626         servers.put(uuid, this);
627         updatedList.add(this);
628
629         // notify list will need to be rebuilt
630         notifyList = null;
631
632         if (socketAddress != null && this != thisServer) {
633             // initialize 'client' and 'target' fields
634             if (siteSocketAddress != null
635                     && !siteSocketAddress.equals(thisServer.siteSocketAddress)) {
636                 // destination is on a remote site
637                 destSocketAddress = siteSocketAddress;
638             } else {
639                 // destination is on the local site -- use direct addressing
640                 destSocketAddress = socketAddress;
641             }
642             destName = socketAddressToName(destSocketAddress);
643             try {
644                 // 'client' is used for REST messages to the destination
645                 client = buildClient(uuid.toString(), destSocketAddress, destName);
646
647                 // initialize the 'target' field
648                 target = getTarget(client);
649             } catch (KeyManagementException | NoSuchAlgorithmException
650                          | NoSuchFieldException | IllegalAccessException
651                          | ClassNotFoundException | HttpClientConfigException e) {
652                 logger.error("Server.newServer: problems creating 'client'", e);
653             }
654         }
655         logger.info("New server: {} ({})", uuid, socketAddress);
656         for (Events listener : Events.getListeners()) {
657             listener.newServer(this);
658         }
659     }
660
661     /**
662      * Check the server state in response to some issue. At present, only the
663      * 'destName' information is checked.
664      */
665     private void checkServer() {
666         // recalculate 'destName' (we have seen DNS issues)
667         String newDestName = socketAddressToName(destSocketAddress);
668         if (newDestName.equals(destName)) {
669             return;
670         }
671         logger.warn("Remote host name for {} has changed from {} to {}",
672                     destSocketAddress, destName, newDestName);
673
674         // shut down old client, and rebuild
675         client.shutdown();
676         client = null;
677         target = null;
678
679         // update 'destName', and rebuild the client
680         destName = newDestName;
681         try {
682             // 'client' is used for REST messages to the destination
683             client = buildClient(uuid.toString(), destSocketAddress, destName);
684
685             // initialize the 'target' field
686             target = getTarget(client);
687         } catch (KeyManagementException | NoSuchAlgorithmException
688                      | NoSuchFieldException | IllegalAccessException
689                      | ClassNotFoundException | HttpClientConfigException e) {
690             logger.error("Server.checkServer: problems recreating 'client'", e);
691         }
692     }
693
694     /**
695      * Update server data.
696      *
697      * @param serverData this is a temporary 'Server' instance created from
698      *     an incoming message, which is used to update fields within the
699      *     'Server' instance identified by 'this'
700      */
701     private void updateServer(Server serverData) {
702         if (serverData.count > count) {
703             // an update has occurred
704             count = serverData.count;
705
706             // TBD: calculate and verify checksum, more fields may be updated
707
708             // adjust 'allowedGap' accordingly
709             long currentTime = System.currentTimeMillis();
710             long gap = currentTime - lastUpdateTime;
711
712             // adjust 'allowedGap' accordingly
713             // TBD: need properties to support overrides
714             gap = gap * 3 / 2 + adaptiveGapAdjust;
715             if (gap > allowedGap) {
716                 // update 'allowedGap' immediately
717                 allowedGap = gap;
718             } else {
719                 // gradually pull the allowed gap down
720                 // TBD: need properties to support overrides
721                 allowedGap = (allowedGap * 15 + gap) / 16;
722             }
723             lastUpdateTime = currentTime;
724
725             updatedList.add(this);
726         }
727     }
728
729     /**
730      * a server has failed.
731      */
732     private void serverFailed() {
733         // mark as inactive
734         synchronized (this) {
735             active = false;
736         }
737
738         // remove it from the table
739         servers.remove(uuid);
740
741         // add it to the failed servers table
742         failedServers.put(uuid, this);
743
744         // clean up client information
745         if (client != null) {
746             client.shutdown();
747             client = null;
748             target = null;
749         }
750
751         // log an error message
752         logger.error("Server failure: {} ({})", uuid, socketAddress);
753         for (Events listener : Events.getListeners()) {
754             listener.serverFailed(this);
755         }
756     }
757
758     /**
759      * Fetch, and possibily calculate, the "notify list" associated with this
760      * server. This is the list of servers to forward a server and bucket
761      * information to, and is approximately log2(n) in length, where 'n' is
762      * the total number of servers.
763      * It is calculated by starting with all of the servers sorted by UUID --
764      * let's say the current server is at position 's'. The notify list will
765      * contain the server at positions:
766      *     (s + 1) % n
767      *     (s + 2) % n
768      *     (s + 4) % n
769      *          ...
770      * Using all powers of 2 less than 'n'. If the total server count is 50,
771      * this list has 6 entries.
772      * @return the notify list
773      */
774     static Collection<Server> getNotifyList() {
775         // The 'notifyList' value is initially 'null', and it is reset to 'null'
776         // every time a new host joins, or one leaves. That way, it is marked for
777         // recalculation, but only when needed.
778         if (notifyList == null) {
779             // next index we are looking for
780             int dest = 1;
781
782             // our current position in the Server table -- starting at 'thisServer'
783             UUID current = thisServer.uuid;
784
785             // site socket address of 'current'
786             InetSocketAddress thisSiteSocketAddress = thisServer.siteSocketAddress;
787
788             // hash set of all site socket addresses located
789             HashSet<InetSocketAddress> siteSocketAddresses = new HashSet<>();
790             siteSocketAddresses.add(thisSiteSocketAddress);
791
792             // the list we are building
793             notifyList = new LinkedList<Server>();
794
795             int index = 1;
796             for ( ; ; ) {
797                 // move to the next key (UUID) -- if we hit the end of the table,
798                 // wrap to the beginning
799                 current = servers.higherKey(current);
800                 if (current == null) {
801                     current = servers.firstKey();
802                 }
803                 if (current.equals(thisServer.uuid)) {
804                     // we have looped through the entire list
805                     break;
806                 }
807
808                 // fetch associated server & site socket address
809                 Server server = servers.get(current);
810                 InetSocketAddress currentSiteSocketAddress =
811                     server.siteSocketAddress;
812
813                 if (Objects.equals(thisSiteSocketAddress,
814                                    currentSiteSocketAddress)) {
815                     // same site -- see if we should add this one
816                     if (index == dest) {
817                         // this is the next index we are looking for --
818                         // add the server
819                         notifyList.add(server);
820
821                         // advance to the next offset (current-offset * 2)
822                         dest = dest << 1;
823                     }
824                     index += 1;
825                 } else if (!siteSocketAddresses.contains(currentSiteSocketAddress)) {
826                     // we need at least one member from each site
827                     notifyList.add(server);
828                     siteSocketAddresses.add(currentSiteSocketAddress);
829                 }
830             }
831         }
832         return notifyList;
833     }
834
835     /**
836      * See if there is a host name associated with a destination socket address.
837      *
838      * @param dest the socket address of the destination
839      * @return the host name associated with the IP address, or the IP address
840      *     if no associated host name is found.
841      */
842     private static String socketAddressToName(InetSocketAddress dest) {
843         // destination IP address
844         InetAddress inetAddress = dest.getAddress();
845         String destName = null;
846
847         // go through the 'hostList' to see if there is a matching name
848         for (String hostName : hostList) {
849             try {
850                 if (inetAddress.equals(InetAddress.getByName(hostName))) {
851                     // this one matches -- use the name instead of the IP address
852                     destName = hostName;
853                     break;
854                 }
855             } catch (UnknownHostException e) {
856                 logger.debug("Server.socketAddressToName error", e);
857             }
858         }
859
860         // default name = string value of IP address
861         return destName == null ? inetAddress.getHostAddress() : destName;
862     }
863
864     /**
865      * Create an 'HttpClient' instance for a particular host.
866      *
867      * @param name of the host (currently a UUID or host:port string)
868      * @param dest the socket address of the destination
869      * @param destName the string name to use for the destination
870      */
871     static HttpClient buildClient(String name, InetSocketAddress dest, String destName)
872         throws KeyManagementException, NoSuchAlgorithmException,
873         ClassNotFoundException, HttpClientConfigException {
874
875         return HttpClientFactoryInstance.getClientFactory().build(
876             BusTopicParams.builder()
877                 .clientName(name)                               // name
878                 .useHttps(useHttps)                             // https
879                 .allowSelfSignedCerts(useSelfSignedCertificates)// selfSignedCerts
880                 .hostname(destName)                             // host
881                 .port(dest.getPort())                           // port
882                 .managed(false)                                 // managed
883                 .build());
884     }
885
886     /**
887      * Extract the 'WebTarget' information from the 'HttpClient'.
888      *
889      * @param client the associated HttpClient instance
890      * @return a WebTarget referring to the previously-specified 'baseUrl'
891      */
892     static WebTarget getTarget(HttpClient client)
893         throws NoSuchFieldException, IllegalAccessException {
894         // need access to the internal field 'client'
895         // TBD: We need a way to get this information without reflection
896         Field field = client.getClass().getDeclaredField("client");
897         field.setAccessible(true);
898         Client rsClient = (Client) field.get(client);
899         field.setAccessible(false);
900
901         rsClient.property(ClientProperties.CONNECT_TIMEOUT, connectTimeout);
902         rsClient.property(ClientProperties.READ_TIMEOUT, readTimeout);
903
904         // For performance reasons, the root 'WebTarget' is generated only once
905         // at initialization time for each remote host.
906         return rsClient.target(client.getBaseUrl());
907     }
908
909     /**
910      * This method may be invoked from any thread, and is used to send a
911      * message to the destination server associated with this 'Server' instance.
912      *
913      * @param path the path relative to the base URL
914      * @param entity the "request entity" containing the body of the
915      *     HTTP POST request
916      */
917     public void post(final String path, final Entity<?> entity) {
918         post(path, entity, null);
919     }
920
921     /**
922      * This method may be invoked from any thread, and is used to send a
923      * message to the destination server associated with this 'Server' instance.
924      *
925      * @param path the path relative to the base URL
926      * @param entity the "request entity" containing the body of the
927      *     HTTP POST request (if 'null', an HTTP GET is used instead)
928      * @param responseCallback if non-null, this callback may be used to
929      *     modify the WebTarget, and/or receive the POST response message
930      */
931     public void post(final String path, final Entity<?> entity,
932                      PostResponse responseCallback) {
933         if (target == null) {
934             return;
935         }
936
937         getThreadPool().execute(() -> {
938             /*
939              * This method is running within the 'MainLoop' thread.
940              */
941             try {
942                 WebTarget webTarget = target.path(path);
943                 if (responseCallback != null) {
944                     // give callback a chance to modify 'WebTarget'
945                     webTarget = responseCallback.webTarget(webTarget);
946
947                     // send the response to the callback
948                     Response response;
949                     if (entity == null) {
950                         response = webTarget.request().get();
951                     } else {
952                         response = webTarget.request().post(entity);
953                     }
954                     responseCallback.response(response);
955                 } else {
956                     // just do the invoke, and ignore the response
957                     if (entity == null) {
958                         webTarget.request().get();
959                     } else {
960                         webTarget.request().post(entity);
961                     }
962                 }
963             } catch (Exception e) {
964                 logger.error("Failed to send to {} ({}, {})",
965                              uuid, destSocketAddress, destName);
966                 if (responseCallback != null) {
967                     responseCallback.exceptionResponse(e);
968                 }
969                 // this runs in the 'MainLoop' thread
970
971                 // the DNS cache may have been out-of-date when this server
972                 // was first contacted -- fix the problem, if needed
973                 MainLoop.queueWork(this::checkServer);
974             }
975         });
976     }
977
978     /**
979      * This method may be invoked from any thread.
980      *
981      * @return the 'ThreadPoolExecutor' associated with this server
982      */
983     public synchronized ThreadPoolExecutor getThreadPool() {
984         if (sendThreadPool == null) {
985             // build a thread pool for this Server
986             sendThreadPool =
987                 new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
988                                        keepAliveTime, TimeUnit.MILLISECONDS,
989                                        new LinkedTransferQueue<>());
990             sendThreadPool.allowCoreThreadTimeOut(true);
991         }
992         return sendThreadPool;
993     }
994
995     /**
996      * Lower-level method supporting HTTP, which requires that the caller's
997      * thread tolerate blocking. This method may be called from any thread.
998      *
999      * @param path the path relative to the base URL
1000      * @return a 'WebTarget' instance pointing to this path
1001      */
1002     public WebTarget getWebTarget(String path) {
1003         return target == null ? null : target.path(path);
1004     }
1005
1006     /**
1007      * This method may be invoked from any thread, but its real intent is
1008      * to decode an incoming 'admin' message (which is Base-64-encoded),
1009      * and send it to the 'MainLoop' thread for processing.
1010      *
1011      * @param data the base-64-encoded data
1012      */
1013     static void adminRequest(byte[] data) {
1014         final byte[] packet = Base64.getDecoder().decode(data);
1015         Runnable task = () -> {
1016             try {
1017                 ByteArrayInputStream bis = new ByteArrayInputStream(packet);
1018                 DataInputStream dis = new DataInputStream(bis);
1019
1020                 while (dis.available() != 0) {
1021                     Server serverData = new Server(dis);
1022
1023                     // TBD: Compare with current server
1024
1025                     Server server = servers.get(serverData.uuid);
1026                     if (server == null) {
1027                         serverData.newServer();
1028                     } else {
1029                         server.updateServer(serverData);
1030                     }
1031                 }
1032             } catch (Exception e) {
1033                 logger.error("Server.adminRequest: can't decode packet", e);
1034             }
1035         };
1036         MainLoop.queueWork(task);
1037     }
1038
1039     /**
1040      * Send out information about servers 'updatedList' to all servers
1041      * in 'notifyList' (may need to build or rebuild 'notifyList').
1042      */
1043     static void sendOutData() throws IOException {
1044         // include 'thisServer' in the data -- first, advance the count
1045         thisServer.count += 1;
1046         if (thisServer.count == 0) {
1047             /*
1048              * counter wrapped (0 is a special case) --
1049              * actually, we could probably leave this out, because it would take
1050              * more than a century to wrap if the increment is 1 second
1051              */
1052             thisServer.count = 1;
1053         }
1054
1055         ByteArrayOutputStream bos = new ByteArrayOutputStream();
1056         DataOutputStream dos = new DataOutputStream(bos);
1057
1058         thisServer.lastUpdateTime = System.currentTimeMillis();
1059         thisServer.writeServerData(dos);
1060
1061         // include all hosts in the updated list
1062         for (Server server : updatedList) {
1063             server.writeServerData(dos);
1064         }
1065         updatedList.clear();
1066
1067         // create an 'Entity' that can be sent out to all hosts in the notify list
1068         Entity<String> entity = Entity.entity(
1069             new String(Base64.getEncoder().encode(bos.toByteArray()), StandardCharsets.UTF_8),
1070             MediaType.APPLICATION_OCTET_STREAM_TYPE);
1071         for (Server server : getNotifyList()) {
1072             server.post("admin", entity);
1073         }
1074     }
1075
1076     /**
1077      * Search for servers which have taken too long to respond.
1078      */
1079     static void searchForFailedServers() {
1080         long currentTime = System.currentTimeMillis();
1081
1082         // used to build a list of newly-failed servers
1083         LinkedList<Server> failed = new LinkedList<>();
1084         for (Server server : servers.values()) {
1085             if (server == thisServer) {
1086                 continue;
1087             }
1088             long gap = currentTime - server.lastUpdateTime;
1089             if (gap > server.allowedGap) {
1090                 // add it to the failed list -- we don't call 'serverFailed' yet,
1091                 // because this updates the server list, and leads to a
1092                 // 'ConcurrentModificationException'
1093                 failed.add(server);
1094             }
1095         }
1096
1097         // remove servers from our list
1098         if (!failed.isEmpty()) {
1099             for (Server server : failed) {
1100                 server.serverFailed();
1101             }
1102             notifyList = null;
1103         }
1104     }
1105
1106     /**
1107      * This method may be invoked from any thread:
1108      * Send information about 'thisServer' to the list of hosts.
1109      *
1110      * @param out the 'PrintStream' to use for displaying information
1111      * @param hosts a comma-separated list of entries containing
1112      *     'host:port' or just 'port' (current host is implied in this case)
1113      */
1114     static void pingHosts(PrintStream out, String hosts) {
1115         LinkedList<InetSocketAddress> addresses = new LinkedList<>();
1116         boolean error = false;
1117
1118         for (String host : hosts.split(",")) {
1119             try {
1120                 String[] segs = host.split(":");
1121
1122                 switch (segs.length) {
1123                     case 1:
1124                         addresses.add(new InetSocketAddress(InetAddress.getLocalHost(),
1125                                 Integer.parseInt(segs[0])));
1126                         break;
1127                     case 2:
1128                         addresses.add(new InetSocketAddress(segs[0],
1129                                 Integer.parseInt(segs[1])));
1130                         break;
1131                     default:
1132                         out.println(host + ": Invalid host/port value");
1133                         error = true;
1134                         break;
1135                 }
1136             } catch (NumberFormatException e) {
1137                 out.println(host + ": Invalid port value");
1138                 logger.error(PINGHOSTS_ERROR, e);
1139                 error = true;
1140             } catch (UnknownHostException e) {
1141                 out.println(host + ": Unknown host");
1142                 logger.error(PINGHOSTS_ERROR, e);
1143                 error = true;
1144             }
1145         }
1146         if (!error) {
1147             pingHosts(out, addresses);
1148         }
1149     }
1150
1151     /**
1152      * This method may be invoked from any thread:
1153      * Send information about 'thisServer' to the list of hosts.
1154      *
1155      * @param out the 'PrintStream' to use for displaying information
1156      * @param hosts a collection of 'InetSocketAddress' instances, which are
1157      *     the hosts to send the information to
1158      */
1159     static void pingHosts(final PrintStream out,
1160                           final Collection<InetSocketAddress> hosts) {
1161         FutureTask<Integer> ft = new FutureTask<>(() -> {
1162             ByteArrayOutputStream bos = new ByteArrayOutputStream();
1163             DataOutputStream dos = new DataOutputStream(bos);
1164
1165             // add information for this server only
1166             try {
1167                 thisServer.writeServerData(dos);
1168
1169                 // create an 'Entity' that can be sent out to all hosts
1170                 Entity<String> entity = Entity.entity(
1171                     new String(Base64.getEncoder().encode(bos.toByteArray()),
1172                         StandardCharsets.UTF_8),
1173                     MediaType.APPLICATION_OCTET_STREAM_TYPE);
1174                 pingHostsLoop(entity, out, hosts);
1175             } catch (IOException e) {
1176                 out.println("Unable to generate 'ping' data: " + e);
1177                 logger.error(PINGHOSTS_ERROR, e);
1178             }
1179             return 0;
1180         });
1181
1182         MainLoop.queueWork(ft);
1183         try {
1184             ft.get(60, TimeUnit.SECONDS);
1185         } catch (InterruptedException e) {
1186             logger.error("Server.pingHosts: interrupted waiting for queued work", e);
1187             Thread.currentThread().interrupt();
1188         } catch (ExecutionException | TimeoutException e) {
1189             logger.error("Server.pingHosts: error waiting for queued work", e);
1190         }
1191     }
1192
1193     /**
1194      * This method is used for pingHosts method to reduce its Cognitive Complexity.
1195      *
1196      * @param entity for sending out to all hosts
1197      * @param out the 'PrintStream' to use for displaying information
1198      * @param hosts a collection of 'InetSocketAddress' instances, which are
1199      *     the hosts to send the information to
1200      */
1201     static void pingHostsLoop(final Entity<String> entity,
1202                               final PrintStream out,
1203                               final Collection<InetSocketAddress> hosts) {
1204         // loop through hosts
1205         for (InetSocketAddress host : hosts) {
1206             HttpClient httpClient = null;
1207
1208             try {
1209                 httpClient = buildClient(host.toString(), host,
1210                                      socketAddressToName(host));
1211                 getTarget(httpClient).path("admin").request().post(entity);
1212                 httpClient.shutdown();
1213                 httpClient = null;
1214             } catch (KeyManagementException | NoSuchAlgorithmException e) {
1215                 out.println(host + ": Unable to create client connection");
1216                 logger.error(PINGHOSTS_ERROR, e);
1217             } catch (NoSuchFieldException | IllegalAccessException e) {
1218                 out.println(host + ": Unable to get link to target");
1219                 logger.error(PINGHOSTS_ERROR, e);
1220             } catch (Exception e) {
1221                 out.println(host + ": " + e);
1222                 logger.error(PINGHOSTS_ERROR, e);
1223             }
1224             if (httpClient != null) {
1225                 httpClient.shutdown();
1226             }
1227         }
1228     }
1229
1230     /**
1231      * This method may be invoked from any thread:
1232      * Dump out the current 'servers' table in a human-readable table form.
1233      *
1234      * @param out the 'PrintStream' to dump the table to
1235      */
1236     public static void dumpHosts(final PrintStream out) {
1237         FutureTask<Integer> ft = new FutureTask<>(() -> {
1238             dumpHostsInternal(out);
1239             return 0;
1240         });
1241         MainLoop.queueWork(ft);
1242         try {
1243             ft.get(60, TimeUnit.SECONDS);
1244         } catch (InterruptedException e) {
1245             logger.error("Server.dumpHosts: interrupted waiting for queued work", e);
1246             Thread.currentThread().interrupt();
1247         } catch (ExecutionException | TimeoutException e) {
1248             logger.error("Server.dumpHosts: error waiting for queued work", e);
1249         }
1250     }
1251
1252     /**
1253      * Dump out the current 'servers' table in a human-readable table form.
1254      *
1255      * @param out the 'PrintStream' to dump the table to
1256      */
1257     private static void dumpHostsInternal(PrintStream out) {
1258         // modifications to 'servers.values()' and 'notifyList'.
1259         HashSet<Server> localNotifyList = new HashSet<>(getNotifyList());
1260
1261         // see if we have any site information
1262         boolean siteData = false;
1263         for (Server server : servers.values()) {
1264             if (server.siteSocketAddress != null) {
1265                 siteData = true;
1266                 break;
1267             }
1268         }
1269
1270         String format = "%1s %-36s %-15s %5s %5s %12s %7s %7s\n";
1271         SimpleDateFormat dateFormat = new SimpleDateFormat("kk:mm:ss.SSS");
1272
1273         if (siteData) {
1274             format = "%1s %-36s %-15s %5s %-15s %5s %5s %12s %7s %7s\n";
1275             // @formatter:off
1276             out.printf(format, "", "UUID", "IP Address", "Port",
1277                        "Site IP Address", "Port",
1278                        "Count", "Update Time", "Elapsed", "Allowed");
1279             out.printf(format, "", "----", "----------", "----",
1280                        "---------------", "----",
1281                        "-----", "-----------", PRINTOUT_DASHES, PRINTOUT_DASHES);
1282             // @formatter:on
1283         } else {
1284             // @formatter:off
1285             out.printf(format, "", "UUID", "IP Address", "Port",
1286                        "Count", "Update Time", "Elapsed", "Allowed");
1287             out.printf(format, "", "----", "----------", "----",
1288                        "-----", "-----------", PRINTOUT_DASHES, PRINTOUT_DASHES);
1289             // @formatter:on
1290         }
1291
1292         long currentTime = System.currentTimeMillis();
1293         for (Server server : servers.values()) {
1294             String thisOne = "";
1295
1296             if (server == thisServer) {
1297                 thisOne = "*";
1298             } else if (localNotifyList.contains(server)) {
1299                 thisOne = "n";
1300             }
1301
1302             if (siteData) {
1303                 String siteIp = "";
1304                 String sitePort = "";
1305                 if (server.siteSocketAddress != null) {
1306                     siteIp =
1307                         server.siteSocketAddress.getAddress().getHostAddress();
1308                     sitePort = String.valueOf(server.siteSocketAddress.getPort());
1309                 }
1310
1311                 out.printf(format, thisOne, server.uuid,
1312                            server.socketAddress.getAddress().getHostAddress(),
1313                            server.socketAddress.getPort(),
1314                            siteIp, sitePort, server.count,
1315                            dateFormat.format(new Date(server.lastUpdateTime)),
1316                            currentTime - server.lastUpdateTime,
1317                            server.allowedGap);
1318             } else {
1319                 out.printf(format, thisOne, server.uuid,
1320                            server.socketAddress.getAddress().getHostAddress(),
1321                            server.socketAddress.getPort(), server.count,
1322                            dateFormat.format(new Date(server.lastUpdateTime)),
1323                            currentTime - server.lastUpdateTime,
1324                            server.allowedGap);
1325             }
1326         }
1327         out.println("Count: " + servers.size());
1328     }
1329
1330     /* ============================================================ */
1331
1332     /**
1333      * This interface supports the 'post' method, and provides the opportunity
1334      * to change the WebTarget and/or receive the POST response message.
1335      */
1336     interface PostResponse {
1337         /**
1338          * Callback that can be used to modify 'WebTarget', and do things like
1339          * add query parameters.
1340          *
1341          * @param webTarget the current WebTarget
1342          * @return the updated WebTarget
1343          */
1344         public default WebTarget webTarget(WebTarget webTarget) {
1345             return webTarget;
1346         }
1347
1348         /**
1349          * Callback that passes the POST response.
1350          *
1351          * @param response the POST response
1352          */
1353         public default void response(Response response) {
1354         }
1355
1356         /**
1357          * Callback that passes the POST exception response.
1358          *
1359          */
1360         public default void exceptionResponse(Exception exception) {
1361             Response.ResponseBuilder response;
1362             response = Response.serverError();
1363             this.response(response.build());
1364         }
1365     }
1366 }