2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.components.scheduledtasks;
23 import com.google.common.annotations.VisibleForTesting;
24 import fj.data.Either;
25 import org.apache.commons.lang.math.NumberUtils;
26 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
27 import org.openecomp.sdc.be.components.distribution.engine.EnvironmentsEngine;
28 import org.openecomp.sdc.be.config.ConfigurationManager;
29 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
30 import org.openecomp.sdc.be.dao.cassandra.CassandraOperationStatus;
31 import org.openecomp.sdc.be.dao.cassandra.OperationalEnvironmentDao;
32 import org.openecomp.sdc.be.datatypes.enums.EnvironmentStatusEnum;
33 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
34 import org.openecomp.sdc.common.datastructure.Wrapper;
35 import org.openecomp.sdc.common.log.wrappers.Logger;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.stereotype.Component;
39 import javax.annotation.PostConstruct;
40 import javax.annotation.PreDestroy;
41 import javax.annotation.Resource;
42 import java.util.List;
43 import java.util.concurrent.ExecutorService;
44 import java.util.concurrent.Executors;
45 import java.util.concurrent.ScheduledExecutorService;
46 import java.util.concurrent.TimeUnit;
47 import java.util.stream.Collectors;
49 import static org.apache.commons.collections.CollectionUtils.isEmpty;
50 import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.convertToFunction;
52 @Component("recoveryThreadManager")
53 public class RecoveryThreadManager extends AbstractScheduleTaskRunner {
55 private static final Logger log = Logger.getLogger(RecoveryThreadManager.class);
57 FixEnvironmentTask task = new FixEnvironmentTask();
60 private OperationalEnvironmentDao operationalEnvironmentDao;
63 private EnvironmentsEngine environmentsEngine;
65 private ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(NumberUtils.INTEGER_ONE,
66 new BasicThreadFactory.Builder().namingPattern("EnvironmentCleanThread-%d").build());
68 Integer allowedTimeBeforeStaleSec;
72 log.debug("Enter init method of RecoveryThreadManager");
73 final DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager
74 .getConfigurationManager().getDistributionEngineConfiguration();
75 Integer opEnvRecoveryIntervalSec = distributionEngineConfiguration.getOpEnvRecoveryIntervalSec();
76 scheduledService.scheduleAtFixedRate(task, NumberUtils.INTEGER_ZERO, opEnvRecoveryIntervalSec,
78 this.allowedTimeBeforeStaleSec = distributionEngineConfiguration.getAllowedTimeBeforeStaleSec();
79 log.debug("End init method of AsdcComponentsCleaner");
83 public void destroy() {
87 protected class FixEnvironmentTask implements Runnable {
92 Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> eitherFailedEnv = operationalEnvironmentDao
93 .getByEnvironmentsStatus(EnvironmentStatusEnum.FAILED);
94 eitherFailedEnv.bimap(convertToFunction(this::handleFailedeEnvironmentsRecords), convertToFunction(
95 cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.FAILED, cassandraError)));
98 Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> eitherInProgressEnv = operationalEnvironmentDao
99 .getByEnvironmentsStatus(EnvironmentStatusEnum.IN_PROGRESS);
100 eitherInProgressEnv.bimap(convertToFunction(this::handleInProgressEnvironmentsRecords),
101 convertToFunction(cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.IN_PROGRESS,
104 // Envs To Connect to UEB topics
105 Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> eitherCompleteEnv = operationalEnvironmentDao
106 .getByEnvironmentsStatus(EnvironmentStatusEnum.COMPLETED);
107 eitherCompleteEnv.bimap(convertToFunction(this::handleCompleteEnvironmentsRecords), convertToFunction(
108 cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.COMPLETED, cassandraError)));
110 } catch (Exception e) {
111 log.debug("error while handling operational environments to be fixed :{}", e.getMessage(), e);
115 private void handleCompleteEnvironmentsRecords(List<OperationalEnvironmentEntry> completeEnvironmentsRecords) {
116 if (!isEmpty(completeEnvironmentsRecords)) {
117 completeEnvironmentsRecords.stream().filter(env -> !environmentsEngine.isInMap(env))
118 .forEach(opEnvEntry -> {
119 environmentsEngine.createUebTopicsForEnvironment(opEnvEntry);
120 environmentsEngine.addToMap(opEnvEntry);
126 private void handleFailedeEnvironmentsRecords(List<OperationalEnvironmentEntry> failedEnvironmentsRecords) {
127 if (!isEmpty(failedEnvironmentsRecords)) {
128 failedEnvironmentsRecords.parallelStream()
129 .forEach(env -> environmentsEngine.buildOpEnv(new Wrapper<>(), env));
134 private void handleInProgressEnvironmentsRecords(List<OperationalEnvironmentEntry> inProgressEnvList) {
135 if (!isEmpty(inProgressEnvList)) {
137 long currentTimeMillis = System.currentTimeMillis();
138 if (!isEmpty(inProgressEnvList)) {
139 List<OperationalEnvironmentEntry> staleInProgressEnvList = inProgressEnvList.stream()
140 .filter(record -> (record.getLastModified().getTime() + (allowedTimeBeforeStaleSec * 1000)) < currentTimeMillis)
141 .collect(Collectors.toList());
142 staleInProgressEnvList.parallelStream()
143 .forEach(env -> environmentsEngine.buildOpEnv(new Wrapper<>(), env));
150 private void logFailedRetrieveRecord(EnvironmentStatusEnum recordStatus, CassandraOperationStatus error) {
151 log.debug("error: {} while retrieving operational environments with status: {}", error, recordStatus);
158 public ExecutorService getExecutorService() {
159 return scheduledService;