2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.serverpool;
23 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_HTTPS;
24 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SELF_SIGNED_CERTIFICATES;
25 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_ADAPTIVE_GAP_ADJUST;
26 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_CONNECT_TIMEOUT;
27 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_INITIAL_ALLOWED_GAP;
28 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_IP_ADDRESS;
29 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_PORT;
30 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_READ_TIMEOUT;
31 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_THREADS_CORE_POOL_SIZE;
32 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_THREADS_KEEP_ALIVE_TIME;
33 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_THREADS_MAXIMUM_POOL_SIZE;
34 import static org.onap.policy.drools.serverpool.ServerPoolProperties.HOST_LIST;
35 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_ADAPTIVE_GAP_ADJUST;
36 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_CONNECT_TIMEOUT;
37 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_HTTPS;
38 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_INITIAL_ALLOWED_GAP;
39 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_IP_ADDRESS;
40 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_PORT;
41 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_READ_TIMEOUT;
42 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_SELF_SIGNED_CERTIFICATES;
43 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_THREADS_CORE_POOL_SIZE;
44 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_THREADS_KEEP_ALIVE_TIME;
45 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_THREADS_MAXIMUM_POOL_SIZE;
46 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SITE_IP_ADDRESS;
47 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SITE_PORT;
48 import static org.onap.policy.drools.serverpool.ServerPoolProperties.getProperty;
50 import java.io.ByteArrayInputStream;
51 import java.io.ByteArrayOutputStream;
52 import java.io.DataInputStream;
53 import java.io.DataOutputStream;
54 import java.io.IOException;
55 import java.io.PrintStream;
56 import java.io.StringReader;
57 import java.lang.reflect.Field;
58 import java.net.InetAddress;
59 import java.net.InetSocketAddress;
60 import java.net.UnknownHostException;
61 import java.nio.charset.StandardCharsets;
62 import java.security.KeyManagementException;
63 import java.security.NoSuchAlgorithmException;
64 import java.text.SimpleDateFormat;
65 import java.util.Base64;
66 import java.util.Collection;
67 import java.util.Date;
68 import java.util.HashSet;
69 import java.util.LinkedList;
70 import java.util.Objects;
71 import java.util.Properties;
72 import java.util.TreeMap;
73 import java.util.TreeSet;
74 import java.util.UUID;
75 import java.util.concurrent.ExecutionException;
76 import java.util.concurrent.FutureTask;
77 import java.util.concurrent.LinkedTransferQueue;
78 import java.util.concurrent.ThreadPoolExecutor;
79 import java.util.concurrent.TimeUnit;
80 import java.util.concurrent.TimeoutException;
82 import javax.ws.rs.client.Client;
83 import javax.ws.rs.client.Entity;
84 import javax.ws.rs.client.WebTarget;
85 import javax.ws.rs.core.MediaType;
86 import javax.ws.rs.core.Response;
88 import org.glassfish.jersey.client.ClientProperties;
89 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
90 import org.onap.policy.common.endpoints.http.client.HttpClient;
91 import org.onap.policy.common.endpoints.http.client.HttpClientConfigException;
92 import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
93 import org.onap.policy.common.endpoints.http.server.HttpServletServer;
94 import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
95 import org.onap.policy.drools.utils.PropertyUtil;
96 import org.slf4j.Logger;
97 import org.slf4j.LoggerFactory;
99 public class Server implements Comparable<Server> {
100 private static Logger logger = LoggerFactory.getLogger(Server.class);
102 // maps UUID to Server object for all known servers
103 private static TreeMap<UUID, Server> servers =
104 new TreeMap<>(Util.uuidComparator);
106 // maps UUID to Server object for all failed servers
107 // (so they aren't accidentally restored, due to updates from other hosts)
108 private static TreeMap<UUID, Server> failedServers =
109 new TreeMap<>(Util.uuidComparator);
111 // subset of servers to be notified (null means it needs to be rebuilt)
112 private static LinkedList<Server> notifyList = null;
114 // data to be sent out to notify list
115 private static TreeSet<Server> updatedList = new TreeSet<>();
117 // the server associated with the current host
118 private static Server thisServer = null;
120 // the current REST server
121 private static HttpServletServer restServer;
123 /*==================================================*/
124 /* Some properties extracted at initialization time */
125 /*==================================================*/
127 // initial value of gap to allow between pings
128 private static long initialAllowedGap;
130 // used in adaptive calculation of allowed gap between pings
131 private static long adaptiveGapAdjust;
133 // time to allow for TCP connect (long)
134 private static String connectTimeout;
136 // time to allow before TCP read timeout (long)
137 private static String readTimeout;
139 // outgoing per-server thread pool parameters
140 private static int corePoolSize;
141 private static int maximumPoolSize;
142 private static long keepAliveTime;
144 // https-related parameters
145 private static boolean useHttps;
146 private static boolean useSelfSignedCertificates;
148 // list of remote host names
149 private static String[] hostList = new String[0];
151 /*=========================================================*/
152 /* Fields included in every 'ping' message between servers */
153 /*=========================================================*/
155 // unique id for this server
158 // counter periodically incremented to indicate the server is "alive"
161 // 16 byte MD5 checksum over additional data that is NOT included in
162 // every 'ping' message -- used to determine whether the data is up-to-date
163 private byte[] checksum;
165 /*========================================================================*/
166 /* The following data is included in the checksum, and doesn't change too */
167 /* frequently (some fields may change as servers go up and down) */
168 /*========================================================================*/
170 // IP address and port of listener
171 private InetSocketAddress socketAddress;
173 // site IP address and port
174 private InetSocketAddress siteSocketAddress = null;
176 /*============================================*/
177 /* Local information not included in checksum */
178 /*============================================*/
180 // destination socket information
181 private InetSocketAddress destSocketAddress;
182 private String destName;
184 // REST client fields
185 private HttpClient client;
186 private WebTarget target;
187 private ThreadPoolExecutor sendThreadPool = null;
189 // time when the 'count' field was last updated
190 private long lastUpdateTime;
192 // calculated field indicating the maximum time between updates
193 private long allowedGap = initialAllowedGap;
195 // indicates whether the 'Server' instance is active or not (synchronized)
196 private boolean active = true;
199 * Tags for encoding of server data
201 static final int END_OF_PARAMETERS_TAG = 0;
202 static final int SOCKET_ADDRESS_TAG = 1;
203 static final int SITE_SOCKET_ADDRESS_TAG = 2;
206 static final String PINGHOSTS_ERROR = "Server.pingHosts error";
208 /*==============================*/
209 /* Comparable<Server> interface */
210 /*==============================*/
213 * Compare this instance to another one by comparing the 'uuid' field.
216 public int compareTo(Server other) {
217 return Util.uuidComparator.compare(uuid, other.uuid);
221 * This method may be invoked from any thread, and is used as the main
222 * entry point when testing.
224 * @param args arguments contaning an '=' character are intepreted as
225 * a property, other arguments are presumed to be a property file.
227 public static void main(String[] args) throws IOException {
228 Properties prop = new Properties();
230 for (String arg : args) {
231 // arguments with an '=' in them are a property definition;
232 // otherwise, they are a properties file name
234 if (arg.contains("=")) {
235 prop.load(new StringReader(arg));
237 prop.putAll(PropertyUtil.getProperties(arg));
241 String rval = startup(prop);
243 logger.error("Server.startup failed: {}", rval);
248 * This method may be invoked from any thread, and performs initialization.
250 * @param propertiesFile the name of a property file
252 public static String startup(String propertiesFile) {
253 Properties properties;
255 properties = PropertyUtil.getProperties(propertiesFile);
256 } catch (IOException e) {
257 logger.error("Server.startup: exception reading properties", e);
258 properties = new Properties();
260 return startup(properties);
264 * This method may be invoked from any thread, and performs initialization.
266 * @param properties contains properties used by the server
268 public static String startup(Properties properties) {
269 ServerPoolProperties.setProperties(properties);
270 logger.info("startup: properties={}", properties);
272 // fetch some static properties
273 initialAllowedGap = getProperty(SERVER_INITIAL_ALLOWED_GAP,
274 DEFAULT_SERVER_INITIAL_ALLOWED_GAP);
275 adaptiveGapAdjust = getProperty(SERVER_ADAPTIVE_GAP_ADJUST,
276 DEFAULT_SERVER_ADAPTIVE_GAP_ADJUST);
278 String.valueOf(getProperty(SERVER_CONNECT_TIMEOUT,
279 DEFAULT_SERVER_CONNECT_TIMEOUT));
280 readTimeout = String.valueOf(getProperty(SERVER_READ_TIMEOUT,
281 DEFAULT_SERVER_READ_TIMEOUT));
282 corePoolSize = getProperty(SERVER_THREADS_CORE_POOL_SIZE,
283 DEFAULT_SERVER_THREADS_CORE_POOL_SIZE);
284 maximumPoolSize = getProperty(SERVER_THREADS_MAXIMUM_POOL_SIZE,
285 DEFAULT_SERVER_THREADS_MAXIMUM_POOL_SIZE);
286 keepAliveTime = getProperty(SERVER_THREADS_KEEP_ALIVE_TIME,
287 DEFAULT_SERVER_THREADS_KEEP_ALIVE_TIME);
288 useHttps = getProperty(SERVER_HTTPS, DEFAULT_HTTPS);
289 useSelfSignedCertificates = getProperty(SERVER_SELF_SIGNED_CERTIFICATES,
290 DEFAULT_SELF_SIGNED_CERTIFICATES);
291 String hostListNames = getProperty(HOST_LIST, null);
292 if (hostListNames != null) {
293 hostList = hostListNames.split(",");
296 String possibleError = null;
298 // fetch server information
299 String ipAddressString =
300 getProperty(SERVER_IP_ADDRESS, DEFAULT_SERVER_IP_ADDRESS);
301 int port = getProperty(SERVER_PORT, DEFAULT_SERVER_PORT);
303 possibleError = "Unknown Host: " + ipAddressString;
304 InetAddress address = InetAddress.getByName(ipAddressString);
305 InetSocketAddress socketAddress = new InetSocketAddress(address, port);
307 possibleError = "HTTP server initialization error";
308 restServer = HttpServletServerFactoryInstance.getServerFactory().build(
309 "SERVER-POOL", // name
311 socketAddress.getAddress().getHostAddress(),// host (maybe 0.0.0.0)
312 port, // port (can no longer be 0)
316 restServer.addServletClass(null, RestServerPool.class.getName());
318 // add any additional servlets
319 for (ServerPoolApi feature : ServerPoolApi.impl.getList()) {
320 Collection<Class<?>> classes = feature.servletClasses();
321 if (classes != null) {
322 for (Class<?> clazz : classes) {
323 restServer.addServletClass(null, clazz.getName());
328 // we may not know the port until after the server is started
329 possibleError = "HTTP server start error";
331 possibleError = null;
333 // determine the address to use
334 if (DEFAULT_SERVER_IP_ADDRESS.contentEquals(address.getHostAddress())) {
335 address = InetAddress.getLocalHost();
338 thisServer = new Server(new InetSocketAddress(address, port));
340 // TBD: is this really appropriate?
341 thisServer.newServer();
343 // start background thread
344 MainLoop.startThread();
345 MainLoop.queueWork(() -> {
346 // run this in the 'MainLoop' thread
350 logger.info("Listening on port {}", port);
353 } catch (UnknownHostException e) {
354 logger.error("Server.startup: exception start server", e);
355 if (possibleError == null) {
356 possibleError = e.toString();
358 return possibleError;
363 * Shut down all threads associate with server pool.
365 public static void shutdown() {
366 Discovery.stopDiscovery();
367 MainLoop.stopThread();
368 TargetLock.shutdown();
371 HashSet<Server> allServers = new HashSet<>();
372 allServers.addAll(servers.values());
373 allServers.addAll(failedServers.values());
375 for (Server server : allServers) {
376 if (server.sendThreadPool != null) {
377 server.sendThreadPool.shutdown();
380 if (restServer != null) {
381 restServer.shutdown();
386 * Return the Server instance associated with the current host.
388 * @return the Server instance associated with the current host
390 public static Server getThisServer() {
395 * Return the first Server instance in the 'servers' list.
397 * @return the first Server instance in the 'servers' list
398 * (the one with the lowest UUID)
400 public static Server getFirstServer() {
401 return servers.firstEntry().getValue();
405 * Lookup a Server instance associated with a UUID.
407 * @param uuid the key to the lookup
408 @ @return the associated 'Server' instance, or 'null' if none
410 public static Server getServer(UUID uuid) {
411 return servers.get(uuid);
415 * Return a count of the number of servers.
417 * @return a count of the number of servers
419 public static int getServerCount() {
420 return servers.size();
424 * Return the complete list of servers.
426 * @return the complete list of servers
428 public static Collection<Server> getServers() {
429 return servers.values();
433 * This method is invoked from the 'startup' thread, and creates a new
434 * 'Server' instance for the current server.
436 * @param socketAddress the IP address and port the listener is bound to
438 private Server(InetSocketAddress socketAddress) {
439 this.uuid = UUID.randomUUID();
441 this.socketAddress = socketAddress;
442 this.lastUpdateTime = System.currentTimeMillis();
446 String siteIp = getProperty(SITE_IP_ADDRESS, null);
447 int sitePort = getProperty(SITE_PORT, 0);
448 if (siteIp != null && sitePort != 0) {
449 // we do have site information specified
451 siteSocketAddress = new InetSocketAddress(siteIp, sitePort);
452 if (siteSocketAddress.getAddress() == null) {
453 logger.error("Couldn't resolve site address: {}", siteIp);
454 siteSocketAddress = null;
456 } catch (IllegalArgumentException e) {
457 logger.error("Illegal 'siteSocketAddress'", e);
458 siteSocketAddress = null;
462 // TBD: calculate checksum
466 * Initialize a 'Server' instance from a 'DataInputStream'. If it is new,
467 * it may get inserted in the table. If it is an update, fields in an
468 * existing 'Server' may be updated.
470 * @param is the 'DataInputStream'
472 Server(DataInputStream is) throws IOException {
473 // read in 16 byte UUID
474 uuid = Util.readUuid(is);
476 // read in 4 byte counter value
477 count = is.readInt();
479 // read in 16 byte MD5 checksum
480 checksum = new byte[16];
481 is.readFully(checksum);
483 // optional parameters
485 while ((tag = is.readUnsignedByte()) != END_OF_PARAMETERS_TAG) {
487 case SOCKET_ADDRESS_TAG:
488 socketAddress = readSocketAddress(is);
490 case SITE_SOCKET_ADDRESS_TAG:
491 siteSocketAddress = readSocketAddress(is);
495 logger.error("Illegal tag: {}", tag);
502 * Read an 'InetSocketAddress' from a 'DataInputStream'.
504 * @param is the 'DataInputStream'
505 * @return the 'InetSocketAddress'
507 private static InetSocketAddress readSocketAddress(DataInputStream is) throws IOException {
509 byte[] ipAddress = new byte[4];
510 is.read(ipAddress, 0, 4);
511 int port = is.readUnsignedShort();
512 return new InetSocketAddress(InetAddress.getByAddress(ipAddress), port);
519 public String toString() {
520 return "Server[" + uuid + "]";
524 * Return the UUID associated with this Server.
526 * @return the UUID associated with this Server
528 public UUID getUuid() {
533 * Return the external InetSocketAddress of the site.
535 * @return the external InetSocketAddress of the site
536 * ('null' if it doesn't exist)
538 public InetSocketAddress getSiteSocketAddress() {
539 return siteSocketAddress;
543 * This method may be called from any thread.
545 * @return 'true' if the this server is active, and 'false' if not
547 public synchronized boolean isActive() {
552 * This method writes out the data associated with the current Server
555 * @param os outout stream that should receive the data
557 void writeServerData(DataOutputStream os) throws IOException {
558 // write out 16 byte UUID
559 Util.writeUuid(os, uuid);
561 // write out 4 byte counter value
564 // write out 16 byte MD5 checksum
565 // TBD: should this be implemented?
566 os.write(checksum == null ? new byte[16] : checksum);
568 if (socketAddress != null) {
569 // write out socket address
570 os.writeByte(SOCKET_ADDRESS_TAG);
571 os.write(socketAddress.getAddress().getAddress(), 0, 4);
572 os.writeShort(socketAddress.getPort());
575 if (siteSocketAddress != null) {
576 // write out socket address
577 os.writeByte(SITE_SOCKET_ADDRESS_TAG);
578 os.write(siteSocketAddress.getAddress().getAddress(), 0, 4);
579 os.writeShort(siteSocketAddress.getPort());
582 os.writeByte(END_OF_PARAMETERS_TAG);
586 * Do any processing needed to create a new server. This method is invoked
587 * from the 'MainLoop' thread in every case except for the current server,
588 * in which case it is invoked in 'startup' prior to creating 'MainLoop'.
590 private void newServer() {
591 Server failed = failedServers.get(uuid);
592 if (failed != null) {
593 // this one is on the failed list -- see if the counter has advanced
594 if ((count - failed.count) <= 0) {
595 // the counter has not advanced -- ignore
599 // the counter has advanced -- somehow, this server has returned
600 failedServers.remove(uuid);
601 synchronized (this) {
604 logger.error("Server reawakened: {} ({})", uuid, socketAddress);
607 lastUpdateTime = System.currentTimeMillis();
608 servers.put(uuid, this);
609 updatedList.add(this);
611 // notify list will need to be rebuilt
614 if (socketAddress != null && this != thisServer) {
615 // initialize 'client' and 'target' fields
616 if (siteSocketAddress != null
617 && !siteSocketAddress.equals(thisServer.siteSocketAddress)) {
618 // destination is on a remote site
619 destSocketAddress = siteSocketAddress;
621 // destination is on the local site -- use direct addressing
622 destSocketAddress = socketAddress;
624 destName = socketAddressToName(destSocketAddress);
626 // 'client' is used for REST messages to the destination
627 client = buildClient(uuid.toString(), destSocketAddress, destName);
629 // initialize the 'target' field
630 target = getTarget(client);
631 } catch (KeyManagementException | NoSuchAlgorithmException
632 | NoSuchFieldException | IllegalAccessException
633 | ClassNotFoundException | HttpClientConfigException e) {
634 logger.error("Server.newServer: problems creating 'client'", e);
637 logger.info("New server: {} ({})", uuid, socketAddress);
638 for (Events listener : Events.getListeners()) {
639 listener.newServer(this);
644 * Check the server state in response to some issue. At present, only the
645 * 'destName' information is checked.
647 private void checkServer() {
648 // recalculate 'destName' (we have seen DNS issues)
649 String newDestName = socketAddressToName(destSocketAddress);
650 if (newDestName.equals(destName)) {
653 logger.warn("Remote host name for {} has changed from {} to {}",
654 destSocketAddress, destName, newDestName);
656 // shut down old client, and rebuild
661 // update 'destName', and rebuild the client
662 destName = newDestName;
664 // 'client' is used for REST messages to the destination
665 client = buildClient(uuid.toString(), destSocketAddress, destName);
667 // initialize the 'target' field
668 target = getTarget(client);
669 } catch (KeyManagementException | NoSuchAlgorithmException
670 | NoSuchFieldException | IllegalAccessException
671 | ClassNotFoundException | HttpClientConfigException e) {
672 logger.error("Server.checkServer: problems recreating 'client'", e);
677 * Update server data.
679 * @param serverData this is a temporary 'Server' instance created from
680 * an incoming message, which is used to update fields within the
681 * 'Server' instance identified by 'this'
683 private void updateServer(Server serverData) {
684 if (serverData.count > count) {
685 // an update has occurred
686 count = serverData.count;
688 // TBD: calculate and verify checksum, more fields may be updated
690 // adjust 'allowedGap' accordingly
691 long currentTime = System.currentTimeMillis();
692 long gap = currentTime - lastUpdateTime;
694 // adjust 'allowedGap' accordingly
695 // TBD: need properties to support overrides
696 gap = gap * 3 / 2 + adaptiveGapAdjust;
697 if (gap > allowedGap) {
698 // update 'allowedGap' immediately
701 // gradually pull the allowed gap down
702 // TBD: need properties to support overrides
703 allowedGap = (allowedGap * 15 + gap) / 16;
705 lastUpdateTime = currentTime;
707 updatedList.add(this);
712 * a server has failed.
714 private void serverFailed() {
716 synchronized (this) {
720 // remove it from the table
721 servers.remove(uuid);
723 // add it to the failed servers table
724 failedServers.put(uuid, this);
726 // clean up client information
727 if (client != null) {
733 // log an error message
734 logger.error("Server failure: {} ({})", uuid, socketAddress);
735 for (Events listener : Events.getListeners()) {
736 listener.serverFailed(this);
741 * Fetch, and possibily calculate, the "notify list" associated with this
742 * server. This is the list of servers to forward a server and bucket
743 * information to, and is approximately log2(n) in length, where 'n' is
744 * the total number of servers.
745 * It is calculated by starting with all of the servers sorted by UUID --
746 * let's say the current server is at position 's'. The notify list will
747 * contain the server at positions:
752 * Using all powers of 2 less than 'n'. If the total server count is 50,
753 * this list has 6 entries.
754 * @return the notify list
756 static Collection<Server> getNotifyList() {
757 // The 'notifyList' value is initially 'null', and it is reset to 'null'
758 // every time a new host joins, or one leaves. That way, it is marked for
759 // recalculation, but only when needed.
760 if (notifyList == null) {
761 // next index we are looking for
764 // our current position in the Server table -- starting at 'thisServer'
765 UUID current = thisServer.uuid;
767 // site socket address of 'current'
768 InetSocketAddress thisSiteSocketAddress = thisServer.siteSocketAddress;
770 // hash set of all site socket addresses located
771 HashSet<InetSocketAddress> siteSocketAddresses = new HashSet<>();
772 siteSocketAddresses.add(thisSiteSocketAddress);
774 // the list we are building
775 notifyList = new LinkedList<Server>();
779 // move to the next key (UUID) -- if we hit the end of the table,
780 // wrap to the beginning
781 current = servers.higherKey(current);
782 if (current == null) {
783 current = servers.firstKey();
785 if (current.equals(thisServer.uuid)) {
786 // we have looped through the entire list
790 // fetch associated server & site socket address
791 Server server = servers.get(current);
792 InetSocketAddress currentSiteSocketAddress =
793 server.siteSocketAddress;
795 if (Objects.equals(thisSiteSocketAddress,
796 currentSiteSocketAddress)) {
797 // same site -- see if we should add this one
799 // this is the next index we are looking for --
801 notifyList.add(server);
803 // advance to the next offset (current-offset * 2)
807 } else if (!siteSocketAddresses.contains(currentSiteSocketAddress)) {
808 // we need at least one member from each site
809 notifyList.add(server);
810 siteSocketAddresses.add(currentSiteSocketAddress);
818 * See if there is a host name associated with a destination socket address.
820 * @param dest the socket address of the destination
821 * @return the host name associated with the IP address, or the IP address
822 * if no associated host name is found.
824 private static String socketAddressToName(InetSocketAddress dest) {
825 // destination IP address
826 InetAddress inetAddress = dest.getAddress();
827 String destName = null;
829 // go through the 'hostList' to see if there is a matching name
830 for (String hostName : hostList) {
832 if (inetAddress.equals(InetAddress.getByName(hostName))) {
833 // this one matches -- use the name instead of the IP address
837 } catch (UnknownHostException e) {
838 logger.debug("Server.socketAddressToName error", e);
842 // default name = string value of IP address
843 return destName == null ? inetAddress.getHostAddress() : destName;
847 * Create an 'HttpClient' instance for a particular host.
849 * @param name of the host (currently a UUID or host:port string)
850 * @param dest the socket address of the destination
851 * @param destName the string name to use for the destination
853 static HttpClient buildClient(String name, InetSocketAddress dest, String destName)
854 throws KeyManagementException, NoSuchAlgorithmException,
855 ClassNotFoundException, HttpClientConfigException {
857 return HttpClientFactoryInstance.getClientFactory().build(
858 BusTopicParams.builder()
859 .clientName(name) // name
860 .useHttps(useHttps) // https
861 .allowSelfSignedCerts(useSelfSignedCertificates)// selfSignedCerts
862 .hostname(destName) // host
863 .port(dest.getPort()) // port
864 .managed(false) // managed
869 * Extract the 'WebTarget' information from the 'HttpClient'.
871 * @param client the associated HttpClient instance
872 * @return a WebTarget referring to the previously-specified 'baseUrl'
874 static WebTarget getTarget(HttpClient client)
875 throws NoSuchFieldException, IllegalAccessException {
876 // need access to the internal field 'client'
877 // TBD: We need a way to get this information without reflection
878 Field field = client.getClass().getDeclaredField("client");
879 field.setAccessible(true);
880 Client rsClient = (Client)field.get(client);
881 field.setAccessible(false);
883 rsClient.property(ClientProperties.CONNECT_TIMEOUT, connectTimeout);
884 rsClient.property(ClientProperties.READ_TIMEOUT, readTimeout);
886 // For performance reasons, the root 'WebTarget' is generated only once
887 // at initialization time for each remote host.
888 return rsClient.target(client.getBaseUrl());
892 * This method may be invoked from any thread, and is used to send a
893 * message to the destination server associated with this 'Server' instance.
895 * @param path the path relative to the base URL
896 * @param entity the "request entity" containing the body of the
899 public void post(final String path, final Entity<?> entity) {
900 post(path, entity, null);
904 * This method may be invoked from any thread, and is used to send a
905 * message to the destination server associated with this 'Server' instance.
907 * @param path the path relative to the base URL
908 * @param entity the "request entity" containing the body of the
909 * HTTP POST request (if 'null', an HTTP GET is used instead)
910 * @param responseCallback if non-null, this callback may be used to
911 * modify the WebTarget, and/or receive the POST response message
913 public void post(final String path, final Entity<?> entity,
914 PostResponse responseCallback) {
915 if (target == null) {
919 getThreadPool().execute(() -> {
921 * This method is running within the 'MainLoop' thread.
924 WebTarget webTarget = target.path(path);
925 if (responseCallback != null) {
926 // give callback a chance to modify 'WebTarget'
927 webTarget = responseCallback.webTarget(webTarget);
929 // send the response to the callback
931 if (entity == null) {
932 response = webTarget.request().get();
934 response = webTarget.request().post(entity);
936 responseCallback.response(response);
938 // just do the invoke, and ignore the response
939 if (entity == null) {
940 webTarget.request().get();
942 webTarget.request().post(entity);
945 } catch (Exception e) {
946 logger.error("Failed to send to {} ({}, {})",
947 uuid, destSocketAddress, destName);
948 if (responseCallback != null) {
949 responseCallback.exceptionResponse(e);
951 MainLoop.queueWork(() -> {
952 // this runs in the 'MainLoop' thread
954 // the DNS cache may have been out-of-date when this server
955 // was first contacted -- fix the problem, if needed
963 * This method may be invoked from any thread.
965 * @return the 'ThreadPoolExecutor' associated with this server
967 public synchronized ThreadPoolExecutor getThreadPool() {
968 if (sendThreadPool == null) {
969 // build a thread pool for this Server
971 new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
972 keepAliveTime, TimeUnit.MILLISECONDS,
973 new LinkedTransferQueue<Runnable>());
974 sendThreadPool.allowCoreThreadTimeOut(true);
976 return sendThreadPool;
980 * Lower-level method supporting HTTP, which requires that the caller's
981 * thread tolerate blocking. This method may be called from any thread.
983 * @param path the path relative to the base URL
984 * @return a 'WebTarget' instance pointing to this path
986 public WebTarget getWebTarget(String path) {
987 return target == null ? null : target.path(path);
991 * This method may be invoked from any thread, but its real intent is
992 * to decode an incoming 'admin' message (which is Base-64-encoded),
993 * and send it to the 'MainLoop' thread for processing.
995 * @param data the base-64-encoded data
997 static void adminRequest(byte[] data) {
998 final byte[] packet = Base64.getDecoder().decode(data);
999 Runnable task = () -> {
1001 ByteArrayInputStream bis = new ByteArrayInputStream(packet);
1002 DataInputStream dis = new DataInputStream(bis);
1004 while (dis.available() != 0) {
1005 Server serverData = new Server(dis);
1007 // TBD: Compare with current server
1009 Server server = servers.get(serverData.uuid);
1010 if (server == null) {
1011 serverData.newServer();
1013 server.updateServer(serverData);
1016 } catch (Exception e) {
1017 logger.error("Server.adminRequest: can't decode packet", e);
1020 MainLoop.queueWork(task);
1024 * Send out information about servers 'updatedList' to all servers
1025 * in 'notifyList' (may need to build or rebuild 'notifyList').
1027 static void sendOutData() throws IOException {
1028 // include 'thisServer' in the data -- first, advance the count
1029 thisServer.count += 1;
1030 if (thisServer.count == 0) {
1032 * counter wrapped (0 is a special case) --
1033 * actually, we could probably leave this out, because it would take
1034 * more than a century to wrap if the increment is 1 second
1036 thisServer.count = 1;
1039 ByteArrayOutputStream bos = new ByteArrayOutputStream();
1040 DataOutputStream dos = new DataOutputStream(bos);
1042 thisServer.lastUpdateTime = System.currentTimeMillis();
1043 thisServer.writeServerData(dos);
1045 // include all hosts in the updated list
1046 for (Server server : updatedList) {
1047 server.writeServerData(dos);
1049 updatedList.clear();
1051 // create an 'Entity' that can be sent out to all hosts in the notify list
1052 Entity<String> entity = Entity.entity(
1053 new String(Base64.getEncoder().encode(bos.toByteArray()), StandardCharsets.UTF_8),
1054 MediaType.APPLICATION_OCTET_STREAM_TYPE);
1055 for (Server server : getNotifyList()) {
1056 server.post("admin", entity);
1061 * Search for servers which have taken too long to respond.
1063 static void searchForFailedServers() {
1064 long currentTime = System.currentTimeMillis();
1066 // used to build a list of newly-failed servers
1067 LinkedList<Server> failed = new LinkedList<>();
1068 for (Server server : servers.values()) {
1069 if (server == thisServer) {
1072 long gap = currentTime - server.lastUpdateTime;
1073 if (gap > server.allowedGap) {
1074 // add it to the failed list -- we don't call 'serverFailed' yet,
1075 // because this updates the server list, and leads to a
1076 // 'ConcurrentModificationException'
1081 // remove servers from our list
1082 if (!failed.isEmpty()) {
1083 for (Server server : failed) {
1084 server.serverFailed();
1091 * This method may be invoked from any thread:
1092 * Send information about 'thisServer' to the list of hosts.
1094 * @param out the 'PrintStream' to use for displaying information
1095 * @param hosts a comma-separated list of entries containing
1096 * 'host:port' or just 'port' (current host is implied in this case)
1098 static void pingHosts(PrintStream out, String hosts) {
1099 LinkedList<InetSocketAddress> addresses = new LinkedList<>();
1100 boolean error = false;
1102 for (String host : hosts.split(",")) {
1104 String[] segs = host.split(":");
1106 switch (segs.length) {
1108 addresses.add(new InetSocketAddress(InetAddress.getLocalHost(),
1109 Integer.parseInt(segs[0])));
1112 addresses.add(new InetSocketAddress(segs[0],
1113 Integer.parseInt(segs[1])));
1116 out.println(host + ": Invalid host/port value");
1120 } catch (NumberFormatException e) {
1121 out.println(host + ": Invalid port value");
1122 logger.error(PINGHOSTS_ERROR, e);
1124 } catch (UnknownHostException e) {
1125 out.println(host + ": Unknown host");
1126 logger.error(PINGHOSTS_ERROR, e);
1131 pingHosts(out, addresses);
1136 * This method may be invoked from any thread:
1137 * Send information about 'thisServer' to the list of hosts.
1139 * @param out the 'PrintStream' to use for displaying information
1140 * @param hosts a collection of 'InetSocketAddress' instances, which are
1141 * the hosts to send the information to
1143 static void pingHosts(final PrintStream out,
1144 final Collection<InetSocketAddress> hosts) {
1145 FutureTask<Integer> ft = new FutureTask<>(() -> {
1146 ByteArrayOutputStream bos = new ByteArrayOutputStream();
1147 DataOutputStream dos = new DataOutputStream(bos);
1149 // add information for this server only
1151 thisServer.writeServerData(dos);
1153 // create an 'Entity' that can be sent out to all hosts
1154 Entity<String> entity = Entity.entity(
1155 new String(Base64.getEncoder().encode(bos.toByteArray()),
1156 StandardCharsets.UTF_8),
1157 MediaType.APPLICATION_OCTET_STREAM_TYPE);
1159 // loop through hosts
1160 for (InetSocketAddress host : hosts) {
1161 HttpClient httpClient = null;
1164 httpClient = buildClient(host.toString(), host,
1165 socketAddressToName(host));
1166 getTarget(httpClient).path("admin").request().post(entity);
1167 httpClient.shutdown();
1169 } catch (KeyManagementException | NoSuchAlgorithmException e) {
1170 out.println(host + ": Unable to create client connection");
1171 logger.error(PINGHOSTS_ERROR, e);
1172 } catch (NoSuchFieldException | IllegalAccessException e) {
1173 out.println(host + ": Unable to get link to target");
1174 logger.error(PINGHOSTS_ERROR, e);
1175 } catch (Exception e) {
1176 out.println(host + ": " + e);
1177 logger.error(PINGHOSTS_ERROR, e);
1179 if (httpClient != null) {
1180 httpClient.shutdown();
1183 } catch (IOException e) {
1184 out.println("Unable to generate 'ping' data: " + e);
1185 logger.error(PINGHOSTS_ERROR, e);
1190 MainLoop.queueWork(ft);
1192 ft.get(60, TimeUnit.SECONDS);
1193 } catch (InterruptedException e) {
1194 logger.error("Server.pingHosts: interrupted waiting for queued work", e);
1195 Thread.currentThread().interrupt();
1196 } catch (ExecutionException | TimeoutException e) {
1197 logger.error("Server.pingHosts: error waiting for queued work", e);
1202 * This method may be invoked from any thread:
1203 * Dump out the current 'servers' table in a human-readable table form.
1205 * @param out the 'PrintStream' to dump the table to
1207 public static void dumpHosts(final PrintStream out) {
1208 FutureTask<Integer> ft = new FutureTask<>(() -> {
1209 dumpHostsInternal(out);
1212 MainLoop.queueWork(ft);
1214 ft.get(60, TimeUnit.SECONDS);
1215 } catch (InterruptedException e) {
1216 logger.error("Server.dumpHosts: interrupted waiting for queued work", e);
1217 Thread.currentThread().interrupt();
1218 } catch (ExecutionException | TimeoutException e) {
1219 logger.error("Server.dumpHosts: error waiting for queued work", e);
1224 * Dump out the current 'servers' table in a human-readable table form.
1226 * @param out the 'PrintStream' to dump the table to
1228 private static void dumpHostsInternal(PrintStream out) {
1229 // modifications to 'servers.values()' and 'notifyList'.
1230 HashSet<Server> localNotifyList = new HashSet<>(getNotifyList());
1232 // see if we have any site information
1233 boolean siteData = false;
1234 for (Server server : servers.values()) {
1235 if (server.siteSocketAddress != null) {
1241 String format = "%1s %-36s %-15s %5s %5s %12s %7s %7s\n";
1242 SimpleDateFormat dateFormat = new SimpleDateFormat("kk:mm:ss.SSS");
1245 format = "%1s %-36s %-15s %5s %-15s %5s %5s %12s %7s %7s\n";
1247 out.printf(format, "", "UUID", "IP Address", "Port",
1248 "Site IP Address", "Port",
1249 "Count", "Update Time", "Elapsed", "Allowed");
1250 out.printf(format, "", "----", "----------", "----",
1251 "---------------", "----",
1252 "-----", "-----------", "-------", "-------");
1256 out.printf(format, "", "UUID", "IP Address", "Port",
1257 "Count", "Update Time", "Elapsed", "Allowed");
1258 out.printf(format, "", "----", "----------", "----",
1259 "-----", "-----------", "-------", "-------");
1263 long currentTime = System.currentTimeMillis();
1264 for (Server server : servers.values()) {
1265 String thisOne = "";
1267 if (server == thisServer) {
1269 } else if (localNotifyList.contains(server)) {
1275 String sitePort = "";
1276 if (server.siteSocketAddress != null) {
1278 server.siteSocketAddress.getAddress().getHostAddress();
1279 sitePort = String.valueOf(server.siteSocketAddress.getPort());
1282 out.printf(format, thisOne, server.uuid,
1283 server.socketAddress.getAddress().getHostAddress(),
1284 server.socketAddress.getPort(),
1285 siteIp, sitePort, server.count,
1286 dateFormat.format(new Date(server.lastUpdateTime)),
1287 currentTime - server.lastUpdateTime,
1290 out.printf(format, thisOne, server.uuid,
1291 server.socketAddress.getAddress().getHostAddress(),
1292 server.socketAddress.getPort(), server.count,
1293 dateFormat.format(new Date(server.lastUpdateTime)),
1294 currentTime - server.lastUpdateTime,
1298 out.println("Count: " + servers.size());
1301 /* ============================================================ */
1304 * This interface supports the 'post' method, and provides the opportunity
1305 * to change the WebTarget and/or receive the POST response message.
1307 interface PostResponse {
1309 * Callback that can be used to modify 'WebTarget', and do things like
1310 * add query parameters.
1312 * @param webTarget the current WebTarget
1313 * @return the updated WebTarget
1315 public default WebTarget webTarget(WebTarget webTarget) {
1320 * Callback that passes the POST response.
1322 * @param response the POST response
1324 public default void response(Response response) {
1328 * Callback that passes the POST exception response.
1331 public default void exceptionResponse(Exception exception) {
1332 Response.ResponseBuilder response;
1333 response = Response.serverError();
1334 this.response(response.build());