Catalog alignment
[sdc.git] / catalog-dao / src / main / java / org / openecomp / sdc / be / dao / janusgraph / JanusGraphClient.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.dao.janusgraph;
22
23 import org.janusgraph.core.*;
24 import org.janusgraph.core.schema.ConsistencyModifier;
25 import org.janusgraph.core.schema.JanusGraphIndex;
26 import org.janusgraph.core.schema.JanusGraphManagement;
27 import org.janusgraph.diskstorage.BackendException;
28 import org.janusgraph.diskstorage.ResourceUnavailableException;
29 import org.janusgraph.diskstorage.locking.PermanentLockingException;
30 import org.janusgraph.graphdb.database.idassigner.IDPoolExhaustedException;
31 import fj.data.Either;
32 import org.apache.commons.configuration.BaseConfiguration;
33 import org.apache.tinkerpop.gremlin.structure.T;
34 import org.apache.tinkerpop.gremlin.structure.Vertex;
35 import org.openecomp.sdc.be.config.BeEcompErrorManager;
36 import org.openecomp.sdc.be.config.ConfigurationManager;
37 import org.openecomp.sdc.be.dao.DAOJanusGraphStrategy;
38 import org.openecomp.sdc.be.dao.JanusGraphClientStrategy;
39 import org.openecomp.sdc.be.dao.neo4j.GraphPropertiesDictionary;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import org.springframework.stereotype.Component;
43
44 import javax.annotation.PostConstruct;
45 import javax.annotation.PreDestroy;
46 import java.util.ArrayList;
47 import java.util.HashMap;
48 import java.util.concurrent.*;
49
50
51 @Component("janusgraph-client")
52 public class JanusGraphClient {
53         private static Logger logger = LoggerFactory.getLogger(JanusGraphClient.class.getName());
54         private static Logger healthLogger = LoggerFactory.getLogger("janusgraph.healthcheck");
55
56         private static final String HEALTH_CHECK = GraphPropertiesDictionary.HEALTH_CHECK.getProperty();
57         private static final String OK = "GOOD";
58
59         public JanusGraphClient() {
60         }
61
62         @PreDestroy
63         public void closeSession(){
64                 if ( graph.isOpen() ){
65                         graph.close();
66                         logger.info("** JanusGraphClient session closed");
67                 }
68         }
69         private class HealthCheckTask implements Callable<Vertex> {
70                 @Override
71                 public Vertex call() {
72
73                         JanusGraphVertex v = (JanusGraphVertex) graph.query().has(HEALTH_CHECK, OK).vertices().iterator().next();
74                         JanusGraphVertexProperty<String> property = v.property("healthcheck", OK + "_" + System.currentTimeMillis());
75                         healthLogger.trace("Health Check Node Found...{}", v.property(HEALTH_CHECK));
76                         graph.tx().commit();
77
78                         return v;
79                 }
80         }
81
82         private class HealthCheckScheduledTask implements Runnable {
83                 @Override
84                 public void run() {
85                         healthLogger.trace("Executing janusGraph Health Check Task - Start");
86                         boolean healthStatus = isGraphOpen();
87                         healthLogger.trace("Executing janusGraph Health Check Task - Status = {}", healthStatus);
88                         if (healthStatus != lastHealthState) {
89                                 logger.trace("janusGraph  Health State Changed to {}. Issuing alarm / recovery alarm...", healthStatus);
90                                 lastHealthState = healthStatus;
91                                 logAlarm();
92                         }
93                 }
94         }
95
96         private class ReconnectTask implements Runnable {
97                 @Override
98                 public void run() {
99                         logger.trace("Trying to reconnect to JanusGraph...");
100                         if (graph == null) {
101                                 createGraph(janusGraphCfgFile);
102                         }
103                 }
104         }
105
106         private JanusGraph graph;
107
108         // Health Check Variables
109
110         /**
111          * This executor will execute the health check task on a callable task that can be executed with a timeout.
112          */
113         ExecutorService healthCheckExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
114                 @Override
115                 public Thread newThread(Runnable r) {
116                         return new Thread(r, "JanusGraph-Health-Check-Thread");
117                 }
118         });
119         private long healthCheckReadTimeout = 2;
120         HealthCheckTask healthCallableTask = new HealthCheckTask();
121         HealthCheckScheduledTask healthCheckScheduledTask = new HealthCheckScheduledTask();
122         boolean lastHealthState = false;
123
124         // Reconnection variables
125         private ScheduledExecutorService reconnectScheduler = null;
126         private ScheduledExecutorService healthCheckScheduler = null;
127         private Runnable reconnectTask = null;
128         private long reconnectInterval = 3;
129         @SuppressWarnings("rawtypes")
130         private Future reconnectFuture;
131
132         private String janusGraphCfgFile = null;
133         JanusGraphClientStrategy janusGraphClientStrategy;
134
135         public JanusGraphClient(JanusGraphClientStrategy janusGraphClientStrategy) {
136                 super();
137                 this.janusGraphClientStrategy = janusGraphClientStrategy;
138
139                 // Initialize a single threaded scheduler for health-check
140                 this.healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
141                         @Override
142                         public Thread newThread(Runnable r) {
143                                 return new Thread(r, "JanusGraph-Health-Check-Task");
144                         }
145                 });
146
147                 healthCheckReadTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getJanusGraphHealthCheckReadTimeout(2);
148                 reconnectInterval = ConfigurationManager.getConfigurationManager().getConfiguration().getJanusGraphReconnectIntervalInSeconds(3);
149
150                 logger.info("** JanusGraphClient created");
151         }
152
153         @PostConstruct
154         public JanusGraphOperationStatus createGraph() {
155
156                 logger.info("** createGraph started **");
157
158                 if (ConfigurationManager.getConfigurationManager().getConfiguration().getJanusGraphInMemoryGraph()) {
159                         BaseConfiguration conf = new BaseConfiguration();
160                         conf.setProperty("storage.backend", "inmemory");
161                         graph = JanusGraphFactory.open(conf);
162             createJanusGraphSchema();
163                         logger.info("** in memory graph created");
164                         return JanusGraphOperationStatus.OK;
165                 } else {
166                         this.janusGraphCfgFile = janusGraphClientStrategy.getConfigFile();
167                         if (janusGraphCfgFile == null || janusGraphCfgFile.isEmpty()) {
168                                 janusGraphCfgFile = "config/janusgraph.properties";
169                         }
170
171                         // yavivi
172                         // In case connection failed on init time, schedule a reconnect task
173                         // in the BG
174                         JanusGraphOperationStatus status = createGraph(janusGraphCfgFile);
175                         logger.debug("Create JanusGraph graph status {}", status);
176                         if (status != JanusGraphOperationStatus.OK) {
177                                 this.startReconnectTask();
178                         }
179
180                         return status;
181                 }
182         }
183
184         private void startHealthCheckTask() {
185                 this.healthCheckScheduler.scheduleAtFixedRate(healthCheckScheduledTask, 0, reconnectInterval, TimeUnit.SECONDS);
186         }
187
188         /**
189          * This method will be invoked ONLY on init time in case JanusGraph storage is down.
190          */
191         private void startReconnectTask() {
192                 this.reconnectTask = new ReconnectTask();
193                 // Initialize a single threaded scheduler
194                 this.reconnectScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
195                         @Override
196                         public Thread newThread(Runnable r) {
197                                 return new Thread(r, "JanusGraph-Reconnect-Task");
198                         }
199                 });
200
201                 logger.info("Scheduling reconnect task {} with interval of {} seconds", reconnectTask, reconnectInterval);
202                 reconnectFuture = this.reconnectScheduler.scheduleAtFixedRate(this.reconnectTask, 0, this.reconnectInterval, TimeUnit.SECONDS);
203         }
204
205         public void cleanupGraph() {
206                 if (graph != null) {
207                         // graph.shutdown();
208                         graph.close();
209                         try {
210                                 JanusGraphFactory.drop(graph);
211                         } catch (BackendException e) {
212                                 e.printStackTrace();
213                         }
214                 }
215         }
216
217         private boolean graphInitialized(){
218                 JanusGraphManagement graphMgmt = graph.openManagement();
219                 return graphMgmt.containsPropertyKey(HEALTH_CHECK) && graphMgmt.containsGraphIndex(HEALTH_CHECK);
220         }
221         
222
223         public JanusGraphOperationStatus createGraph(String janusGraphCfgFile) {
224                 logger.info("** open graph with {} started", janusGraphCfgFile);
225                 try {
226                         logger.info("openGraph : try to load file {}", janusGraphCfgFile);
227                         graph = JanusGraphFactory.open(janusGraphCfgFile);
228                         if (graph.isClosed() || !graphInitialized()) {
229                                 logger.error("janusgraph graph was not initialized");
230                                 return JanusGraphOperationStatus.NOT_CREATED;
231                         }
232
233                 } catch (Exception e) {
234                         this.graph = null;
235                         logger.info("createGraph : failed to open JanusGraph graph with configuration file: {}", janusGraphCfgFile);
236                         logger.debug("createGraph : failed with exception.", e);
237                         return JanusGraphOperationStatus.NOT_CONNECTED;
238                 }
239
240                 logger.info("** JanusGraph graph created ");
241
242                 // Do some post creation actions
243                 this.onGraphOpened();
244
245                 return JanusGraphOperationStatus.OK;
246         }
247
248         private void onGraphOpened() {
249                 // if a reconnect task is running, cancel it.
250                 if (this.reconnectFuture != null) {
251                         logger.info("** Cancelling JanusGraph reconnect task");
252                         reconnectFuture.cancel(true);
253                 }
254
255                 // create health-check node
256                 if (!graph.query().has(HEALTH_CHECK, OK).vertices().iterator().hasNext()) {
257                         logger.trace("Healthcheck Singleton node does not exist, Creating healthcheck node...");
258                         Vertex healthCheckNode = graph.addVertex();
259                         healthCheckNode.property(HEALTH_CHECK, OK);
260                         logger.trace("Healthcheck node created successfully. ID={}", healthCheckNode.property(T.id.getAccessor()));
261                         graph.tx().commit();
262                 } else {
263                         logger.trace("Skipping Healthcheck Singleton node creation. Already exist...");
264                 }
265                 this.startHealthCheckTask();
266         }
267
268
269         public Either<JanusGraph, JanusGraphOperationStatus> getGraph() {
270                 if (graph != null) {
271                         return Either.left(graph);
272                 } else {
273                         return Either.right(JanusGraphOperationStatus.NOT_CREATED);
274                 }
275         }
276
277         public JanusGraphOperationStatus commit() {
278                 if (graph != null) {
279                         try {
280                                 graph.tx().commit();
281                                 return JanusGraphOperationStatus.OK;
282                         } catch (Exception e) {
283                                 return handleJanusGraphException(e);
284                         }
285                 } else {
286                         return JanusGraphOperationStatus.NOT_CREATED;
287                 }
288         }
289
290         public JanusGraphOperationStatus rollback() {
291                 if (graph != null) {
292                         try {
293                                 // graph.rollback();
294                                 graph.tx().rollback();
295                                 return JanusGraphOperationStatus.OK;
296                         } catch (Exception e) {
297                                 return handleJanusGraphException(e);
298                         }
299                 } else {
300                         return JanusGraphOperationStatus.NOT_CREATED;
301                 }
302         }
303
304         public static JanusGraphOperationStatus handleJanusGraphException(Exception e) {
305                 if (e instanceof JanusGraphConfigurationException) {
306                         return JanusGraphOperationStatus.JANUSGRAPH_CONFIGURATION;
307                 }
308                 if (e instanceof SchemaViolationException) {
309                         return JanusGraphOperationStatus.JANUSGRAPH_SCHEMA_VIOLATION;
310                 }
311                 if (e instanceof PermanentLockingException) {
312                         return JanusGraphOperationStatus.JANUSGRAPH_SCHEMA_VIOLATION;
313                 }
314                 if (e instanceof IDPoolExhaustedException) {
315                         return JanusGraphOperationStatus.GENERAL_ERROR;
316                 }
317                 if (e instanceof InvalidElementException) {
318                         return JanusGraphOperationStatus.INVALID_ELEMENT;
319                 }
320                 if (e instanceof InvalidIDException) {
321                         return JanusGraphOperationStatus.INVALID_ID;
322                 }
323                 if (e instanceof QueryException) {
324                         return JanusGraphOperationStatus.INVALID_QUERY;
325                 }
326                 if (e instanceof ResourceUnavailableException) {
327                         return JanusGraphOperationStatus.RESOURCE_UNAVAILABLE;
328                 }
329                 if (e instanceof IllegalArgumentException) {
330                         // TODO check the error message??
331                         return JanusGraphOperationStatus.ILLEGAL_ARGUMENT;
332                 }
333
334                 return JanusGraphOperationStatus.GENERAL_ERROR;
335         }
336
337         public boolean getHealth() {
338                 return this.lastHealthState;
339         }
340
341         private boolean isGraphOpen() {
342                 healthLogger.trace("Invoking JanusGraph health check ...");
343                 Vertex v = null;
344                 if (graph != null) {
345                         try {
346                                 Future<Vertex> future = healthCheckExecutor.submit(healthCallableTask);
347                                 v = future.get(this.healthCheckReadTimeout, TimeUnit.SECONDS);
348                                 healthLogger.trace("Health Check Node Found... {}", v.property(HEALTH_CHECK));
349                                 graph.tx().commit();
350                         } catch (Exception e) {
351                                 String message = e.getMessage();
352                                 if (message == null) {
353                                         message = e.getClass().getName();
354                                 }
355                                 logger.error("JanusGraph Health Check Failed. {}", message);
356                                 return false;
357                         }
358                         return true;
359                 } else {
360                         return false;
361                 }
362         }
363
364
365         public static void main(String[] args) throws InterruptedException {
366                 JanusGraphClient client = new JanusGraphClient(new DAOJanusGraphStrategy());
367                 client.createGraph();
368
369                 while (true) {
370                         boolean health = client.isGraphOpen();
371                         System.err.println("health=" + health);
372                         Thread.sleep(2000);
373                 }
374
375         }
376
377
378         private static final String JANUSGRAPH_HEALTH_CHECK = "janusgraphHealthCheck";
379
380         private void logAlarm() {
381                 if (lastHealthState) {
382                         BeEcompErrorManager.getInstance().logBeHealthCheckJanusGraphRecovery(JANUSGRAPH_HEALTH_CHECK);
383                 } else {
384                         BeEcompErrorManager.getInstance().logBeHealthCheckJanusGraphError(JANUSGRAPH_HEALTH_CHECK);
385                 }
386         }
387         
388         private void createJanusGraphSchema() {
389                 
390                 JanusGraphManagement graphMgt = graph.openManagement();
391                 JanusGraphIndex index = null;
392                 for (GraphPropertiesDictionary prop : GraphPropertiesDictionary.values()) {
393                         PropertyKey propKey = null;
394                         if (!graphMgt.containsPropertyKey(prop.getProperty())) {
395                                 Class<?> clazz = prop.getClazz();
396                                 if (!clazz.isAssignableFrom(ArrayList.class) && !clazz.isAssignableFrom(HashMap.class)) {
397                                         propKey = graphMgt.makePropertyKey(prop.getProperty()).dataType(prop.getClazz()).make();
398                                 }
399                         } else {
400                                 propKey = graphMgt.getPropertyKey(prop.getProperty());
401                         }
402                         if (prop.isIndexed()) {
403                                 if (!graphMgt.containsGraphIndex(prop.getProperty())) {
404                                         if (prop.isUnique()) {
405                                                 index = graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).unique().buildCompositeIndex();
406                                                 // Ensures only one name per vertex
407                                                 graphMgt.setConsistency(propKey, ConsistencyModifier.LOCK);
408                                                 // Ensures name uniqueness in the graph
409                                                 graphMgt.setConsistency(index, ConsistencyModifier.LOCK);
410
411                                         } else {
412                                                 graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).buildCompositeIndex();
413                                         }
414                                 }
415                         }
416                 }
417                 graphMgt.commit();
418         }
419
420 }