re base code
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / scheduledtasks / RecoveryThreadManager.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.components.scheduledtasks;
22
23 import com.google.common.annotations.VisibleForTesting;
24 import fj.data.Either;
25 import org.apache.commons.lang.math.NumberUtils;
26 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
27 import org.openecomp.sdc.be.components.distribution.engine.EnvironmentsEngine;
28 import org.openecomp.sdc.be.config.ConfigurationManager;
29 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
30 import org.openecomp.sdc.be.dao.cassandra.CassandraOperationStatus;
31 import org.openecomp.sdc.be.dao.cassandra.OperationalEnvironmentDao;
32 import org.openecomp.sdc.be.datatypes.enums.EnvironmentStatusEnum;
33 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
34 import org.openecomp.sdc.common.datastructure.Wrapper;
35 import org.openecomp.sdc.common.log.wrappers.Logger;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.stereotype.Component;
38
39 import javax.annotation.PostConstruct;
40 import javax.annotation.PreDestroy;
41 import javax.annotation.Resource;
42 import java.util.List;
43 import java.util.concurrent.ExecutorService;
44 import java.util.concurrent.Executors;
45 import java.util.concurrent.ScheduledExecutorService;
46 import java.util.concurrent.TimeUnit;
47 import java.util.stream.Collectors;
48
49 import static org.apache.commons.collections.CollectionUtils.isEmpty;
50 import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.convertToFunction;
51
52 @Component("recoveryThreadManager")
53 public class RecoveryThreadManager extends AbstractScheduleTaskRunner {
54
55     private static final Logger log = Logger.getLogger(RecoveryThreadManager.class);
56     @VisibleForTesting
57     FixEnvironmentTask task = new FixEnvironmentTask();
58
59     @Resource
60     private OperationalEnvironmentDao operationalEnvironmentDao;
61
62     @Autowired
63     private EnvironmentsEngine environmentsEngine;
64
65     private ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(NumberUtils.INTEGER_ONE,
66             new BasicThreadFactory.Builder().namingPattern("EnvironmentCleanThread-%d").build());
67     @VisibleForTesting
68     Integer allowedTimeBeforeStaleSec;
69
70     @PostConstruct
71     public void init() {
72         log.debug("Enter init method of RecoveryThreadManager");
73         final DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager
74                 .getConfigurationManager().getDistributionEngineConfiguration();
75         Integer opEnvRecoveryIntervalSec = distributionEngineConfiguration.getOpEnvRecoveryIntervalSec();
76         scheduledService.scheduleAtFixedRate(task, NumberUtils.INTEGER_ZERO, opEnvRecoveryIntervalSec,
77                 TimeUnit.SECONDS);
78         this.allowedTimeBeforeStaleSec = distributionEngineConfiguration.getAllowedTimeBeforeStaleSec();
79         log.debug("End init method of AsdcComponentsCleaner");
80     }
81
82     @PreDestroy
83     public void destroy() {
84         shutdownExecutor();
85     }
86
87     protected class FixEnvironmentTask implements Runnable {
88         @Override
89         public void run() {
90             try {
91                 // Failed Envs
92                 Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> eitherFailedEnv = operationalEnvironmentDao
93                         .getByEnvironmentsStatus(EnvironmentStatusEnum.FAILED);
94                 eitherFailedEnv.bimap(convertToFunction(this::handleFailedeEnvironmentsRecords), convertToFunction(
95                         cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.FAILED, cassandraError)));
96
97                 // In-Progress Envs
98                 Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> eitherInProgressEnv = operationalEnvironmentDao
99                         .getByEnvironmentsStatus(EnvironmentStatusEnum.IN_PROGRESS);
100                 eitherInProgressEnv.bimap(convertToFunction(this::handleInProgressEnvironmentsRecords),
101                         convertToFunction(cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.IN_PROGRESS,
102                                 cassandraError)));
103
104                 // Envs To Connect to UEB topics
105                 Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> eitherCompleteEnv = operationalEnvironmentDao
106                         .getByEnvironmentsStatus(EnvironmentStatusEnum.COMPLETED);
107                 eitherCompleteEnv.bimap(convertToFunction(this::handleCompleteEnvironmentsRecords), convertToFunction(
108                         cassandraError -> logFailedRetrieveRecord(EnvironmentStatusEnum.COMPLETED, cassandraError)));
109
110             } catch (Exception e) {
111                 log.debug("error while handling operational environments to be fixed :{}", e.getMessage(), e);
112             }
113         }
114
115         private void handleCompleteEnvironmentsRecords(List<OperationalEnvironmentEntry> completeEnvironmentsRecords) {
116             if (!isEmpty(completeEnvironmentsRecords)) {
117                 completeEnvironmentsRecords.stream().filter(env -> !environmentsEngine.isInMap(env))
118                         .forEach(opEnvEntry -> {
119                             environmentsEngine.createUebTopicsForEnvironment(opEnvEntry);
120                             environmentsEngine.addToMap(opEnvEntry);
121                         });
122             }
123
124         }
125
126         private void handleFailedeEnvironmentsRecords(List<OperationalEnvironmentEntry> failedEnvironmentsRecords) {
127             if (!isEmpty(failedEnvironmentsRecords)) {
128                 failedEnvironmentsRecords.parallelStream()
129                         .forEach(env -> environmentsEngine.buildOpEnv(new Wrapper<>(), env));
130             }
131
132         }
133
134         private void handleInProgressEnvironmentsRecords(List<OperationalEnvironmentEntry> inProgressEnvList) {
135             if (!isEmpty(inProgressEnvList)) {
136
137                 long currentTimeMillis = System.currentTimeMillis();
138                 if (!isEmpty(inProgressEnvList)) {
139                     List<OperationalEnvironmentEntry> staleInProgressEnvList = inProgressEnvList.stream()
140                             .filter(record -> (record.getLastModified().getTime() + (allowedTimeBeforeStaleSec * 1000)) < currentTimeMillis)
141                             .collect(Collectors.toList());
142                     staleInProgressEnvList.parallelStream()
143                             .forEach(env -> environmentsEngine.buildOpEnv(new Wrapper<>(), env));
144                 }
145
146             }
147
148         }
149
150         private void logFailedRetrieveRecord(EnvironmentStatusEnum recordStatus, CassandraOperationStatus error) {
151             log.debug("error: {} while retrieving operational environments with status: {}", error, recordStatus);
152         }
153
154
155     }
156
157     @Override
158     public ExecutorService getExecutorService() {
159         return scheduledService;
160     }
161
162 }