2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.dao.janusgraph;
23 import org.janusgraph.core.*;
24 import org.janusgraph.core.schema.ConsistencyModifier;
25 import org.janusgraph.core.schema.JanusGraphIndex;
26 import org.janusgraph.core.schema.JanusGraphManagement;
27 import org.janusgraph.diskstorage.BackendException;
28 import org.janusgraph.diskstorage.ResourceUnavailableException;
29 import org.janusgraph.diskstorage.locking.PermanentLockingException;
30 import org.janusgraph.graphdb.database.idassigner.IDPoolExhaustedException;
31 import fj.data.Either;
32 import org.apache.commons.configuration.BaseConfiguration;
33 import org.apache.tinkerpop.gremlin.structure.T;
34 import org.apache.tinkerpop.gremlin.structure.Vertex;
35 import org.openecomp.sdc.be.config.BeEcompErrorManager;
36 import org.openecomp.sdc.be.config.ConfigurationManager;
37 import org.openecomp.sdc.be.dao.DAOJanusGraphStrategy;
38 import org.openecomp.sdc.be.dao.JanusGraphClientStrategy;
39 import org.openecomp.sdc.be.dao.neo4j.GraphPropertiesDictionary;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import org.springframework.stereotype.Component;
44 import javax.annotation.PostConstruct;
45 import java.util.ArrayList;
46 import java.util.HashMap;
47 import java.util.concurrent.*;
50 @Component("janusgraph-client")
51 public class JanusGraphClient {
52 private static Logger logger = LoggerFactory.getLogger(JanusGraphClient.class.getName());
53 private static Logger healthLogger = LoggerFactory.getLogger("janusgraph.healthcheck");
55 private static final String HEALTH_CHECK = GraphPropertiesDictionary.HEALTH_CHECK.getProperty();
56 private static final String OK = "GOOD";
58 public JanusGraphClient() {
61 private class HealthCheckTask implements Callable<Vertex> {
63 public Vertex call() {
65 JanusGraphVertex v = (JanusGraphVertex) graph.query().has(HEALTH_CHECK, OK).vertices().iterator().next();
66 JanusGraphVertexProperty<String> property = v.property("healthcheck", OK + "_" + System.currentTimeMillis());
67 healthLogger.trace("Health Check Node Found...{}", v.property(HEALTH_CHECK));
74 private class HealthCheckScheduledTask implements Runnable {
77 healthLogger.trace("Executing janusGraph Health Check Task - Start");
78 boolean healthStatus = isGraphOpen();
79 healthLogger.trace("Executing janusGraph Health Check Task - Status = {}", healthStatus);
80 if (healthStatus != lastHealthState) {
81 logger.trace("janusGraph Health State Changed to {}. Issuing alarm / recovery alarm...", healthStatus);
82 lastHealthState = healthStatus;
88 private class ReconnectTask implements Runnable {
91 logger.trace("Trying to reconnect to JanusGraph...");
93 createGraph(janusGraphCfgFile);
98 private JanusGraph graph;
100 // Health Check Variables
103 * This executor will execute the health check task on a callable task that can be executed with a timeout.
105 ExecutorService healthCheckExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
107 public Thread newThread(Runnable r) {
108 return new Thread(r, "JanusGraph-Health-Check-Thread");
111 private long healthCheckReadTimeout = 2;
112 HealthCheckTask healthCallableTask = new HealthCheckTask();
113 HealthCheckScheduledTask healthCheckScheduledTask = new HealthCheckScheduledTask();
114 boolean lastHealthState = false;
116 // Reconnection variables
117 private ScheduledExecutorService reconnectScheduler = null;
118 private ScheduledExecutorService healthCheckScheduler = null;
119 private Runnable reconnectTask = null;
120 private long reconnectInterval = 3;
121 @SuppressWarnings("rawtypes")
122 private Future reconnectFuture;
124 private String janusGraphCfgFile = null;
125 JanusGraphClientStrategy janusGraphClientStrategy;
127 public JanusGraphClient(JanusGraphClientStrategy janusGraphClientStrategy) {
129 this.janusGraphClientStrategy = janusGraphClientStrategy;
131 // Initialize a single threaded scheduler for health-check
132 this.healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
134 public Thread newThread(Runnable r) {
135 return new Thread(r, "JanusGraph-Health-Check-Task");
139 healthCheckReadTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getJanusGraphHealthCheckReadTimeout(2);
140 reconnectInterval = ConfigurationManager.getConfigurationManager().getConfiguration().getJanusGraphReconnectIntervalInSeconds(3);
142 logger.info("** JanusGraphClient created");
146 public JanusGraphOperationStatus createGraph() {
148 logger.info("** createGraph started **");
150 if (ConfigurationManager.getConfigurationManager().getConfiguration().getJanusGraphInMemoryGraph()) {
151 BaseConfiguration conf = new BaseConfiguration();
152 conf.setProperty("storage.backend", "inmemory");
153 graph = JanusGraphFactory.open(conf);
154 createJanusGraphSchema();
155 logger.info("** in memory graph created");
156 return JanusGraphOperationStatus.OK;
158 this.janusGraphCfgFile = janusGraphClientStrategy.getConfigFile();
159 if (janusGraphCfgFile == null || janusGraphCfgFile.isEmpty()) {
160 janusGraphCfgFile = "config/janusgraph.properties";
164 // In case connection failed on init time, schedule a reconnect task
166 JanusGraphOperationStatus status = createGraph(janusGraphCfgFile);
167 logger.debug("Create JanusGraph graph status {}", status);
168 if (status != JanusGraphOperationStatus.OK) {
169 this.startReconnectTask();
176 private void startHealthCheckTask() {
177 this.healthCheckScheduler.scheduleAtFixedRate(healthCheckScheduledTask, 0, reconnectInterval, TimeUnit.SECONDS);
181 * This method will be invoked ONLY on init time in case JanusGraph storage is down.
183 private void startReconnectTask() {
184 this.reconnectTask = new ReconnectTask();
185 // Initialize a single threaded scheduler
186 this.reconnectScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
188 public Thread newThread(Runnable r) {
189 return new Thread(r, "JanusGraph-Reconnect-Task");
193 logger.info("Scheduling reconnect task {} with interval of {} seconds", reconnectTask, reconnectInterval);
194 reconnectFuture = this.reconnectScheduler.scheduleAtFixedRate(this.reconnectTask, 0, this.reconnectInterval, TimeUnit.SECONDS);
197 public void cleanupGraph() {
202 JanusGraphFactory.drop(graph);
203 } catch (BackendException e) {
209 private boolean graphInitialized(){
210 JanusGraphManagement graphMgmt = graph.openManagement();
211 return graphMgmt.containsPropertyKey(HEALTH_CHECK) && graphMgmt.containsGraphIndex(HEALTH_CHECK);
215 public JanusGraphOperationStatus createGraph(String janusGraphCfgFile) {
216 logger.info("** open graph with {} started", janusGraphCfgFile);
218 logger.info("openGraph : try to load file {}", janusGraphCfgFile);
219 graph = JanusGraphFactory.open(janusGraphCfgFile);
220 if (graph.isClosed() || !graphInitialized()) {
221 logger.error("janusgraph graph was not initialized");
222 return JanusGraphOperationStatus.NOT_CREATED;
225 } catch (Exception e) {
227 logger.info("createGraph : failed to open JanusGraph graph with configuration file: {}", janusGraphCfgFile);
228 logger.debug("createGraph : failed with exception.", e);
229 return JanusGraphOperationStatus.NOT_CONNECTED;
232 logger.info("** JanusGraph graph created ");
234 // Do some post creation actions
235 this.onGraphOpened();
237 return JanusGraphOperationStatus.OK;
240 private void onGraphOpened() {
241 // if a reconnect task is running, cancel it.
242 if (this.reconnectFuture != null) {
243 logger.info("** Cancelling JanusGraph reconnect task");
244 reconnectFuture.cancel(true);
247 // create health-check node
248 if (!graph.query().has(HEALTH_CHECK, OK).vertices().iterator().hasNext()) {
249 logger.trace("Healthcheck Singleton node does not exist, Creating healthcheck node...");
250 Vertex healthCheckNode = graph.addVertex();
251 healthCheckNode.property(HEALTH_CHECK, OK);
252 logger.trace("Healthcheck node created successfully. ID={}", healthCheckNode.property(T.id.getAccessor()));
255 logger.trace("Skipping Healthcheck Singleton node creation. Already exist...");
257 this.startHealthCheckTask();
261 public Either<JanusGraph, JanusGraphOperationStatus> getGraph() {
263 return Either.left(graph);
265 return Either.right(JanusGraphOperationStatus.NOT_CREATED);
269 public JanusGraphOperationStatus commit() {
273 return JanusGraphOperationStatus.OK;
274 } catch (Exception e) {
275 return handleJanusGraphException(e);
278 return JanusGraphOperationStatus.NOT_CREATED;
282 public JanusGraphOperationStatus rollback() {
286 graph.tx().rollback();
287 return JanusGraphOperationStatus.OK;
288 } catch (Exception e) {
289 return handleJanusGraphException(e);
292 return JanusGraphOperationStatus.NOT_CREATED;
296 public static JanusGraphOperationStatus handleJanusGraphException(Exception e) {
297 if (e instanceof JanusGraphConfigurationException) {
298 return JanusGraphOperationStatus.JANUSGRAPH_CONFIGURATION;
300 if (e instanceof SchemaViolationException) {
301 return JanusGraphOperationStatus.JANUSGRAPH_SCHEMA_VIOLATION;
303 if (e instanceof PermanentLockingException) {
304 return JanusGraphOperationStatus.JANUSGRAPH_SCHEMA_VIOLATION;
306 if (e instanceof IDPoolExhaustedException) {
307 return JanusGraphOperationStatus.GENERAL_ERROR;
309 if (e instanceof InvalidElementException) {
310 return JanusGraphOperationStatus.INVALID_ELEMENT;
312 if (e instanceof InvalidIDException) {
313 return JanusGraphOperationStatus.INVALID_ID;
315 if (e instanceof QueryException) {
316 return JanusGraphOperationStatus.INVALID_QUERY;
318 if (e instanceof ResourceUnavailableException) {
319 return JanusGraphOperationStatus.RESOURCE_UNAVAILABLE;
321 if (e instanceof IllegalArgumentException) {
322 // TODO check the error message??
323 return JanusGraphOperationStatus.ILLEGAL_ARGUMENT;
326 return JanusGraphOperationStatus.GENERAL_ERROR;
329 public boolean getHealth() {
330 return this.lastHealthState;
333 private boolean isGraphOpen() {
334 healthLogger.trace("Invoking JanusGraph health check ...");
338 Future<Vertex> future = healthCheckExecutor.submit(healthCallableTask);
339 v = future.get(this.healthCheckReadTimeout, TimeUnit.SECONDS);
340 healthLogger.trace("Health Check Node Found... {}", v.property(HEALTH_CHECK));
342 } catch (Exception e) {
343 String message = e.getMessage();
344 if (message == null) {
345 message = e.getClass().getName();
347 logger.error("JanusGraph Health Check Failed. {}", message);
357 public static void main(String[] args) throws InterruptedException {
358 JanusGraphClient client = new JanusGraphClient(new DAOJanusGraphStrategy());
359 client.createGraph();
362 boolean health = client.isGraphOpen();
363 System.err.println("health=" + health);
370 private static final String JANUSGRAPH_HEALTH_CHECK = "janusgraphHealthCheck";
372 private void logAlarm() {
373 if (lastHealthState) {
374 BeEcompErrorManager.getInstance().logBeHealthCheckJanusGraphRecovery(JANUSGRAPH_HEALTH_CHECK);
376 BeEcompErrorManager.getInstance().logBeHealthCheckJanusGraphError(JANUSGRAPH_HEALTH_CHECK);
380 private void createJanusGraphSchema() {
382 JanusGraphManagement graphMgt = graph.openManagement();
383 JanusGraphIndex index = null;
384 for (GraphPropertiesDictionary prop : GraphPropertiesDictionary.values()) {
385 PropertyKey propKey = null;
386 if (!graphMgt.containsPropertyKey(prop.getProperty())) {
387 Class<?> clazz = prop.getClazz();
388 if (!clazz.isAssignableFrom(ArrayList.class) && !clazz.isAssignableFrom(HashMap.class)) {
389 propKey = graphMgt.makePropertyKey(prop.getProperty()).dataType(prop.getClazz()).make();
392 propKey = graphMgt.getPropertyKey(prop.getProperty());
394 if (prop.isIndexed()) {
395 if (!graphMgt.containsGraphIndex(prop.getProperty())) {
396 if (prop.isUnique()) {
397 index = graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).unique().buildCompositeIndex();
398 // Ensures only one name per vertex
399 graphMgt.setConsistency(propKey, ConsistencyModifier.LOCK);
400 // Ensures name uniqueness in the graph
401 graphMgt.setConsistency(index, ConsistencyModifier.LOCK);
404 graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).buildCompositeIndex();