* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
* Modifications Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import java.net.URI;
import java.net.UnknownHostException;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Getter;
+import lombok.Setter;
import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder;
import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
import org.onap.policy.apex.core.infrastructure.messaging.MessagingService;
// Number of messages processed
private long messagesSent = 0;
private long messagesReceived = 0;
+ @Getter
+ @Setter
+ private AtomicReference<CountDownLatch> countDownLatch = new AtomicReference<>();
/**
* Instantiates a new deployment client.
public DeploymentClient(final String host, final int port) {
this.host = host;
this.port = port;
+ countDownLatch.set(new CountDownLatch(1));
}
/**
service.startConnection();
started = true;
+ countDownLatch.get().countDown();
LOGGER.debug("engine<-->deployment client thread started");
} catch (final Exception e) {
LOGGER.error("engine<-->deployment client thread exception", e);
service.stopConnection();
}
started = false;
+ countDownLatch.set(new CountDownLatch(1));
LOGGER.debug("engine<-->deployment test client stopped . . .");
}
* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
* Modifications Copyright (C) 2020 Nordix Foundation.
+ * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import java.io.InputStream;
import java.net.URL;
import java.util.concurrent.TimeUnit;
-import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
import org.onap.policy.apex.core.protocols.Message;
import org.onap.policy.apex.core.protocols.engdep.messages.EngineServiceInfoResponse;
import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineInfo;
// The default message timeout and timeout increment (the amount of time between
// polls) in
// milliseconds
- private static final int CLIENT_START_WAIT_INTERVAL = 100;
private static final int REPLY_MESSAGE_TIMEOUT_DEFAULT = 10000;
private static final int REPLY_MESSAGE_TIMEOUT_INCREMENT = 100;
clientThread.start();
// Wait for the connection to come up
- while (!client.isStarted()) {
- if (clientThread.isAlive()) {
- ThreadUtilities.sleep(CLIENT_START_WAIT_INTERVAL);
- } else {
- throw new ApexDeploymentException("could not handshake with server " + hostName + ":" + port);
- }
+ if (!client.getCountDownLatch().get().await(5L, TimeUnit.SECONDS)) {
+ throw new ApexDeploymentException("could not handshake with server " + hostName + ":" + port);
}
LOGGER.debug("opened connection to server {}:{} . . .", hostName, port);
* ============LICENSE_START=======================================================
* Copyright (C) 2018 Ericsson. All rights reserved.
* Modifications Copyright (C) 2019-2020 Nordix Foundation.
+ * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
thisThread.setName(DeploymentClient.class.getName() + "-" + getHost() + ":" + getPort());
started = true;
-
+ getCountDownLatch().get().countDown();
// Loop forever, sending messages as they appear on the queue
await().atLeast(50, TimeUnit.MILLISECONDS).until(() -> !(started && !thisThread.isInterrupted()));
// Thread has been interrupted
thisThread.interrupt();
}
started = false;
+ getCountDownLatch().set(new CountDownLatch(1));
}
/**