2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.openecomp.sdc.be.components.scheduledtasks;
22 import static org.apache.commons.collections.CollectionUtils.isEmpty;
23 import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.convertToFunction;
25 import com.google.common.annotations.VisibleForTesting;
26 import fj.data.Either;
27 import java.util.List;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ScheduledExecutorService;
31 import java.util.concurrent.TimeUnit;
32 import java.util.stream.Collectors;
33 import javax.annotation.PostConstruct;
34 import javax.annotation.PreDestroy;
35 import javax.annotation.Resource;
36 import org.apache.commons.lang3.math.NumberUtils;
37 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
38 import org.openecomp.sdc.be.components.distribution.engine.EnvironmentsEngine;
39 import org.openecomp.sdc.be.config.ConfigurationManager;
40 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
41 import org.openecomp.sdc.be.dao.cassandra.CassandraOperationStatus;
42 import org.openecomp.sdc.be.dao.cassandra.OperationalEnvironmentDao;
43 import org.openecomp.sdc.be.datatypes.enums.EnvironmentStatusEnum;
44 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
45 import org.openecomp.sdc.common.datastructure.Wrapper;
46 import org.openecomp.sdc.common.log.wrappers.Logger;
47 import org.springframework.beans.factory.annotation.Autowired;
48 import org.springframework.stereotype.Component;
50 @Component("recoveryThreadManager")
51 public class RecoveryThreadManager extends AbstractScheduleTaskRunner {
53 private static final Logger log = Logger.getLogger(RecoveryThreadManager.class);
55 FixEnvironmentTask task = new FixEnvironmentTask();
57 Integer allowedTimeBeforeStaleSec;
59 private OperationalEnvironmentDao operationalEnvironmentDao;
61 private EnvironmentsEngine environmentsEngine;
62 private ScheduledExecutorService scheduledService = Executors
63 .newScheduledThreadPool(NumberUtils.INTEGER_ONE, new BasicThreadFactory.Builder().namingPattern("EnvironmentCleanThread-%d").build());
67 log.debug("Enter init method of RecoveryThreadManager");
68 final DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
69 .getDistributionEngineConfiguration();
70 Integer opEnvRecoveryIntervalSec = distributionEngineConfiguration.getOpEnvRecoveryIntervalSec();
71 scheduledService.scheduleAtFixedRate(task, NumberUtils.INTEGER_ZERO, opEnvRecoveryIntervalSec, TimeUnit.SECONDS);
72 this.allowedTimeBeforeStaleSec = distributionEngineConfiguration.getAllowedTimeBeforeStaleSec();
73 log.debug("End init method of AsdcComponentsCleaner");
77 public void destroy() {
82 public ExecutorService getExecutorService() {
83 return scheduledService;
86 protected class FixEnvironmentTask implements Runnable {
92 Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> eitherFailedEnv = operationalEnvironmentDao
93 .getByEnvironmentsStatus(EnvironmentStatusEnum.FAILED);
94 eitherFailedEnv.bimap(convertToFunction(this::handleFailedeEnvironmentsRecords),
95 convertToFunction(cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.FAILED, cassandraError)));
97 Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> eitherInProgressEnv = operationalEnvironmentDao
98 .getByEnvironmentsStatus(EnvironmentStatusEnum.IN_PROGRESS);
99 eitherInProgressEnv.bimap(convertToFunction(this::handleInProgressEnvironmentsRecords),
100 convertToFunction(cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.IN_PROGRESS, cassandraError)));
101 // Envs To Connect to UEB topics
102 Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> eitherCompleteEnv = operationalEnvironmentDao
103 .getByEnvironmentsStatus(EnvironmentStatusEnum.COMPLETED);
104 eitherCompleteEnv.bimap(convertToFunction(this::handleCompleteEnvironmentsRecords),
105 convertToFunction(cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.COMPLETED, cassandraError)));
106 } catch (Exception e) {
107 log.debug("error while handling operational environments to be fixed :{}", e.getMessage(), e);
111 private void handleCompleteEnvironmentsRecords(List<OperationalEnvironmentEntry> completeEnvironmentsRecords) {
112 if (!isEmpty(completeEnvironmentsRecords)) {
113 completeEnvironmentsRecords.stream().filter(env -> !environmentsEngine.isInMap(env)).forEach(opEnvEntry -> {
114 environmentsEngine.createUebTopicsForEnvironment(opEnvEntry);
115 environmentsEngine.addToMap(opEnvEntry);
120 private void handleFailedeEnvironmentsRecords(List<OperationalEnvironmentEntry> failedEnvironmentsRecords) {
121 if (!isEmpty(failedEnvironmentsRecords)) {
122 failedEnvironmentsRecords.parallelStream().forEach(env -> environmentsEngine.buildOpEnv(new Wrapper<>(), env));
126 private void handleInProgressEnvironmentsRecords(List<OperationalEnvironmentEntry> inProgressEnvList) {
127 if (!isEmpty(inProgressEnvList)) {
128 long currentTimeMillis = System.currentTimeMillis();
129 if (!isEmpty(inProgressEnvList)) {
130 List<OperationalEnvironmentEntry> staleInProgressEnvList = inProgressEnvList.stream()
131 .filter(record -> (record.getLastModified().getTime() + (allowedTimeBeforeStaleSec * 1000)) < currentTimeMillis)
132 .collect(Collectors.toList());
133 staleInProgressEnvList.parallelStream().forEach(env -> environmentsEngine.buildOpEnv(new Wrapper<>(), env));
138 private void logFailedRetrieveRecord(EnvironmentStatusEnum recordStatus, CassandraOperationStatus error) {
139 log.debug("error: {} while retrieving operational environments with status: {}", error, recordStatus);