// Read the command definitions
try {
commands = new JSONHandler<CLICommands>().read(CLICommands.class, parameters.getMetadataStream());
- LOGGER.debug("found " + commands.getCommandSet().size() + " commands");
} catch (final Exception e) {
LOGGER.error("start of Apex command line editor failed, error reading command metadata from "
+ parameters.getMetadataLocation());
}
// The JSON processing returns null if there is an empty file
- if (null == commands) {
+ if (commands == null || commands.getCommandSet().isEmpty()) {
LOGGER.error("start of Apex command line editor failed, no commands found in "
+ parameters.getApexPropertiesLocation());
errorCount++;
return;
}
+ LOGGER.debug("found " + commands.getCommandSet().size() + " commands");
+
// Read the Apex properties
try {
apexModelProperties = new JSONHandler<ApexModelProperties>().read(ApexModelProperties.class,
parameters.getApexPropertiesStream());
- LOGGER.debug("model properties are: " + apexModelProperties.toString());
} catch (final Exception e) {
LOGGER.error("start of Apex command line editor failed, error reading Apex model properties from "
+ parameters.getApexPropertiesLocation());
return;
}
+ LOGGER.debug("model properties are: " + apexModelProperties.toString());
+
// Find the system commands
final Set<KeywordNode> systemCommandNodes = new TreeSet<>();
for (final CLICommand command : commands.getCommandSet()) {
import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
import org.onap.policy.apex.model.utilities.TextFileUtils;
public class TestCLIEditorEventsContext {
// CHECKSTYLE:OFF: MagicNumber
+ private static final Path SRC_MAIN_FOLDER = Paths.get("src/main/resources/");
+ private static final Path SRC_TEST_FOLDER = Paths.get("src/test/resources/");
+
+ private static final Path SUB_FOLDER = SRC_MAIN_FOLDER.resolve("examples/scripts/");
+
+ private static final String SPACES = "\\s+";
+ private static final String EMPTY_STRING = "";
+
+ private static final Path APEX_AVRO_POLICY_FILE = SUB_FOLDER.resolve("TestPolicyAvroEventContext.apex");
+ private static final Path APEX_JAVA_POLICY_FILE = SUB_FOLDER.resolve("TestPolicyJavaEventContext.apex");
+
+ private static final String FILE_NAME = "TestPolicyJavaEventsAndContext";
+ private static final String JSON_FILE = FILE_NAME + ".json";
+ private static final String LOG_FILE = FILE_NAME + ".log";
+
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
/**
* Test java context model.
*
*/
@Test
public void testJavaContextModel() throws IOException, ApexModelException {
- final File tempLogFile = File.createTempFile("TestPolicyJavaEventsAndContext", ".log");
- final File tempModelFile = File.createTempFile("TestPolicyJavaEventsAndContext", ".json");
- final String[] cliArgs =
- new String[] {"-c", "src/main/resources/examples/scripts/TestPolicyJavaEventContext.apex", "-l",
- tempLogFile.getAbsolutePath(), "-o", tempModelFile.getAbsolutePath()};
+ final File tempLogFile = temporaryFolder.newFile(LOG_FILE);
+ final File tempModelFile = temporaryFolder.newFile(JSON_FILE);
+
+ final String[] cliArgs = new String[] {"-c", APEX_JAVA_POLICY_FILE.toString(), "-l",
+ tempLogFile.getAbsolutePath(), "-o", tempModelFile.getAbsolutePath()};
final ApexCLIEditorMain cliEditor = new ApexCLIEditorMain(cliArgs);
assertEquals(0, cliEditor.getErrorCount());
final String modelString = TextFileUtils.getTextFileAsString(tempModelFile.getCanonicalPath());
// As a sanity check, count the number of non white space characters in log and model files
- final int logCharCount = logString.replaceAll("\\s+", "").length();
- final int modelCharCount = modelString.replaceAll("\\s+", "").length();
+ final int logCharCount = logString.replaceAll(SPACES, EMPTY_STRING).length();
+ final int modelCharCount = modelString.replaceAll(SPACES, EMPTY_STRING).length();
assertEquals(25911, logCharCount);
assertEquals(46138, modelCharCount);
-
- tempLogFile.delete();
- tempModelFile.delete();
}
/**
*/
@Test
public void testAvroContextModel() throws IOException, ApexModelException {
- final File tempLogFile = File.createTempFile("TestPolicyAvroEventsAndContext", ".log");
- final File tempModelFile = File.createTempFile("TestPolicyAvroEventsAndContext", ".json");
- final String[] cliArgs =
- new String[] {"-c", "src/main/resources/examples/scripts/TestPolicyAvroEventContext.apex", "-l",
- tempLogFile.getAbsolutePath(), "-o", tempModelFile.getAbsolutePath()};
+ final File tempLogFile = temporaryFolder.newFile(LOG_FILE);
+ final File tempModelFile = temporaryFolder.newFile(JSON_FILE);
+
+ final String[] cliArgs = new String[] {"-c", APEX_AVRO_POLICY_FILE.toString(), "-l",
+ tempLogFile.getAbsolutePath(), "-o", tempModelFile.getAbsolutePath()};
final ApexCLIEditorMain cliEditor = new ApexCLIEditorMain(cliArgs);
assertEquals(0, cliEditor.getErrorCount());
final String modelString = TextFileUtils.getTextFileAsString(tempModelFile.getCanonicalPath());
// As a sanity check, count the number of non white space characters in log and model files
- final int logCharCount = logString.replaceAll("\\s+", "").length();
- final int modelCharCount = modelString.replaceAll("\\s+", "").length();
+ final int logCharCount = logString.replaceAll(SPACES, EMPTY_STRING).length();
+ final int modelCharCount = modelString.replaceAll(SPACES, EMPTY_STRING).length();
assertEquals(30315, logCharCount);
assertEquals(52930, modelCharCount);
- tempLogFile.delete();
- tempModelFile.delete();
}
+
+ @Test
+ public void test_emptyMetadataCommandFileWithEmptyJsonTag_errorcountGreaterThanOne() throws IOException {
+
+ final File tempLogFile = temporaryFolder.newFile(LOG_FILE);
+ final File tempModelFile = temporaryFolder.newFile(JSON_FILE);
+
+ final String modelFile = SRC_TEST_FOLDER.resolve("model").resolve("empty_commands.json").toString();
+ final String apexPropertiesLocation =
+ SRC_MAIN_FOLDER.resolve("etc/editor").resolve("ApexModelProperties.json").toString();
+
+ final String[] cliArgs =
+ new String[] {"-c", APEX_AVRO_POLICY_FILE.toString(), "-l", tempLogFile.getAbsolutePath(), "-o",
+ tempModelFile.getAbsolutePath(), "-m", modelFile, "-a", apexPropertiesLocation};
+
+ final ApexCLIEditorMain objUnderTest = new ApexCLIEditorMain(cliArgs);
+ assertEquals(1, objUnderTest.getErrorCount());
+
+ }
+
+ @Test
+ public void test_emptyMetadataCommandFile_errorcountGreaterThanOne() throws IOException {
+
+ final File tempLogFile = temporaryFolder.newFile(LOG_FILE);
+ final File tempModelFile = temporaryFolder.newFile(JSON_FILE);
+
+ final File modelFile = temporaryFolder.newFile("empty_commands.json");
+
+ final String apexPropertiesLocation =
+ SRC_MAIN_FOLDER.resolve("etc/editor").resolve("ApexModelProperties.json").toString();
+
+ final String[] cliArgs = new String[] {"-c", APEX_AVRO_POLICY_FILE.toString(), "-l",
+ tempLogFile.getAbsolutePath(), "-o", tempModelFile.getAbsolutePath(), "-m", modelFile.getAbsolutePath(),
+ "-a", apexPropertiesLocation};
+
+ final ApexCLIEditorMain objUnderTest = new ApexCLIEditorMain(cliArgs);
+ assertEquals(1, objUnderTest.getErrorCount());
+
+ }
+
}
item = new TestContextItem003(0L);
}
lTypeAlbum.put("testValue", item);
- } catch (final Exception exception) {
- throw exception;
} finally {
- try {
- lTypeAlbum.unlockForWriting("testValue");
- } catch (final ContextException contextException) {
- throw contextException;
- }
+ lTypeAlbum.unlockForWriting("testValue");
}
}
}
package org.onap.policy.apex.core.deployment;
-import com.google.common.eventbus.Subscribe;
-
import java.net.URI;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
+import com.google.common.eventbus.Subscribe;
+
/**
- * The Class DeploymentClient handles the client side of an EngDep communication session with an Apex server. It runs a
- * thread to handle message sending and session monitoring. It uses a sending queue to queue messages for sending by the
- * client thread and a receiving queue to queue messages received from the Apex engine.
+ * The Class DeploymentClient handles the client side of an EngDep communication session with an
+ * Apex server. It runs a thread to handle message sending and session monitoring. It uses a sending
+ * queue to queue messages for sending by the client thread and a receiving queue to queue messages
+ * received from the Apex engine.
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
thisThread.setName(DeploymentClient.class.getName() + "-" + host + ":" + port);
try {
- // Establish a connection to the Apex server for EngDep message communication over Web Sockets
+ // Establish a connection to the Apex server for EngDep message communication over Web
+ // Sockets
service = factory.createClient(new URI("ws://" + host + ":" + port));
service.addMessageListener(new DeploymentClientListener());
LOGGER.error("engine<-->deployment client thread exception", e);
return;
}
-
// Loop forever, sending messages as they appear on the queue
while (true) {
try {
} catch (final InterruptedException e) {
// Message sending has been interrupted, we are finished
LOGGER.debug("engine<-->deployment client interrupted");
+ // restore the interrupt status
+ thisThread.interrupt();
break;
}
}
}
/**
- * The listener interface for receiving deploymentClient events. The class that is interested in processing a
- * deploymentClient event implements this interface, and the object created with that class is registered with a
- * component using the component's {@code addDeploymentClientListener} method. When the deploymentClient event
- * occurs, that object's appropriate method is invoked.
+ * The listener interface for receiving deploymentClient events. The class that is interested in
+ * processing a deploymentClient event implements this interface, and the object created with
+ * that class is registered with a component using the component's
+ * {@code addDeploymentClientListener} method. When the deploymentClient event occurs, that
+ * object's appropriate method is invoked.
*
* @see DeploymentClientEvent
*/
/*
* (non-Javadoc)
*
- * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap.policy.apex.core.
- * infrastructure.messaging.impl.ws.messageblock. MessageBlock)
+ * @see
+ * org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap.
+ * policy.apex.core. infrastructure.messaging.impl.ws.messageblock. MessageBlock)
*/
@Subscribe
@Override
/*
* (non-Javadoc)
*
- * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang.String)
+ * @see
+ * org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang.
+ * String)
*/
@Override
public void onMessage(final String messageString) {
import org.slf4j.ext.XLoggerFactory;
/**
- * The Class Deployer deploys an Apex model held as an XML file onto an Apex engine. It uses the EngDep protocol to
- * communicate with the engine, with the EngDep protocol being carried on Java web sockets.
+ * The Class Deployer deploys an Apex model held as an XML file onto an Apex engine. It uses the
+ * EngDep protocol to communicate with the engine, with the EngDep protocol being carried on Java
+ * web sockets.
*
- * This deployer is a simple command line deployer that reads the communication parameters and the location of the XML
- * model file as arguments.
+ * This deployer is a simple command line deployer that reads the communication parameters and the
+ * location of the XML model file as arguments.
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
// Get a reference to the logger
private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceFacade.class);
- // The default message timeout and timeout increment (the amount of time between polls) in milliseconds
+ // The default message timeout and timeout increment (the amount of time between polls) in
+ // milliseconds
private static final int CLIENT_START_WAIT_INTERVAL = 100;
private static final int REPLY_MESSAGE_TIMEOUT_DEFAULT = 10000;
private static final int REPLY_MESSAGE_TIMEOUT_INCREMENT = 100;
try {
LOGGER.debug("handshaking with server {}:{} . . .", hostName, port);
- // Use the deployment client to handle the EngDep communication towards the Apex server. It runs a thread to
+ // Use the deployment client to handle the EngDep communication towards the Apex server.
+ // It runs a thread to
// monitor the session and to send
// messages
client = new DeploymentClient(hostName, port);
*
* @param modelFileName the name of the model file containing the model to deploy
* @param ignoreConflicts true if conflicts between context in polices is to be ignored
- * @param force true if the model is to be applied even if it is incompatible with the existing model
+ * @param force true if the model is to be applied even if it is incompatible with the existing
+ * model
* @throws ApexException on Apex errors
* @throws IOException on IO exceptions from the operating system
*/
* @param modelFileName the name of the model file containing the model to deploy
* @param modelInputStream the stream that holds the Apex model
* @param ignoreConflicts true if conflicts between context in polices is to be ignored
- * @param force true if the model is to be applied even if it is incompatible with the existing model
+ * @param force true if the model is to be applied even if it is incompatible with the existing
+ * model
* @throws ApexException on model deployment errors
*/
public void deployModel(final String modelFileName, final InputStream modelInputStream,
*
* @param apexPolicyModel the name of the model to deploy
* @param ignoreConflicts true if conflicts between context in polices is to be ignored
- * @param force true if the model is to be applied even if it is incompatible with the existing model
+ * @param force true if the model is to be applied even if it is incompatible with the existing
+ * model
* @throws ApexException on model deployment errors
*/
public void deployModel(final AxPolicyModel apexPolicyModel, final boolean ignoreConflicts, final boolean force)
try {
receivedMessage = client.getReceiveQueue().poll(REPLY_MESSAGE_TIMEOUT_INCREMENT, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
LOGGER.warn("reception of response from server interrupted {}:{}", hostName, port, e);
throw new ApexDeploymentException(
"reception of response from server interrupted " + hostName + ':' + port, e);
import org.onap.policy.apex.model.utilities.Assertions;
/**
- * This class is the output of a state, and is used by the engine to decide what the next state for execution is.
+ * This class is the output of a state, and is used by the engine to decide what the next state for
+ * execution is.
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
for (final Entry<String, Object> incomingFieldEntry : eventFieldMap.entrySet()) {
final String fieldName = incomingFieldEntry.getKey();
final AxField fieldDef = incomingFieldDefinitionMap.get(fieldName);
- try {
-
- // Check if this field is a field in the event
- if (!outputEventDef.getFields().contains(fieldDef)) {
- throw new StateMachineException(
- "field \"" + fieldName + "\" does not exist on event \"" + outputEventDef.getID() + "\"");
- }
- } catch (final Exception e) {
- e.printStackTrace();
+
+ // Check if this field is a field in the event
+ if (!outputEventDef.getFields().contains(fieldDef)) {
+ throw new StateMachineException(
+ "field \"" + fieldName + "\" does not exist on event \"" + outputEventDef.getID() + "\"");
}
// Set the value in the output event
}
/**
- * This method copies any fields that exist on the input event that also exist on the output event if they are not
- * set on the output event.
+ * This method copies any fields that exist on the input event that also exist on the output
+ * event if they are not set on the output event.
*
* @param incomingEvent The incoming event to copy from
*/
// CHECKSTYLE:OFF: checkstyle:visibilityModifier Logic has access to this field
/** The full definition information for the state. */
- public AxState state;
+ public final AxState state;
// CHECKSTYLE:ON: checkstyle:visibilityModifier
* The full definition of the task we are presenting a facade to, executing logic has full access to the task
* definition.
*/
- public AxTask task;
+ public final AxTask task;
// CHECKSTYLE:ON: checkstyle:visibilityModifier
* fields to determine what state output to select. Once a state finalizer has selected a state output, it must
* marshal these fields so that they match the fields required for the event defined in the state output.
*/
- public Map<String, Object> fields;
+ public final Map<String, Object> fields;
// A message specified in the logic
private String message;
import org.slf4j.ext.XLoggerFactory;
/**
- * The Class RawMessageHandler handles raw messages being received on a Java web socket and forwards the messages to the
- * DataHandler instance that has subscribed to the RawMessageHandler instance.
+ * The Class RawMessageHandler handles raw messages being received on a Java web socket and forwards
+ * the messages to the DataHandler instance that has subscribed to the RawMessageHandler instance.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
* @param <MESSAGE> the generic type of message being received
return;
}
- // Read the messages from the web socket and place them on the message queue for handling by the queue
+ // Read the messages from the web socket and place them on the message queue for handling by
+ // the queue
// processing thread
ObjectInputStream ois = null;
try {
final MessageHolder<MESSAGE> messageHolder = (MessageHolder<MESSAGE>) ois.readObject();
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("message {} recieved from the client {} ", messageHolder.toString(),
+ LOGGER.debug("message {} recieved from the client {} ", messageHolder,
messageHolder == null ? "Apex Engine " : messageHolder.getSenderHostAddress());
}
}
/**
- * This method is called when a string message is received on a web socket and is to be forwarded to a listener.
+ * This method is called when a string message is received on a web socket and is to be
+ * forwarded to a listener.
*
* @param messageString the message string
*/
dataHandler.post(messageBlock);
}
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
LOGGER.debug("raw message listening has been interrupted");
break;
}
dataHandler.post(stringMessage);
}
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
LOGGER.debug("raw message listening has been interrupted");
break;
}
try {
Thread.sleep(QUEUE_POLL_TIMEOUT);
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
LOGGER.debug("raw message listening has been interrupted");
break;
}
}
/**
- * This method is called when a message is received on a web socket and is to be forwarded to a listener.
+ * This method is called when a message is received on a web socket and is to be forwarded to a
+ * listener.
*
* @param data the message data containing a message
*/
if (listener == null) {
throw new IllegalArgumentException("The listener object cannot be null");
}
- if (dataHandler == null) {
- throw new IllegalStateException("Data handler not initialized");
- }
}
}
import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
/**
- * The Class MessagingClient is the class that wraps web socket handling, message sending, and message reception on the
- * client side of a web socket in Apex.
+ * The Class MessagingClient is the class that wraps web socket handling, message sending, and
+ * message reception on the client side of a web socket in Apex.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
* @param <MESSAGE> the generic type
// The length of time to wait for a connection to a web socket server before aborting
private static final int CONNECTION_TIMEOUT_TIME_MS = 3000;
- // The length of time to wait before checking if a connection to a web socket server has worked or not
+ // The length of time to wait before checking if a connection to a web socket server has worked
+ // or not
private static final int CONNECTION_TRY_INTERVAL_MS = 100;
/**
- * Constructor of this class, uses its {@link InternalMessageBusClient} superclass to set up the web socket and
- * handle incoming message forwarding.
+ * Constructor of this class, uses its {@link InternalMessageBusClient} superclass to set up the
+ * web socket and handle incoming message forwarding.
*
* @param serverUri The URI of the service
*/
public void startConnection() {
// Open the web socket
final WebSocket connection = super.getConnection();
- if (connection != null && !connection.isOpen()) {
+
+ if (connection == null) {
+ throw new IllegalStateException("Could not connect to the server");
+ }
+ if (!connection.isOpen()) {
connect();
}
/*
* (non-Javadoc)
*
- * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex.core.
- * infrastructure. messaging.MessageHolder)
+ * @see
+ * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex
+ * .core. infrastructure. messaging.MessageHolder)
*/
@Override
public void send(final MessageHolder<MESSAGE> commands) {
/*
* (non-Javadoc)
*
- * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String)
+ * @see
+ * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String)
*/
@Override
public void send(final String messageString) {
} catch (final IOException ioe) {
LOGGER.catching(ioe);
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
// This can happen in normal operation so ignore
}
isStarted = false;
/*
* (non-Javadoc)
*
- * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex.core.
- * infrastructure. messaging.MessageHolder)
+ * @see
+ * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex
+ * .core. infrastructure. messaging.MessageHolder)
*/
@Override
public void send(final MessageHolder<MESSAGE> message) {
/*
* (non-Javadoc)
*
- * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String)
+ * @see
+ * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String)
*/
@Override
public void send(final String messageString) {
import org.slf4j.ext.XLoggerFactory;
/**
- * The Class MessagingUtils is a class with static methods used in IPC messaging for finding free ports, translating
- * host names to addresses, serializing objects and flushing object streams.
+ * The Class MessagingUtils is a class with static methods used in IPC messaging for finding free
+ * ports, translating host names to addresses, serializing objects and flushing object streams.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
*/
// The port number of the lowest user port, ports 0-1023 are system ports
private static final int LOWEST_USER_PORT = 1024;
+ /**
+ * Port number is an unsigned 16-bit integer, so maximum port is 65535
+ */
+ private static final int MAX_PORT_RANGE = 65535;
+
// Logger for this class
private static final XLogger LOGGER = XLoggerFactory.getXLogger(MessagingUtils.class);
private MessagingUtils() {}
/**
- * This method searches the availability of the port, if the requested port not available, this method will throw an
- * exception.
+ * This method searches the availability of the port, if the requested port not available, this
+ * method will throw an exception.
*
* @param port the port to check
* @return the port verified as being free
public static int checkPort(final int port) {
LOGGER.entry("Checking availability of port {}", port);
- Socket s = null;
- try {
- // Try to connect to the port, if we can connect then the port is occupied
- s = new Socket("localhost", port);
- LOGGER.debug("Port {} is not available", port);
-
- throw new RuntimeException("could not allocate requested port: " + port);
- } catch (final IOException e) {
- // We found a free port
+ if (isPortAvailable(port)) {
LOGGER.debug("Port {} is available ", port);
return port;
- } finally {
- // Close the socket used to check if the port was free
- if (s != null) {
- try {
- s.close();
- } catch (final IOException e) {
- LOGGER.catching(e);
- LOGGER.warn("could not allocate requested port " + port, e);
- }
- }
}
+ LOGGER.debug("Port {} is not available", port);
+ throw new RuntimeException("could not allocate requested port: " + port);
}
/**
- * This method searches the availability of the port, if the requested port not available,this method will increment
- * the port number and check the availability of that port, this process will continue until it find port available.
+ * This method searches the availability of the port, if the requested port not available,this
+ * method will increment the port number and check the availability of that port, this process
+ * will continue until it reaches max port range which is {@link MAX_PORT_RANGE}.
*
* @param port the first port to check
* @return the port that was found
public static int findPort(final int port) {
LOGGER.entry("Checking availability of port {}", port);
- Socket s = null;
- try {
- // Try to connect to the port, if we can connect then the port is occupied
- s = new Socket("localhost", port);
- LOGGER.debug("Port {} is not available", port);
+ int availablePort = port;
- // Recurse and try the next port
- return findPort(port + 1);
- } catch (final IOException e) {
- // We found a free port
- LOGGER.debug("Port {} is available ", port);
- return port;
- } finally {
- // Close the socket used to check if the port was free
- if (s != null) {
- try {
- s.close();
- } catch (final IOException e) {
- LOGGER.catching(e);
- LOGGER.warn("could not allocate requested port " + port, e);
- throw new RuntimeException("could not allocate requested port " + port, e);
- }
+ while (availablePort <= MAX_PORT_RANGE) {
+ if (isPortAvailable(availablePort)) {
+ LOGGER.debug("Port {} is available ", availablePort);
+ return availablePort;
}
+ LOGGER.debug("Port {} is not available", availablePort);
+ availablePort++;
+ }
+ throw new RuntimeException("could not find free available");
+ }
+
+ /**
+ * Check if port is available or not
+ *
+ * @param port
+ * @return true if port is available
+ */
+ public static boolean isPortAvailable(final int port) {
+ try (final Socket socket = new Socket("localhost", port)) {
+ return false;
+ } catch (final IOException ignoredException) {
+ LOGGER.trace("Port {} is available", port, ignoredException);
+ return true;
}
}
}
/**
- * This method searches the availability of the port, if the requested port not available,this method will increment
- * the port number and check the availability, this process will continue until it find port available.
+ * This method searches the availability of the port, if the requested port not available,this
+ * method will increment the port number and check the availability, this process will continue
+ * until it find port available.
*
* @param port the first port to check
* @return the port that was found
try {
Thread.sleep(milliseconds);
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
return false;
}
package org.onap.policy.apex.plugins.context.distribution.infinispan;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
+import java.io.IOException;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.onap.policy.apex.context.ContextException;
-import org.onap.policy.apex.model.utilities.ResourceUtils;
import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
setSystemProperties(infinispanDistributorParameters);
- // First, try and open a local input stream for Infinispan configuration
- InputStream infinispanConfigStream =
- getLocalInfinispanConfigurationStream(infinispanDistributorParameters.getConfigFile());
-
- // Check if a local file was found, if not then go to the class path
- if (infinispanConfigStream == null) {
- // If a local file is not specified, then check for an infinispan configuration file on
- // the class path
- infinispanConfigStream =
- getClasspathInfinispanConfigurationStream(infinispanDistributorParameters.getConfigFile());
- }
-
- // Check if we found configuration for Infinispan
- if (infinispanConfigStream == null) {
+ try {
+ LOGGER.debug("starting infinispan cache manager using specified configuration . . .");
+ cacheManager = new DefaultCacheManager(infinispanDistributorParameters.getConfigFile());
+ LOGGER.debug("started infinispan cache manager using specified configuration");
+ } catch (final IOException ioException) {
final String errorMessage =
"failed to start infinispan cache manager, no infinispan configuration found on local file system or in classpath, "
+ "try setting Infinspan \"configFile\" parameter";
LOGGER.error(errorMessage);
- throw new ContextException(errorMessage);
- }
-
- try {
- LOGGER.debug("starting infinispan cache manager using specified configuration . . .");
- cacheManager = new DefaultCacheManager(infinispanConfigStream);
- LOGGER.debug("started infinispan cache manager using specified configuration");
+ throw new ContextException(errorMessage, ioException);
} catch (final Exception e) {
LOGGER.error("failed to start infinispan cache manager using specified configuration", e);
throw new ContextException("failed to start infinispan cache manager using specified configuration", e);
System.setProperty("jgroups.bind_addr", infinispanDistributorParameters.getjGroupsBindAddress());
}
- /**
- * Get an Infinispan configuration stream from the local file system.
- *
- * @param infinispanConfigFileName The file name to open
- * @return The file opened as a stream
- * @throws ContextException If the local file could not be found or is invalid
- */
- private InputStream getLocalInfinispanConfigurationStream(final String infinispanConfigFileName)
- throws ContextException {
- LOGGER.debug("checking infinispan configuration file exists at \"" + infinispanConfigFileName + "\". . .");
-
- // Check if the file exists
- final File infinispanConfigFile = new File(infinispanConfigFileName);
- if (!infinispanConfigFile.exists()) {
- return null;
- }
-
- // Check the file
- if (!infinispanConfigFile.isFile() || !infinispanConfigFile.canRead()) {
- LOGGER.error("infinispan configuration file at \"" + infinispanConfigFileName
- + "\" does not exist or is invalid");
- throw new ContextException("infinispan configuration file at \"" + infinispanConfigFileName
- + "\" does not exist or is invalid");
- }
-
- try {
- final InputStream infinispanConfigStream = new FileInputStream(infinispanConfigFile);
- LOGGER.debug("infinispan configuration file exists at \"" + infinispanConfigFileName + "\"");
- return infinispanConfigStream;
- } catch (final Exception e) {
- LOGGER.error("infinispan configuration file at \"" + infinispanConfigFileName
- + "\" does not exist or is invalid", e);
- throw new ContextException("infinispan configuration file at \"" + infinispanConfigFileName
- + "\" does not exist or is invalid", e);
- }
- }
-
- /**
- * Get an Infinispan configuration stream from the class path.
- *
- * @param apexInfinispanConfigFile the apex infinispan config file
- * @return The file opened as a stream
- */
- private InputStream getClasspathInfinispanConfigurationStream(final String apexInfinispanConfigFile) {
- LOGGER.debug(
- "checking infinispan configuration file exists at resource \"" + apexInfinispanConfigFile + "\". . .");
- final InputStream infinispanConfigStream = ResourceUtils.getResourceAsStream(apexInfinispanConfigFile);
-
- if (infinispanConfigStream != null) {
- LOGGER.debug("infinispan configuration file exists at resource \"" + apexInfinispanConfigFile + "\"");
- } else {
- LOGGER.debug("infinispan configuration file at resource \"" + apexInfinispanConfigFile + "\" not found");
- }
- return infinispanConfigStream;
- }
-
/**
* Private class to implement the shutdown hook for this infinispan manager.
*/
lockParameters.getZookeeperConnectSleepTime() * lockParameters.getZookeeperContextRetries(),
TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
LOGGER.warn("could not connect to Zookeeper server at \"" + curatorZookeeperAddress
+ "\", wait for connection timed out");
throw new ContextException("could not connect to Zookeeper server at \"" + curatorZookeeperAddress
if (Schema.Type.NULL.equals(schema.getType())) {
schema = types.get(1);
}
- if (Schema.Type.NULL.equals(schema.getType()) || Schema.Type.NULL.equals(schema.getType())) {
+ if (Schema.Type.NULL.equals(schema.getType())) {
final String resultSting = userKey.getID()
+ ": Apex currently only supports UNION schema2 with 2 options, only one can be NULL, and the other cannot be another UNION";
LOGGER.warn(resultSting);
objectString = (String) object;
}
} catch (final ClassCastException e) {
- final String returnString = getUserKey().getID() + ": object \"" + object.toString() + "\" of type \""
+ final String returnString = getUserKey().getID() + ": object \"" + object + "\" of type \""
+ object.getClass().getCanonicalName() + "\" must be assignable to \""
+ getSchemaClass().getCanonicalName()
+ "\" or be a Json string representation of it for Avro unmarshalling";
}
}
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
LOGGER.debug("message listener execution has been interrupted");
break;
}
try {
queue.put(apexEvent);
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
LOGGER.warn("Failed to queue the event: " + apexEvent, e);
}
}
LOGGER.trace("event sent : " + apexEvent.toString());
}
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
LOGGER.debug("Thread interrupted, Reason {}", e.getMessage());
break;
} catch (final Exception e) {
// Pass the event to the activator for forwarding to Apex
engineServiceHandler.forwardEvent(apexEvent);
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
LOGGER.warn("BatchProcessor thread interrupted, Reason {}", e.getMessage());
break;
} catch (final Exception e) {
try {
event = eventProcessingQueue.take();
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
LOGGER.debug("Engine {} processing interrupted ", engineWorkerKey);
break;
}
// Check the carrier technology parameter class
if (carrierTechnologyParameterClassName == null || carrierTechnologyParameterClassName.length() == 0) {
- final String errorMessage =
- "carrier technology \"" + carrierTechnologyLabel + "\" parameter \"" + PARAMETER_CLASS_NAME
- + "\" value \"" + classNameJsonPrimitive.getAsString() + "\" invalid in JSON file";
+ final String errorMessage = "carrier technology \"" + carrierTechnologyLabel + "\" parameter \""
+ + PARAMETER_CLASS_NAME + "\" value \""
+ + (classNameJsonPrimitive != null ? classNameJsonPrimitive.getAsString() : "null")
+ + "\" invalid in JSON file";
LOGGER.warn(errorMessage);
throw new ApexParameterRuntimeException(errorMessage);
}
if (eventProtocolParameterClassName == null || eventProtocolParameterClassName.length() == 0) {
final String errorMessage =
"event protocol \"" + eventProtocolLabel + "\" parameter \"" + PARAMETER_CLASS_NAME + "\" value \""
- + classNameJsonPrimitive.getAsString() + "\" invalid in JSON file";
+ + (classNameJsonPrimitive != null ? classNameJsonPrimitive.getAsString() : "null")
+ + "\" invalid in JSON file";
LOGGER.warn(errorMessage);
throw new ApexParameterRuntimeException(errorMessage);
}