Sync Integ to Master
[sdc.git] / catalog-dao / src / main / java / org / openecomp / sdc / be / dao / titan / TitanGraphClient.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.dao.titan;
22
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.TimeUnit;
32
33 import javax.annotation.PostConstruct;
34
35 import org.apache.commons.configuration.BaseConfiguration;
36 import org.apache.tinkerpop.gremlin.structure.T;
37 import org.apache.tinkerpop.gremlin.structure.Vertex;
38 import org.openecomp.sdc.be.config.BeEcompErrorManager;
39 import org.openecomp.sdc.be.config.ConfigurationManager;
40 import org.openecomp.sdc.be.dao.DAOTitanStrategy;
41 import org.openecomp.sdc.be.dao.TitanClientStrategy;
42 import org.openecomp.sdc.be.dao.neo4j.GraphPropertiesDictionary;
43 import org.openecomp.sdc.common.config.EcompErrorName;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 import org.springframework.stereotype.Component;
47
48 import com.thinkaurelius.titan.core.InvalidElementException;
49 import com.thinkaurelius.titan.core.InvalidIDException;
50 import com.thinkaurelius.titan.core.PropertyKey;
51 import com.thinkaurelius.titan.core.QueryException;
52 import com.thinkaurelius.titan.core.SchemaViolationException;
53 import com.thinkaurelius.titan.core.TitanConfigurationException;
54 import com.thinkaurelius.titan.core.TitanFactory;
55 import com.thinkaurelius.titan.core.TitanGraph;
56 import com.thinkaurelius.titan.core.TitanVertex;
57 import com.thinkaurelius.titan.core.TitanVertexProperty;
58 import com.thinkaurelius.titan.core.schema.ConsistencyModifier;
59 import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
60 import com.thinkaurelius.titan.core.schema.TitanManagement;
61 import com.thinkaurelius.titan.core.util.TitanCleanup;
62 import com.thinkaurelius.titan.diskstorage.ResourceUnavailableException;
63 import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException;
64 import com.thinkaurelius.titan.graphdb.database.idassigner.IDPoolExhaustedException;
65
66 import fj.data.Either;
67
68
69 @Component("titan-client")
70 public class TitanGraphClient {
71
72         private static Logger logger = LoggerFactory.getLogger(TitanGraphClient.class.getName());
73         private static Logger healthLogger = LoggerFactory.getLogger("titan.healthcheck");
74
75         private static final String HEALTH_CHECK = GraphPropertiesDictionary.HEALTH_CHECK.getProperty();
76         private static final String OK = "GOOD";
77
78         public TitanGraphClient() {
79         }
80
81         private class HealthCheckTask implements Callable<Vertex> {
82                 @Override
83                 public Vertex call() {
84
85                         TitanVertex v = (TitanVertex) graph.query().has(HEALTH_CHECK, OK).vertices().iterator().next();
86                         TitanVertexProperty<String> property = v.property("healthcheck", OK + "_" + System.currentTimeMillis());
87                         healthLogger.trace("Health Check Node Found...{}", v.property(HEALTH_CHECK));
88                         graph.tx().commit();
89
90                         // Vertex v = graph.getVertices(HEALTH_CHECK, OK).iterator().next();
91                         // v.setProperty("healthcheck", OK + "_" +
92                         // System.currentTimeMillis());
93                         // graph.commit();
94                         // healthLogger.trace("Health Check Node
95                         // Found..."+v.getProperty(HEALTH_CHECK) );
96                         return v;
97                 }
98         }
99
100         private class HealthCheckScheduledTask implements Runnable {
101                 @Override
102                 public void run() {
103                         healthLogger.trace("Executing TITAN Health Check Task - Start");
104                         boolean healthStatus = isGraphOpen();
105                         healthLogger.trace("Executing TITAN Health Check Task - Status = {}", healthStatus);
106                         if (healthStatus != lastHealthState) {
107                                 logger.trace("TITAN  Health State Changed to {}. Issuing alarm / recovery alarm...", healthStatus);
108                                 lastHealthState = healthStatus;
109                                 logAlarm();
110                         }
111                 }
112         }
113
114         private class ReconnectTask implements Runnable {
115                 @Override
116                 public void run() {
117                         logger.trace("Trying to reconnect to Titan...");
118                         if (graph == null) {
119                                 createGraph(titanCfgFile);
120                         }
121                 }
122         }
123
124         private TitanGraph graph;
125
126         // Health Check Variables
127
128         /**
129          * This executor will execute the health check task on a callable task that can be executed with a timeout.
130          */
131         ExecutorService healthCheckExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
132                 @Override
133                 public Thread newThread(Runnable r) {
134                         return new Thread(r, "Titan-Health-Check-Thread");
135                 }
136         });
137         private long healthCheckReadTimeout = 2;
138         HealthCheckTask healthCallableTask = new HealthCheckTask();
139         HealthCheckScheduledTask healthCheckScheduledTask = new HealthCheckScheduledTask();
140         boolean lastHealthState = false;
141
142         // Reconnection variables
143         private ScheduledExecutorService reconnectScheduler = null;
144         private ScheduledExecutorService healthCheckScheduler = null;
145         private Runnable reconnectTask = null;
146         private long reconnectInterval = 3;
147         @SuppressWarnings("rawtypes")
148         private Future reconnectFuture;
149
150         private String titanCfgFile = null;
151         TitanClientStrategy titanClientStrategy;
152
153         public TitanGraphClient(TitanClientStrategy titanClientStrategy) {
154                 super();
155                 this.titanClientStrategy = titanClientStrategy;
156
157                 // Initialize a single threaded scheduler for health-check
158                 this.healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
159                         @Override
160                         public Thread newThread(Runnable r) {
161                                 return new Thread(r, "Titan-Health-Check-Task");
162                         }
163                 });
164
165                 healthCheckReadTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getTitanHealthCheckReadTimeout(2);
166                 reconnectInterval = ConfigurationManager.getConfigurationManager().getConfiguration().getTitanReconnectIntervalInSeconds(3);
167
168                 logger.info("** TitanGraphClient created");
169         }
170
171         @PostConstruct
172         public TitanOperationStatus createGraph() {
173
174                 logger.info("** createGraph started **");
175
176                 if (ConfigurationManager.getConfigurationManager().getConfiguration().getTitanInMemoryGraph()) {
177                         BaseConfiguration conf = new BaseConfiguration();
178                         conf.setProperty("storage.backend", "inmemory");
179                         graph = TitanFactory.open(conf);
180             createTitanSchema(); 
181                         logger.info("** in memory graph created");
182                         return TitanOperationStatus.OK;
183                 } else {
184                         this.titanCfgFile = titanClientStrategy.getConfigFile();
185                         if (titanCfgFile == null || titanCfgFile.isEmpty()) {
186                                 titanCfgFile = "config/titan.properties";
187                         }
188
189                         // yavivi
190                         // In case connection failed on init time, schedule a reconnect task
191                         // in the BG
192                         TitanOperationStatus status = createGraph(titanCfgFile);
193                         logger.debug("Create Titan graph status {}", status);
194                         if (status != TitanOperationStatus.OK) {
195                                 this.startReconnectTask();
196                         }
197
198                         return status;
199                 }
200         }
201
202         private void startHealthCheckTask() {
203                 this.healthCheckScheduler.scheduleAtFixedRate(healthCheckScheduledTask, 0, reconnectInterval, TimeUnit.SECONDS);
204         }
205
206         /**
207          * This method will be invoked ONLY on init time in case Titan storage is down.
208          */
209         private void startReconnectTask() {
210                 this.reconnectTask = new ReconnectTask();
211                 // Initialize a single threaded scheduler
212                 this.reconnectScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
213                         @Override
214                         public Thread newThread(Runnable r) {
215                                 return new Thread(r, "Titan-Reconnect-Task");
216                         }
217                 });
218
219                 logger.info("Scheduling reconnect task {} with interval of {} seconds", reconnectTask, reconnectInterval);
220                 reconnectFuture = this.reconnectScheduler.scheduleAtFixedRate(this.reconnectTask, 0, this.reconnectInterval, TimeUnit.SECONDS);
221         }
222
223         public void cleanupGraph() {
224                 if (graph != null) {
225                         // graph.shutdown();
226                         graph.close();
227                         TitanCleanup.clear(graph);
228                 }
229         }
230
231         private boolean graphInitialized(){
232                 TitanManagement graphMgmt = graph.openManagement();
233                 return graphMgmt.containsPropertyKey(HEALTH_CHECK) && graphMgmt.containsGraphIndex(HEALTH_CHECK);
234         }
235         
236
237         public TitanOperationStatus createGraph(String titanCfgFile) {
238                 logger.info("** open graph with {} started", titanCfgFile);
239                 try {
240                         logger.info("openGraph : try to load file {}", titanCfgFile);
241                         graph = TitanFactory.open(titanCfgFile);
242                         if (graph.isClosed() || !graphInitialized()) {
243                                 logger.error("titan graph was not initialized");
244                                 return TitanOperationStatus.NOT_CREATED;
245                         }
246
247                 } catch (Exception e) {
248                         this.graph = null;
249                         logger.info("createGraph : failed to open Titan graph with configuration file: {}", titanCfgFile, e);
250                         return TitanOperationStatus.NOT_CONNECTED;
251                 }
252
253                 logger.info("** Titan graph created ");
254
255                 // Do some post creation actions
256                 this.onGraphOpened();
257
258                 return TitanOperationStatus.OK;
259         }
260
261         private void onGraphOpened() {
262                 // if a reconnect task is running, cancel it.
263                 if (this.reconnectFuture != null) {
264                         logger.info("** Cancelling Titan reconnect task");
265                         reconnectFuture.cancel(true);
266                 }
267
268                 // create health-check node
269                 if (!graph.query().has(HEALTH_CHECK, OK).vertices().iterator().hasNext()) {
270                         logger.trace("Healthcheck Singleton node does not exist, Creating healthcheck node...");
271                         Vertex healthCheckNode = graph.addVertex();
272                         healthCheckNode.property(HEALTH_CHECK, OK);
273                         logger.trace("Healthcheck node created successfully. ID={}", healthCheckNode.property(T.id.getAccessor()));
274                         graph.tx().commit();
275                 } else {
276                         logger.trace("Skipping Healthcheck Singleton node creation. Already exist...");
277                 }
278                 this.startHealthCheckTask();
279         }
280
281
282         public Either<TitanGraph, TitanOperationStatus> getGraph() {
283                 if (graph != null) {
284                         return Either.left(graph);
285                 } else {
286                         return Either.right(TitanOperationStatus.NOT_CREATED);
287                 }
288         }
289
290         public TitanOperationStatus commit() {
291                 if (graph != null) {
292                         try {
293                                 graph.tx().commit();
294                                 return TitanOperationStatus.OK;
295                         } catch (Exception e) {
296                                 return handleTitanException(e);
297                         }
298                 } else {
299                         return TitanOperationStatus.NOT_CREATED;
300                 }
301         }
302
303         public TitanOperationStatus rollback() {
304                 if (graph != null) {
305                         try {
306                                 // graph.rollback();
307                                 graph.tx().rollback();
308                                 return TitanOperationStatus.OK;
309                         } catch (Exception e) {
310                                 return handleTitanException(e);
311                         }
312                 } else {
313                         return TitanOperationStatus.NOT_CREATED;
314                 }
315         }
316
317         public static TitanOperationStatus handleTitanException(Exception e) {
318                 if (e instanceof TitanConfigurationException) {
319                         return TitanOperationStatus.TITAN_CONFIGURATION;
320                 }
321                 if (e instanceof SchemaViolationException) {
322                         return TitanOperationStatus.TITAN_SCHEMA_VIOLATION;
323                 }
324                 if (e instanceof PermanentLockingException) {
325                         return TitanOperationStatus.TITAN_SCHEMA_VIOLATION;
326                 }
327                 if (e instanceof IDPoolExhaustedException) {
328                         return TitanOperationStatus.GENERAL_ERROR;
329                 }
330                 if (e instanceof InvalidElementException) {
331                         return TitanOperationStatus.INVALID_ELEMENT;
332                 }
333                 if (e instanceof InvalidIDException) {
334                         return TitanOperationStatus.INVALID_ID;
335                 }
336                 if (e instanceof QueryException) {
337                         return TitanOperationStatus.INVALID_QUERY;
338                 }
339                 if (e instanceof ResourceUnavailableException) {
340                         return TitanOperationStatus.RESOURCE_UNAVAILABLE;
341                 }
342                 if (e instanceof IllegalArgumentException) {
343                         // TODO check the error message??
344                         return TitanOperationStatus.ILLEGAL_ARGUMENT;
345                 }
346
347                 return TitanOperationStatus.GENERAL_ERROR;
348         }
349
350         public boolean getHealth() {
351                 return this.lastHealthState;
352         }
353
354         private boolean isGraphOpen() {
355                 healthLogger.trace("Invoking Titan health check ...");
356                 Vertex v = null;
357                 if (graph != null) {
358                         try {
359                                 Future<Vertex> future = healthCheckExecutor.submit(healthCallableTask);
360                                 v = future.get(this.healthCheckReadTimeout, TimeUnit.SECONDS);
361                                 healthLogger.trace("Health Check Node Found... {}", v.property(HEALTH_CHECK));
362                                 graph.tx().commit();
363                         } catch (Exception e) {
364                                 String message = e.getMessage();
365                                 if (message == null) {
366                                         message = e.getClass().getName();
367                                 }
368                                 logger.error("Titan Health Check Failed. {}", message);
369                                 return false;
370                         }
371                         return true;
372                 } else {
373                         return false;
374                 }
375         }
376
377
378         public static void main(String[] args) throws InterruptedException {
379                 TitanGraphClient client = new TitanGraphClient(new DAOTitanStrategy());
380                 client.createGraph();
381
382                 while (true) {
383                         boolean health = client.isGraphOpen();
384                         System.err.println("health=" + health);
385                         Thread.sleep(2000);
386                 }
387
388         }
389
390
391         private static final String TITAN_HEALTH_CHECK_STR = "titanHealthCheck";
392
393         private void logAlarm() {
394                 if (lastHealthState == true) {
395                         BeEcompErrorManager.getInstance().logBeHealthCheckTitanRecovery(TITAN_HEALTH_CHECK_STR);
396                 } else {
397                         BeEcompErrorManager.getInstance().logBeHealthCheckTitanError(TITAN_HEALTH_CHECK_STR);
398                 }
399         }
400         
401         private void createTitanSchema() {
402                 
403                 TitanManagement graphMgt = graph.openManagement();
404                 TitanGraphIndex index = null;
405                 for (GraphPropertiesDictionary prop : GraphPropertiesDictionary.values()) {
406                         PropertyKey propKey = null;
407                         if (!graphMgt.containsPropertyKey(prop.getProperty())) {
408                                 Class<?> clazz = prop.getClazz();
409                                 if (!ArrayList.class.getName().equals(clazz.getName()) && !HashMap.class.getName().equals(clazz.getName())) {
410                                         propKey = graphMgt.makePropertyKey(prop.getProperty()).dataType(prop.getClazz()).make();
411                                 }
412                         } else {
413                                 propKey = graphMgt.getPropertyKey(prop.getProperty());
414                         }
415                         if (prop.isIndexed()) {
416                                 if (!graphMgt.containsGraphIndex(prop.getProperty())) {
417                                         if (prop.isUnique()) {
418                                                 index = graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).unique().buildCompositeIndex();
419
420                                                 graphMgt.setConsistency(propKey, ConsistencyModifier.LOCK); // Ensures
421                                                                                                                                                                         // only
422                                                                                                                                                                         // one
423                                                                                                                                                                         // name
424                                                                                                                                                                         // per
425                                                                                                                                                                         // vertex
426                                                 graphMgt.setConsistency(index, ConsistencyModifier.LOCK); // Ensures
427                                                                                                                                                                         // name
428                                                                                                                                                                         // uniqueness
429                                                                                                                                                                         // in
430                                                                                                                                                                         // the
431                                                                                                                                                                         // graph
432
433                                         } else {
434                                                 graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).buildCompositeIndex();
435                                         }
436                                 }
437                         }
438                 }
439                 graphMgt.commit();
440         }
441
442 }