2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.serverpool;
23 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_HTTPS;
24 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SELF_SIGNED_CERTIFICATES;
25 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_ADAPTIVE_GAP_ADJUST;
26 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_CONNECT_TIMEOUT;
27 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_INITIAL_ALLOWED_GAP;
28 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_IP_ADDRESS;
29 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_PORT;
30 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_READ_TIMEOUT;
31 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_THREADS_CORE_POOL_SIZE;
32 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_THREADS_KEEP_ALIVE_TIME;
33 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_SERVER_THREADS_MAXIMUM_POOL_SIZE;
34 import static org.onap.policy.drools.serverpool.ServerPoolProperties.HOST_LIST;
35 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_ADAPTIVE_GAP_ADJUST;
36 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_CONNECT_TIMEOUT;
37 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_HTTPS;
38 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_INITIAL_ALLOWED_GAP;
39 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_IP_ADDRESS;
40 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_PORT;
41 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_READ_TIMEOUT;
42 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_SELF_SIGNED_CERTIFICATES;
43 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_THREADS_CORE_POOL_SIZE;
44 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_THREADS_KEEP_ALIVE_TIME;
45 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SERVER_THREADS_MAXIMUM_POOL_SIZE;
46 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SITE_IP_ADDRESS;
47 import static org.onap.policy.drools.serverpool.ServerPoolProperties.SITE_PORT;
48 import static org.onap.policy.drools.serverpool.ServerPoolProperties.getProperty;
50 import java.io.ByteArrayInputStream;
51 import java.io.ByteArrayOutputStream;
52 import java.io.DataInputStream;
53 import java.io.DataOutputStream;
54 import java.io.IOException;
55 import java.io.PrintStream;
56 import java.io.StringReader;
57 import java.lang.reflect.Field;
58 import java.net.InetAddress;
59 import java.net.InetSocketAddress;
60 import java.net.UnknownHostException;
61 import java.nio.charset.StandardCharsets;
62 import java.security.KeyManagementException;
63 import java.security.NoSuchAlgorithmException;
64 import java.text.SimpleDateFormat;
65 import java.util.Base64;
66 import java.util.Collection;
67 import java.util.Date;
68 import java.util.HashSet;
69 import java.util.LinkedList;
70 import java.util.Objects;
71 import java.util.Properties;
72 import java.util.TreeMap;
73 import java.util.TreeSet;
74 import java.util.UUID;
75 import java.util.concurrent.ExecutionException;
76 import java.util.concurrent.FutureTask;
77 import java.util.concurrent.LinkedTransferQueue;
78 import java.util.concurrent.ThreadPoolExecutor;
79 import java.util.concurrent.TimeUnit;
80 import java.util.concurrent.TimeoutException;
81 import javax.ws.rs.client.Client;
82 import javax.ws.rs.client.Entity;
83 import javax.ws.rs.client.WebTarget;
84 import javax.ws.rs.core.MediaType;
85 import javax.ws.rs.core.Response;
86 import org.glassfish.jersey.client.ClientProperties;
87 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
88 import org.onap.policy.common.endpoints.http.client.HttpClient;
89 import org.onap.policy.common.endpoints.http.client.HttpClientConfigException;
90 import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
91 import org.onap.policy.common.endpoints.http.server.HttpServletServer;
92 import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
93 import org.onap.policy.drools.utils.PropertyUtil;
94 import org.slf4j.Logger;
95 import org.slf4j.LoggerFactory;
97 public class Server implements Comparable<Server> {
98 private static Logger logger = LoggerFactory.getLogger(Server.class);
100 // maps UUID to Server object for all known servers
101 private static TreeMap<UUID, Server> servers =
102 new TreeMap<>(Util.uuidComparator);
104 // maps UUID to Server object for all failed servers
105 // (so they aren't accidentally restored, due to updates from other hosts)
106 private static TreeMap<UUID, Server> failedServers =
107 new TreeMap<>(Util.uuidComparator);
109 // subset of servers to be notified (null means it needs to be rebuilt)
110 private static LinkedList<Server> notifyList = null;
112 // data to be sent out to notify list
113 private static TreeSet<Server> updatedList = new TreeSet<>();
115 // the server associated with the current host
116 private static Server thisServer = null;
118 // the current REST server
119 private static HttpServletServer restServer;
121 /*==================================================*/
122 /* Some properties extracted at initialization time */
123 /*==================================================*/
125 // initial value of gap to allow between pings
126 private static long initialAllowedGap;
128 // used in adaptive calculation of allowed gap between pings
129 private static long adaptiveGapAdjust;
131 // time to allow for TCP connect (long)
132 private static String connectTimeout;
134 // time to allow before TCP read timeout (long)
135 private static String readTimeout;
137 // outgoing per-server thread pool parameters
138 private static int corePoolSize;
139 private static int maximumPoolSize;
140 private static long keepAliveTime;
142 // https-related parameters
143 private static boolean useHttps;
144 private static boolean useSelfSignedCertificates;
146 // list of remote host names
147 private static String[] hostList = new String[0];
149 /*=========================================================*/
150 /* Fields included in every 'ping' message between servers */
151 /*=========================================================*/
153 // unique id for this server
156 // counter periodically incremented to indicate the server is "alive"
159 // 16 byte MD5 checksum over additional data that is NOT included in
160 // every 'ping' message -- used to determine whether the data is up-to-date
161 private byte[] checksum;
163 /*========================================================================*/
164 /* The following data is included in the checksum, and doesn't change too */
165 /* frequently (some fields may change as servers go up and down) */
166 /*========================================================================*/
168 // IP address and port of listener
169 private InetSocketAddress socketAddress;
171 // site IP address and port
172 private InetSocketAddress siteSocketAddress = null;
174 /*============================================*/
175 /* Local information not included in checksum */
176 /*============================================*/
178 // destination socket information
179 private InetSocketAddress destSocketAddress;
180 private String destName;
182 // REST client fields
183 private HttpClient client;
184 private WebTarget target;
185 private ThreadPoolExecutor sendThreadPool = null;
187 // time when the 'count' field was last updated
188 private long lastUpdateTime;
190 // calculated field indicating the maximum time between updates
191 private long allowedGap = initialAllowedGap;
193 // indicates whether the 'Server' instance is active or not (synchronized)
194 private boolean active = true;
197 * Tags for encoding of server data
199 static final int END_OF_PARAMETERS_TAG = 0;
200 static final int SOCKET_ADDRESS_TAG = 1;
201 static final int SITE_SOCKET_ADDRESS_TAG = 2;
204 static final String PINGHOSTS_ERROR = "Server.pingHosts error";
206 /*==============================*/
207 /* Comparable<Server> interface */
208 /*==============================*/
211 * Compare this instance to another one by comparing the 'uuid' field.
214 public int compareTo(Server other) {
215 return Util.uuidComparator.compare(uuid, other.uuid);
219 public int hashCode() {
220 return Objects.hash(uuid);
224 public boolean equals(Object obj) {
228 if (!(obj instanceof Server)) {
231 Server other = (Server) obj;
232 return Objects.equals(uuid, other.uuid);
236 * This method may be invoked from any thread, and is used as the main
237 * entry point when testing.
239 * @param args arguments contaning an '=' character are intepreted as
240 * a property, other arguments are presumed to be a property file.
242 public static void main(String[] args) throws IOException {
243 Properties prop = new Properties();
245 for (String arg : args) {
246 // arguments with an '=' in them are a property definition;
247 // otherwise, they are a properties file name
249 if (arg.contains("=")) {
250 prop.load(new StringReader(arg));
252 prop.putAll(PropertyUtil.getProperties(arg));
256 String rval = startup(prop);
258 logger.error("Server.startup failed: {}", rval);
263 * This method may be invoked from any thread, and performs initialization.
265 * @param propertiesFile the name of a property file
267 public static String startup(String propertiesFile) {
268 Properties properties;
270 properties = PropertyUtil.getProperties(propertiesFile);
271 } catch (IOException e) {
272 logger.error("Server.startup: exception reading properties", e);
273 properties = new Properties();
275 return startup(properties);
279 * This method may be invoked from any thread, and performs initialization.
281 * @param properties contains properties used by the server
283 public static String startup(Properties properties) {
284 ServerPoolProperties.setProperties(properties);
285 logger.info("startup: properties={}", properties);
287 // fetch some static properties
288 initialAllowedGap = getProperty(SERVER_INITIAL_ALLOWED_GAP,
289 DEFAULT_SERVER_INITIAL_ALLOWED_GAP);
290 adaptiveGapAdjust = getProperty(SERVER_ADAPTIVE_GAP_ADJUST,
291 DEFAULT_SERVER_ADAPTIVE_GAP_ADJUST);
293 String.valueOf(getProperty(SERVER_CONNECT_TIMEOUT,
294 DEFAULT_SERVER_CONNECT_TIMEOUT));
295 readTimeout = String.valueOf(getProperty(SERVER_READ_TIMEOUT,
296 DEFAULT_SERVER_READ_TIMEOUT));
297 corePoolSize = getProperty(SERVER_THREADS_CORE_POOL_SIZE,
298 DEFAULT_SERVER_THREADS_CORE_POOL_SIZE);
299 maximumPoolSize = getProperty(SERVER_THREADS_MAXIMUM_POOL_SIZE,
300 DEFAULT_SERVER_THREADS_MAXIMUM_POOL_SIZE);
301 keepAliveTime = getProperty(SERVER_THREADS_KEEP_ALIVE_TIME,
302 DEFAULT_SERVER_THREADS_KEEP_ALIVE_TIME);
303 useHttps = getProperty(SERVER_HTTPS, DEFAULT_HTTPS);
304 useSelfSignedCertificates = getProperty(SERVER_SELF_SIGNED_CERTIFICATES,
305 DEFAULT_SELF_SIGNED_CERTIFICATES);
306 String hostListNames = getProperty(HOST_LIST, null);
307 if (hostListNames != null) {
308 hostList = hostListNames.split(",");
311 String possibleError = null;
313 // fetch server information
314 String ipAddressString =
315 getProperty(SERVER_IP_ADDRESS, DEFAULT_SERVER_IP_ADDRESS);
316 int port = getProperty(SERVER_PORT, DEFAULT_SERVER_PORT);
318 possibleError = "Unknown Host: " + ipAddressString;
319 InetAddress address = InetAddress.getByName(ipAddressString);
320 InetSocketAddress socketAddress = new InetSocketAddress(address, port);
322 possibleError = "HTTP server initialization error";
323 restServer = HttpServletServerFactoryInstance.getServerFactory().build(
324 "SERVER-POOL", // name
326 socketAddress.getAddress().getHostAddress(), // host (maybe 0.0.0.0)
327 port, // port (can no longer be 0)
331 restServer.addServletClass(null, RestServerPool.class.getName());
333 // add any additional servlets
334 for (ServerPoolApi feature : ServerPoolApi.impl.getList()) {
335 Collection<Class<?>> classes = feature.servletClasses();
336 if (classes != null) {
337 for (Class<?> clazz : classes) {
338 restServer.addServletClass(null, clazz.getName());
343 // we may not know the port until after the server is started
344 possibleError = "HTTP server start error";
346 possibleError = null;
348 // determine the address to use
349 if (DEFAULT_SERVER_IP_ADDRESS.contentEquals(address.getHostAddress())) {
350 address = InetAddress.getLocalHost();
353 thisServer = new Server(new InetSocketAddress(address, port));
355 // TBD: is this really appropriate?
356 thisServer.newServer();
358 // start background thread
359 MainLoop.startThread();
360 MainLoop.queueWork(() -> {
361 // run this in the 'MainLoop' thread
365 logger.info("Listening on port {}", port);
368 } catch (UnknownHostException e) {
369 logger.error("Server.startup: exception start server", e);
370 if (possibleError == null) {
371 possibleError = e.toString();
373 return possibleError;
378 * Shut down all threads associate with server pool.
380 public static void shutdown() {
381 Discovery.stopDiscovery();
382 MainLoop.stopThread();
383 TargetLock.shutdown();
386 HashSet<Server> allServers = new HashSet<>();
387 allServers.addAll(servers.values());
388 allServers.addAll(failedServers.values());
390 for (Server server : allServers) {
391 if (server.sendThreadPool != null) {
392 server.sendThreadPool.shutdown();
395 if (restServer != null) {
396 restServer.shutdown();
401 * Return the Server instance associated with the current host.
403 * @return the Server instance associated with the current host
405 public static Server getThisServer() {
410 * Return the first Server instance in the 'servers' list.
412 * @return the first Server instance in the 'servers' list
413 * (the one with the lowest UUID)
415 public static Server getFirstServer() {
416 return servers.firstEntry().getValue();
420 * Lookup a Server instance associated with a UUID.
422 * @param uuid the key to the lookup
423 @ @return the associated 'Server' instance, or 'null' if none
425 public static Server getServer(UUID uuid) {
426 return servers.get(uuid);
430 * Return a count of the number of servers.
432 * @return a count of the number of servers
434 public static int getServerCount() {
435 return servers.size();
439 * Return the complete list of servers.
441 * @return the complete list of servers
443 public static Collection<Server> getServers() {
444 return servers.values();
448 * This method is invoked from the 'startup' thread, and creates a new
449 * 'Server' instance for the current server.
451 * @param socketAddress the IP address and port the listener is bound to
453 private Server(InetSocketAddress socketAddress) {
454 this.uuid = UUID.randomUUID();
456 this.socketAddress = socketAddress;
457 this.lastUpdateTime = System.currentTimeMillis();
461 String siteIp = getProperty(SITE_IP_ADDRESS, null);
462 int sitePort = getProperty(SITE_PORT, 0);
463 if (siteIp != null && sitePort != 0) {
464 // we do have site information specified
466 siteSocketAddress = new InetSocketAddress(siteIp, sitePort);
467 if (siteSocketAddress.getAddress() == null) {
468 logger.error("Couldn't resolve site address: {}", siteIp);
469 siteSocketAddress = null;
471 } catch (IllegalArgumentException e) {
472 logger.error("Illegal 'siteSocketAddress'", e);
473 siteSocketAddress = null;
477 // TBD: calculate checksum
481 * Initialize a 'Server' instance from a 'DataInputStream'. If it is new,
482 * it may get inserted in the table. If it is an update, fields in an
483 * existing 'Server' may be updated.
485 * @param is the 'DataInputStream'
487 Server(DataInputStream is) throws IOException {
488 // read in 16 byte UUID
489 uuid = Util.readUuid(is);
491 // read in 4 byte counter value
492 count = is.readInt();
494 // read in 16 byte MD5 checksum
495 checksum = new byte[16];
496 is.readFully(checksum);
498 // optional parameters
500 while ((tag = is.readUnsignedByte()) != END_OF_PARAMETERS_TAG) {
502 case SOCKET_ADDRESS_TAG:
503 socketAddress = readSocketAddress(is);
505 case SITE_SOCKET_ADDRESS_TAG:
506 siteSocketAddress = readSocketAddress(is);
510 logger.error("Illegal tag: {}", tag);
517 * Read an 'InetSocketAddress' from a 'DataInputStream'.
519 * @param is the 'DataInputStream'
520 * @return the 'InetSocketAddress'
522 private static InetSocketAddress readSocketAddress(DataInputStream is) throws IOException {
524 byte[] ipAddress = new byte[4];
525 is.read(ipAddress, 0, 4);
526 int port = is.readUnsignedShort();
527 return new InetSocketAddress(InetAddress.getByAddress(ipAddress), port);
534 public String toString() {
535 return "Server[" + uuid + "]";
539 * Return the UUID associated with this Server.
541 * @return the UUID associated with this Server
543 public UUID getUuid() {
548 * Return the external InetSocketAddress of the site.
550 * @return the external InetSocketAddress of the site
551 * ('null' if it doesn't exist)
553 public InetSocketAddress getSiteSocketAddress() {
554 return siteSocketAddress;
558 * This method may be called from any thread.
560 * @return 'true' if the this server is active, and 'false' if not
562 public synchronized boolean isActive() {
567 * This method writes out the data associated with the current Server
570 * @param os outout stream that should receive the data
572 void writeServerData(DataOutputStream os) throws IOException {
573 // write out 16 byte UUID
574 Util.writeUuid(os, uuid);
576 // write out 4 byte counter value
579 // write out 16 byte MD5 checksum
580 // TBD: should this be implemented?
581 os.write(checksum == null ? new byte[16] : checksum);
583 if (socketAddress != null) {
584 // write out socket address
585 os.writeByte(SOCKET_ADDRESS_TAG);
586 os.write(socketAddress.getAddress().getAddress(), 0, 4);
587 os.writeShort(socketAddress.getPort());
590 if (siteSocketAddress != null) {
591 // write out socket address
592 os.writeByte(SITE_SOCKET_ADDRESS_TAG);
593 os.write(siteSocketAddress.getAddress().getAddress(), 0, 4);
594 os.writeShort(siteSocketAddress.getPort());
597 os.writeByte(END_OF_PARAMETERS_TAG);
601 * Do any processing needed to create a new server. This method is invoked
602 * from the 'MainLoop' thread in every case except for the current server,
603 * in which case it is invoked in 'startup' prior to creating 'MainLoop'.
605 private void newServer() {
606 Server failed = failedServers.get(uuid);
607 if (failed != null) {
608 // this one is on the failed list -- see if the counter has advanced
609 if ((count - failed.count) <= 0) {
610 // the counter has not advanced -- ignore
614 // the counter has advanced -- somehow, this server has returned
615 failedServers.remove(uuid);
616 synchronized (this) {
619 logger.error("Server reawakened: {} ({})", uuid, socketAddress);
622 lastUpdateTime = System.currentTimeMillis();
623 servers.put(uuid, this);
624 updatedList.add(this);
626 // notify list will need to be rebuilt
629 if (socketAddress != null && this != thisServer) {
630 // initialize 'client' and 'target' fields
631 if (siteSocketAddress != null
632 && !siteSocketAddress.equals(thisServer.siteSocketAddress)) {
633 // destination is on a remote site
634 destSocketAddress = siteSocketAddress;
636 // destination is on the local site -- use direct addressing
637 destSocketAddress = socketAddress;
639 destName = socketAddressToName(destSocketAddress);
641 // 'client' is used for REST messages to the destination
642 client = buildClient(uuid.toString(), destSocketAddress, destName);
644 // initialize the 'target' field
645 target = getTarget(client);
646 } catch (KeyManagementException | NoSuchAlgorithmException
647 | NoSuchFieldException | IllegalAccessException
648 | ClassNotFoundException | HttpClientConfigException e) {
649 logger.error("Server.newServer: problems creating 'client'", e);
652 logger.info("New server: {} ({})", uuid, socketAddress);
653 for (Events listener : Events.getListeners()) {
654 listener.newServer(this);
659 * Check the server state in response to some issue. At present, only the
660 * 'destName' information is checked.
662 private void checkServer() {
663 // recalculate 'destName' (we have seen DNS issues)
664 String newDestName = socketAddressToName(destSocketAddress);
665 if (newDestName.equals(destName)) {
668 logger.warn("Remote host name for {} has changed from {} to {}",
669 destSocketAddress, destName, newDestName);
671 // shut down old client, and rebuild
676 // update 'destName', and rebuild the client
677 destName = newDestName;
679 // 'client' is used for REST messages to the destination
680 client = buildClient(uuid.toString(), destSocketAddress, destName);
682 // initialize the 'target' field
683 target = getTarget(client);
684 } catch (KeyManagementException | NoSuchAlgorithmException
685 | NoSuchFieldException | IllegalAccessException
686 | ClassNotFoundException | HttpClientConfigException e) {
687 logger.error("Server.checkServer: problems recreating 'client'", e);
692 * Update server data.
694 * @param serverData this is a temporary 'Server' instance created from
695 * an incoming message, which is used to update fields within the
696 * 'Server' instance identified by 'this'
698 private void updateServer(Server serverData) {
699 if (serverData.count > count) {
700 // an update has occurred
701 count = serverData.count;
703 // TBD: calculate and verify checksum, more fields may be updated
705 // adjust 'allowedGap' accordingly
706 long currentTime = System.currentTimeMillis();
707 long gap = currentTime - lastUpdateTime;
709 // adjust 'allowedGap' accordingly
710 // TBD: need properties to support overrides
711 gap = gap * 3 / 2 + adaptiveGapAdjust;
712 if (gap > allowedGap) {
713 // update 'allowedGap' immediately
716 // gradually pull the allowed gap down
717 // TBD: need properties to support overrides
718 allowedGap = (allowedGap * 15 + gap) / 16;
720 lastUpdateTime = currentTime;
722 updatedList.add(this);
727 * a server has failed.
729 private void serverFailed() {
731 synchronized (this) {
735 // remove it from the table
736 servers.remove(uuid);
738 // add it to the failed servers table
739 failedServers.put(uuid, this);
741 // clean up client information
742 if (client != null) {
748 // log an error message
749 logger.error("Server failure: {} ({})", uuid, socketAddress);
750 for (Events listener : Events.getListeners()) {
751 listener.serverFailed(this);
756 * Fetch, and possibily calculate, the "notify list" associated with this
757 * server. This is the list of servers to forward a server and bucket
758 * information to, and is approximately log2(n) in length, where 'n' is
759 * the total number of servers.
760 * It is calculated by starting with all of the servers sorted by UUID --
761 * let's say the current server is at position 's'. The notify list will
762 * contain the server at positions:
767 * Using all powers of 2 less than 'n'. If the total server count is 50,
768 * this list has 6 entries.
769 * @return the notify list
771 static Collection<Server> getNotifyList() {
772 // The 'notifyList' value is initially 'null', and it is reset to 'null'
773 // every time a new host joins, or one leaves. That way, it is marked for
774 // recalculation, but only when needed.
775 if (notifyList == null) {
776 // next index we are looking for
779 // our current position in the Server table -- starting at 'thisServer'
780 UUID current = thisServer.uuid;
782 // site socket address of 'current'
783 InetSocketAddress thisSiteSocketAddress = thisServer.siteSocketAddress;
785 // hash set of all site socket addresses located
786 HashSet<InetSocketAddress> siteSocketAddresses = new HashSet<>();
787 siteSocketAddresses.add(thisSiteSocketAddress);
789 // the list we are building
790 notifyList = new LinkedList<Server>();
794 // move to the next key (UUID) -- if we hit the end of the table,
795 // wrap to the beginning
796 current = servers.higherKey(current);
797 if (current == null) {
798 current = servers.firstKey();
800 if (current.equals(thisServer.uuid)) {
801 // we have looped through the entire list
805 // fetch associated server & site socket address
806 Server server = servers.get(current);
807 InetSocketAddress currentSiteSocketAddress =
808 server.siteSocketAddress;
810 if (Objects.equals(thisSiteSocketAddress,
811 currentSiteSocketAddress)) {
812 // same site -- see if we should add this one
814 // this is the next index we are looking for --
816 notifyList.add(server);
818 // advance to the next offset (current-offset * 2)
822 } else if (!siteSocketAddresses.contains(currentSiteSocketAddress)) {
823 // we need at least one member from each site
824 notifyList.add(server);
825 siteSocketAddresses.add(currentSiteSocketAddress);
833 * See if there is a host name associated with a destination socket address.
835 * @param dest the socket address of the destination
836 * @return the host name associated with the IP address, or the IP address
837 * if no associated host name is found.
839 private static String socketAddressToName(InetSocketAddress dest) {
840 // destination IP address
841 InetAddress inetAddress = dest.getAddress();
842 String destName = null;
844 // go through the 'hostList' to see if there is a matching name
845 for (String hostName : hostList) {
847 if (inetAddress.equals(InetAddress.getByName(hostName))) {
848 // this one matches -- use the name instead of the IP address
852 } catch (UnknownHostException e) {
853 logger.debug("Server.socketAddressToName error", e);
857 // default name = string value of IP address
858 return destName == null ? inetAddress.getHostAddress() : destName;
862 * Create an 'HttpClient' instance for a particular host.
864 * @param name of the host (currently a UUID or host:port string)
865 * @param dest the socket address of the destination
866 * @param destName the string name to use for the destination
868 static HttpClient buildClient(String name, InetSocketAddress dest, String destName)
869 throws KeyManagementException, NoSuchAlgorithmException,
870 ClassNotFoundException, HttpClientConfigException {
872 return HttpClientFactoryInstance.getClientFactory().build(
873 BusTopicParams.builder()
874 .clientName(name) // name
875 .useHttps(useHttps) // https
876 .allowSelfSignedCerts(useSelfSignedCertificates)// selfSignedCerts
877 .hostname(destName) // host
878 .port(dest.getPort()) // port
879 .managed(false) // managed
884 * Extract the 'WebTarget' information from the 'HttpClient'.
886 * @param client the associated HttpClient instance
887 * @return a WebTarget referring to the previously-specified 'baseUrl'
889 static WebTarget getTarget(HttpClient client)
890 throws NoSuchFieldException, IllegalAccessException {
891 // need access to the internal field 'client'
892 // TBD: We need a way to get this information without reflection
893 Field field = client.getClass().getDeclaredField("client");
894 field.setAccessible(true);
895 Client rsClient = (Client) field.get(client);
896 field.setAccessible(false);
898 rsClient.property(ClientProperties.CONNECT_TIMEOUT, connectTimeout);
899 rsClient.property(ClientProperties.READ_TIMEOUT, readTimeout);
901 // For performance reasons, the root 'WebTarget' is generated only once
902 // at initialization time for each remote host.
903 return rsClient.target(client.getBaseUrl());
907 * This method may be invoked from any thread, and is used to send a
908 * message to the destination server associated with this 'Server' instance.
910 * @param path the path relative to the base URL
911 * @param entity the "request entity" containing the body of the
914 public void post(final String path, final Entity<?> entity) {
915 post(path, entity, null);
919 * This method may be invoked from any thread, and is used to send a
920 * message to the destination server associated with this 'Server' instance.
922 * @param path the path relative to the base URL
923 * @param entity the "request entity" containing the body of the
924 * HTTP POST request (if 'null', an HTTP GET is used instead)
925 * @param responseCallback if non-null, this callback may be used to
926 * modify the WebTarget, and/or receive the POST response message
928 public void post(final String path, final Entity<?> entity,
929 PostResponse responseCallback) {
930 if (target == null) {
934 getThreadPool().execute(() -> {
936 * This method is running within the 'MainLoop' thread.
939 WebTarget webTarget = target.path(path);
940 if (responseCallback != null) {
941 // give callback a chance to modify 'WebTarget'
942 webTarget = responseCallback.webTarget(webTarget);
944 // send the response to the callback
946 if (entity == null) {
947 response = webTarget.request().get();
949 response = webTarget.request().post(entity);
951 responseCallback.response(response);
953 // just do the invoke, and ignore the response
954 if (entity == null) {
955 webTarget.request().get();
957 webTarget.request().post(entity);
960 } catch (Exception e) {
961 logger.error("Failed to send to {} ({}, {})",
962 uuid, destSocketAddress, destName);
963 if (responseCallback != null) {
964 responseCallback.exceptionResponse(e);
966 MainLoop.queueWork(() -> {
967 // this runs in the 'MainLoop' thread
969 // the DNS cache may have been out-of-date when this server
970 // was first contacted -- fix the problem, if needed
978 * This method may be invoked from any thread.
980 * @return the 'ThreadPoolExecutor' associated with this server
982 public synchronized ThreadPoolExecutor getThreadPool() {
983 if (sendThreadPool == null) {
984 // build a thread pool for this Server
986 new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
987 keepAliveTime, TimeUnit.MILLISECONDS,
988 new LinkedTransferQueue<Runnable>());
989 sendThreadPool.allowCoreThreadTimeOut(true);
991 return sendThreadPool;
995 * Lower-level method supporting HTTP, which requires that the caller's
996 * thread tolerate blocking. This method may be called from any thread.
998 * @param path the path relative to the base URL
999 * @return a 'WebTarget' instance pointing to this path
1001 public WebTarget getWebTarget(String path) {
1002 return target == null ? null : target.path(path);
1006 * This method may be invoked from any thread, but its real intent is
1007 * to decode an incoming 'admin' message (which is Base-64-encoded),
1008 * and send it to the 'MainLoop' thread for processing.
1010 * @param data the base-64-encoded data
1012 static void adminRequest(byte[] data) {
1013 final byte[] packet = Base64.getDecoder().decode(data);
1014 Runnable task = () -> {
1016 ByteArrayInputStream bis = new ByteArrayInputStream(packet);
1017 DataInputStream dis = new DataInputStream(bis);
1019 while (dis.available() != 0) {
1020 Server serverData = new Server(dis);
1022 // TBD: Compare with current server
1024 Server server = servers.get(serverData.uuid);
1025 if (server == null) {
1026 serverData.newServer();
1028 server.updateServer(serverData);
1031 } catch (Exception e) {
1032 logger.error("Server.adminRequest: can't decode packet", e);
1035 MainLoop.queueWork(task);
1039 * Send out information about servers 'updatedList' to all servers
1040 * in 'notifyList' (may need to build or rebuild 'notifyList').
1042 static void sendOutData() throws IOException {
1043 // include 'thisServer' in the data -- first, advance the count
1044 thisServer.count += 1;
1045 if (thisServer.count == 0) {
1047 * counter wrapped (0 is a special case) --
1048 * actually, we could probably leave this out, because it would take
1049 * more than a century to wrap if the increment is 1 second
1051 thisServer.count = 1;
1054 ByteArrayOutputStream bos = new ByteArrayOutputStream();
1055 DataOutputStream dos = new DataOutputStream(bos);
1057 thisServer.lastUpdateTime = System.currentTimeMillis();
1058 thisServer.writeServerData(dos);
1060 // include all hosts in the updated list
1061 for (Server server : updatedList) {
1062 server.writeServerData(dos);
1064 updatedList.clear();
1066 // create an 'Entity' that can be sent out to all hosts in the notify list
1067 Entity<String> entity = Entity.entity(
1068 new String(Base64.getEncoder().encode(bos.toByteArray()), StandardCharsets.UTF_8),
1069 MediaType.APPLICATION_OCTET_STREAM_TYPE);
1070 for (Server server : getNotifyList()) {
1071 server.post("admin", entity);
1076 * Search for servers which have taken too long to respond.
1078 static void searchForFailedServers() {
1079 long currentTime = System.currentTimeMillis();
1081 // used to build a list of newly-failed servers
1082 LinkedList<Server> failed = new LinkedList<>();
1083 for (Server server : servers.values()) {
1084 if (server == thisServer) {
1087 long gap = currentTime - server.lastUpdateTime;
1088 if (gap > server.allowedGap) {
1089 // add it to the failed list -- we don't call 'serverFailed' yet,
1090 // because this updates the server list, and leads to a
1091 // 'ConcurrentModificationException'
1096 // remove servers from our list
1097 if (!failed.isEmpty()) {
1098 for (Server server : failed) {
1099 server.serverFailed();
1106 * This method may be invoked from any thread:
1107 * Send information about 'thisServer' to the list of hosts.
1109 * @param out the 'PrintStream' to use for displaying information
1110 * @param hosts a comma-separated list of entries containing
1111 * 'host:port' or just 'port' (current host is implied in this case)
1113 static void pingHosts(PrintStream out, String hosts) {
1114 LinkedList<InetSocketAddress> addresses = new LinkedList<>();
1115 boolean error = false;
1117 for (String host : hosts.split(",")) {
1119 String[] segs = host.split(":");
1121 switch (segs.length) {
1123 addresses.add(new InetSocketAddress(InetAddress.getLocalHost(),
1124 Integer.parseInt(segs[0])));
1127 addresses.add(new InetSocketAddress(segs[0],
1128 Integer.parseInt(segs[1])));
1131 out.println(host + ": Invalid host/port value");
1135 } catch (NumberFormatException e) {
1136 out.println(host + ": Invalid port value");
1137 logger.error(PINGHOSTS_ERROR, e);
1139 } catch (UnknownHostException e) {
1140 out.println(host + ": Unknown host");
1141 logger.error(PINGHOSTS_ERROR, e);
1146 pingHosts(out, addresses);
1151 * This method may be invoked from any thread:
1152 * Send information about 'thisServer' to the list of hosts.
1154 * @param out the 'PrintStream' to use for displaying information
1155 * @param hosts a collection of 'InetSocketAddress' instances, which are
1156 * the hosts to send the information to
1158 static void pingHosts(final PrintStream out,
1159 final Collection<InetSocketAddress> hosts) {
1160 FutureTask<Integer> ft = new FutureTask<>(() -> {
1161 ByteArrayOutputStream bos = new ByteArrayOutputStream();
1162 DataOutputStream dos = new DataOutputStream(bos);
1164 // add information for this server only
1166 thisServer.writeServerData(dos);
1168 // create an 'Entity' that can be sent out to all hosts
1169 Entity<String> entity = Entity.entity(
1170 new String(Base64.getEncoder().encode(bos.toByteArray()),
1171 StandardCharsets.UTF_8),
1172 MediaType.APPLICATION_OCTET_STREAM_TYPE);
1174 // loop through hosts
1175 for (InetSocketAddress host : hosts) {
1176 HttpClient httpClient = null;
1179 httpClient = buildClient(host.toString(), host,
1180 socketAddressToName(host));
1181 getTarget(httpClient).path("admin").request().post(entity);
1182 httpClient.shutdown();
1184 } catch (KeyManagementException | NoSuchAlgorithmException e) {
1185 out.println(host + ": Unable to create client connection");
1186 logger.error(PINGHOSTS_ERROR, e);
1187 } catch (NoSuchFieldException | IllegalAccessException e) {
1188 out.println(host + ": Unable to get link to target");
1189 logger.error(PINGHOSTS_ERROR, e);
1190 } catch (Exception e) {
1191 out.println(host + ": " + e);
1192 logger.error(PINGHOSTS_ERROR, e);
1194 if (httpClient != null) {
1195 httpClient.shutdown();
1198 } catch (IOException e) {
1199 out.println("Unable to generate 'ping' data: " + e);
1200 logger.error(PINGHOSTS_ERROR, e);
1205 MainLoop.queueWork(ft);
1207 ft.get(60, TimeUnit.SECONDS);
1208 } catch (InterruptedException e) {
1209 logger.error("Server.pingHosts: interrupted waiting for queued work", e);
1210 Thread.currentThread().interrupt();
1211 } catch (ExecutionException | TimeoutException e) {
1212 logger.error("Server.pingHosts: error waiting for queued work", e);
1217 * This method may be invoked from any thread:
1218 * Dump out the current 'servers' table in a human-readable table form.
1220 * @param out the 'PrintStream' to dump the table to
1222 public static void dumpHosts(final PrintStream out) {
1223 FutureTask<Integer> ft = new FutureTask<>(() -> {
1224 dumpHostsInternal(out);
1227 MainLoop.queueWork(ft);
1229 ft.get(60, TimeUnit.SECONDS);
1230 } catch (InterruptedException e) {
1231 logger.error("Server.dumpHosts: interrupted waiting for queued work", e);
1232 Thread.currentThread().interrupt();
1233 } catch (ExecutionException | TimeoutException e) {
1234 logger.error("Server.dumpHosts: error waiting for queued work", e);
1239 * Dump out the current 'servers' table in a human-readable table form.
1241 * @param out the 'PrintStream' to dump the table to
1243 private static void dumpHostsInternal(PrintStream out) {
1244 // modifications to 'servers.values()' and 'notifyList'.
1245 HashSet<Server> localNotifyList = new HashSet<>(getNotifyList());
1247 // see if we have any site information
1248 boolean siteData = false;
1249 for (Server server : servers.values()) {
1250 if (server.siteSocketAddress != null) {
1256 String format = "%1s %-36s %-15s %5s %5s %12s %7s %7s\n";
1257 SimpleDateFormat dateFormat = new SimpleDateFormat("kk:mm:ss.SSS");
1260 format = "%1s %-36s %-15s %5s %-15s %5s %5s %12s %7s %7s\n";
1262 out.printf(format, "", "UUID", "IP Address", "Port",
1263 "Site IP Address", "Port",
1264 "Count", "Update Time", "Elapsed", "Allowed");
1265 out.printf(format, "", "----", "----------", "----",
1266 "---------------", "----",
1267 "-----", "-----------", "-------", "-------");
1271 out.printf(format, "", "UUID", "IP Address", "Port",
1272 "Count", "Update Time", "Elapsed", "Allowed");
1273 out.printf(format, "", "----", "----------", "----",
1274 "-----", "-----------", "-------", "-------");
1278 long currentTime = System.currentTimeMillis();
1279 for (Server server : servers.values()) {
1280 String thisOne = "";
1282 if (server == thisServer) {
1284 } else if (localNotifyList.contains(server)) {
1290 String sitePort = "";
1291 if (server.siteSocketAddress != null) {
1293 server.siteSocketAddress.getAddress().getHostAddress();
1294 sitePort = String.valueOf(server.siteSocketAddress.getPort());
1297 out.printf(format, thisOne, server.uuid,
1298 server.socketAddress.getAddress().getHostAddress(),
1299 server.socketAddress.getPort(),
1300 siteIp, sitePort, server.count,
1301 dateFormat.format(new Date(server.lastUpdateTime)),
1302 currentTime - server.lastUpdateTime,
1305 out.printf(format, thisOne, server.uuid,
1306 server.socketAddress.getAddress().getHostAddress(),
1307 server.socketAddress.getPort(), server.count,
1308 dateFormat.format(new Date(server.lastUpdateTime)),
1309 currentTime - server.lastUpdateTime,
1313 out.println("Count: " + servers.size());
1316 /* ============================================================ */
1319 * This interface supports the 'post' method, and provides the opportunity
1320 * to change the WebTarget and/or receive the POST response message.
1322 interface PostResponse {
1324 * Callback that can be used to modify 'WebTarget', and do things like
1325 * add query parameters.
1327 * @param webTarget the current WebTarget
1328 * @return the updated WebTarget
1330 public default WebTarget webTarget(WebTarget webTarget) {
1335 * Callback that passes the POST response.
1337 * @param response the POST response
1339 public default void response(Response response) {
1343 * Callback that passes the POST exception response.
1346 public default void exceptionResponse(Exception exception) {
1347 Response.ResponseBuilder response;
1348 response = Response.serverError();
1349 this.response(response.build());