re base code
[sdc.git] / catalog-dao / src / main / java / org / openecomp / sdc / be / dao / impl / ESCatalogDAO.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.dao.impl;
22
23 import com.fasterxml.jackson.core.JsonProcessingException;
24 import fj.data.Either;
25 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
26 import org.elasticsearch.cluster.health.ClusterHealthStatus;
27 import org.elasticsearch.common.unit.TimeValue;
28 import org.openecomp.sdc.be.config.BeEcompErrorManager;
29 import org.openecomp.sdc.be.config.ConfigurationManager;
30 import org.openecomp.sdc.be.dao.api.ESGenericSearchDAO;
31 import org.openecomp.sdc.be.dao.api.ICatalogDAO;
32 import org.openecomp.sdc.be.dao.api.ResourceUploadStatus;
33 import org.openecomp.sdc.be.resources.data.ESArtifactData;
34 import org.openecomp.sdc.be.resources.exception.ResourceDAOException;
35 import org.openecomp.sdc.common.api.HealthCheckInfo.HealthCheckStatus;
36 import org.openecomp.sdc.common.log.wrappers.Logger;
37 import org.springframework.stereotype.Component;
38
39 import javax.annotation.PostConstruct;
40 import java.util.List;
41 import java.util.concurrent.Executors;
42 import java.util.concurrent.ScheduledExecutorService;
43 import java.util.concurrent.ThreadFactory;
44 import java.util.concurrent.TimeUnit;
45
46 @Component("resource-dao")
47 public class ESCatalogDAO extends ESGenericSearchDAO implements ICatalogDAO {
48
49         private static Logger log = Logger.getLogger(ESCatalogDAO.class.getName());
50
51         //TODO use LoggerMetric instead
52         private static Logger healthCheckLogger = Logger.getLogger("elasticsearch.healthcheck");
53
54         ///// HealthCheck/////////
55         private static final String ES_HEALTH_CHECK_STR = "elasticsearchHealthCheck";
56
57         private ScheduledExecutorService healthCheckScheduler = Executors
58                         .newSingleThreadScheduledExecutor(new ThreadFactory() {
59                                 @Override
60                                 public Thread newThread(Runnable r) {
61                                         return new Thread(r, "ES-Health-Check-Thread");
62                                 }
63                         });
64
65         private class HealthCheckScheduledTask implements Runnable {
66                 @Override
67                 public void run() {
68                         log.trace("Executing ELASTICSEARCH Health Check Task - Start");
69
70                         HealthCheckStatus healthStatus = null;
71                         try {
72                                 healthStatus = isInitCompleted() ? checkHealth() : HealthCheckStatus.DOWN;
73                         } catch (Exception e) {
74                                 log.error("Error while trying to connect to elasticsearch. host: {} | port: {} | error: {}", 
75                                                 getEsClient().getServerHost(), getEsClient().getServerPort(), e.getMessage(), e);
76                                 healthStatus = HealthCheckStatus.DOWN;
77                         }
78                         log.trace("Executed ELASTICSEARCH Health Check Task - Status = {}", healthStatus);
79                         if (healthStatus != lastHealthState) {
80                                 log.trace("ELASTICSEARCH Health State Changed to {}. Issuing alarm / recovery alarm...", healthStatus);
81                                 lastHealthState = healthStatus;
82                                 logAlarm();
83                         }
84                 }
85         }
86
87         private HealthCheckScheduledTask healthCheckScheduledTask = new HealthCheckScheduledTask();
88         private volatile HealthCheckStatus lastHealthState = HealthCheckStatus.DOWN;
89
90         /**
91          * Get ES cluster status string rep
92          * 
93          * @return "GREEN", "YELLOW" or "RED"
94          */
95         private HealthCheckStatus checkHealth() {
96                 if (!isInitCompleted()) {
97                         return HealthCheckStatus.DOWN;
98                 }
99                 ClusterHealthRequest healthRequest = new ClusterHealthRequest("_all");
100                 healthRequest.masterNodeTimeout(TimeValue.timeValueSeconds(2));
101                 ClusterHealthStatus status = getClient().admin().cluster().health(healthRequest).actionGet().getStatus();
102                 healthCheckLogger.debug("ES cluster health status is {}", status);
103                 if (status == null || status.equals(ClusterHealthStatus.RED)) {
104                         return HealthCheckStatus.DOWN;
105                 }
106                 return HealthCheckStatus.UP;
107         }
108
109         private void logAlarm() {
110                 if (lastHealthState == HealthCheckStatus.UP) {
111                         BeEcompErrorManager.getInstance().logBeHealthCheckElasticSearchRecovery(ES_HEALTH_CHECK_STR);
112                 } else {
113                         BeEcompErrorManager.getInstance().logBeHealthCheckElasticSearchError(ES_HEALTH_CHECK_STR);
114                 }
115         }
116
117         @PostConstruct
118         public void initCompleted() {
119                 long interval = ConfigurationManager.getConfigurationManager().getConfiguration()
120                                 .getEsReconnectIntervalInSeconds(5);
121                 this.healthCheckScheduler.scheduleAtFixedRate(healthCheckScheduledTask, 0, interval, TimeUnit.SECONDS);
122                 initCompleted = true;
123         }
124
125         // Index Checking Variables
126         private boolean initCompleted = false;
127
128         @Override
129         public void writeArtifact(ESArtifactData artifactData) throws ResourceDAOException {
130                 try {
131                         saveResourceData(artifactData);
132                 } catch (Exception e) {
133                         throw new ResourceDAOException("Error to save ArtifactData with " + artifactData.getId());
134                 }
135         }
136
137         @Override
138         public Either<ESArtifactData, ResourceUploadStatus> getArtifact(String id) {
139                 ESArtifactData resData = null;
140
141                 try {
142                         resData = findById(getTypeFromClass(ESArtifactData.class), id, ESArtifactData.class);
143                 } catch (Exception e) {
144                         resData = null;
145                         BeEcompErrorManager.getInstance().logBeDaoSystemError("Get Artifact from database");
146                         log.debug("ESCatalogDAO:getArtifact failed with exception ", e);
147                         return Either.right(ResourceUploadStatus.ERROR);
148                 }
149
150                 if (resData != null) {
151                         return Either.left(resData);
152                 } else {
153                         return Either.right(ResourceUploadStatus.NOT_EXIST);
154                 }
155         }
156
157         private <T> String getTypeFromClass(Class<T> clazz) {
158
159                 return clazz.getSimpleName().toLowerCase();
160         }
161
162         @Override
163         public void deleteArtifact(String id) {
164                 delete(getTypeFromClass(ESArtifactData.class), id);
165         }
166
167         @Override
168         public Either<List<ESArtifactData>, ResourceUploadStatus> getArtifacts(String[] ids) {
169                 List<ESArtifactData> resData = null;
170                 try {
171                         resData = findByIds(getTypeFromClass(ESArtifactData.class), ESArtifactData.class, ids);
172                 } catch (Exception e) {
173                         resData = null;
174                         return Either.right(ResourceUploadStatus.ERROR);
175                 }
176
177                 if (resData != null && !resData.isEmpty()) {
178                         return Either.left(resData);
179                 } else {
180                         return Either.right(ResourceUploadStatus.NOT_EXIST);
181                 }
182         }
183
184         private void saveResourceData(ESArtifactData data) throws JsonProcessingException {
185                 String typeName = getTypeFromClass(data.getClass());
186                 saveResourceData(typeName, data, data.getId());
187         }
188
189         @Override
190         public void deleteAllArtifacts() {
191                 String typeName = getTypeFromClass(ESArtifactData.class);
192                 String indexName = getIndexForType(typeName);
193                 deleteIndex(indexName);
194
195         }
196
197         public boolean isInitCompleted() {
198                 return initCompleted;
199         }
200
201         public HealthCheckStatus getHealth() {
202                 return lastHealthState;
203         }
204
205 }