Update vulnerable package dependencies
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / scheduledtasks / RecoveryThreadManager.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.openecomp.sdc.be.components.scheduledtasks;
21
22 import static org.apache.commons.collections.CollectionUtils.isEmpty;
23 import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.convertToFunction;
24
25 import com.google.common.annotations.VisibleForTesting;
26 import fj.data.Either;
27 import java.util.List;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ScheduledExecutorService;
31 import java.util.concurrent.TimeUnit;
32 import java.util.stream.Collectors;
33 import javax.annotation.PostConstruct;
34 import javax.annotation.PreDestroy;
35 import javax.annotation.Resource;
36 import org.apache.commons.lang3.math.NumberUtils;
37 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
38 import org.openecomp.sdc.be.components.distribution.engine.EnvironmentsEngine;
39 import org.openecomp.sdc.be.config.ConfigurationManager;
40 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
41 import org.openecomp.sdc.be.dao.cassandra.CassandraOperationStatus;
42 import org.openecomp.sdc.be.dao.cassandra.OperationalEnvironmentDao;
43 import org.openecomp.sdc.be.datatypes.enums.EnvironmentStatusEnum;
44 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
45 import org.openecomp.sdc.common.datastructure.Wrapper;
46 import org.openecomp.sdc.common.log.wrappers.Logger;
47 import org.springframework.beans.factory.annotation.Autowired;
48 import org.springframework.stereotype.Component;
49
50 @Component("recoveryThreadManager")
51 public class RecoveryThreadManager extends AbstractScheduleTaskRunner {
52
53     private static final Logger log = Logger.getLogger(RecoveryThreadManager.class);
54     @VisibleForTesting
55     FixEnvironmentTask task = new FixEnvironmentTask();
56     @VisibleForTesting
57     Integer allowedTimeBeforeStaleSec;
58     @Resource
59     private OperationalEnvironmentDao operationalEnvironmentDao;
60     @Autowired
61     private EnvironmentsEngine environmentsEngine;
62     private ScheduledExecutorService scheduledService = Executors
63         .newScheduledThreadPool(NumberUtils.INTEGER_ONE, new BasicThreadFactory.Builder().namingPattern("EnvironmentCleanThread-%d").build());
64
65     @PostConstruct
66     public void init() {
67         log.debug("Enter init method of RecoveryThreadManager");
68         final DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
69             .getDistributionEngineConfiguration();
70         Integer opEnvRecoveryIntervalSec = distributionEngineConfiguration.getOpEnvRecoveryIntervalSec();
71         scheduledService.scheduleAtFixedRate(task, NumberUtils.INTEGER_ZERO, opEnvRecoveryIntervalSec, TimeUnit.SECONDS);
72         this.allowedTimeBeforeStaleSec = distributionEngineConfiguration.getAllowedTimeBeforeStaleSec();
73         log.debug("End init method of AsdcComponentsCleaner");
74     }
75
76     @PreDestroy
77     public void destroy() {
78         shutdownExecutor();
79     }
80
81     @Override
82     public ExecutorService getExecutorService() {
83         return scheduledService;
84     }
85
86     protected class FixEnvironmentTask implements Runnable {
87
88         @Override
89         public void run() {
90             try {
91                 // Failed Envs
92                 Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> eitherFailedEnv = operationalEnvironmentDao
93                     .getByEnvironmentsStatus(EnvironmentStatusEnum.FAILED);
94                 eitherFailedEnv.bimap(convertToFunction(this::handleFailedeEnvironmentsRecords),
95                     convertToFunction(cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.FAILED, cassandraError)));
96                 // In-Progress Envs
97                 Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> eitherInProgressEnv = operationalEnvironmentDao
98                     .getByEnvironmentsStatus(EnvironmentStatusEnum.IN_PROGRESS);
99                 eitherInProgressEnv.bimap(convertToFunction(this::handleInProgressEnvironmentsRecords),
100                     convertToFunction(cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.IN_PROGRESS, cassandraError)));
101                 // Envs To Connect to UEB topics
102                 Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> eitherCompleteEnv = operationalEnvironmentDao
103                     .getByEnvironmentsStatus(EnvironmentStatusEnum.COMPLETED);
104                 eitherCompleteEnv.bimap(convertToFunction(this::handleCompleteEnvironmentsRecords),
105                     convertToFunction(cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.COMPLETED, cassandraError)));
106             } catch (Exception e) {
107                 log.debug("error while handling operational environments to be fixed :{}", e.getMessage(), e);
108             }
109         }
110
111         private void handleCompleteEnvironmentsRecords(List<OperationalEnvironmentEntry> completeEnvironmentsRecords) {
112             if (!isEmpty(completeEnvironmentsRecords)) {
113                 completeEnvironmentsRecords.stream().filter(env -> !environmentsEngine.isInMap(env)).forEach(opEnvEntry -> {
114                     environmentsEngine.createUebTopicsForEnvironment(opEnvEntry);
115                     environmentsEngine.addToMap(opEnvEntry);
116                 });
117             }
118         }
119
120         private void handleFailedeEnvironmentsRecords(List<OperationalEnvironmentEntry> failedEnvironmentsRecords) {
121             if (!isEmpty(failedEnvironmentsRecords)) {
122                 failedEnvironmentsRecords.parallelStream().forEach(env -> environmentsEngine.buildOpEnv(new Wrapper<>(), env));
123             }
124         }
125
126         private void handleInProgressEnvironmentsRecords(List<OperationalEnvironmentEntry> inProgressEnvList) {
127             if (!isEmpty(inProgressEnvList)) {
128                 long currentTimeMillis = System.currentTimeMillis();
129                 if (!isEmpty(inProgressEnvList)) {
130                     List<OperationalEnvironmentEntry> staleInProgressEnvList = inProgressEnvList.stream()
131                         .filter(record -> (record.getLastModified().getTime() + (allowedTimeBeforeStaleSec * 1000)) < currentTimeMillis)
132                         .collect(Collectors.toList());
133                     staleInProgressEnvList.parallelStream().forEach(env -> environmentsEngine.buildOpEnv(new Wrapper<>(), env));
134                 }
135             }
136         }
137
138         private void logFailedRetrieveRecord(EnvironmentStatusEnum recordStatus, CassandraOperationStatus error) {
139             log.debug("error: {} while retrieving operational environments with status: {}", error, recordStatus);
140         }
141     }
142 }