fdc8e814704c95b8a284cf67fa90917be22b9fb8
[aai/aai-common.git] / aai-core / src / main / java / org / onap / aai / dbgen / GraphSONPartialReader.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *    http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.aai.dbgen;
22
23 import com.att.eelf.configuration.EELFLogger;
24 import com.att.eelf.configuration.EELFManager;
25
26 import java.io.BufferedReader;
27 import java.io.ByteArrayInputStream;
28 import java.io.IOException;
29 import java.io.InputStream;
30 import java.io.InputStreamReader;
31 import java.io.OutputStream;
32 import java.util.HashMap;
33 import java.util.Iterator;
34 import java.util.Map;
35 import java.util.concurrent.atomic.AtomicLong;
36 import java.util.function.Function;
37 import java.util.stream.Stream;
38
39 import org.apache.tinkerpop.gremlin.structure.Direction;
40 import org.apache.tinkerpop.gremlin.structure.Edge;
41 import org.apache.tinkerpop.gremlin.structure.Graph;
42 import org.apache.tinkerpop.gremlin.structure.Property;
43 import org.apache.tinkerpop.gremlin.structure.T;
44 import org.apache.tinkerpop.gremlin.structure.Vertex;
45 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
46 import org.apache.tinkerpop.gremlin.structure.io.GraphReader;
47 import org.apache.tinkerpop.gremlin.structure.io.GraphWriter;
48 import org.apache.tinkerpop.gremlin.structure.io.Mapper;
49 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
50 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONReader;
51 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
52 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONVersion;
53 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
54 import org.apache.tinkerpop.gremlin.structure.util.Attachable;
55 import org.apache.tinkerpop.gremlin.structure.util.Host;
56 import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
57 import org.apache.tinkerpop.gremlin.util.function.FunctionUtils;
58 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
59 import org.apache.tinkerpop.shaded.jackson.core.type.TypeReference;
60 import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
61 import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
62 import org.apache.tinkerpop.shaded.jackson.databind.node.JsonNodeType;
63 import org.onap.aai.dbmap.InMemoryGraph;
64
65 /**
66  * This is a Wrapper around the GraphsonReader class
67  * The idea is to rewrite methods that are customized for A&AI
68  * GraphsonReader is a final class . hence the use of the Wrapper
69  * instead of inheriting-overwriting
70  *
71  * 
72  */
73 public final class GraphSONPartialReader implements GraphReader {
74     private final ObjectMapper mapper;
75     private final long batchSize;
76     private final GraphSONVersion version;
77     private boolean unwrapAdjacencyList = false;
78     private final GraphSONReader reader;
79
80     private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(InMemoryGraph.class);
81
82     final TypeReference<Map<String, Object>> mapTypeReference = new TypeReference<Map<String, Object>>() {};
83
84     private GraphSONPartialReader(final Builder builder) {
85         mapper = builder.mapper.createMapper();
86         batchSize = builder.batchSize;
87         unwrapAdjacencyList = builder.unwrapAdjacencyList;
88         version = ((GraphSONMapper) builder.mapper).getVersion();
89         reader = GraphSONReader.build().create();
90     }
91
92     /**
93      * Read data into a {@link Graph} from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
94      * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
95      *
96      * @param inputStream a stream containing an entire graph of vertices and edges as defined by the accompanying
97      *        {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
98      * @param graphToWriteTo the graph to write to when reading from the stream.
99      */
100     @Override
101     public void readGraph(final InputStream inputStream, final Graph graphToWriteTo) throws IOException {
102         // dual pass - create all vertices and store to cache the ids. then create edges. as long as we don't
103         // have vertex labels in the output we can't do this single pass
104         LOGGER.info("Read the Partial Graph");
105         final Map<StarGraph.StarVertex, Vertex> cache = new HashMap<>();
106         final AtomicLong counter = new AtomicLong(0);
107
108         final boolean supportsTx = graphToWriteTo.features().graph().supportsTransactions();
109         final Graph.Features.EdgeFeatures edgeFeatures = graphToWriteTo.features().edge();
110
111         readVertexStrings(inputStream)
112                 .<Vertex>map(FunctionUtils.wrapFunction(
113                         line -> readVertex(new ByteArrayInputStream(line.getBytes()), null, null, Direction.IN)))
114                 .forEach(vertex -> {
115                     try {
116                         final Attachable<Vertex> attachable = (Attachable<Vertex>) vertex;
117                         cache.put((StarGraph.StarVertex) attachable.get(),
118                                 attachable.attach(Attachable.Method.create(graphToWriteTo)));
119                         if (supportsTx && counter.incrementAndGet() % batchSize == 0)
120                             graphToWriteTo.tx().commit();
121                     } catch (Exception ex) {
122                         LOGGER.info("Error in reading vertex from graphson" + vertex.toString());
123                     }
124                 });
125
126         cache.entrySet().forEach(kv -> kv.getKey().edges(Direction.IN).forEachRemaining(e -> {
127             try {
128                 // can't use a standard Attachable attach method here because we have to use the cache for those
129                 // graphs that don't support userSuppliedIds on edges. note that outVertex/inVertex methods return
130                 // StarAdjacentVertex whose equality should match StarVertex.
131                 final Vertex cachedOutV = cache.get(e.outVertex());
132                 final Vertex cachedInV = cache.get(e.inVertex());
133
134                 if (cachedOutV != null && cachedInV != null) {
135
136                     final Edge newEdge =
137                             edgeFeatures.willAllowId(e.id()) ? cachedOutV.addEdge(e.label(), cachedInV, T.id, e.id())
138                                     : cachedOutV.addEdge(e.label(), cachedInV);
139                     e.properties().forEachRemaining(p -> newEdge.property(p.key(), p.value()));
140                 } else {
141                     LOGGER.debug("Ghost edges from " + cachedOutV + " to " + cachedInV);
142
143                 }
144                 if (supportsTx && counter.incrementAndGet() % batchSize == 0)
145                     graphToWriteTo.tx().commit();
146             } catch (Exception ex) {
147                 LOGGER.info("Error in writing vertex into graph" + e.toString());
148             }
149         }));
150
151         if (supportsTx)
152             graphToWriteTo.tx().commit();
153     }
154
155     /**
156      * Read {@link Vertex} objects from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
157      * {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
158      *
159      * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
160      *        {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} or
161      *        {@link GraphWriter#writeVertices(OutputStream, Iterator)} methods.
162      * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
163      * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
164      * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
165      */
166     @Override
167     public Iterator<Vertex> readVertices(final InputStream inputStream,
168             final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
169             final Function<Attachable<Edge>, Edge> edgeAttachMethod, final Direction attachEdgesOfThisDirection)
170             throws IOException {
171         // return readVertexStrings(inputStream).<Vertex>map(FunctionUtils.wrapFunction(line -> readVertex(new
172         // ByteArrayInputStream(line.getBytes()), vertexAttachMethod, edgeAttachMethod,
173         // attachEdgesOfThisDirection))).iterator();
174         return reader.readVertices(inputStream, vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection);
175
176     }
177
178     /**
179      * Read a {@link Vertex} from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
180      * {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
181      *
182      * @param inputStream a stream containing at least a single vertex as defined by the accompanying
183      *        {@link GraphWriter#writeVertex(OutputStream, Vertex)}.
184      * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
185      */
186     @Override
187     public Vertex readVertex(final InputStream inputStream,
188             final Function<Attachable<Vertex>, Vertex> vertexAttachMethod) throws IOException {
189         return reader.readVertex(inputStream, vertexAttachMethod);
190     }
191
192     /**
193      * Read a {@link Vertex} from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
194      * {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
195      *
196      * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
197      *        {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} method.
198      * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
199      * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
200      * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
201      */
202     @Override
203     public Vertex readVertex(final InputStream inputStream,
204             final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
205             final Function<Attachable<Edge>, Edge> edgeAttachMethod, final Direction attachEdgesOfThisDirection)
206             throws IOException {
207
208         return reader.readVertex(inputStream, vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection);
209     }
210
211     /**
212      * Read an {@link Edge} from output generated by {@link GraphSONWriter#writeEdge(OutputStream, Edge)} or via
213      * an {@link Edge} passed to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
214      *
215      * @param inputStream a stream containing at least one {@link Edge} as defined by the accompanying
216      *        {@link GraphWriter#writeEdge(OutputStream, Edge)} method.
217      * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
218      */
219     @Override
220     public Edge readEdge(final InputStream inputStream, final Function<Attachable<Edge>, Edge> edgeAttachMethod)
221             throws IOException {
222         /*
223          * if (version == GraphSONVersion.v1_0) {
224          * final Map<String, Object> edgeData = mapper.readValue(inputStream, mapTypeReference);
225          * 
226          * final Map<String, Object> edgeProperties = edgeData.containsKey(GraphSONTokens.PROPERTIES) ?
227          * (Map<String, Object>) edgeData.get(GraphSONTokens.PROPERTIES) : Collections.EMPTY_MAP;
228          * final DetachedEdge edge = new DetachedEdge(edgeData.get(GraphSONTokens.ID),
229          * edgeData.get(GraphSONTokens.LABEL).toString(),
230          * edgeProperties,
231          * Pair.with(edgeData.get(GraphSONTokens.OUT), edgeData.get(GraphSONTokens.OUT_LABEL).toString()),
232          * Pair.with(edgeData.get(GraphSONTokens.IN), edgeData.get(GraphSONTokens.IN_LABEL).toString()));
233          * 
234          * return edgeAttachMethod.apply(edge);
235          * } else {
236          * return edgeAttachMethod.apply((DetachedEdge) mapper.readValue(inputStream, Edge.class));
237          * }
238          */
239         return reader.readEdge(inputStream, edgeAttachMethod);
240     }
241
242     /**
243      * Read a {@link VertexProperty} from output generated by
244      * {@link GraphSONWriter#writeVertexProperty(OutputStream, VertexProperty)} or via an {@link VertexProperty} passed
245      * to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
246      *
247      * @param inputStream a stream containing at least one {@link VertexProperty} as written by the accompanying
248      *        {@link GraphWriter#writeVertexProperty(OutputStream, VertexProperty)} method.
249      * @param vertexPropertyAttachMethod a function that creates re-attaches a {@link VertexProperty} to a
250      *        {@link Host} object.
251      */
252     @Override
253     public VertexProperty readVertexProperty(final InputStream inputStream,
254             final Function<Attachable<VertexProperty>, VertexProperty> vertexPropertyAttachMethod) throws IOException {
255         /*
256          * if (version == GraphSONVersion.v1_0) {
257          * final Map<String, Object> vpData = mapper.readValue(inputStream, mapTypeReference);
258          * final Map<String, Object> metaProperties = (Map<String, Object>) vpData.get(GraphSONTokens.PROPERTIES);
259          * final DetachedVertexProperty vp = new DetachedVertexProperty(vpData.get(GraphSONTokens.ID),
260          * vpData.get(GraphSONTokens.LABEL).toString(),
261          * vpData.get(GraphSONTokens.VALUE), metaProperties);
262          * return vertexPropertyAttachMethod.apply(vp);
263          * } else {
264          * return vertexPropertyAttachMethod.apply((DetachedVertexProperty) mapper.readValue(inputStream,
265          * VertexProperty.class));
266          * }
267          */
268         return reader.readVertexProperty(inputStream, vertexPropertyAttachMethod);
269     }
270
271     /**
272      * Read a {@link Property} from output generated by {@link GraphSONWriter#writeProperty(OutputStream, Property)} or
273      * via an {@link Property} passed to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
274      *
275      * @param inputStream a stream containing at least one {@link Property} as written by the accompanying
276      *        {@link GraphWriter#writeProperty(OutputStream, Property)} method.
277      * @param propertyAttachMethod a function that creates re-attaches a {@link Property} to a {@link Host} object.
278      */
279     @Override
280     public Property readProperty(final InputStream inputStream,
281             final Function<Attachable<Property>, Property> propertyAttachMethod) throws IOException {
282         /*
283          * if (version == GraphSONVersion.v1_0) {
284          * final Map<String, Object> propertyData = mapper.readValue(inputStream, mapTypeReference);
285          * final DetachedProperty p = new DetachedProperty(propertyData.get(GraphSONTokens.KEY).toString(),
286          * propertyData.get(GraphSONTokens.VALUE));
287          * return propertyAttachMethod.apply(p);
288          * } else {
289          * return propertyAttachMethod.apply((DetachedProperty) mapper.readValue(inputStream, Property.class));
290          * }
291          */
292         return reader.readProperty(inputStream, propertyAttachMethod);
293     }
294
295     /**
296      * {@inheritDoc}
297      */
298     @Override
299     public <C> C readObject(final InputStream inputStream, final Class<? extends C> clazz) throws IOException {
300         return mapper.readValue(inputStream, clazz);
301     }
302
303     private Stream<String> readVertexStrings(final InputStream inputStream) throws IOException {
304         if (unwrapAdjacencyList) {
305             final JsonNode root = mapper.readTree(inputStream);
306             final JsonNode vertices = root.get(GraphSONTokens.VERTICES);
307             if (!vertices.getNodeType().equals(JsonNodeType.ARRAY))
308                 throw new IOException(String.format("The '%s' key must be an array", GraphSONTokens.VERTICES));
309             return IteratorUtils.stream(vertices.elements()).map(Object::toString);
310         } else {
311             final BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
312             return br.lines();
313         }
314
315     }
316
317     public static Builder build() {
318         return new Builder();
319     }
320
321     public final static class Builder implements ReaderBuilder<GraphSONPartialReader> {
322         private long batchSize = 10000;
323
324         private Mapper<ObjectMapper> mapper = GraphSONMapper.build().create();
325         private boolean unwrapAdjacencyList = false;
326
327         private Builder() {
328         }
329
330         /**
331          * Number of mutations to perform before a commit is executed when using
332          * {@link GraphSONPartialReader#readGraph(InputStream, Graph)}.
333          */
334         public Builder batchSize(final long batchSize) {
335             this.batchSize = batchSize;
336             return this;
337         }
338
339         /**
340          * Override all of the {@link GraphSONMapper} builder
341          * options with this mapper. If this value is set to something other than null then that value will be
342          * used to construct the writer.
343          */
344         public Builder mapper(final Mapper<ObjectMapper> mapper) {
345             this.mapper = mapper;
346             return this;
347         }
348
349         /**
350          * If the adjacency list is wrapped in a JSON object, as is done when writing a graph with
351          * {@link GraphSONWriter.Builder#wrapAdjacencyList} set to {@code true}, this setting needs to be set to
352          * {@code true} to properly read it. By default, this value is {@code false} and the adjacency list is
353          * simply read as line delimited vertices.
354          * <p/>
355          * By setting this value to {@code true}, the generated JSON is no longer "splittable" by line and thus not
356          * suitable for OLAP processing. Furthermore, reading this format of the JSON with
357          * {@link GraphSONPartialReader#readGraph(InputStream, Graph)} or
358          * {@link GraphSONPartialReader#readVertices(InputStream, Function, Function, Direction)} requires that the
359          * entire JSON object be read into memory, so it is best saved for "small" graphs.
360          */
361         public Builder unwrapAdjacencyList(final boolean unwrapAdjacencyList) {
362             this.unwrapAdjacencyList = unwrapAdjacencyList;
363             return this;
364         }
365
366         public GraphSONPartialReader create() {
367             return new GraphSONPartialReader(this);
368         }
369     }
370 }