Sync Integ to Master
[sdc.git] / catalog-dao / src / main / java / org / openecomp / sdc / be / dao / impl / ESCatalogDAO.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.dao.impl;
22
23 import java.util.List;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.TimeUnit;
28
29 import javax.annotation.PostConstruct;
30
31 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
32 import org.elasticsearch.cluster.health.ClusterHealthStatus;
33 import org.elasticsearch.common.unit.TimeValue;
34 import org.openecomp.sdc.be.config.BeEcompErrorManager;
35 import org.openecomp.sdc.be.config.ConfigurationManager;
36 import org.openecomp.sdc.be.dao.api.ESGenericSearchDAO;
37 import org.openecomp.sdc.be.dao.api.ICatalogDAO;
38 import org.openecomp.sdc.be.dao.api.ResourceUploadStatus;
39 import org.openecomp.sdc.be.resources.data.ESArtifactData;
40 import org.openecomp.sdc.be.resources.exception.ResourceDAOException;
41 import org.openecomp.sdc.common.api.HealthCheckInfo.HealthCheckStatus;
42 import org.openecomp.sdc.common.config.EcompErrorName;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45 import org.springframework.stereotype.Component;
46
47 import com.fasterxml.jackson.core.JsonProcessingException;
48
49 import fj.data.Either;
50
51 @Component("resource-dao")
52 public class ESCatalogDAO extends ESGenericSearchDAO implements ICatalogDAO {
53
54         private static Logger log = LoggerFactory.getLogger(ESCatalogDAO.class.getName());
55         private static Logger healthCheckLogger = LoggerFactory.getLogger("elasticsearch.healthcheck");
56
57         ///// HealthCheck/////////
58         private static final String ES_HEALTH_CHECK_STR = "elasticsearchHealthCheck";
59
60         private ScheduledExecutorService healthCheckScheduler = Executors
61                         .newSingleThreadScheduledExecutor(new ThreadFactory() {
62                                 @Override
63                                 public Thread newThread(Runnable r) {
64                                         return new Thread(r, "ES-Health-Check-Thread");
65                                 }
66                         });
67
68         private class HealthCheckScheduledTask implements Runnable {
69                 @Override
70                 public void run() {
71                         healthCheckLogger.trace("Executing ELASTICSEARCH Health Check Task - Start");
72
73                         HealthCheckStatus healthStatus = null;
74                         try {
75                                 healthStatus = isInitCompleted() ? checkHealth() : HealthCheckStatus.DOWN;
76                         } catch (Exception e) {
77                                 log.error("Error while trying to connect to elasticsearch. host: {} | port: {} | error: {}", 
78                                                 getEsClient().getServerHost(), getEsClient().getServerPort(), e.getMessage(), e);
79                                 healthStatus = HealthCheckStatus.DOWN;
80                         }
81                         healthCheckLogger.trace("Executed ELASTICSEARCH Health Check Task - Status = {}", healthStatus);
82                         if (healthStatus != lastHealthState) {
83                                 log.trace("ELASTICSEARCH Health State Changed to {}. Issuing alarm / recovery alarm...", healthStatus);
84                                 lastHealthState = healthStatus;
85                                 logAlarm();
86                         }
87                 }
88         }
89
90         private HealthCheckScheduledTask healthCheckScheduledTask = new HealthCheckScheduledTask();
91         private volatile HealthCheckStatus lastHealthState = HealthCheckStatus.DOWN;
92
93         /**
94          * Get ES cluster status string rep
95          * 
96          * @return "GREEN", "YELLOW" or "RED"
97          */
98         private HealthCheckStatus checkHealth() {
99                 if (!isInitCompleted()) {
100                         return HealthCheckStatus.DOWN;
101                 }
102                 ClusterHealthRequest healthRequest = new ClusterHealthRequest("_all");
103                 healthRequest.masterNodeTimeout(TimeValue.timeValueSeconds(2));
104                 ClusterHealthStatus status = getClient().admin().cluster().health(healthRequest).actionGet().getStatus();
105                 healthCheckLogger.debug("ES cluster health status is {}", status);
106                 if (status == null || status.equals(ClusterHealthStatus.RED)) {
107                         return HealthCheckStatus.DOWN;
108                 }
109                 return HealthCheckStatus.UP;
110         }
111
112         private void logAlarm() {
113                 if (lastHealthState == HealthCheckStatus.UP) {
114                         BeEcompErrorManager.getInstance().logBeHealthCheckElasticSearchRecovery(ES_HEALTH_CHECK_STR);
115                 } else {
116                         BeEcompErrorManager.getInstance().logBeHealthCheckElasticSearchError(ES_HEALTH_CHECK_STR);
117                 }
118         }
119
120         @PostConstruct
121         public void initCompleted() {
122                 long interval = ConfigurationManager.getConfigurationManager().getConfiguration()
123                                 .getEsReconnectIntervalInSeconds(5);
124                 this.healthCheckScheduler.scheduleAtFixedRate(healthCheckScheduledTask, 0, interval, TimeUnit.SECONDS);
125                 initCompleted = true;
126         }
127
128         // Index Checking Variables
129         private boolean initCompleted = false;
130
131         @Override
132         public void writeArtifact(ESArtifactData artifactData) throws ResourceDAOException {
133                 try {
134                         saveResourceData(artifactData);
135                 } catch (Exception e) {
136                         throw new ResourceDAOException("Error to save ArtifactData with " + artifactData.getId());
137                 }
138         }
139
140         @Override
141         public Either<ESArtifactData, ResourceUploadStatus> getArtifact(String id) {
142                 ESArtifactData resData = null;
143
144                 try {
145                         resData = findById(getTypeFromClass(ESArtifactData.class), id, ESArtifactData.class);
146                 } catch (Exception e) {
147                         resData = null;
148                         BeEcompErrorManager.getInstance().logBeDaoSystemError("Get Artifact from database");
149                         log.debug("ESCatalogDAO:getArtifact failed with exception ", e);
150                         return Either.right(ResourceUploadStatus.ERROR);
151                 }
152
153                 if (resData != null) {
154                         return Either.left(resData);
155                 } else {
156                         return Either.right(ResourceUploadStatus.NOT_EXIST);
157                 }
158         }
159
160         private <T> String getTypeFromClass(Class<T> clazz) {
161
162                 return clazz.getSimpleName().toLowerCase();
163         }
164
165         @Override
166         public void deleteArtifact(String id) {
167                 delete(getTypeFromClass(ESArtifactData.class), id);
168         }
169
170         @Override
171         public Either<List<ESArtifactData>, ResourceUploadStatus> getArtifacts(String[] ids) {
172                 List<ESArtifactData> resData = null;
173                 try {
174                         resData = findByIds(getTypeFromClass(ESArtifactData.class), ESArtifactData.class, ids);
175                 } catch (Exception e) {
176                         resData = null;
177                         return Either.right(ResourceUploadStatus.ERROR);
178                 }
179
180                 if (resData != null && !resData.isEmpty()) {
181                         return Either.left(resData);
182                 } else {
183                         return Either.right(ResourceUploadStatus.NOT_EXIST);
184                 }
185         }
186
187         private void saveResourceData(ESArtifactData data) throws JsonProcessingException {
188                 String typeName = getTypeFromClass(data.getClass());
189                 saveResourceData(typeName, data, data.getId());
190         }
191
192         @Override
193         public void deleteAllArtifacts() {
194                 String typeName = getTypeFromClass(ESArtifactData.class);
195                 String indexName = getIndexForType(typeName);
196                 deleteIndex(indexName);
197
198         }
199
200         public boolean isInitCompleted() {
201                 return initCompleted;
202         }
203
204         public HealthCheckStatus getHealth() {
205                 return lastHealthState;
206         }
207
208 }