2088286dc37b5d859eae7e11149c5a8f5cd1db6c
[aai/aai-common.git] / aai-core / src / main / java / org / onap / aai / dbgen / GraphSONPartialReader.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *    http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.aai.dbgen;
21
22 import org.apache.tinkerpop.gremlin.structure.Direction;
23 import org.apache.tinkerpop.gremlin.structure.Edge;
24 import org.apache.tinkerpop.gremlin.structure.Graph;
25 import org.apache.tinkerpop.gremlin.structure.Property;
26 import org.apache.tinkerpop.gremlin.structure.T;
27 import org.apache.tinkerpop.gremlin.structure.Vertex;
28 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
29 import org.apache.tinkerpop.gremlin.structure.io.GraphReader;
30 import org.apache.tinkerpop.gremlin.structure.io.GraphWriter;
31 import org.apache.tinkerpop.gremlin.structure.io.Mapper;
32 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
33 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONReader;
34 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
35 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONVersion;
36 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
37 import org.apache.tinkerpop.gremlin.structure.util.Attachable;
38 import org.apache.tinkerpop.gremlin.structure.util.Host;
39 import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
40 import org.apache.tinkerpop.gremlin.util.function.FunctionUtils;
41 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
42 import org.apache.tinkerpop.shaded.jackson.core.type.TypeReference;
43 import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
44 import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
45 import org.apache.tinkerpop.shaded.jackson.databind.node.JsonNodeType;
46 import org.onap.aai.dbmap.InMemoryGraph;
47
48 import com.att.eelf.configuration.EELFLogger;
49 import com.att.eelf.configuration.EELFManager;
50
51 import java.io.BufferedReader;
52 import java.io.ByteArrayInputStream;
53 import java.io.IOException;
54 import java.io.InputStream;
55 import java.io.InputStreamReader;
56 import java.io.OutputStream;
57 import java.util.HashMap;
58 import java.util.Iterator;
59 import java.util.Map;
60 import java.util.concurrent.atomic.AtomicLong;
61 import java.util.function.Function;
62 import java.util.stream.Stream;
63
64 /**
65  * This is a Wrapper around the GraphsonReader class
66  * The idea is to rewrite methods that are customized for A&AI
67  * GraphsonReader is a final class . hence the use of the Wrapper
68  * instead of inheriting-overwriting
69  *
70  * 
71  */
72 public final class GraphSONPartialReader implements GraphReader {
73     private final ObjectMapper mapper ;
74     private final long batchSize ;
75     private final GraphSONVersion version ;
76     private boolean unwrapAdjacencyList = false;
77     private final GraphSONReader reader;
78     
79     private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(InMemoryGraph.class);
80
81     final TypeReference<Map<String, Object>> mapTypeReference = new TypeReference<Map<String, Object>>() {
82     };
83
84     private GraphSONPartialReader(final Builder builder) {
85         mapper = builder.mapper.createMapper();
86         batchSize = builder.batchSize;
87         unwrapAdjacencyList = builder.unwrapAdjacencyList;
88         version = ((GraphSONMapper)builder.mapper).getVersion();
89         reader = GraphSONReader.build().create();
90     }
91
92     /**
93      * Read data into a {@link Graph} from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
94      * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
95      *
96      * @param inputStream a stream containing an entire graph of vertices and edges as defined by the accompanying
97      *                    {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
98      * @param graphToWriteTo the graph to write to when reading from the stream.
99      */
100     @Override
101     public void readGraph(final InputStream inputStream, final Graph graphToWriteTo) throws IOException {
102         // dual pass - create all vertices and store to cache the ids.  then create edges.  as long as we don't
103         // have vertex labels in the output we can't do this single pass
104         LOGGER.info("Read the Partial Graph");
105         final Map<StarGraph.StarVertex,Vertex> cache = new HashMap<>();
106         final AtomicLong counter = new AtomicLong(0);
107         
108         final boolean supportsTx = graphToWriteTo.features().graph().supportsTransactions();
109         final Graph.Features.EdgeFeatures edgeFeatures = graphToWriteTo.features().edge();
110         
111         readVertexStrings(inputStream).<Vertex>map(FunctionUtils.wrapFunction(line -> readVertex(new ByteArrayInputStream(line.getBytes()), null, null, Direction.IN))).forEach(vertex -> {
112                 try{
113                         final Attachable<Vertex> attachable = (Attachable<Vertex>) vertex;
114                     cache.put((StarGraph.StarVertex) attachable.get(), attachable.attach(Attachable.Method.create(graphToWriteTo)));
115                     if (supportsTx && counter.incrementAndGet() % batchSize == 0)
116                         graphToWriteTo.tx().commit();
117                 }
118                 catch(Exception ex){
119                         LOGGER.info("Error in reading vertex from graphson"+vertex.toString());
120                 }
121         });
122         
123         cache.entrySet().forEach(kv -> kv.getKey().edges(Direction.IN).forEachRemaining(e -> {
124                 try{
125                         // can't use a standard Attachable attach method here because we have to use the cache for those
126                     // graphs that don't support userSuppliedIds on edges.  note that outVertex/inVertex methods return
127                     // StarAdjacentVertex whose equality should match StarVertex.
128                     final Vertex cachedOutV = cache.get(e.outVertex());
129                     final Vertex cachedInV = cache.get(e.inVertex());
130                     
131                     if(cachedOutV != null  && cachedInV != null){
132                          
133                             final Edge newEdge = edgeFeatures.willAllowId(e.id()) ? cachedOutV.addEdge(e.label(), cachedInV, T.id, e.id()) : cachedOutV.addEdge(e.label(), cachedInV);
134                             e.properties().forEachRemaining(p -> newEdge.property(p.key(), p.value()));
135                         }
136                     else{
137                         LOGGER.debug("Ghost edges from "+ cachedOutV + " to "+ cachedInV);
138                         
139                     }
140                     if (supportsTx && counter.incrementAndGet() % batchSize == 0)
141                         graphToWriteTo.tx().commit();
142                 }
143                 catch(Exception ex){
144                         LOGGER.info("Error in writing vertex into graph"+e.toString());
145                 }
146         }));
147
148         if (supportsTx) graphToWriteTo.tx().commit();
149     }
150
151     /**
152      * Read {@link Vertex} objects from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
153      * {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
154      *
155      * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
156      *                    {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} or
157      *                    {@link GraphWriter#writeVertices(OutputStream, Iterator)} methods.
158      * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
159      * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
160      * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
161      */
162     @Override
163     public Iterator<Vertex> readVertices(final InputStream inputStream,
164                                          final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
165                                          final Function<Attachable<Edge>, Edge> edgeAttachMethod,
166                                          final Direction attachEdgesOfThisDirection) throws IOException {
167        // return readVertexStrings(inputStream).<Vertex>map(FunctionUtils.wrapFunction(line -> readVertex(new ByteArrayInputStream(line.getBytes()), vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection))).iterator();
168         return reader.readVertices(inputStream, vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection);
169                         
170     }
171
172     /**
173      * Read a {@link Vertex}  from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
174      * {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
175      *
176      * @param inputStream a stream containing at least a single vertex as defined by the accompanying
177      *                    {@link GraphWriter#writeVertex(OutputStream, Vertex)}.
178      * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
179      */
180     @Override
181     public Vertex readVertex(final InputStream inputStream, final Function<Attachable<Vertex>, Vertex> vertexAttachMethod) throws IOException {
182         return reader.readVertex(inputStream, vertexAttachMethod);
183     }
184
185     /**
186      * Read a {@link Vertex} from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
187      * {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
188      *
189      * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
190      *                    {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} method.
191      * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
192      * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
193      * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
194      */
195     @Override
196     public Vertex readVertex(final InputStream inputStream,
197                              final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
198                              final Function<Attachable<Edge>, Edge> edgeAttachMethod,
199                              final Direction attachEdgesOfThisDirection) throws IOException {
200         
201         return reader.readVertex(inputStream, vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection);
202     }
203
204     /**
205      * Read an {@link Edge} from output generated by {@link GraphSONWriter#writeEdge(OutputStream, Edge)} or via
206      * an {@link Edge} passed to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
207      *
208      * @param inputStream a stream containing at least one {@link Edge} as defined by the accompanying
209      *                    {@link GraphWriter#writeEdge(OutputStream, Edge)} method.
210      * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
211      */
212     @Override
213     public Edge readEdge(final InputStream inputStream, final Function<Attachable<Edge>, Edge> edgeAttachMethod) throws IOException {
214         /*if (version == GraphSONVersion.v1_0) {
215             final Map<String, Object> edgeData = mapper.readValue(inputStream, mapTypeReference);
216
217             final Map<String, Object> edgeProperties = edgeData.containsKey(GraphSONTokens.PROPERTIES) ?
218                     (Map<String, Object>) edgeData.get(GraphSONTokens.PROPERTIES) : Collections.EMPTY_MAP;
219             final DetachedEdge edge = new DetachedEdge(edgeData.get(GraphSONTokens.ID),
220                     edgeData.get(GraphSONTokens.LABEL).toString(),
221                     edgeProperties,
222                     Pair.with(edgeData.get(GraphSONTokens.OUT), edgeData.get(GraphSONTokens.OUT_LABEL).toString()),
223                     Pair.with(edgeData.get(GraphSONTokens.IN), edgeData.get(GraphSONTokens.IN_LABEL).toString()));
224
225             return edgeAttachMethod.apply(edge);
226         } else {
227             return edgeAttachMethod.apply((DetachedEdge) mapper.readValue(inputStream, Edge.class));
228         }*/
229         return reader.readEdge(inputStream, edgeAttachMethod);
230     }
231
232     /**
233      * Read a {@link VertexProperty} from output generated by
234      * {@link GraphSONWriter#writeVertexProperty(OutputStream, VertexProperty)} or via an {@link VertexProperty} passed
235      * to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
236      *
237      * @param inputStream a stream containing at least one {@link VertexProperty} as written by the accompanying
238      *                    {@link GraphWriter#writeVertexProperty(OutputStream, VertexProperty)} method.
239      * @param vertexPropertyAttachMethod a function that creates re-attaches a {@link VertexProperty} to a
240      *                                   {@link Host} object.
241      */
242     @Override
243     public VertexProperty readVertexProperty(final InputStream inputStream,
244                                              final Function<Attachable<VertexProperty>, VertexProperty> vertexPropertyAttachMethod) throws IOException {
245         /*if (version == GraphSONVersion.v1_0) {
246             final Map<String, Object> vpData = mapper.readValue(inputStream, mapTypeReference);
247             final Map<String, Object> metaProperties = (Map<String, Object>) vpData.get(GraphSONTokens.PROPERTIES);
248             final DetachedVertexProperty vp = new DetachedVertexProperty(vpData.get(GraphSONTokens.ID),
249                     vpData.get(GraphSONTokens.LABEL).toString(),
250                     vpData.get(GraphSONTokens.VALUE), metaProperties);
251             return vertexPropertyAttachMethod.apply(vp);
252         } else {
253             return vertexPropertyAttachMethod.apply((DetachedVertexProperty) mapper.readValue(inputStream, VertexProperty.class));
254         }*/
255         return reader.readVertexProperty(inputStream, vertexPropertyAttachMethod);
256     }
257
258     /**
259      * Read a {@link Property} from output generated by  {@link GraphSONWriter#writeProperty(OutputStream, Property)} or
260      * via an {@link Property} passed to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
261      *
262      * @param inputStream a stream containing at least one {@link Property} as written by the accompanying
263      *                    {@link GraphWriter#writeProperty(OutputStream, Property)} method.
264      * @param propertyAttachMethod a function that creates re-attaches a {@link Property} to a {@link Host} object.
265      */
266     @Override
267     public Property readProperty(final InputStream inputStream,
268                                  final Function<Attachable<Property>, Property> propertyAttachMethod) throws IOException {
269         /*if (version == GraphSONVersion.v1_0) {
270             final Map<String, Object> propertyData = mapper.readValue(inputStream, mapTypeReference);
271             final DetachedProperty p = new DetachedProperty(propertyData.get(GraphSONTokens.KEY).toString(), propertyData.get(GraphSONTokens.VALUE));
272             return propertyAttachMethod.apply(p);
273         } else {
274             return propertyAttachMethod.apply((DetachedProperty) mapper.readValue(inputStream, Property.class));
275         }*/
276         return reader.readProperty(inputStream, propertyAttachMethod);
277     }
278
279     /**
280      * {@inheritDoc}
281      */
282     @Override
283     public <C> C readObject(final InputStream inputStream, final Class<? extends C> clazz) throws IOException {
284         return mapper.readValue(inputStream, clazz);
285     }
286
287     private Stream<String> readVertexStrings(final InputStream inputStream) throws IOException {
288         if (unwrapAdjacencyList) {
289                 final JsonNode root = mapper.readTree(inputStream);
290             final JsonNode vertices = root.get(GraphSONTokens.VERTICES);
291             if (!vertices.getNodeType().equals(JsonNodeType.ARRAY)) throw new IOException(String.format("The '%s' key must be an array", GraphSONTokens.VERTICES));
292             return IteratorUtils.stream(vertices.elements()).map(Object::toString);
293         } else {
294                 final BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
295             return br.lines();
296         }
297         
298     }
299
300     
301     public static Builder build() {
302         return new Builder();
303     }
304
305     public final static class Builder implements ReaderBuilder<GraphSONPartialReader> {
306         private long batchSize = 10000;
307
308         private Mapper<ObjectMapper> mapper = GraphSONMapper.build().create();
309         private boolean unwrapAdjacencyList = false;
310         
311
312         private Builder() {}
313
314         /**
315          * Number of mutations to perform before a commit is executed when using
316          * {@link GraphSONPartialReader#readGraph(InputStream, Graph)}.
317          */
318         public Builder batchSize(final long batchSize) {
319             this.batchSize = batchSize;
320             return this;
321         }
322
323         /**
324          * Override all of the {@link GraphSONMapper} builder
325          * options with this mapper.  If this value is set to something other than null then that value will be
326          * used to construct the writer.
327          */
328         public Builder mapper(final Mapper<ObjectMapper> mapper) {
329             this.mapper = mapper;
330             return this;
331         }
332
333         /**
334          * If the adjacency list is wrapped in a JSON object, as is done when writing a graph with
335          * {@link GraphSONWriter.Builder#wrapAdjacencyList} set to {@code true}, this setting needs to be set to
336          * {@code true} to properly read it.  By default, this value is {@code false} and the adjacency list is
337          * simply read as line delimited vertices.
338          * <p/>
339          * By setting this value to {@code true}, the generated JSON is no longer "splittable" by line and thus not
340          * suitable for OLAP processing.  Furthermore, reading this format of the JSON with
341          * {@link GraphSONPartialReader#readGraph(InputStream, Graph)} or
342          * {@link GraphSONPartialReader#readVertices(InputStream, Function, Function, Direction)} requires that the
343          * entire JSON object be read into memory, so it is best saved for "small" graphs.
344          */
345         public Builder unwrapAdjacencyList(final boolean unwrapAdjacencyList) {
346             this.unwrapAdjacencyList = unwrapAdjacencyList;
347             return this;
348         }
349
350         public GraphSONPartialReader create() {
351             return new GraphSONPartialReader(this);
352         }
353     }
354 }