Merge "[AAI] Fix doc config files"
[aai/aai-common.git] / aai-core / src / main / java / org / onap / aai / dbgen / GraphSONPartialReader.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *    http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.aai.dbgen;
22
23 import java.io.BufferedReader;
24 import java.io.ByteArrayInputStream;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.io.InputStreamReader;
28 import java.io.OutputStream;
29 import java.util.HashMap;
30 import java.util.Iterator;
31 import java.util.Map;
32 import java.util.concurrent.atomic.AtomicLong;
33 import java.util.function.Function;
34 import java.util.stream.Stream;
35
36 import org.apache.tinkerpop.gremlin.structure.Direction;
37 import org.apache.tinkerpop.gremlin.structure.Edge;
38 import org.apache.tinkerpop.gremlin.structure.Graph;
39 import org.apache.tinkerpop.gremlin.structure.Property;
40 import org.apache.tinkerpop.gremlin.structure.T;
41 import org.apache.tinkerpop.gremlin.structure.Vertex;
42 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
43 import org.apache.tinkerpop.gremlin.structure.io.GraphReader;
44 import org.apache.tinkerpop.gremlin.structure.io.GraphWriter;
45 import org.apache.tinkerpop.gremlin.structure.io.Mapper;
46 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
47 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONReader;
48 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
49 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONWriter;
50 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
51 import org.apache.tinkerpop.gremlin.structure.util.Attachable;
52 import org.apache.tinkerpop.gremlin.structure.util.Host;
53 import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
54 import org.apache.tinkerpop.gremlin.util.function.FunctionUtils;
55 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
56 import org.apache.tinkerpop.shaded.jackson.core.type.TypeReference;
57 import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
58 import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
59 import org.apache.tinkerpop.shaded.jackson.databind.node.JsonNodeType;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 /**
64  * This is a Wrapper around the GraphsonReader class
65  * The idea is to rewrite methods that are customized for A&AI
66  * GraphsonReader is a final class . hence the use of the Wrapper
67  * instead of inheriting-overwriting
68  *
69  *
70  */
71 public final class GraphSONPartialReader implements GraphReader {
72     private final ObjectMapper mapper;
73     private final long batchSize;
74     private boolean unwrapAdjacencyList = false;
75     private final GraphSONReader reader;
76
77     private static final Logger LOGGER = LoggerFactory.getLogger(GraphSONPartialReader.class);
78
79     final TypeReference<Map<String, Object>> mapTypeReference = new TypeReference<Map<String, Object>>() {};
80
81     private GraphSONPartialReader(final Builder builder) {
82         mapper = builder.mapper.createMapper();
83         batchSize = builder.batchSize;
84         unwrapAdjacencyList = builder.unwrapAdjacencyList;
85         reader = GraphSONReader.build().create();
86     }
87
88     /**
89      * Read data into a {@link Graph} from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
90      * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
91      *
92      * @param inputStream a stream containing an entire graph of vertices and edges as defined by the accompanying
93      *        {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
94      * @param graphToWriteTo the graph to write to when reading from the stream.
95      */
96     @Override
97     public void readGraph(final InputStream inputStream, final Graph graphToWriteTo) throws IOException {
98         // dual pass - create all vertices and store to cache the ids. then create edges. as long as we don't
99         // have vertex labels in the output we can't do this single pass
100         LOGGER.info("Read the Partial Graph");
101         final Map<StarGraph.StarVertex, Vertex> cache = new HashMap<>();
102         final AtomicLong counter = new AtomicLong(0);
103
104         final boolean supportsTx = graphToWriteTo.features().graph().supportsTransactions();
105         final Graph.Features.EdgeFeatures edgeFeatures = graphToWriteTo.features().edge();
106
107         readVertexStrings(inputStream)
108                 .<Vertex>map(FunctionUtils.wrapFunction(
109                         line -> readVertex(new ByteArrayInputStream(line.getBytes()), null, null, Direction.IN)))
110                 .forEach(vertex -> {
111                     try {
112                         final Attachable<Vertex> attachable = (Attachable<Vertex>) vertex;
113                         cache.put((StarGraph.StarVertex) attachable.get(),
114                                 attachable.attach(Attachable.Method.create(graphToWriteTo)));
115                         if (supportsTx && counter.incrementAndGet() % batchSize == 0)
116                             graphToWriteTo.tx().commit();
117                     } catch (Exception ex) {
118                         LOGGER.info(String.format("Error in reading vertex from graphson%s", vertex.toString()));
119                     }
120                 });
121
122         cache.entrySet().forEach(kv -> kv.getKey().edges(Direction.IN).forEachRemaining(e -> {
123             try {
124                 // can't use a standard Attachable attach method here because we have to use the cache for those
125                 // graphs that don't support userSuppliedIds on edges. note that outVertex/inVertex methods return
126                 // StarAdjacentVertex whose equality should match StarVertex.
127                 final Vertex cachedOutV = cache.get(e.outVertex());
128                 final Vertex cachedInV = cache.get(e.inVertex());
129
130                 if (cachedOutV != null && cachedInV != null) {
131
132                     final Edge newEdge =
133                             edgeFeatures.willAllowId(e.id()) ? cachedOutV.addEdge(e.label(), cachedInV, T.id, e.id())
134                                     : cachedOutV.addEdge(e.label(), cachedInV);
135                     e.properties().forEachRemaining(p -> newEdge.property(p.key(), p.value()));
136                 } else {
137                     LOGGER.debug(String.format("Ghost edges from %s to %s", cachedOutV, cachedInV));
138
139                 }
140                 if (supportsTx && counter.incrementAndGet() % batchSize == 0)
141                     graphToWriteTo.tx().commit();
142             } catch (Exception ex) {
143                 LOGGER.info(String.format("Error in writing vertex into graph%s", e.toString()));
144             }
145         }));
146
147         if (supportsTx)
148             graphToWriteTo.tx().commit();
149     }
150
151     /**
152      * Read {@link Vertex} objects from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
153      * {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
154      *
155      * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
156      *        {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} or
157      *        {@link GraphWriter#writeVertices(OutputStream, Iterator)} methods.
158      * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
159      * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
160      * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
161      */
162     @Override
163     public Iterator<Vertex> readVertices(final InputStream inputStream,
164             final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
165             final Function<Attachable<Edge>, Edge> edgeAttachMethod, final Direction attachEdgesOfThisDirection)
166             throws IOException {
167         return reader.readVertices(inputStream, vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection);
168
169     }
170
171     /**
172      * Read a {@link Vertex} from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
173      * {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
174      *
175      * @param inputStream a stream containing at least a single vertex as defined by the accompanying
176      *        {@link GraphWriter#writeVertex(OutputStream, Vertex)}.
177      * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
178      */
179     @Override
180     public Vertex readVertex(final InputStream inputStream,
181             final Function<Attachable<Vertex>, Vertex> vertexAttachMethod) throws IOException {
182         return reader.readVertex(inputStream, vertexAttachMethod);
183     }
184
185     /**
186      * Read a {@link Vertex} from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
187      * {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
188      *
189      * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
190      *        {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} method.
191      * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
192      * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
193      * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
194      */
195     @Override
196     public Vertex readVertex(final InputStream inputStream,
197             final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
198             final Function<Attachable<Edge>, Edge> edgeAttachMethod, final Direction attachEdgesOfThisDirection)
199             throws IOException {
200
201         return reader.readVertex(inputStream, vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection);
202     }
203
204     /**
205      * Read an {@link Edge} from output generated by {@link GraphSONWriter#writeEdge(OutputStream, Edge)} or via
206      * an {@link Edge} passed to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
207      *
208      * @param inputStream a stream containing at least one {@link Edge} as defined by the accompanying
209      *        {@link GraphWriter#writeEdge(OutputStream, Edge)} method.
210      * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
211      */
212     @Override
213     public Edge readEdge(final InputStream inputStream, final Function<Attachable<Edge>, Edge> edgeAttachMethod)
214             throws IOException {
215         return reader.readEdge(inputStream, edgeAttachMethod);
216     }
217
218     /**
219      * Read a {@link VertexProperty} from output generated by
220      * {@link GraphSONWriter#writeVertexProperty(OutputStream, VertexProperty)} or via an {@link VertexProperty} passed
221      * to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
222      *
223      * @param inputStream a stream containing at least one {@link VertexProperty} as written by the accompanying
224      *        {@link GraphWriter#writeVertexProperty(OutputStream, VertexProperty)} method.
225      * @param vertexPropertyAttachMethod a function that creates re-attaches a {@link VertexProperty} to a
226      *        {@link Host} object.
227      */
228     @Override
229     public VertexProperty readVertexProperty(final InputStream inputStream,
230             final Function<Attachable<VertexProperty>, VertexProperty> vertexPropertyAttachMethod) throws IOException {
231         return reader.readVertexProperty(inputStream, vertexPropertyAttachMethod);
232     }
233
234     /**
235      * Read a {@link Property} from output generated by {@link GraphSONWriter#writeProperty(OutputStream, Property)} or
236      * via an {@link Property} passed to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
237      *
238      * @param inputStream a stream containing at least one {@link Property} as written by the accompanying
239      *        {@link GraphWriter#writeProperty(OutputStream, Property)} method.
240      * @param propertyAttachMethod a function that creates re-attaches a {@link Property} to a {@link Host} object.
241      */
242     @Override
243     public Property readProperty(final InputStream inputStream,
244             final Function<Attachable<Property>, Property> propertyAttachMethod) throws IOException {
245         return reader.readProperty(inputStream, propertyAttachMethod);
246     }
247
248     /**
249      * {@inheritDoc}
250      */
251     @Override
252     public <C> C readObject(final InputStream inputStream, final Class<? extends C> clazz) throws IOException {
253         return mapper.readValue(inputStream, clazz);
254     }
255
256     private Stream<String> readVertexStrings(final InputStream inputStream) throws IOException {
257         if (unwrapAdjacencyList) {
258             final JsonNode root = mapper.readTree(inputStream);
259             final JsonNode vertices = root.get(GraphSONTokens.VERTICES);
260             if (!vertices.getNodeType().equals(JsonNodeType.ARRAY))
261                 throw new IOException(String.format("The '%s' key must be an array", GraphSONTokens.VERTICES));
262             return IteratorUtils.stream(vertices.elements()).map(Object::toString);
263         } else {
264             final BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
265             return br.lines();
266         }
267
268     }
269
270     public static Builder build() {
271         return new Builder();
272     }
273
274     public static final class Builder implements ReaderBuilder<GraphSONPartialReader> {
275         private long batchSize = 10000;
276
277         private Mapper<ObjectMapper> mapper = GraphSONMapper.build().create();
278         private boolean unwrapAdjacencyList = false;
279
280         private Builder() {
281         }
282
283         /**
284          * Number of mutations to perform before a commit is executed when using
285          * {@link GraphSONPartialReader#readGraph(InputStream, Graph)}.
286          */
287         public Builder batchSize(final long batchSize) {
288             this.batchSize = batchSize;
289             return this;
290         }
291
292         /**
293          * Override all of the {@link GraphSONMapper} builder
294          * options with this mapper. If this value is set to something other than null then that value will be
295          * used to construct the writer.
296          */
297         public Builder mapper(final Mapper<ObjectMapper> mapper) {
298             this.mapper = mapper;
299             return this;
300         }
301
302         /**
303          * If the adjacency list is wrapped in a JSON object, as is done when writing a graph with
304          * {@link GraphSONWriter.Builder#wrapAdjacencyList} set to {@code true}, this setting needs to be set to
305          * {@code true} to properly read it. By default, this value is {@code false} and the adjacency list is
306          * simply read as line delimited vertices.
307          * <p/>
308          * By setting this value to {@code true}, the generated JSON is no longer "splittable" by line and thus not
309          * suitable for OLAP processing. Furthermore, reading this format of the JSON with
310          * {@link GraphSONPartialReader#readGraph(InputStream, Graph)} or
311          * {@link GraphSONPartialReader#readVertices(InputStream, Function, Function, Direction)} requires that the
312          * entire JSON object be read into memory, so it is best saved for "small" graphs.
313          */
314         public Builder unwrapAdjacencyList(final boolean unwrapAdjacencyList) {
315             this.unwrapAdjacencyList = unwrapAdjacencyList;
316             return this;
317         }
318
319         public GraphSONPartialReader create() {
320             return new GraphSONPartialReader(this);
321         }
322     }
323 }