* limitations under the License.
* ============LICENSE_END=========================================================
*/
+
package org.onap.aai.dbgen;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.shaded.jackson.databind.node.JsonNodeType;
import org.onap.aai.dbmap.InMemoryGraph;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Function;
-import java.util.stream.Stream;
-
/**
* This is a Wrapper around the GraphsonReader class
* The idea is to rewrite methods that are customized for A&AI
*
*/
public final class GraphSONPartialReader implements GraphReader {
- private final ObjectMapper mapper ;
- private final long batchSize ;
- private final GraphSONVersion version ;
+ private final ObjectMapper mapper;
+ private final long batchSize;
+ private final GraphSONVersion version;
private boolean unwrapAdjacencyList = false;
private final GraphSONReader reader;
-
+
private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(InMemoryGraph.class);
- final TypeReference<Map<String, Object>> mapTypeReference = new TypeReference<Map<String, Object>>() {
- };
+ final TypeReference<Map<String, Object>> mapTypeReference = new TypeReference<Map<String, Object>>() {};
private GraphSONPartialReader(final Builder builder) {
mapper = builder.mapper.createMapper();
batchSize = builder.batchSize;
unwrapAdjacencyList = builder.unwrapAdjacencyList;
- version = ((GraphSONMapper)builder.mapper).getVersion();
+ version = ((GraphSONMapper) builder.mapper).getVersion();
reader = GraphSONReader.build().create();
}
* {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
*
* @param inputStream a stream containing an entire graph of vertices and edges as defined by the accompanying
- * {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
+ * {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
* @param graphToWriteTo the graph to write to when reading from the stream.
*/
@Override
public void readGraph(final InputStream inputStream, final Graph graphToWriteTo) throws IOException {
- // dual pass - create all vertices and store to cache the ids. then create edges. as long as we don't
+ // dual pass - create all vertices and store to cache the ids. then create edges. as long as we don't
// have vertex labels in the output we can't do this single pass
- LOGGER.info("Read the Partial Graph");
- final Map<StarGraph.StarVertex,Vertex> cache = new HashMap<>();
+ LOGGER.info("Read the Partial Graph");
+ final Map<StarGraph.StarVertex, Vertex> cache = new HashMap<>();
final AtomicLong counter = new AtomicLong(0);
-
+
final boolean supportsTx = graphToWriteTo.features().graph().supportsTransactions();
final Graph.Features.EdgeFeatures edgeFeatures = graphToWriteTo.features().edge();
-
- readVertexStrings(inputStream).<Vertex>map(FunctionUtils.wrapFunction(line -> readVertex(new ByteArrayInputStream(line.getBytes()), null, null, Direction.IN))).forEach(vertex -> {
- try{
- final Attachable<Vertex> attachable = (Attachable<Vertex>) vertex;
- cache.put((StarGraph.StarVertex) attachable.get(), attachable.attach(Attachable.Method.create(graphToWriteTo)));
- if (supportsTx && counter.incrementAndGet() % batchSize == 0)
- graphToWriteTo.tx().commit();
- }
- catch(Exception ex){
- LOGGER.info("Error in reading vertex from graphson"+vertex.toString());
- }
- });
-
+
+ readVertexStrings(inputStream)
+ .<Vertex>map(FunctionUtils.wrapFunction(
+ line -> readVertex(new ByteArrayInputStream(line.getBytes()), null, null, Direction.IN)))
+ .forEach(vertex -> {
+ try {
+ final Attachable<Vertex> attachable = (Attachable<Vertex>) vertex;
+ cache.put((StarGraph.StarVertex) attachable.get(),
+ attachable.attach(Attachable.Method.create(graphToWriteTo)));
+ if (supportsTx && counter.incrementAndGet() % batchSize == 0)
+ graphToWriteTo.tx().commit();
+ } catch (Exception ex) {
+ LOGGER.info("Error in reading vertex from graphson" + vertex.toString());
+ }
+ });
+
cache.entrySet().forEach(kv -> kv.getKey().edges(Direction.IN).forEachRemaining(e -> {
- try{
- // can't use a standard Attachable attach method here because we have to use the cache for those
- // graphs that don't support userSuppliedIds on edges. note that outVertex/inVertex methods return
- // StarAdjacentVertex whose equality should match StarVertex.
- final Vertex cachedOutV = cache.get(e.outVertex());
- final Vertex cachedInV = cache.get(e.inVertex());
-
- if(cachedOutV != null && cachedInV != null){
-
- final Edge newEdge = edgeFeatures.willAllowId(e.id()) ? cachedOutV.addEdge(e.label(), cachedInV, T.id, e.id()) : cachedOutV.addEdge(e.label(), cachedInV);
- e.properties().forEachRemaining(p -> newEdge.property(p.key(), p.value()));
- }
- else{
- LOGGER.debug("Ghost edges from "+ cachedOutV + " to "+ cachedInV);
-
- }
- if (supportsTx && counter.incrementAndGet() % batchSize == 0)
- graphToWriteTo.tx().commit();
- }
- catch(Exception ex){
- LOGGER.info("Error in writing vertex into graph"+e.toString());
- }
+ try {
+ // can't use a standard Attachable attach method here because we have to use the cache for those
+ // graphs that don't support userSuppliedIds on edges. note that outVertex/inVertex methods return
+ // StarAdjacentVertex whose equality should match StarVertex.
+ final Vertex cachedOutV = cache.get(e.outVertex());
+ final Vertex cachedInV = cache.get(e.inVertex());
+
+ if (cachedOutV != null && cachedInV != null) {
+
+ final Edge newEdge =
+ edgeFeatures.willAllowId(e.id()) ? cachedOutV.addEdge(e.label(), cachedInV, T.id, e.id())
+ : cachedOutV.addEdge(e.label(), cachedInV);
+ e.properties().forEachRemaining(p -> newEdge.property(p.key(), p.value()));
+ } else {
+ LOGGER.debug("Ghost edges from " + cachedOutV + " to " + cachedInV);
+
+ }
+ if (supportsTx && counter.incrementAndGet() % batchSize == 0)
+ graphToWriteTo.tx().commit();
+ } catch (Exception ex) {
+ LOGGER.info("Error in writing vertex into graph" + e.toString());
+ }
}));
- if (supportsTx) graphToWriteTo.tx().commit();
+ if (supportsTx)
+ graphToWriteTo.tx().commit();
}
/**
* {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
*
* @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
- * {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} or
- * {@link GraphWriter#writeVertices(OutputStream, Iterator)} methods.
+ * {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} or
+ * {@link GraphWriter#writeVertices(OutputStream, Iterator)} methods.
* @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
* @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
* @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
*/
@Override
public Iterator<Vertex> readVertices(final InputStream inputStream,
- final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
- final Function<Attachable<Edge>, Edge> edgeAttachMethod,
- final Direction attachEdgesOfThisDirection) throws IOException {
- // return readVertexStrings(inputStream).<Vertex>map(FunctionUtils.wrapFunction(line -> readVertex(new ByteArrayInputStream(line.getBytes()), vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection))).iterator();
+ final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
+ final Function<Attachable<Edge>, Edge> edgeAttachMethod, final Direction attachEdgesOfThisDirection)
+ throws IOException {
+ // return readVertexStrings(inputStream).<Vertex>map(FunctionUtils.wrapFunction(line -> readVertex(new
+ // ByteArrayInputStream(line.getBytes()), vertexAttachMethod, edgeAttachMethod,
+ // attachEdgesOfThisDirection))).iterator();
return reader.readVertices(inputStream, vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection);
-
+
}
/**
- * Read a {@link Vertex} from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
+ * Read a {@link Vertex} from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
* {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
*
* @param inputStream a stream containing at least a single vertex as defined by the accompanying
- * {@link GraphWriter#writeVertex(OutputStream, Vertex)}.
+ * {@link GraphWriter#writeVertex(OutputStream, Vertex)}.
* @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
*/
@Override
- public Vertex readVertex(final InputStream inputStream, final Function<Attachable<Vertex>, Vertex> vertexAttachMethod) throws IOException {
+ public Vertex readVertex(final InputStream inputStream,
+ final Function<Attachable<Vertex>, Vertex> vertexAttachMethod) throws IOException {
return reader.readVertex(inputStream, vertexAttachMethod);
}
* {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
*
* @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
- * {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} method.
+ * {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} method.
* @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
* @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
* @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
*/
@Override
public Vertex readVertex(final InputStream inputStream,
- final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
- final Function<Attachable<Edge>, Edge> edgeAttachMethod,
- final Direction attachEdgesOfThisDirection) throws IOException {
-
- return reader.readVertex(inputStream, vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection);
+ final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
+ final Function<Attachable<Edge>, Edge> edgeAttachMethod, final Direction attachEdgesOfThisDirection)
+ throws IOException {
+
+ return reader.readVertex(inputStream, vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection);
}
/**
* an {@link Edge} passed to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
*
* @param inputStream a stream containing at least one {@link Edge} as defined by the accompanying
- * {@link GraphWriter#writeEdge(OutputStream, Edge)} method.
+ * {@link GraphWriter#writeEdge(OutputStream, Edge)} method.
* @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
*/
@Override
- public Edge readEdge(final InputStream inputStream, final Function<Attachable<Edge>, Edge> edgeAttachMethod) throws IOException {
- /*if (version == GraphSONVersion.v1_0) {
- final Map<String, Object> edgeData = mapper.readValue(inputStream, mapTypeReference);
-
- final Map<String, Object> edgeProperties = edgeData.containsKey(GraphSONTokens.PROPERTIES) ?
- (Map<String, Object>) edgeData.get(GraphSONTokens.PROPERTIES) : Collections.EMPTY_MAP;
- final DetachedEdge edge = new DetachedEdge(edgeData.get(GraphSONTokens.ID),
- edgeData.get(GraphSONTokens.LABEL).toString(),
- edgeProperties,
- Pair.with(edgeData.get(GraphSONTokens.OUT), edgeData.get(GraphSONTokens.OUT_LABEL).toString()),
- Pair.with(edgeData.get(GraphSONTokens.IN), edgeData.get(GraphSONTokens.IN_LABEL).toString()));
-
- return edgeAttachMethod.apply(edge);
- } else {
- return edgeAttachMethod.apply((DetachedEdge) mapper.readValue(inputStream, Edge.class));
- }*/
- return reader.readEdge(inputStream, edgeAttachMethod);
+ public Edge readEdge(final InputStream inputStream, final Function<Attachable<Edge>, Edge> edgeAttachMethod)
+ throws IOException {
+ /*
+ * if (version == GraphSONVersion.v1_0) {
+ * final Map<String, Object> edgeData = mapper.readValue(inputStream, mapTypeReference);
+ *
+ * final Map<String, Object> edgeProperties = edgeData.containsKey(GraphSONTokens.PROPERTIES) ?
+ * (Map<String, Object>) edgeData.get(GraphSONTokens.PROPERTIES) : Collections.EMPTY_MAP;
+ * final DetachedEdge edge = new DetachedEdge(edgeData.get(GraphSONTokens.ID),
+ * edgeData.get(GraphSONTokens.LABEL).toString(),
+ * edgeProperties,
+ * Pair.with(edgeData.get(GraphSONTokens.OUT), edgeData.get(GraphSONTokens.OUT_LABEL).toString()),
+ * Pair.with(edgeData.get(GraphSONTokens.IN), edgeData.get(GraphSONTokens.IN_LABEL).toString()));
+ *
+ * return edgeAttachMethod.apply(edge);
+ * } else {
+ * return edgeAttachMethod.apply((DetachedEdge) mapper.readValue(inputStream, Edge.class));
+ * }
+ */
+ return reader.readEdge(inputStream, edgeAttachMethod);
}
/**
* to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
*
* @param inputStream a stream containing at least one {@link VertexProperty} as written by the accompanying
- * {@link GraphWriter#writeVertexProperty(OutputStream, VertexProperty)} method.
+ * {@link GraphWriter#writeVertexProperty(OutputStream, VertexProperty)} method.
* @param vertexPropertyAttachMethod a function that creates re-attaches a {@link VertexProperty} to a
- * {@link Host} object.
+ * {@link Host} object.
*/
@Override
public VertexProperty readVertexProperty(final InputStream inputStream,
- final Function<Attachable<VertexProperty>, VertexProperty> vertexPropertyAttachMethod) throws IOException {
- /*if (version == GraphSONVersion.v1_0) {
- final Map<String, Object> vpData = mapper.readValue(inputStream, mapTypeReference);
- final Map<String, Object> metaProperties = (Map<String, Object>) vpData.get(GraphSONTokens.PROPERTIES);
- final DetachedVertexProperty vp = new DetachedVertexProperty(vpData.get(GraphSONTokens.ID),
- vpData.get(GraphSONTokens.LABEL).toString(),
- vpData.get(GraphSONTokens.VALUE), metaProperties);
- return vertexPropertyAttachMethod.apply(vp);
- } else {
- return vertexPropertyAttachMethod.apply((DetachedVertexProperty) mapper.readValue(inputStream, VertexProperty.class));
- }*/
- return reader.readVertexProperty(inputStream, vertexPropertyAttachMethod);
+ final Function<Attachable<VertexProperty>, VertexProperty> vertexPropertyAttachMethod) throws IOException {
+ /*
+ * if (version == GraphSONVersion.v1_0) {
+ * final Map<String, Object> vpData = mapper.readValue(inputStream, mapTypeReference);
+ * final Map<String, Object> metaProperties = (Map<String, Object>) vpData.get(GraphSONTokens.PROPERTIES);
+ * final DetachedVertexProperty vp = new DetachedVertexProperty(vpData.get(GraphSONTokens.ID),
+ * vpData.get(GraphSONTokens.LABEL).toString(),
+ * vpData.get(GraphSONTokens.VALUE), metaProperties);
+ * return vertexPropertyAttachMethod.apply(vp);
+ * } else {
+ * return vertexPropertyAttachMethod.apply((DetachedVertexProperty) mapper.readValue(inputStream,
+ * VertexProperty.class));
+ * }
+ */
+ return reader.readVertexProperty(inputStream, vertexPropertyAttachMethod);
}
/**
- * Read a {@link Property} from output generated by {@link GraphSONWriter#writeProperty(OutputStream, Property)} or
+ * Read a {@link Property} from output generated by {@link GraphSONWriter#writeProperty(OutputStream, Property)} or
* via an {@link Property} passed to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
*
* @param inputStream a stream containing at least one {@link Property} as written by the accompanying
- * {@link GraphWriter#writeProperty(OutputStream, Property)} method.
+ * {@link GraphWriter#writeProperty(OutputStream, Property)} method.
* @param propertyAttachMethod a function that creates re-attaches a {@link Property} to a {@link Host} object.
*/
@Override
public Property readProperty(final InputStream inputStream,
- final Function<Attachable<Property>, Property> propertyAttachMethod) throws IOException {
- /*if (version == GraphSONVersion.v1_0) {
- final Map<String, Object> propertyData = mapper.readValue(inputStream, mapTypeReference);
- final DetachedProperty p = new DetachedProperty(propertyData.get(GraphSONTokens.KEY).toString(), propertyData.get(GraphSONTokens.VALUE));
- return propertyAttachMethod.apply(p);
- } else {
- return propertyAttachMethod.apply((DetachedProperty) mapper.readValue(inputStream, Property.class));
- }*/
- return reader.readProperty(inputStream, propertyAttachMethod);
+ final Function<Attachable<Property>, Property> propertyAttachMethod) throws IOException {
+ /*
+ * if (version == GraphSONVersion.v1_0) {
+ * final Map<String, Object> propertyData = mapper.readValue(inputStream, mapTypeReference);
+ * final DetachedProperty p = new DetachedProperty(propertyData.get(GraphSONTokens.KEY).toString(),
+ * propertyData.get(GraphSONTokens.VALUE));
+ * return propertyAttachMethod.apply(p);
+ * } else {
+ * return propertyAttachMethod.apply((DetachedProperty) mapper.readValue(inputStream, Property.class));
+ * }
+ */
+ return reader.readProperty(inputStream, propertyAttachMethod);
}
/**
private Stream<String> readVertexStrings(final InputStream inputStream) throws IOException {
if (unwrapAdjacencyList) {
- final JsonNode root = mapper.readTree(inputStream);
+ final JsonNode root = mapper.readTree(inputStream);
final JsonNode vertices = root.get(GraphSONTokens.VERTICES);
- if (!vertices.getNodeType().equals(JsonNodeType.ARRAY)) throw new IOException(String.format("The '%s' key must be an array", GraphSONTokens.VERTICES));
+ if (!vertices.getNodeType().equals(JsonNodeType.ARRAY))
+ throw new IOException(String.format("The '%s' key must be an array", GraphSONTokens.VERTICES));
return IteratorUtils.stream(vertices.elements()).map(Object::toString);
} else {
- final BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
+ final BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
return br.lines();
}
-
+
}
-
public static Builder build() {
return new Builder();
}
private Mapper<ObjectMapper> mapper = GraphSONMapper.build().create();
private boolean unwrapAdjacencyList = false;
-
- private Builder() {}
+ private Builder() {
+ }
/**
* Number of mutations to perform before a commit is executed when using
/**
* Override all of the {@link GraphSONMapper} builder
- * options with this mapper. If this value is set to something other than null then that value will be
+ * options with this mapper. If this value is set to something other than null then that value will be
* used to construct the writer.
*/
public Builder mapper(final Mapper<ObjectMapper> mapper) {
/**
* If the adjacency list is wrapped in a JSON object, as is done when writing a graph with
* {@link GraphSONWriter.Builder#wrapAdjacencyList} set to {@code true}, this setting needs to be set to
- * {@code true} to properly read it. By default, this value is {@code false} and the adjacency list is
+ * {@code true} to properly read it. By default, this value is {@code false} and the adjacency list is
* simply read as line delimited vertices.
* <p/>
* By setting this value to {@code true}, the generated JSON is no longer "splittable" by line and thus not
- * suitable for OLAP processing. Furthermore, reading this format of the JSON with
+ * suitable for OLAP processing. Furthermore, reading this format of the JSON with
* {@link GraphSONPartialReader#readGraph(InputStream, Graph)} or
* {@link GraphSONPartialReader#readVertices(InputStream, Function, Function, Direction)} requires that the
* entire JSON object be read into memory, so it is best saved for "small" graphs.