Upgrade SDC from Titan to Janus Graph
[sdc.git] / catalog-dao / src / main / java / org / openecomp / sdc / be / dao / janusgraph / JanusGraphClient.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.dao.janusgraph;
22
23 import org.janusgraph.core.*;
24 import org.janusgraph.core.schema.ConsistencyModifier;
25 import org.janusgraph.core.schema.JanusGraphIndex;
26 import org.janusgraph.core.schema.JanusGraphManagement;
27 import org.janusgraph.diskstorage.BackendException;
28 import org.janusgraph.diskstorage.ResourceUnavailableException;
29 import org.janusgraph.diskstorage.locking.PermanentLockingException;
30 import org.janusgraph.graphdb.database.idassigner.IDPoolExhaustedException;
31 import fj.data.Either;
32 import org.apache.commons.configuration.BaseConfiguration;
33 import org.apache.tinkerpop.gremlin.structure.T;
34 import org.apache.tinkerpop.gremlin.structure.Vertex;
35 import org.openecomp.sdc.be.config.BeEcompErrorManager;
36 import org.openecomp.sdc.be.config.ConfigurationManager;
37 import org.openecomp.sdc.be.dao.DAOJanusGraphStrategy;
38 import org.openecomp.sdc.be.dao.JanusGraphClientStrategy;
39 import org.openecomp.sdc.be.dao.neo4j.GraphPropertiesDictionary;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import org.springframework.stereotype.Component;
43
44 import javax.annotation.PostConstruct;
45 import java.util.ArrayList;
46 import java.util.HashMap;
47 import java.util.concurrent.*;
48
49
50 @Component("janusgraph-client")
51 public class JanusGraphClient {
52         private static Logger logger = LoggerFactory.getLogger(JanusGraphClient.class.getName());
53         private static Logger healthLogger = LoggerFactory.getLogger("janusgraph.healthcheck");
54
55         private static final String HEALTH_CHECK = GraphPropertiesDictionary.HEALTH_CHECK.getProperty();
56         private static final String OK = "GOOD";
57
58         public JanusGraphClient() {
59         }
60
61         private class HealthCheckTask implements Callable<Vertex> {
62                 @Override
63                 public Vertex call() {
64
65                         JanusGraphVertex v = (JanusGraphVertex) graph.query().has(HEALTH_CHECK, OK).vertices().iterator().next();
66                         JanusGraphVertexProperty<String> property = v.property("healthcheck", OK + "_" + System.currentTimeMillis());
67                         healthLogger.trace("Health Check Node Found...{}", v.property(HEALTH_CHECK));
68                         graph.tx().commit();
69
70                         return v;
71                 }
72         }
73
74         private class HealthCheckScheduledTask implements Runnable {
75                 @Override
76                 public void run() {
77                         healthLogger.trace("Executing janusGraph Health Check Task - Start");
78                         boolean healthStatus = isGraphOpen();
79                         healthLogger.trace("Executing janusGraph Health Check Task - Status = {}", healthStatus);
80                         if (healthStatus != lastHealthState) {
81                                 logger.trace("janusGraph  Health State Changed to {}. Issuing alarm / recovery alarm...", healthStatus);
82                                 lastHealthState = healthStatus;
83                                 logAlarm();
84                         }
85                 }
86         }
87
88         private class ReconnectTask implements Runnable {
89                 @Override
90                 public void run() {
91                         logger.trace("Trying to reconnect to JanusGraph...");
92                         if (graph == null) {
93                                 createGraph(janusGraphCfgFile);
94                         }
95                 }
96         }
97
98         private JanusGraph graph;
99
100         // Health Check Variables
101
102         /**
103          * This executor will execute the health check task on a callable task that can be executed with a timeout.
104          */
105         ExecutorService healthCheckExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
106                 @Override
107                 public Thread newThread(Runnable r) {
108                         return new Thread(r, "JanusGraph-Health-Check-Thread");
109                 }
110         });
111         private long healthCheckReadTimeout = 2;
112         HealthCheckTask healthCallableTask = new HealthCheckTask();
113         HealthCheckScheduledTask healthCheckScheduledTask = new HealthCheckScheduledTask();
114         boolean lastHealthState = false;
115
116         // Reconnection variables
117         private ScheduledExecutorService reconnectScheduler = null;
118         private ScheduledExecutorService healthCheckScheduler = null;
119         private Runnable reconnectTask = null;
120         private long reconnectInterval = 3;
121         @SuppressWarnings("rawtypes")
122         private Future reconnectFuture;
123
124         private String janusGraphCfgFile = null;
125         JanusGraphClientStrategy janusGraphClientStrategy;
126
127         public JanusGraphClient(JanusGraphClientStrategy janusGraphClientStrategy) {
128                 super();
129                 this.janusGraphClientStrategy = janusGraphClientStrategy;
130
131                 // Initialize a single threaded scheduler for health-check
132                 this.healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
133                         @Override
134                         public Thread newThread(Runnable r) {
135                                 return new Thread(r, "JanusGraph-Health-Check-Task");
136                         }
137                 });
138
139                 healthCheckReadTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getJanusGraphHealthCheckReadTimeout(2);
140                 reconnectInterval = ConfigurationManager.getConfigurationManager().getConfiguration().getJanusGraphReconnectIntervalInSeconds(3);
141
142                 logger.info("** JanusGraphClient created");
143         }
144
145         @PostConstruct
146         public JanusGraphOperationStatus createGraph() {
147
148                 logger.info("** createGraph started **");
149
150                 if (ConfigurationManager.getConfigurationManager().getConfiguration().getJanusGraphInMemoryGraph()) {
151                         BaseConfiguration conf = new BaseConfiguration();
152                         conf.setProperty("storage.backend", "inmemory");
153                         graph = JanusGraphFactory.open(conf);
154             createJanusGraphSchema();
155                         logger.info("** in memory graph created");
156                         return JanusGraphOperationStatus.OK;
157                 } else {
158                         this.janusGraphCfgFile = janusGraphClientStrategy.getConfigFile();
159                         if (janusGraphCfgFile == null || janusGraphCfgFile.isEmpty()) {
160                                 janusGraphCfgFile = "config/janusgraph.properties";
161                         }
162
163                         // yavivi
164                         // In case connection failed on init time, schedule a reconnect task
165                         // in the BG
166                         JanusGraphOperationStatus status = createGraph(janusGraphCfgFile);
167                         logger.debug("Create JanusGraph graph status {}", status);
168                         if (status != JanusGraphOperationStatus.OK) {
169                                 this.startReconnectTask();
170                         }
171
172                         return status;
173                 }
174         }
175
176         private void startHealthCheckTask() {
177                 this.healthCheckScheduler.scheduleAtFixedRate(healthCheckScheduledTask, 0, reconnectInterval, TimeUnit.SECONDS);
178         }
179
180         /**
181          * This method will be invoked ONLY on init time in case JanusGraph storage is down.
182          */
183         private void startReconnectTask() {
184                 this.reconnectTask = new ReconnectTask();
185                 // Initialize a single threaded scheduler
186                 this.reconnectScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
187                         @Override
188                         public Thread newThread(Runnable r) {
189                                 return new Thread(r, "JanusGraph-Reconnect-Task");
190                         }
191                 });
192
193                 logger.info("Scheduling reconnect task {} with interval of {} seconds", reconnectTask, reconnectInterval);
194                 reconnectFuture = this.reconnectScheduler.scheduleAtFixedRate(this.reconnectTask, 0, this.reconnectInterval, TimeUnit.SECONDS);
195         }
196
197         public void cleanupGraph() {
198                 if (graph != null) {
199                         // graph.shutdown();
200                         graph.close();
201                         try {
202                                 JanusGraphFactory.drop(graph);
203                         } catch (BackendException e) {
204                                 e.printStackTrace();
205                         }
206                 }
207         }
208
209         private boolean graphInitialized(){
210                 JanusGraphManagement graphMgmt = graph.openManagement();
211                 return graphMgmt.containsPropertyKey(HEALTH_CHECK) && graphMgmt.containsGraphIndex(HEALTH_CHECK);
212         }
213         
214
215         public JanusGraphOperationStatus createGraph(String janusGraphCfgFile) {
216                 logger.info("** open graph with {} started", janusGraphCfgFile);
217                 try {
218                         logger.info("openGraph : try to load file {}", janusGraphCfgFile);
219                         graph = JanusGraphFactory.open(janusGraphCfgFile);
220                         if (graph.isClosed() || !graphInitialized()) {
221                                 logger.error("janusgraph graph was not initialized");
222                                 return JanusGraphOperationStatus.NOT_CREATED;
223                         }
224
225                 } catch (Exception e) {
226                         this.graph = null;
227                         logger.info("createGraph : failed to open JanusGraph graph with configuration file: {}", janusGraphCfgFile);
228                         logger.debug("createGraph : failed with exception.", e);
229                         return JanusGraphOperationStatus.NOT_CONNECTED;
230                 }
231
232                 logger.info("** JanusGraph graph created ");
233
234                 // Do some post creation actions
235                 this.onGraphOpened();
236
237                 return JanusGraphOperationStatus.OK;
238         }
239
240         private void onGraphOpened() {
241                 // if a reconnect task is running, cancel it.
242                 if (this.reconnectFuture != null) {
243                         logger.info("** Cancelling JanusGraph reconnect task");
244                         reconnectFuture.cancel(true);
245                 }
246
247                 // create health-check node
248                 if (!graph.query().has(HEALTH_CHECK, OK).vertices().iterator().hasNext()) {
249                         logger.trace("Healthcheck Singleton node does not exist, Creating healthcheck node...");
250                         Vertex healthCheckNode = graph.addVertex();
251                         healthCheckNode.property(HEALTH_CHECK, OK);
252                         logger.trace("Healthcheck node created successfully. ID={}", healthCheckNode.property(T.id.getAccessor()));
253                         graph.tx().commit();
254                 } else {
255                         logger.trace("Skipping Healthcheck Singleton node creation. Already exist...");
256                 }
257                 this.startHealthCheckTask();
258         }
259
260
261         public Either<JanusGraph, JanusGraphOperationStatus> getGraph() {
262                 if (graph != null) {
263                         return Either.left(graph);
264                 } else {
265                         return Either.right(JanusGraphOperationStatus.NOT_CREATED);
266                 }
267         }
268
269         public JanusGraphOperationStatus commit() {
270                 if (graph != null) {
271                         try {
272                                 graph.tx().commit();
273                                 return JanusGraphOperationStatus.OK;
274                         } catch (Exception e) {
275                                 return handleJanusGraphException(e);
276                         }
277                 } else {
278                         return JanusGraphOperationStatus.NOT_CREATED;
279                 }
280         }
281
282         public JanusGraphOperationStatus rollback() {
283                 if (graph != null) {
284                         try {
285                                 // graph.rollback();
286                                 graph.tx().rollback();
287                                 return JanusGraphOperationStatus.OK;
288                         } catch (Exception e) {
289                                 return handleJanusGraphException(e);
290                         }
291                 } else {
292                         return JanusGraphOperationStatus.NOT_CREATED;
293                 }
294         }
295
296         public static JanusGraphOperationStatus handleJanusGraphException(Exception e) {
297                 if (e instanceof JanusGraphConfigurationException) {
298                         return JanusGraphOperationStatus.JANUSGRAPH_CONFIGURATION;
299                 }
300                 if (e instanceof SchemaViolationException) {
301                         return JanusGraphOperationStatus.JANUSGRAPH_SCHEMA_VIOLATION;
302                 }
303                 if (e instanceof PermanentLockingException) {
304                         return JanusGraphOperationStatus.JANUSGRAPH_SCHEMA_VIOLATION;
305                 }
306                 if (e instanceof IDPoolExhaustedException) {
307                         return JanusGraphOperationStatus.GENERAL_ERROR;
308                 }
309                 if (e instanceof InvalidElementException) {
310                         return JanusGraphOperationStatus.INVALID_ELEMENT;
311                 }
312                 if (e instanceof InvalidIDException) {
313                         return JanusGraphOperationStatus.INVALID_ID;
314                 }
315                 if (e instanceof QueryException) {
316                         return JanusGraphOperationStatus.INVALID_QUERY;
317                 }
318                 if (e instanceof ResourceUnavailableException) {
319                         return JanusGraphOperationStatus.RESOURCE_UNAVAILABLE;
320                 }
321                 if (e instanceof IllegalArgumentException) {
322                         // TODO check the error message??
323                         return JanusGraphOperationStatus.ILLEGAL_ARGUMENT;
324                 }
325
326                 return JanusGraphOperationStatus.GENERAL_ERROR;
327         }
328
329         public boolean getHealth() {
330                 return this.lastHealthState;
331         }
332
333         private boolean isGraphOpen() {
334                 healthLogger.trace("Invoking JanusGraph health check ...");
335                 Vertex v = null;
336                 if (graph != null) {
337                         try {
338                                 Future<Vertex> future = healthCheckExecutor.submit(healthCallableTask);
339                                 v = future.get(this.healthCheckReadTimeout, TimeUnit.SECONDS);
340                                 healthLogger.trace("Health Check Node Found... {}", v.property(HEALTH_CHECK));
341                                 graph.tx().commit();
342                         } catch (Exception e) {
343                                 String message = e.getMessage();
344                                 if (message == null) {
345                                         message = e.getClass().getName();
346                                 }
347                                 logger.error("JanusGraph Health Check Failed. {}", message);
348                                 return false;
349                         }
350                         return true;
351                 } else {
352                         return false;
353                 }
354         }
355
356
357         public static void main(String[] args) throws InterruptedException {
358                 JanusGraphClient client = new JanusGraphClient(new DAOJanusGraphStrategy());
359                 client.createGraph();
360
361                 while (true) {
362                         boolean health = client.isGraphOpen();
363                         System.err.println("health=" + health);
364                         Thread.sleep(2000);
365                 }
366
367         }
368
369
370         private static final String JANUSGRAPH_HEALTH_CHECK = "janusgraphHealthCheck";
371
372         private void logAlarm() {
373                 if (lastHealthState) {
374                         BeEcompErrorManager.getInstance().logBeHealthCheckJanusGraphRecovery(JANUSGRAPH_HEALTH_CHECK);
375                 } else {
376                         BeEcompErrorManager.getInstance().logBeHealthCheckJanusGraphError(JANUSGRAPH_HEALTH_CHECK);
377                 }
378         }
379         
380         private void createJanusGraphSchema() {
381                 
382                 JanusGraphManagement graphMgt = graph.openManagement();
383                 JanusGraphIndex index = null;
384                 for (GraphPropertiesDictionary prop : GraphPropertiesDictionary.values()) {
385                         PropertyKey propKey = null;
386                         if (!graphMgt.containsPropertyKey(prop.getProperty())) {
387                                 Class<?> clazz = prop.getClazz();
388                                 if (!clazz.isAssignableFrom(ArrayList.class) && !clazz.isAssignableFrom(HashMap.class)) {
389                                         propKey = graphMgt.makePropertyKey(prop.getProperty()).dataType(prop.getClazz()).make();
390                                 }
391                         } else {
392                                 propKey = graphMgt.getPropertyKey(prop.getProperty());
393                         }
394                         if (prop.isIndexed()) {
395                                 if (!graphMgt.containsGraphIndex(prop.getProperty())) {
396                                         if (prop.isUnique()) {
397                                                 index = graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).unique().buildCompositeIndex();
398                                                 // Ensures only one name per vertex
399                                                 graphMgt.setConsistency(propKey, ConsistencyModifier.LOCK);
400                                                 // Ensures name uniqueness in the graph
401                                                 graphMgt.setConsistency(index, ConsistencyModifier.LOCK);
402
403                                         } else {
404                                                 graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).buildCompositeIndex();
405                                         }
406                                 }
407                         }
408                 }
409                 graphMgt.commit();
410         }
411
412 }