2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.aai.dbgen;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
26 import java.io.BufferedReader;
27 import java.io.ByteArrayInputStream;
28 import java.io.IOException;
29 import java.io.InputStream;
30 import java.io.InputStreamReader;
31 import java.io.OutputStream;
32 import java.util.HashMap;
33 import java.util.Iterator;
35 import java.util.concurrent.atomic.AtomicLong;
36 import java.util.function.Function;
37 import java.util.stream.Stream;
39 import org.apache.tinkerpop.gremlin.structure.Direction;
40 import org.apache.tinkerpop.gremlin.structure.Edge;
41 import org.apache.tinkerpop.gremlin.structure.Graph;
42 import org.apache.tinkerpop.gremlin.structure.Property;
43 import org.apache.tinkerpop.gremlin.structure.T;
44 import org.apache.tinkerpop.gremlin.structure.Vertex;
45 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
46 import org.apache.tinkerpop.gremlin.structure.io.GraphReader;
47 import org.apache.tinkerpop.gremlin.structure.io.GraphWriter;
48 import org.apache.tinkerpop.gremlin.structure.io.Mapper;
49 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
50 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONReader;
51 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
52 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONWriter;
53 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
54 import org.apache.tinkerpop.gremlin.structure.util.Attachable;
55 import org.apache.tinkerpop.gremlin.structure.util.Host;
56 import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
57 import org.apache.tinkerpop.gremlin.util.function.FunctionUtils;
58 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
59 import org.apache.tinkerpop.shaded.jackson.core.type.TypeReference;
60 import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
61 import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
62 import org.apache.tinkerpop.shaded.jackson.databind.node.JsonNodeType;
65 * This is a Wrapper around the GraphsonReader class
66 * The idea is to rewrite methods that are customized for A&AI
67 * GraphsonReader is a final class . hence the use of the Wrapper
68 * instead of inheriting-overwriting
72 public final class GraphSONPartialReader implements GraphReader {
73 private final ObjectMapper mapper;
74 private final long batchSize;
75 private boolean unwrapAdjacencyList = false;
76 private final GraphSONReader reader;
78 private static final Logger LOGGER = LoggerFactory.getLogger(GraphSONPartialReader.class);
80 final TypeReference<Map<String, Object>> mapTypeReference = new TypeReference<Map<String, Object>>() {};
82 private GraphSONPartialReader(final Builder builder) {
83 mapper = builder.mapper.createMapper();
84 batchSize = builder.batchSize;
85 unwrapAdjacencyList = builder.unwrapAdjacencyList;
86 reader = GraphSONReader.build().create();
90 * Read data into a {@link Graph} from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
91 * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
93 * @param inputStream a stream containing an entire graph of vertices and edges as defined by the accompanying
94 * {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
95 * @param graphToWriteTo the graph to write to when reading from the stream.
98 public void readGraph(final InputStream inputStream, final Graph graphToWriteTo) throws IOException {
99 // dual pass - create all vertices and store to cache the ids. then create edges. as long as we don't
100 // have vertex labels in the output we can't do this single pass
101 LOGGER.info("Read the Partial Graph");
102 final Map<StarGraph.StarVertex, Vertex> cache = new HashMap<>();
103 final AtomicLong counter = new AtomicLong(0);
105 final boolean supportsTx = graphToWriteTo.features().graph().supportsTransactions();
106 final Graph.Features.EdgeFeatures edgeFeatures = graphToWriteTo.features().edge();
108 readVertexStrings(inputStream)
109 .<Vertex>map(FunctionUtils.wrapFunction(
110 line -> readVertex(new ByteArrayInputStream(line.getBytes()), null, null, Direction.IN)))
113 final Attachable<Vertex> attachable = (Attachable<Vertex>) vertex;
114 cache.put((StarGraph.StarVertex) attachable.get(),
115 attachable.attach(Attachable.Method.create(graphToWriteTo)));
116 if (supportsTx && counter.incrementAndGet() % batchSize == 0)
117 graphToWriteTo.tx().commit();
118 } catch (Exception ex) {
119 LOGGER.info(String.format("Error in reading vertex from graphson%s", vertex.toString()));
123 cache.entrySet().forEach(kv -> kv.getKey().edges(Direction.IN).forEachRemaining(e -> {
125 // can't use a standard Attachable attach method here because we have to use the cache for those
126 // graphs that don't support userSuppliedIds on edges. note that outVertex/inVertex methods return
127 // StarAdjacentVertex whose equality should match StarVertex.
128 final Vertex cachedOutV = cache.get(e.outVertex());
129 final Vertex cachedInV = cache.get(e.inVertex());
131 if (cachedOutV != null && cachedInV != null) {
134 edgeFeatures.willAllowId(e.id()) ? cachedOutV.addEdge(e.label(), cachedInV, T.id, e.id())
135 : cachedOutV.addEdge(e.label(), cachedInV);
136 e.properties().forEachRemaining(p -> newEdge.property(p.key(), p.value()));
138 LOGGER.debug(String.format("Ghost edges from %s to %s", cachedOutV, cachedInV));
141 if (supportsTx && counter.incrementAndGet() % batchSize == 0)
142 graphToWriteTo.tx().commit();
143 } catch (Exception ex) {
144 LOGGER.info(String.format("Error in writing vertex into graph%s", e.toString()));
149 graphToWriteTo.tx().commit();
153 * Read {@link Vertex} objects from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
154 * {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
156 * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
157 * {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} or
158 * {@link GraphWriter#writeVertices(OutputStream, Iterator)} methods.
159 * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
160 * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
161 * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
164 public Iterator<Vertex> readVertices(final InputStream inputStream,
165 final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
166 final Function<Attachable<Edge>, Edge> edgeAttachMethod, final Direction attachEdgesOfThisDirection)
168 return reader.readVertices(inputStream, vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection);
173 * Read a {@link Vertex} from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
174 * {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
176 * @param inputStream a stream containing at least a single vertex as defined by the accompanying
177 * {@link GraphWriter#writeVertex(OutputStream, Vertex)}.
178 * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
181 public Vertex readVertex(final InputStream inputStream,
182 final Function<Attachable<Vertex>, Vertex> vertexAttachMethod) throws IOException {
183 return reader.readVertex(inputStream, vertexAttachMethod);
187 * Read a {@link Vertex} from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
188 * {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
190 * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
191 * {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} method.
192 * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
193 * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
194 * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
197 public Vertex readVertex(final InputStream inputStream,
198 final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
199 final Function<Attachable<Edge>, Edge> edgeAttachMethod, final Direction attachEdgesOfThisDirection)
202 return reader.readVertex(inputStream, vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection);
206 * Read an {@link Edge} from output generated by {@link GraphSONWriter#writeEdge(OutputStream, Edge)} or via
207 * an {@link Edge} passed to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
209 * @param inputStream a stream containing at least one {@link Edge} as defined by the accompanying
210 * {@link GraphWriter#writeEdge(OutputStream, Edge)} method.
211 * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
214 public Edge readEdge(final InputStream inputStream, final Function<Attachable<Edge>, Edge> edgeAttachMethod)
216 return reader.readEdge(inputStream, edgeAttachMethod);
220 * Read a {@link VertexProperty} from output generated by
221 * {@link GraphSONWriter#writeVertexProperty(OutputStream, VertexProperty)} or via an {@link VertexProperty} passed
222 * to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
224 * @param inputStream a stream containing at least one {@link VertexProperty} as written by the accompanying
225 * {@link GraphWriter#writeVertexProperty(OutputStream, VertexProperty)} method.
226 * @param vertexPropertyAttachMethod a function that creates re-attaches a {@link VertexProperty} to a
227 * {@link Host} object.
230 public VertexProperty readVertexProperty(final InputStream inputStream,
231 final Function<Attachable<VertexProperty>, VertexProperty> vertexPropertyAttachMethod) throws IOException {
232 return reader.readVertexProperty(inputStream, vertexPropertyAttachMethod);
236 * Read a {@link Property} from output generated by {@link GraphSONWriter#writeProperty(OutputStream, Property)} or
237 * via an {@link Property} passed to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
239 * @param inputStream a stream containing at least one {@link Property} as written by the accompanying
240 * {@link GraphWriter#writeProperty(OutputStream, Property)} method.
241 * @param propertyAttachMethod a function that creates re-attaches a {@link Property} to a {@link Host} object.
244 public Property readProperty(final InputStream inputStream,
245 final Function<Attachable<Property>, Property> propertyAttachMethod) throws IOException {
246 return reader.readProperty(inputStream, propertyAttachMethod);
253 public <C> C readObject(final InputStream inputStream, final Class<? extends C> clazz) throws IOException {
254 return mapper.readValue(inputStream, clazz);
257 private Stream<String> readVertexStrings(final InputStream inputStream) throws IOException {
258 if (unwrapAdjacencyList) {
259 final JsonNode root = mapper.readTree(inputStream);
260 final JsonNode vertices = root.get(GraphSONTokens.VERTICES);
261 if (!vertices.getNodeType().equals(JsonNodeType.ARRAY))
262 throw new IOException(String.format("The '%s' key must be an array", GraphSONTokens.VERTICES));
263 return IteratorUtils.stream(vertices.elements()).map(Object::toString);
265 final BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
271 public static Builder build() {
272 return new Builder();
275 public static final class Builder implements ReaderBuilder<GraphSONPartialReader> {
276 private long batchSize = 10000;
278 private Mapper<ObjectMapper> mapper = GraphSONMapper.build().create();
279 private boolean unwrapAdjacencyList = false;
285 * Number of mutations to perform before a commit is executed when using
286 * {@link GraphSONPartialReader#readGraph(InputStream, Graph)}.
288 public Builder batchSize(final long batchSize) {
289 this.batchSize = batchSize;
294 * Override all of the {@link GraphSONMapper} builder
295 * options with this mapper. If this value is set to something other than null then that value will be
296 * used to construct the writer.
298 public Builder mapper(final Mapper<ObjectMapper> mapper) {
299 this.mapper = mapper;
304 * If the adjacency list is wrapped in a JSON object, as is done when writing a graph with
305 * {@link GraphSONWriter.Builder#wrapAdjacencyList} set to {@code true}, this setting needs to be set to
306 * {@code true} to properly read it. By default, this value is {@code false} and the adjacency list is
307 * simply read as line delimited vertices.
309 * By setting this value to {@code true}, the generated JSON is no longer "splittable" by line and thus not
310 * suitable for OLAP processing. Furthermore, reading this format of the JSON with
311 * {@link GraphSONPartialReader#readGraph(InputStream, Graph)} or
312 * {@link GraphSONPartialReader#readVertices(InputStream, Function, Function, Direction)} requires that the
313 * entire JSON object be read into memory, so it is best saved for "small" graphs.
315 public Builder unwrapAdjacencyList(final boolean unwrapAdjacencyList) {
316 this.unwrapAdjacencyList = unwrapAdjacencyList;
320 public GraphSONPartialReader create() {
321 return new GraphSONPartialReader(this);