2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.serverpool;
23 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_HTTPS;
24 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SELF_SIGNED_CERTIFICATES;
25 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_ADAPTIVE_GAP_ADJUST;
26 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_CONNECT_TIMEOUT;
27 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_INITIAL_ALLOWED_GAP;
28 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_IP_ADDRESS;
29 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_PORT;
30 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_READ_TIMEOUT;
31 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_THREADS_CORE_POOL_SIZE;
32 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_THREADS_KEEP_ALIVE_TIME;
33 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_THREADS_MAXIMUM_POOL_SIZE;
34 import static org.onap.policy.drools.serverpool.ServerPoolProperties.HOST_LIST;
35 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_ADAPTIVE_GAP_ADJUST;
36 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_CONNECT_TIMEOUT;
37 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_HTTPS;
38 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_INITIAL_ALLOWED_GAP;
39 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_IP_ADDRESS;
40 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_PORT;
41 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_READ_TIMEOUT;
42 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_SELF_SIGNED_CERTIFICATES;
43 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_THREADS_CORE_POOL_SIZE;
44 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_THREADS_KEEP_ALIVE_TIME;
45 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_THREADS_MAXIMUM_POOL_SIZE;
46 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SITE_IP_ADDRESS;
47 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SITE_PORT;
48 import static org.onap.policy.drools.serverpool.ServerPoolProperties.getProperty;
50 import java.io.ByteArrayInputStream;
51 import java.io.ByteArrayOutputStream;
52 import java.io.DataInputStream;
53 import java.io.DataOutputStream;
54 import java.io.IOException;
55 import java.io.PrintStream;
56 import java.io.StringReader;
57 import java.lang.reflect.Field;
58 import java.net.InetAddress;
59 import java.net.InetSocketAddress;
60 import java.net.UnknownHostException;
61 import java.nio.charset.StandardCharsets;
62 import java.security.KeyManagementException;
63 import java.security.NoSuchAlgorithmException;
64 import java.text.SimpleDateFormat;
65 import java.util.Base64;
66 import java.util.Collection;
67 import java.util.Date;
68 import java.util.HashSet;
69 import java.util.LinkedList;
70 import java.util.Objects;
71 import java.util.Properties;
72 import java.util.TreeMap;
73 import java.util.TreeSet;
74 import java.util.UUID;
75 import java.util.concurrent.ExecutionException;
76 import java.util.concurrent.FutureTask;
77 import java.util.concurrent.LinkedTransferQueue;
78 import java.util.concurrent.ThreadPoolExecutor;
79 import java.util.concurrent.TimeUnit;
80 import java.util.concurrent.TimeoutException;
81 import javax.ws.rs.client.Client;
82 import javax.ws.rs.client.Entity;
83 import javax.ws.rs.client.WebTarget;
84 import javax.ws.rs.core.MediaType;
85 import javax.ws.rs.core.Response;
86 import org.glassfish.jersey.client.ClientProperties;
87 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
88 import org.onap.policy.common.endpoints.http.client.HttpClient;
89 import org.onap.policy.common.endpoints.http.client.HttpClientConfigException;
90 import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
91 import org.onap.policy.common.endpoints.http.server.HttpServletServer;
92 import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
93 import org.onap.policy.drools.utils.PropertyUtil;
94 import org.slf4j.Logger;
95 import org.slf4j.LoggerFactory;
97 public class Server implements Comparable<Server> {
98 private static Logger logger = LoggerFactory.getLogger(Server.class);
100 // maps UUID to Server object for all known servers
101 private static TreeMap<UUID, Server> servers =
102 new TreeMap<>(Util.uuidComparator);
104 // maps UUID to Server object for all failed servers
105 // (so they aren't accidentally restored, due to updates from other hosts)
106 private static TreeMap<UUID, Server> failedServers =
107 new TreeMap<>(Util.uuidComparator);
109 // subset of servers to be notified (null means it needs to be rebuilt)
110 private static LinkedList<Server> notifyList = null;
112 // data to be sent out to notify list
113 private static TreeSet<Server> updatedList = new TreeSet<>();
115 // the server associated with the current host
116 private static Server thisServer = null;
118 // the current REST server
119 private static HttpServletServer restServer;
121 /*==================================================*/
122 /* Some properties extracted at initialization time */
123 /*==================================================*/
125 // initial value of gap to allow between pings
126 private static long initialAllowedGap;
128 // used in adaptive calculation of allowed gap between pings
129 private static long adaptiveGapAdjust;
131 // time to allow for TCP connect (long)
132 private static String connectTimeout;
134 // time to allow before TCP read timeout (long)
135 private static String readTimeout;
137 // outgoing per-server thread pool parameters
138 private static int corePoolSize;
139 private static int maximumPoolSize;
140 private static long keepAliveTime;
142 // https-related parameters
143 private static boolean useHttps;
144 private static boolean useSelfSignedCertificates;
146 // list of remote host names
147 private static String[] hostList = new String[0];
149 /*=========================================================*/
150 /* Fields included in every 'ping' message between servers */
151 /*=========================================================*/
153 // unique id for this server
156 // counter periodically incremented to indicate the server is "alive"
159 // 16 byte MD5 checksum over additional data that is NOT included in
160 // every 'ping' message -- used to determine whether the data is up-to-date
161 private byte[] checksum;
163 /*========================================================================*/
164 /* The following data is included in the checksum, and doesn't change too */
165 /* frequently (some fields may change as servers go up and down) */
166 /*========================================================================*/
168 // IP address and port of listener
169 private InetSocketAddress socketAddress;
171 // site IP address and port
172 private InetSocketAddress siteSocketAddress = null;
174 /*============================================*/
175 /* Local information not included in checksum */
176 /*============================================*/
178 // destination socket information
179 private InetSocketAddress destSocketAddress;
180 private String destName;
182 // REST client fields
183 private HttpClient client;
184 private WebTarget target;
185 private ThreadPoolExecutor sendThreadPool = null;
187 // time when the 'count' field was last updated
188 private long lastUpdateTime;
190 // calculated field indicating the maximum time between updates
191 private long allowedGap = initialAllowedGap;
193 // indicates whether the 'Server' instance is active or not (synchronized)
194 private boolean active = true;
197 * Tags for encoding of server data
199 static final int END_OF_PARAMETERS_TAG = 0;
200 static final int SOCKET_ADDRESS_TAG = 1;
201 static final int SITE_SOCKET_ADDRESS_TAG = 2;
204 static final String PINGHOSTS_ERROR = "Server.pingHosts error";
206 // a string for print
207 static final String PRINTOUT_DASHES = "-------";
209 /*==============================*/
210 /* Comparable<Server> interface */
211 /*==============================*/
214 * Compare this instance to another one by comparing the 'uuid' field.
217 public int compareTo(Server other) {
218 return Util.uuidComparator.compare(uuid, other.uuid);
222 public int hashCode() {
223 return Objects.hash(uuid);
227 public boolean equals(Object obj) {
231 if (!(obj instanceof Server)) {
234 Server other = (Server) obj;
235 return Objects.equals(uuid, other.uuid);
239 * This method may be invoked from any thread, and is used as the main
240 * entry point when testing.
242 * @param args arguments contaning an '=' character are intepreted as
243 * a property, other arguments are presumed to be a property file.
245 public static void main(String[] args) throws IOException {
246 Properties prop = new Properties();
248 for (String arg : args) {
249 // arguments with an '=' in them are a property definition;
250 // otherwise, they are a properties file name
252 if (arg.contains("=")) {
253 prop.load(new StringReader(arg));
255 prop.putAll(PropertyUtil.getProperties(arg));
259 String rval = startup(prop);
261 logger.error("Server.startup failed: {}", rval);
266 * This method may be invoked from any thread, and performs initialization.
268 * @param propertiesFile the name of a property file
270 public static String startup(String propertiesFile) {
271 Properties properties;
273 properties = PropertyUtil.getProperties(propertiesFile);
274 } catch (IOException e) {
275 logger.error("Server.startup: exception reading properties", e);
276 properties = new Properties();
278 return startup(properties);
282 * This method may be invoked from any thread, and performs initialization.
284 * @param properties contains properties used by the server
286 public static String startup(Properties properties) {
287 ServerPoolProperties.setProperties(properties);
288 logger.info("startup: properties={}", properties);
290 // fetch some static properties
291 initialAllowedGap = getProperty(SERVER_INITIAL_ALLOWED_GAP,
292 DEFAULT_SERVER_INITIAL_ALLOWED_GAP);
293 adaptiveGapAdjust = getProperty(SERVER_ADAPTIVE_GAP_ADJUST,
294 DEFAULT_SERVER_ADAPTIVE_GAP_ADJUST);
296 String.valueOf(getProperty(SERVER_CONNECT_TIMEOUT,
297 DEFAULT_SERVER_CONNECT_TIMEOUT));
298 readTimeout = String.valueOf(getProperty(SERVER_READ_TIMEOUT,
299 DEFAULT_SERVER_READ_TIMEOUT));
300 corePoolSize = getProperty(SERVER_THREADS_CORE_POOL_SIZE,
301 DEFAULT_SERVER_THREADS_CORE_POOL_SIZE);
302 maximumPoolSize = getProperty(SERVER_THREADS_MAXIMUM_POOL_SIZE,
303 DEFAULT_SERVER_THREADS_MAXIMUM_POOL_SIZE);
304 keepAliveTime = getProperty(SERVER_THREADS_KEEP_ALIVE_TIME,
305 DEFAULT_SERVER_THREADS_KEEP_ALIVE_TIME);
306 useHttps = getProperty(SERVER_HTTPS, DEFAULT_HTTPS);
307 useSelfSignedCertificates = getProperty(SERVER_SELF_SIGNED_CERTIFICATES,
308 DEFAULT_SELF_SIGNED_CERTIFICATES);
309 String hostListNames = getProperty(HOST_LIST, null);
310 if (hostListNames != null) {
311 hostList = hostListNames.split(",");
314 String possibleError = null;
316 // fetch server information
317 String ipAddressString =
318 getProperty(SERVER_IP_ADDRESS, DEFAULT_SERVER_IP_ADDRESS);
319 int port = getProperty(SERVER_PORT, DEFAULT_SERVER_PORT);
321 possibleError = "Unknown Host: " + ipAddressString;
322 InetAddress address = InetAddress.getByName(ipAddressString);
323 InetSocketAddress socketAddress = new InetSocketAddress(address, port);
325 possibleError = "HTTP server initialization error";
326 restServer = HttpServletServerFactoryInstance.getServerFactory().build(
327 "SERVER-POOL", // name
329 socketAddress.getAddress().getHostAddress(), // host (maybe 0.0.0.0)
330 port, // port (can no longer be 0)
334 restServer.addServletClass(null, RestServerPool.class.getName());
336 // add any additional servlets
337 for (ServerPoolApi feature : ServerPoolApi.impl.getList()) {
338 Collection<Class<?>> classes = feature.servletClasses();
339 if (classes != null) {
340 for (Class<?> clazz : classes) {
341 restServer.addServletClass(null, clazz.getName());
346 // we may not know the port until after the server is started
347 possibleError = "HTTP server start error";
349 possibleError = null;
351 // determine the address to use
352 if (DEFAULT_SERVER_IP_ADDRESS.contentEquals(address.getHostAddress())) {
353 address = InetAddress.getLocalHost();
356 thisServer = new Server(new InetSocketAddress(address, port));
358 // TBD: is this really appropriate?
359 thisServer.newServer();
361 // start background thread
362 MainLoop.startThread();
363 MainLoop.queueWork(() -> {
364 // run this in the 'MainLoop' thread
368 logger.info("Listening on port {}", port);
371 } catch (UnknownHostException e) {
372 logger.error("Server.startup: exception start server", e);
373 if (possibleError == null) {
374 possibleError = e.toString();
376 return possibleError;
381 * Shut down all threads associate with server pool.
383 public static void shutdown() {
384 Discovery.stopDiscovery();
385 MainLoop.stopThread();
386 TargetLock.shutdown();
389 HashSet<Server> allServers = new HashSet<>();
390 allServers.addAll(servers.values());
391 allServers.addAll(failedServers.values());
393 for (Server server : allServers) {
394 if (server.sendThreadPool != null) {
395 server.sendThreadPool.shutdown();
398 if (restServer != null) {
399 restServer.shutdown();
404 * Return the Server instance associated with the current host.
406 * @return the Server instance associated with the current host
408 public static Server getThisServer() {
413 * Return the first Server instance in the 'servers' list.
415 * @return the first Server instance in the 'servers' list
416 * (the one with the lowest UUID)
418 public static Server getFirstServer() {
419 return servers.firstEntry().getValue();
423 * Lookup a Server instance associated with a UUID.
425 * @param uuid the key to the lookup
426 @ @return the associated 'Server' instance, or 'null' if none
428 public static Server getServer(UUID uuid) {
429 return servers.get(uuid);
433 * Return a count of the number of servers.
435 * @return a count of the number of servers
437 public static int getServerCount() {
438 return servers.size();
442 * Return the complete list of servers.
444 * @return the complete list of servers
446 public static Collection<Server> getServers() {
447 return servers.values();
451 * This method is invoked from the 'startup' thread, and creates a new
452 * 'Server' instance for the current server.
454 * @param socketAddress the IP address and port the listener is bound to
456 private Server(InetSocketAddress socketAddress) {
457 this.uuid = UUID.randomUUID();
459 this.socketAddress = socketAddress;
460 this.lastUpdateTime = System.currentTimeMillis();
464 String siteIp = getProperty(SITE_IP_ADDRESS, null);
465 int sitePort = getProperty(SITE_PORT, 0);
466 if (siteIp != null && sitePort != 0) {
467 // we do have site information specified
469 siteSocketAddress = new InetSocketAddress(siteIp, sitePort);
470 if (siteSocketAddress.getAddress() == null) {
471 logger.error("Couldn't resolve site address: {}", siteIp);
472 siteSocketAddress = null;
474 } catch (IllegalArgumentException e) {
475 logger.error("Illegal 'siteSocketAddress'", e);
476 siteSocketAddress = null;
480 // TBD: calculate checksum
484 * Initialize a 'Server' instance from a 'DataInputStream'. If it is new,
485 * it may get inserted in the table. If it is an update, fields in an
486 * existing 'Server' may be updated.
488 * @param is the 'DataInputStream'
490 Server(DataInputStream is) throws IOException {
491 // read in 16 byte UUID
492 uuid = Util.readUuid(is);
494 // read in 4 byte counter value
495 count = is.readInt();
497 // read in 16 byte MD5 checksum
498 checksum = new byte[16];
499 is.readFully(checksum);
501 // optional parameters
503 while ((tag = is.readUnsignedByte()) != END_OF_PARAMETERS_TAG) {
505 case SOCKET_ADDRESS_TAG:
506 socketAddress = readSocketAddress(is);
508 case SITE_SOCKET_ADDRESS_TAG:
509 siteSocketAddress = readSocketAddress(is);
513 logger.error("Illegal tag: {}", tag);
520 * Read an 'InetSocketAddress' from a 'DataInputStream'.
522 * @param is the 'DataInputStream'
523 * @return the 'InetSocketAddress'
525 private static InetSocketAddress readSocketAddress(DataInputStream is) throws IOException {
527 byte[] ipAddress = new byte[4];
528 is.read(ipAddress, 0, 4);
529 int port = is.readUnsignedShort();
530 return new InetSocketAddress(InetAddress.getByAddress(ipAddress), port);
537 public String toString() {
538 return "Server[" + uuid + "]";
542 * Return the UUID associated with this Server.
544 * @return the UUID associated with this Server
546 public UUID getUuid() {
551 * Return the external InetSocketAddress of the site.
553 * @return the external InetSocketAddress of the site
554 * ('null' if it doesn't exist)
556 public InetSocketAddress getSiteSocketAddress() {
557 return siteSocketAddress;
561 * This method may be called from any thread.
563 * @return 'true' if the this server is active, and 'false' if not
565 public synchronized boolean isActive() {
570 * This method writes out the data associated with the current Server
573 * @param os outout stream that should receive the data
575 void writeServerData(DataOutputStream os) throws IOException {
576 // write out 16 byte UUID
577 Util.writeUuid(os, uuid);
579 // write out 4 byte counter value
582 // write out 16 byte MD5 checksum
583 // TBD: should this be implemented?
584 os.write(checksum == null ? new byte[16] : checksum);
586 if (socketAddress != null) {
587 // write out socket address
588 os.writeByte(SOCKET_ADDRESS_TAG);
589 os.write(socketAddress.getAddress().getAddress(), 0, 4);
590 os.writeShort(socketAddress.getPort());
593 if (siteSocketAddress != null) {
594 // write out socket address
595 os.writeByte(SITE_SOCKET_ADDRESS_TAG);
596 os.write(siteSocketAddress.getAddress().getAddress(), 0, 4);
597 os.writeShort(siteSocketAddress.getPort());
600 os.writeByte(END_OF_PARAMETERS_TAG);
604 * Do any processing needed to create a new server. This method is invoked
605 * from the 'MainLoop' thread in every case except for the current server,
606 * in which case it is invoked in 'startup' prior to creating 'MainLoop'.
608 private void newServer() {
609 Server failed = failedServers.get(uuid);
610 if (failed != null) {
611 // this one is on the failed list -- see if the counter has advanced
612 if ((count - failed.count) <= 0) {
613 // the counter has not advanced -- ignore
617 // the counter has advanced -- somehow, this server has returned
618 failedServers.remove(uuid);
619 synchronized (this) {
622 logger.error("Server reawakened: {} ({})", uuid, socketAddress);
625 lastUpdateTime = System.currentTimeMillis();
626 servers.put(uuid, this);
627 updatedList.add(this);
629 // notify list will need to be rebuilt
632 if (socketAddress != null && this != thisServer) {
633 // initialize 'client' and 'target' fields
634 if (siteSocketAddress != null
635 && !siteSocketAddress.equals(thisServer.siteSocketAddress)) {
636 // destination is on a remote site
637 destSocketAddress = siteSocketAddress;
639 // destination is on the local site -- use direct addressing
640 destSocketAddress = socketAddress;
642 destName = socketAddressToName(destSocketAddress);
644 // 'client' is used for REST messages to the destination
645 client = buildClient(uuid.toString(), destSocketAddress, destName);
647 // initialize the 'target' field
648 target = getTarget(client);
649 } catch (KeyManagementException | NoSuchAlgorithmException
650 | NoSuchFieldException | IllegalAccessException
651 | ClassNotFoundException | HttpClientConfigException e) {
652 logger.error("Server.newServer: problems creating 'client'", e);
655 logger.info("New server: {} ({})", uuid, socketAddress);
656 for (Events listener : Events.getListeners()) {
657 listener.newServer(this);
662 * Check the server state in response to some issue. At present, only the
663 * 'destName' information is checked.
665 private void checkServer() {
666 // recalculate 'destName' (we have seen DNS issues)
667 String newDestName = socketAddressToName(destSocketAddress);
668 if (newDestName.equals(destName)) {
671 logger.warn("Remote host name for {} has changed from {} to {}",
672 destSocketAddress, destName, newDestName);
674 // shut down old client, and rebuild
679 // update 'destName', and rebuild the client
680 destName = newDestName;
682 // 'client' is used for REST messages to the destination
683 client = buildClient(uuid.toString(), destSocketAddress, destName);
685 // initialize the 'target' field
686 target = getTarget(client);
687 } catch (KeyManagementException | NoSuchAlgorithmException
688 | NoSuchFieldException | IllegalAccessException
689 | ClassNotFoundException | HttpClientConfigException e) {
690 logger.error("Server.checkServer: problems recreating 'client'", e);
695 * Update server data.
697 * @param serverData this is a temporary 'Server' instance created from
698 * an incoming message, which is used to update fields within the
699 * 'Server' instance identified by 'this'
701 private void updateServer(Server serverData) {
702 if (serverData.count > count) {
703 // an update has occurred
704 count = serverData.count;
706 // TBD: calculate and verify checksum, more fields may be updated
708 // adjust 'allowedGap' accordingly
709 long currentTime = System.currentTimeMillis();
710 long gap = currentTime - lastUpdateTime;
712 // adjust 'allowedGap' accordingly
713 // TBD: need properties to support overrides
714 gap = gap * 3 / 2 + adaptiveGapAdjust;
715 if (gap > allowedGap) {
716 // update 'allowedGap' immediately
719 // gradually pull the allowed gap down
720 // TBD: need properties to support overrides
721 allowedGap = (allowedGap * 15 + gap) / 16;
723 lastUpdateTime = currentTime;
725 updatedList.add(this);
730 * a server has failed.
732 private void serverFailed() {
734 synchronized (this) {
738 // remove it from the table
739 servers.remove(uuid);
741 // add it to the failed servers table
742 failedServers.put(uuid, this);
744 // clean up client information
745 if (client != null) {
751 // log an error message
752 logger.error("Server failure: {} ({})", uuid, socketAddress);
753 for (Events listener : Events.getListeners()) {
754 listener.serverFailed(this);
759 * Fetch, and possibily calculate, the "notify list" associated with this
760 * server. This is the list of servers to forward a server and bucket
761 * information to, and is approximately log2(n) in length, where 'n' is
762 * the total number of servers.
763 * It is calculated by starting with all of the servers sorted by UUID --
764 * let's say the current server is at position 's'. The notify list will
765 * contain the server at positions:
770 * Using all powers of 2 less than 'n'. If the total server count is 50,
771 * this list has 6 entries.
772 * @return the notify list
774 static Collection<Server> getNotifyList() {
775 // The 'notifyList' value is initially 'null', and it is reset to 'null'
776 // every time a new host joins, or one leaves. That way, it is marked for
777 // recalculation, but only when needed.
778 if (notifyList == null) {
779 // next index we are looking for
782 // our current position in the Server table -- starting at 'thisServer'
783 UUID current = thisServer.uuid;
785 // site socket address of 'current'
786 InetSocketAddress thisSiteSocketAddress = thisServer.siteSocketAddress;
788 // hash set of all site socket addresses located
789 HashSet<InetSocketAddress> siteSocketAddresses = new HashSet<>();
790 siteSocketAddresses.add(thisSiteSocketAddress);
792 // the list we are building
793 notifyList = new LinkedList<Server>();
797 // move to the next key (UUID) -- if we hit the end of the table,
798 // wrap to the beginning
799 current = servers.higherKey(current);
800 if (current == null) {
801 current = servers.firstKey();
803 if (current.equals(thisServer.uuid)) {
804 // we have looped through the entire list
808 // fetch associated server & site socket address
809 Server server = servers.get(current);
810 InetSocketAddress currentSiteSocketAddress =
811 server.siteSocketAddress;
813 if (Objects.equals(thisSiteSocketAddress,
814 currentSiteSocketAddress)) {
815 // same site -- see if we should add this one
817 // this is the next index we are looking for --
819 notifyList.add(server);
821 // advance to the next offset (current-offset * 2)
825 } else if (!siteSocketAddresses.contains(currentSiteSocketAddress)) {
826 // we need at least one member from each site
827 notifyList.add(server);
828 siteSocketAddresses.add(currentSiteSocketAddress);
836 * See if there is a host name associated with a destination socket address.
838 * @param dest the socket address of the destination
839 * @return the host name associated with the IP address, or the IP address
840 * if no associated host name is found.
842 private static String socketAddressToName(InetSocketAddress dest) {
843 // destination IP address
844 InetAddress inetAddress = dest.getAddress();
845 String destName = null;
847 // go through the 'hostList' to see if there is a matching name
848 for (String hostName : hostList) {
850 if (inetAddress.equals(InetAddress.getByName(hostName))) {
851 // this one matches -- use the name instead of the IP address
855 } catch (UnknownHostException e) {
856 logger.debug("Server.socketAddressToName error", e);
860 // default name = string value of IP address
861 return destName == null ? inetAddress.getHostAddress() : destName;
865 * Create an 'HttpClient' instance for a particular host.
867 * @param name of the host (currently a UUID or host:port string)
868 * @param dest the socket address of the destination
869 * @param destName the string name to use for the destination
871 static HttpClient buildClient(String name, InetSocketAddress dest, String destName)
872 throws KeyManagementException, NoSuchAlgorithmException,
873 ClassNotFoundException, HttpClientConfigException {
875 return HttpClientFactoryInstance.getClientFactory().build(
876 BusTopicParams.builder()
877 .clientName(name) // name
878 .useHttps(useHttps) // https
879 .allowSelfSignedCerts(useSelfSignedCertificates)// selfSignedCerts
880 .hostname(destName) // host
881 .port(dest.getPort()) // port
882 .managed(false) // managed
887 * Extract the 'WebTarget' information from the 'HttpClient'.
889 * @param client the associated HttpClient instance
890 * @return a WebTarget referring to the previously-specified 'baseUrl'
892 static WebTarget getTarget(HttpClient client)
893 throws NoSuchFieldException, IllegalAccessException {
894 // need access to the internal field 'client'
895 // TBD: We need a way to get this information without reflection
896 Field field = client.getClass().getDeclaredField("client");
897 field.setAccessible(true);
898 Client rsClient = (Client) field.get(client);
899 field.setAccessible(false);
901 rsClient.property(ClientProperties.CONNECT_TIMEOUT, connectTimeout);
902 rsClient.property(ClientProperties.READ_TIMEOUT, readTimeout);
904 // For performance reasons, the root 'WebTarget' is generated only once
905 // at initialization time for each remote host.
906 return rsClient.target(client.getBaseUrl());
910 * This method may be invoked from any thread, and is used to send a
911 * message to the destination server associated with this 'Server' instance.
913 * @param path the path relative to the base URL
914 * @param entity the "request entity" containing the body of the
917 public void post(final String path, final Entity<?> entity) {
918 post(path, entity, null);
922 * This method may be invoked from any thread, and is used to send a
923 * message to the destination server associated with this 'Server' instance.
925 * @param path the path relative to the base URL
926 * @param entity the "request entity" containing the body of the
927 * HTTP POST request (if 'null', an HTTP GET is used instead)
928 * @param responseCallback if non-null, this callback may be used to
929 * modify the WebTarget, and/or receive the POST response message
931 public void post(final String path, final Entity<?> entity,
932 PostResponse responseCallback) {
933 if (target == null) {
937 getThreadPool().execute(() -> {
939 * This method is running within the 'MainLoop' thread.
942 WebTarget webTarget = target.path(path);
943 if (responseCallback != null) {
944 // give callback a chance to modify 'WebTarget'
945 webTarget = responseCallback.webTarget(webTarget);
947 // send the response to the callback
949 if (entity == null) {
950 response = webTarget.request().get();
952 response = webTarget.request().post(entity);
954 responseCallback.response(response);
956 // just do the invoke, and ignore the response
957 if (entity == null) {
958 webTarget.request().get();
960 webTarget.request().post(entity);
963 } catch (Exception e) {
964 logger.error("Failed to send to {} ({}, {})",
965 uuid, destSocketAddress, destName);
966 if (responseCallback != null) {
967 responseCallback.exceptionResponse(e);
969 // this runs in the 'MainLoop' thread
971 // the DNS cache may have been out-of-date when this server
972 // was first contacted -- fix the problem, if needed
973 MainLoop.queueWork(this::checkServer);
979 * This method may be invoked from any thread.
981 * @return the 'ThreadPoolExecutor' associated with this server
983 public synchronized ThreadPoolExecutor getThreadPool() {
984 if (sendThreadPool == null) {
985 // build a thread pool for this Server
987 new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
988 keepAliveTime, TimeUnit.MILLISECONDS,
989 new LinkedTransferQueue<>());
990 sendThreadPool.allowCoreThreadTimeOut(true);
992 return sendThreadPool;
996 * Lower-level method supporting HTTP, which requires that the caller's
997 * thread tolerate blocking. This method may be called from any thread.
999 * @param path the path relative to the base URL
1000 * @return a 'WebTarget' instance pointing to this path
1002 public WebTarget getWebTarget(String path) {
1003 return target == null ? null : target.path(path);
1007 * This method may be invoked from any thread, but its real intent is
1008 * to decode an incoming 'admin' message (which is Base-64-encoded),
1009 * and send it to the 'MainLoop' thread for processing.
1011 * @param data the base-64-encoded data
1013 static void adminRequest(byte[] data) {
1014 final byte[] packet = Base64.getDecoder().decode(data);
1015 Runnable task = () -> {
1017 ByteArrayInputStream bis = new ByteArrayInputStream(packet);
1018 DataInputStream dis = new DataInputStream(bis);
1020 while (dis.available() != 0) {
1021 Server serverData = new Server(dis);
1023 // TBD: Compare with current server
1025 Server server = servers.get(serverData.uuid);
1026 if (server == null) {
1027 serverData.newServer();
1029 server.updateServer(serverData);
1032 } catch (Exception e) {
1033 logger.error("Server.adminRequest: can't decode packet", e);
1036 MainLoop.queueWork(task);
1040 * Send out information about servers 'updatedList' to all servers
1041 * in 'notifyList' (may need to build or rebuild 'notifyList').
1043 static void sendOutData() throws IOException {
1044 // include 'thisServer' in the data -- first, advance the count
1045 thisServer.count += 1;
1046 if (thisServer.count == 0) {
1048 * counter wrapped (0 is a special case) --
1049 * actually, we could probably leave this out, because it would take
1050 * more than a century to wrap if the increment is 1 second
1052 thisServer.count = 1;
1055 ByteArrayOutputStream bos = new ByteArrayOutputStream();
1056 DataOutputStream dos = new DataOutputStream(bos);
1058 thisServer.lastUpdateTime = System.currentTimeMillis();
1059 thisServer.writeServerData(dos);
1061 // include all hosts in the updated list
1062 for (Server server : updatedList) {
1063 server.writeServerData(dos);
1065 updatedList.clear();
1067 // create an 'Entity' that can be sent out to all hosts in the notify list
1068 Entity<String> entity = Entity.entity(
1069 new String(Base64.getEncoder().encode(bos.toByteArray()), StandardCharsets.UTF_8),
1070 MediaType.APPLICATION_OCTET_STREAM_TYPE);
1071 for (Server server : getNotifyList()) {
1072 server.post("admin", entity);
1077 * Search for servers which have taken too long to respond.
1079 static void searchForFailedServers() {
1080 long currentTime = System.currentTimeMillis();
1082 // used to build a list of newly-failed servers
1083 LinkedList<Server> failed = new LinkedList<>();
1084 for (Server server : servers.values()) {
1085 if (server == thisServer) {
1088 long gap = currentTime - server.lastUpdateTime;
1089 if (gap > server.allowedGap) {
1090 // add it to the failed list -- we don't call 'serverFailed' yet,
1091 // because this updates the server list, and leads to a
1092 // 'ConcurrentModificationException'
1097 // remove servers from our list
1098 if (!failed.isEmpty()) {
1099 for (Server server : failed) {
1100 server.serverFailed();
1107 * This method may be invoked from any thread:
1108 * Send information about 'thisServer' to the list of hosts.
1110 * @param out the 'PrintStream' to use for displaying information
1111 * @param hosts a comma-separated list of entries containing
1112 * 'host:port' or just 'port' (current host is implied in this case)
1114 static void pingHosts(PrintStream out, String hosts) {
1115 LinkedList<InetSocketAddress> addresses = new LinkedList<>();
1116 boolean error = false;
1118 for (String host : hosts.split(",")) {
1120 String[] segs = host.split(":");
1122 switch (segs.length) {
1124 addresses.add(new InetSocketAddress(InetAddress.getLocalHost(),
1125 Integer.parseInt(segs[0])));
1128 addresses.add(new InetSocketAddress(segs[0],
1129 Integer.parseInt(segs[1])));
1132 out.println(host + ": Invalid host/port value");
1136 } catch (NumberFormatException e) {
1137 out.println(host + ": Invalid port value");
1138 logger.error(PINGHOSTS_ERROR, e);
1140 } catch (UnknownHostException e) {
1141 out.println(host + ": Unknown host");
1142 logger.error(PINGHOSTS_ERROR, e);
1147 pingHosts(out, addresses);
1152 * This method may be invoked from any thread:
1153 * Send information about 'thisServer' to the list of hosts.
1155 * @param out the 'PrintStream' to use for displaying information
1156 * @param hosts a collection of 'InetSocketAddress' instances, which are
1157 * the hosts to send the information to
1159 static void pingHosts(final PrintStream out,
1160 final Collection<InetSocketAddress> hosts) {
1161 FutureTask<Integer> ft = new FutureTask<>(() -> {
1162 ByteArrayOutputStream bos = new ByteArrayOutputStream();
1163 DataOutputStream dos = new DataOutputStream(bos);
1165 // add information for this server only
1167 thisServer.writeServerData(dos);
1169 // create an 'Entity' that can be sent out to all hosts
1170 Entity<String> entity = Entity.entity(
1171 new String(Base64.getEncoder().encode(bos.toByteArray()),
1172 StandardCharsets.UTF_8),
1173 MediaType.APPLICATION_OCTET_STREAM_TYPE);
1174 pingHostsLoop(entity, out, hosts);
1175 } catch (IOException e) {
1176 out.println("Unable to generate 'ping' data: " + e);
1177 logger.error(PINGHOSTS_ERROR, e);
1182 MainLoop.queueWork(ft);
1184 ft.get(60, TimeUnit.SECONDS);
1185 } catch (InterruptedException e) {
1186 logger.error("Server.pingHosts: interrupted waiting for queued work", e);
1187 Thread.currentThread().interrupt();
1188 } catch (ExecutionException | TimeoutException e) {
1189 logger.error("Server.pingHosts: error waiting for queued work", e);
1194 * This method is used for pingHosts method to reduce its Cognitive Complexity.
1196 * @param entity for sending out to all hosts
1197 * @param out the 'PrintStream' to use for displaying information
1198 * @param hosts a collection of 'InetSocketAddress' instances, which are
1199 * the hosts to send the information to
1201 static void pingHostsLoop(final Entity<String> entity,
1202 final PrintStream out,
1203 final Collection<InetSocketAddress> hosts) {
1204 // loop through hosts
1205 for (InetSocketAddress host : hosts) {
1206 HttpClient httpClient = null;
1209 httpClient = buildClient(host.toString(), host,
1210 socketAddressToName(host));
1211 getTarget(httpClient).path("admin").request().post(entity);
1212 httpClient.shutdown();
1214 } catch (KeyManagementException | NoSuchAlgorithmException e) {
1215 out.println(host + ": Unable to create client connection");
1216 logger.error(PINGHOSTS_ERROR, e);
1217 } catch (NoSuchFieldException | IllegalAccessException e) {
1218 out.println(host + ": Unable to get link to target");
1219 logger.error(PINGHOSTS_ERROR, e);
1220 } catch (Exception e) {
1221 out.println(host + ": " + e);
1222 logger.error(PINGHOSTS_ERROR, e);
1224 if (httpClient != null) {
1225 httpClient.shutdown();
1231 * This method may be invoked from any thread:
1232 * Dump out the current 'servers' table in a human-readable table form.
1234 * @param out the 'PrintStream' to dump the table to
1236 public static void dumpHosts(final PrintStream out) {
1237 FutureTask<Integer> ft = new FutureTask<>(() -> {
1238 dumpHostsInternal(out);
1241 MainLoop.queueWork(ft);
1243 ft.get(60, TimeUnit.SECONDS);
1244 } catch (InterruptedException e) {
1245 logger.error("Server.dumpHosts: interrupted waiting for queued work", e);
1246 Thread.currentThread().interrupt();
1247 } catch (ExecutionException | TimeoutException e) {
1248 logger.error("Server.dumpHosts: error waiting for queued work", e);
1253 * Dump out the current 'servers' table in a human-readable table form.
1255 * @param out the 'PrintStream' to dump the table to
1257 private static void dumpHostsInternal(PrintStream out) {
1258 // modifications to 'servers.values()' and 'notifyList'.
1259 HashSet<Server> localNotifyList = new HashSet<>(getNotifyList());
1261 // see if we have any site information
1262 boolean siteData = false;
1263 for (Server server : servers.values()) {
1264 if (server.siteSocketAddress != null) {
1270 String format = "%1s %-36s %-15s %5s %5s %12s %7s %7s\n";
1271 SimpleDateFormat dateFormat = new SimpleDateFormat("kk:mm:ss.SSS");
1274 format = "%1s %-36s %-15s %5s %-15s %5s %5s %12s %7s %7s\n";
1276 out.printf(format, "", "UUID", "IP Address", "Port",
1277 "Site IP Address", "Port",
1278 "Count", "Update Time", "Elapsed", "Allowed");
1279 out.printf(format, "", "----", "----------", "----",
1280 "---------------", "----",
1281 "-----", "-----------", PRINTOUT_DASHES, PRINTOUT_DASHES);
1285 out.printf(format, "", "UUID", "IP Address", "Port",
1286 "Count", "Update Time", "Elapsed", "Allowed");
1287 out.printf(format, "", "----", "----------", "----",
1288 "-----", "-----------", PRINTOUT_DASHES, PRINTOUT_DASHES);
1292 long currentTime = System.currentTimeMillis();
1293 for (Server server : servers.values()) {
1294 String thisOne = "";
1296 if (server == thisServer) {
1298 } else if (localNotifyList.contains(server)) {
1304 String sitePort = "";
1305 if (server.siteSocketAddress != null) {
1307 server.siteSocketAddress.getAddress().getHostAddress();
1308 sitePort = String.valueOf(server.siteSocketAddress.getPort());
1311 out.printf(format, thisOne, server.uuid,
1312 server.socketAddress.getAddress().getHostAddress(),
1313 server.socketAddress.getPort(),
1314 siteIp, sitePort, server.count,
1315 dateFormat.format(new Date(server.lastUpdateTime)),
1316 currentTime - server.lastUpdateTime,
1319 out.printf(format, thisOne, server.uuid,
1320 server.socketAddress.getAddress().getHostAddress(),
1321 server.socketAddress.getPort(), server.count,
1322 dateFormat.format(new Date(server.lastUpdateTime)),
1323 currentTime - server.lastUpdateTime,
1327 out.println("Count: " + servers.size());
1330 /* ============================================================ */
1333 * This interface supports the 'post' method, and provides the opportunity
1334 * to change the WebTarget and/or receive the POST response message.
1336 interface PostResponse {
1338 * Callback that can be used to modify 'WebTarget', and do things like
1339 * add query parameters.
1341 * @param webTarget the current WebTarget
1342 * @return the updated WebTarget
1344 public default WebTarget webTarget(WebTarget webTarget) {
1349 * Callback that passes the POST response.
1351 * @param response the POST response
1353 public default void response(Response response) {
1357 * Callback that passes the POST exception response.
1360 public default void exceptionResponse(Exception exception) {
1361 Response.ResponseBuilder response;
1362 response = Response.serverError();
1363 this.response(response.build());