@Profile("!test")
public class AuditStackService {
+ private static final String MSO_AUDIT_LOCK_TIME = "mso.audit.lock-time";
+
private static final Logger logger = LoggerFactory.getLogger(AuditStackService.class);
private static final String DEFAULT_AUDIT_LOCK_TIME = "60000";
private static final String DEFAULT_MAX_CLIENTS_FOR_TOPIC = "10";
+
@Autowired
public Environment env;
for (int i = 0; i < externalTaskServiceUtils.getMaxClients(); i++) {
ExternalTaskClient client = externalTaskServiceUtils.createExternalTaskClient();
client.subscribe("InventoryAddAudit")
- .lockDuration(Long.parseLong(env.getProperty("mso.audit.lock-time", DEFAULT_AUDIT_LOCK_TIME)))
+ .lockDuration(Long.parseLong(env.getProperty(MSO_AUDIT_LOCK_TIME, DEFAULT_AUDIT_LOCK_TIME)))
.handler(auditCreateStack::executeExternalTask).open();
}
}
for (int i = 0; i < externalTaskServiceUtils.getMaxClients(); i++) {
ExternalTaskClient client = externalTaskServiceUtils.createExternalTaskClient();
client.subscribe("InventoryDeleteAudit")
- .lockDuration(Long.parseLong(env.getProperty("mso.audit.lock-time", DEFAULT_AUDIT_LOCK_TIME)))
+ .lockDuration(Long.parseLong(env.getProperty(MSO_AUDIT_LOCK_TIME, DEFAULT_AUDIT_LOCK_TIME)))
.handler(auditDeleteStack::executeExternalTask).open();
}
}
for (int i = 0; i < externalTaskServiceUtils.getMaxClients(); i++) {
ExternalTaskClient client = externalTaskServiceUtils.createExternalTaskClient();
client.subscribe("InventoryQueryAudit")
- .lockDuration(Long.parseLong(env.getProperty("mso.audit.lock-time", DEFAULT_AUDIT_LOCK_TIME)))
+ .lockDuration(Long.parseLong(env.getProperty(MSO_AUDIT_LOCK_TIME, DEFAULT_AUDIT_LOCK_TIME)))
.handler(auditQueryStack::executeExternalTask).open();
}
}
@PostConstruct
public void auditAAIInventory() throws Exception {
-
ExternalTaskClient client = externalTaskServiceUtils.createExternalTaskClient();
client.subscribe("InventoryCreate")
.lockDuration(Long.parseLong(env.getProperty("mso.audit.lock-time", "60000")))
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableScheduling;
/**
* @since Version 1.0
*/
@SpringBootApplication(scanBasePackages = {"org.onap"})
+@EnableScheduling
public class AppcOrchestratorApplication {
private static final String LOGS_DIR = "logs_dir";
package org.onap.so.adapters.appc.orchestrator.service;
import javax.annotation.PostConstruct;
+import org.camunda.bpm.client.ExternalTaskClient;
import org.onap.so.utils.ExternalTaskServiceUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
@PostConstruct
public void appcOrchestrator() throws Exception {
for (int i = 0; i < externalTaskServiceUtils.getMaxClients(); i++) {
- externalTaskServiceUtils.createExternalTaskClient().subscribe("AppcService").lockDuration(604800000)
- .handler(appcOrchestrator::executeExternalTask).open();
+ ExternalTaskClient client = externalTaskServiceUtils.createExternalTaskClient();
+ client.subscribe("AppcService").lockDuration(604800000).handler(appcOrchestrator::executeExternalTask)
+ .open();
}
}
package org.onap.so.utils;
import java.security.GeneralSecurityException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.camunda.bpm.client.ExternalTaskClient;
import org.camunda.bpm.client.interceptor.ClientRequestInterceptor;
import org.camunda.bpm.client.interceptor.auth.BasicAuthProvider;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
+import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
+
+
@Component
public class ExternalTaskServiceUtils {
@Autowired
public Environment env;
+ protected Set<ExternalTaskClient> taskClients = ConcurrentHashMap.newKeySet();
+
+
private static final Logger logger = LoggerFactory.getLogger(ExternalTaskServiceUtils.class);
public ExternalTaskClient createExternalTaskClient() throws Exception {
String auth = getAuth();
ClientRequestInterceptor interceptor = createClientInterceptor(auth);
- return ExternalTaskClient.create().baseUrl(env.getRequiredProperty("mso.workflow.endpoint")).maxTasks(1)
- .addInterceptor(interceptor).asyncResponseTimeout(120000).build();
+ ExternalTaskClient client =
+ ExternalTaskClient.create().baseUrl(env.getRequiredProperty("mso.workflow.endpoint")).maxTasks(1)
+ .addInterceptor(interceptor).asyncResponseTimeout(120000).build();
+ taskClients.add(client);
+ return client;
}
protected ClientRequestInterceptor createClientInterceptor(String auth) {
return Integer.parseInt(env.getProperty("workflow.topics.maxClients", "3"));
}
+ @Scheduled(fixedDelay = 30000)
+ public void checkAllClientsActive() {
+ getClients().stream().filter(client -> !client.isActive()).forEach(ExternalTaskClient::start);
+ }
+
+ protected Set<ExternalTaskClient> getClients() {
+ return taskClients;
+ }
}
--- /dev/null
+package org.onap.so.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.camunda.bpm.client.ExternalTaskClient;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.springframework.core.env.Environment;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ExternalTaskServiceUtilsTest {
+
+ @Spy
+ @InjectMocks
+ private ExternalTaskServiceUtils utils = new ExternalTaskServiceUtils();
+
+ @Mock
+ private ExternalTaskClient actualClient1;
+
+ @Mock
+ private ExternalTaskClient actualClient2;
+
+ @Mock
+ private ExternalTaskClient actualClient3;
+
+ @Mock
+ private ExternalTaskClient actualClient4;
+
+ @Test
+ public void testCheckActiveClients() throws Exception {
+ Set<ExternalTaskClient> taskClients = ConcurrentHashMap.newKeySet();
+ taskClients.add(actualClient1);
+ taskClients.add(actualClient2);
+ taskClients.add(actualClient3);
+ taskClients.add(actualClient4);
+ when(utils.getClients()).thenReturn(taskClients);
+ when(actualClient1.isActive()).thenReturn(false);
+ when(actualClient2.isActive()).thenReturn(true);
+ when(actualClient3.isActive()).thenReturn(false);
+ when(actualClient4.isActive()).thenReturn(true);
+ utils.checkAllClientsActive();
+ verify(actualClient1, times(1)).isActive();
+ verify(actualClient2, times(1)).isActive();
+ verify(actualClient3, times(1)).isActive();
+ verify(actualClient4, times(1)).isActive();
+ verify(actualClient1, times(1)).start();
+ verify(actualClient3, times(1)).start();
+ }
+
+}