AAI-1523 Batch reformat aai-core
[aai/aai-common.git] / aai-core / src / main / java / org / onap / aai / dbgen / GraphSONPartialReader.java
index 2088286..fdc8e81 100644 (file)
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
+
 package org.onap.aai.dbgen;
 
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Graph;
@@ -45,22 +62,6 @@ import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
 import org.apache.tinkerpop.shaded.jackson.databind.node.JsonNodeType;
 import org.onap.aai.dbmap.InMemoryGraph;
 
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Function;
-import java.util.stream.Stream;
-
 /**
  * This is a Wrapper around the GraphsonReader class
  * The idea is to rewrite methods that are customized for A&AI
@@ -70,22 +71,21 @@ import java.util.stream.Stream;
  * 
  */
 public final class GraphSONPartialReader implements GraphReader {
-    private final ObjectMapper mapper ;
-    private final long batchSize ;
-    private final GraphSONVersion version ;
+    private final ObjectMapper mapper;
+    private final long batchSize;
+    private final GraphSONVersion version;
     private boolean unwrapAdjacencyList = false;
     private final GraphSONReader reader;
-    
+
     private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(InMemoryGraph.class);
 
-    final TypeReference<Map<String, Object>> mapTypeReference = new TypeReference<Map<String, Object>>() {
-    };
+    final TypeReference<Map<String, Object>> mapTypeReference = new TypeReference<Map<String, Object>>() {};
 
     private GraphSONPartialReader(final Builder builder) {
         mapper = builder.mapper.createMapper();
         batchSize = builder.batchSize;
         unwrapAdjacencyList = builder.unwrapAdjacencyList;
-        version = ((GraphSONMapper)builder.mapper).getVersion();
+        version = ((GraphSONMapper) builder.mapper).getVersion();
         reader = GraphSONReader.build().create();
     }
 
@@ -94,58 +94,62 @@ public final class GraphSONPartialReader implements GraphReader {
      * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
      *
      * @param inputStream a stream containing an entire graph of vertices and edges as defined by the accompanying
-     *                    {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
+     *        {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
      * @param graphToWriteTo the graph to write to when reading from the stream.
      */
     @Override
     public void readGraph(final InputStream inputStream, final Graph graphToWriteTo) throws IOException {
-        // dual pass - create all vertices and store to cache the ids.  then create edges.  as long as we don't
+        // dual pass - create all vertices and store to cache the ids. then create edges. as long as we don't
         // have vertex labels in the output we can't do this single pass
-       LOGGER.info("Read the Partial Graph");
-       final Map<StarGraph.StarVertex,Vertex> cache = new HashMap<>();
+        LOGGER.info("Read the Partial Graph");
+        final Map<StarGraph.StarVertex, Vertex> cache = new HashMap<>();
         final AtomicLong counter = new AtomicLong(0);
-        
+
         final boolean supportsTx = graphToWriteTo.features().graph().supportsTransactions();
         final Graph.Features.EdgeFeatures edgeFeatures = graphToWriteTo.features().edge();
-        
-        readVertexStrings(inputStream).<Vertex>map(FunctionUtils.wrapFunction(line -> readVertex(new ByteArrayInputStream(line.getBytes()), null, null, Direction.IN))).forEach(vertex -> {
-               try{
-                       final Attachable<Vertex> attachable = (Attachable<Vertex>) vertex;
-                   cache.put((StarGraph.StarVertex) attachable.get(), attachable.attach(Attachable.Method.create(graphToWriteTo)));
-                   if (supportsTx && counter.incrementAndGet() % batchSize == 0)
-                       graphToWriteTo.tx().commit();
-               }
-               catch(Exception ex){
-                       LOGGER.info("Error in reading vertex from graphson"+vertex.toString());
-               }
-        });
-        
+
+        readVertexStrings(inputStream)
+                .<Vertex>map(FunctionUtils.wrapFunction(
+                        line -> readVertex(new ByteArrayInputStream(line.getBytes()), null, null, Direction.IN)))
+                .forEach(vertex -> {
+                    try {
+                        final Attachable<Vertex> attachable = (Attachable<Vertex>) vertex;
+                        cache.put((StarGraph.StarVertex) attachable.get(),
+                                attachable.attach(Attachable.Method.create(graphToWriteTo)));
+                        if (supportsTx && counter.incrementAndGet() % batchSize == 0)
+                            graphToWriteTo.tx().commit();
+                    } catch (Exception ex) {
+                        LOGGER.info("Error in reading vertex from graphson" + vertex.toString());
+                    }
+                });
+
         cache.entrySet().forEach(kv -> kv.getKey().edges(Direction.IN).forEachRemaining(e -> {
-               try{
-                       // can't use a standard Attachable attach method here because we have to use the cache for those
-                   // graphs that don't support userSuppliedIds on edges.  note that outVertex/inVertex methods return
-                   // StarAdjacentVertex whose equality should match StarVertex.
-                   final Vertex cachedOutV = cache.get(e.outVertex());
-                   final Vertex cachedInV = cache.get(e.inVertex());
-                   
-                   if(cachedOutV != null  && cachedInV != null){
-                        
-                           final Edge newEdge = edgeFeatures.willAllowId(e.id()) ? cachedOutV.addEdge(e.label(), cachedInV, T.id, e.id()) : cachedOutV.addEdge(e.label(), cachedInV);
-                           e.properties().forEachRemaining(p -> newEdge.property(p.key(), p.value()));
-                       }
-                   else{
-                       LOGGER.debug("Ghost edges from "+ cachedOutV + " to "+ cachedInV);
-                       
-                   }
-                   if (supportsTx && counter.incrementAndGet() % batchSize == 0)
-                       graphToWriteTo.tx().commit();
-               }
-               catch(Exception ex){
-                       LOGGER.info("Error in writing vertex into graph"+e.toString());
-               }
+            try {
+                // can't use a standard Attachable attach method here because we have to use the cache for those
+                // graphs that don't support userSuppliedIds on edges. note that outVertex/inVertex methods return
+                // StarAdjacentVertex whose equality should match StarVertex.
+                final Vertex cachedOutV = cache.get(e.outVertex());
+                final Vertex cachedInV = cache.get(e.inVertex());
+
+                if (cachedOutV != null && cachedInV != null) {
+
+                    final Edge newEdge =
+                            edgeFeatures.willAllowId(e.id()) ? cachedOutV.addEdge(e.label(), cachedInV, T.id, e.id())
+                                    : cachedOutV.addEdge(e.label(), cachedInV);
+                    e.properties().forEachRemaining(p -> newEdge.property(p.key(), p.value()));
+                } else {
+                    LOGGER.debug("Ghost edges from " + cachedOutV + " to " + cachedInV);
+
+                }
+                if (supportsTx && counter.incrementAndGet() % batchSize == 0)
+                    graphToWriteTo.tx().commit();
+            } catch (Exception ex) {
+                LOGGER.info("Error in writing vertex into graph" + e.toString());
+            }
         }));
 
-        if (supportsTx) graphToWriteTo.tx().commit();
+        if (supportsTx)
+            graphToWriteTo.tx().commit();
     }
 
     /**
@@ -153,32 +157,35 @@ public final class GraphSONPartialReader implements GraphReader {
      * {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
      *
      * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
-     *                    {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} or
-     *                    {@link GraphWriter#writeVertices(OutputStream, Iterator)} methods.
+     *        {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} or
+     *        {@link GraphWriter#writeVertices(OutputStream, Iterator)} methods.
      * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
      * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
      * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
      */
     @Override
     public Iterator<Vertex> readVertices(final InputStream inputStream,
-                                         final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
-                                         final Function<Attachable<Edge>, Edge> edgeAttachMethod,
-                                         final Direction attachEdgesOfThisDirection) throws IOException {
-       // return readVertexStrings(inputStream).<Vertex>map(FunctionUtils.wrapFunction(line -> readVertex(new ByteArrayInputStream(line.getBytes()), vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection))).iterator();
+            final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
+            final Function<Attachable<Edge>, Edge> edgeAttachMethod, final Direction attachEdgesOfThisDirection)
+            throws IOException {
+        // return readVertexStrings(inputStream).<Vertex>map(FunctionUtils.wrapFunction(line -> readVertex(new
+        // ByteArrayInputStream(line.getBytes()), vertexAttachMethod, edgeAttachMethod,
+        // attachEdgesOfThisDirection))).iterator();
         return reader.readVertices(inputStream, vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection);
-                       
+
     }
 
     /**
-     * Read a {@link Vertex}  from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
+     * Read a {@link Vertex} from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
      * {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
      *
      * @param inputStream a stream containing at least a single vertex as defined by the accompanying
-     *                    {@link GraphWriter#writeVertex(OutputStream, Vertex)}.
+     *        {@link GraphWriter#writeVertex(OutputStream, Vertex)}.
      * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
      */
     @Override
-    public Vertex readVertex(final InputStream inputStream, final Function<Attachable<Vertex>, Vertex> vertexAttachMethod) throws IOException {
+    public Vertex readVertex(final InputStream inputStream,
+            final Function<Attachable<Vertex>, Vertex> vertexAttachMethod) throws IOException {
         return reader.readVertex(inputStream, vertexAttachMethod);
     }
 
@@ -187,18 +194,18 @@ public final class GraphSONPartialReader implements GraphReader {
      * {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
      *
      * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
-     *                    {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} method.
+     *        {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} method.
      * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
      * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
      * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
      */
     @Override
     public Vertex readVertex(final InputStream inputStream,
-                             final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
-                             final Function<Attachable<Edge>, Edge> edgeAttachMethod,
-                             final Direction attachEdgesOfThisDirection) throws IOException {
-       
-       return reader.readVertex(inputStream, vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection);
+            final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
+            final Function<Attachable<Edge>, Edge> edgeAttachMethod, final Direction attachEdgesOfThisDirection)
+            throws IOException {
+
+        return reader.readVertex(inputStream, vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection);
     }
 
     /**
@@ -206,27 +213,30 @@ public final class GraphSONPartialReader implements GraphReader {
      * an {@link Edge} passed to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
      *
      * @param inputStream a stream containing at least one {@link Edge} as defined by the accompanying
-     *                    {@link GraphWriter#writeEdge(OutputStream, Edge)} method.
+     *        {@link GraphWriter#writeEdge(OutputStream, Edge)} method.
      * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
      */
     @Override
-    public Edge readEdge(final InputStream inputStream, final Function<Attachable<Edge>, Edge> edgeAttachMethod) throws IOException {
-        /*if (version == GraphSONVersion.v1_0) {
-            final Map<String, Object> edgeData = mapper.readValue(inputStream, mapTypeReference);
-
-            final Map<String, Object> edgeProperties = edgeData.containsKey(GraphSONTokens.PROPERTIES) ?
-                    (Map<String, Object>) edgeData.get(GraphSONTokens.PROPERTIES) : Collections.EMPTY_MAP;
-            final DetachedEdge edge = new DetachedEdge(edgeData.get(GraphSONTokens.ID),
-                    edgeData.get(GraphSONTokens.LABEL).toString(),
-                    edgeProperties,
-                    Pair.with(edgeData.get(GraphSONTokens.OUT), edgeData.get(GraphSONTokens.OUT_LABEL).toString()),
-                    Pair.with(edgeData.get(GraphSONTokens.IN), edgeData.get(GraphSONTokens.IN_LABEL).toString()));
-
-            return edgeAttachMethod.apply(edge);
-        } else {
-            return edgeAttachMethod.apply((DetachedEdge) mapper.readValue(inputStream, Edge.class));
-        }*/
-       return reader.readEdge(inputStream, edgeAttachMethod);
+    public Edge readEdge(final InputStream inputStream, final Function<Attachable<Edge>, Edge> edgeAttachMethod)
+            throws IOException {
+        /*
+         * if (version == GraphSONVersion.v1_0) {
+         * final Map<String, Object> edgeData = mapper.readValue(inputStream, mapTypeReference);
+         * 
+         * final Map<String, Object> edgeProperties = edgeData.containsKey(GraphSONTokens.PROPERTIES) ?
+         * (Map<String, Object>) edgeData.get(GraphSONTokens.PROPERTIES) : Collections.EMPTY_MAP;
+         * final DetachedEdge edge = new DetachedEdge(edgeData.get(GraphSONTokens.ID),
+         * edgeData.get(GraphSONTokens.LABEL).toString(),
+         * edgeProperties,
+         * Pair.with(edgeData.get(GraphSONTokens.OUT), edgeData.get(GraphSONTokens.OUT_LABEL).toString()),
+         * Pair.with(edgeData.get(GraphSONTokens.IN), edgeData.get(GraphSONTokens.IN_LABEL).toString()));
+         * 
+         * return edgeAttachMethod.apply(edge);
+         * } else {
+         * return edgeAttachMethod.apply((DetachedEdge) mapper.readValue(inputStream, Edge.class));
+         * }
+         */
+        return reader.readEdge(inputStream, edgeAttachMethod);
     }
 
     /**
@@ -235,45 +245,51 @@ public final class GraphSONPartialReader implements GraphReader {
      * to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
      *
      * @param inputStream a stream containing at least one {@link VertexProperty} as written by the accompanying
-     *                    {@link GraphWriter#writeVertexProperty(OutputStream, VertexProperty)} method.
+     *        {@link GraphWriter#writeVertexProperty(OutputStream, VertexProperty)} method.
      * @param vertexPropertyAttachMethod a function that creates re-attaches a {@link VertexProperty} to a
-     *                                   {@link Host} object.
+     *        {@link Host} object.
      */
     @Override
     public VertexProperty readVertexProperty(final InputStream inputStream,
-                                             final Function<Attachable<VertexProperty>, VertexProperty> vertexPropertyAttachMethod) throws IOException {
-        /*if (version == GraphSONVersion.v1_0) {
-            final Map<String, Object> vpData = mapper.readValue(inputStream, mapTypeReference);
-            final Map<String, Object> metaProperties = (Map<String, Object>) vpData.get(GraphSONTokens.PROPERTIES);
-            final DetachedVertexProperty vp = new DetachedVertexProperty(vpData.get(GraphSONTokens.ID),
-                    vpData.get(GraphSONTokens.LABEL).toString(),
-                    vpData.get(GraphSONTokens.VALUE), metaProperties);
-            return vertexPropertyAttachMethod.apply(vp);
-        } else {
-            return vertexPropertyAttachMethod.apply((DetachedVertexProperty) mapper.readValue(inputStream, VertexProperty.class));
-        }*/
-       return reader.readVertexProperty(inputStream, vertexPropertyAttachMethod);
+            final Function<Attachable<VertexProperty>, VertexProperty> vertexPropertyAttachMethod) throws IOException {
+        /*
+         * if (version == GraphSONVersion.v1_0) {
+         * final Map<String, Object> vpData = mapper.readValue(inputStream, mapTypeReference);
+         * final Map<String, Object> metaProperties = (Map<String, Object>) vpData.get(GraphSONTokens.PROPERTIES);
+         * final DetachedVertexProperty vp = new DetachedVertexProperty(vpData.get(GraphSONTokens.ID),
+         * vpData.get(GraphSONTokens.LABEL).toString(),
+         * vpData.get(GraphSONTokens.VALUE), metaProperties);
+         * return vertexPropertyAttachMethod.apply(vp);
+         * } else {
+         * return vertexPropertyAttachMethod.apply((DetachedVertexProperty) mapper.readValue(inputStream,
+         * VertexProperty.class));
+         * }
+         */
+        return reader.readVertexProperty(inputStream, vertexPropertyAttachMethod);
     }
 
     /**
-     * Read a {@link Property} from output generated by  {@link GraphSONWriter#writeProperty(OutputStream, Property)} or
+     * Read a {@link Property} from output generated by {@link GraphSONWriter#writeProperty(OutputStream, Property)} or
      * via an {@link Property} passed to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
      *
      * @param inputStream a stream containing at least one {@link Property} as written by the accompanying
-     *                    {@link GraphWriter#writeProperty(OutputStream, Property)} method.
+     *        {@link GraphWriter#writeProperty(OutputStream, Property)} method.
      * @param propertyAttachMethod a function that creates re-attaches a {@link Property} to a {@link Host} object.
      */
     @Override
     public Property readProperty(final InputStream inputStream,
-                                 final Function<Attachable<Property>, Property> propertyAttachMethod) throws IOException {
-        /*if (version == GraphSONVersion.v1_0) {
-            final Map<String, Object> propertyData = mapper.readValue(inputStream, mapTypeReference);
-            final DetachedProperty p = new DetachedProperty(propertyData.get(GraphSONTokens.KEY).toString(), propertyData.get(GraphSONTokens.VALUE));
-            return propertyAttachMethod.apply(p);
-        } else {
-            return propertyAttachMethod.apply((DetachedProperty) mapper.readValue(inputStream, Property.class));
-        }*/
-       return reader.readProperty(inputStream, propertyAttachMethod);
+            final Function<Attachable<Property>, Property> propertyAttachMethod) throws IOException {
+        /*
+         * if (version == GraphSONVersion.v1_0) {
+         * final Map<String, Object> propertyData = mapper.readValue(inputStream, mapTypeReference);
+         * final DetachedProperty p = new DetachedProperty(propertyData.get(GraphSONTokens.KEY).toString(),
+         * propertyData.get(GraphSONTokens.VALUE));
+         * return propertyAttachMethod.apply(p);
+         * } else {
+         * return propertyAttachMethod.apply((DetachedProperty) mapper.readValue(inputStream, Property.class));
+         * }
+         */
+        return reader.readProperty(inputStream, propertyAttachMethod);
     }
 
     /**
@@ -286,18 +302,18 @@ public final class GraphSONPartialReader implements GraphReader {
 
     private Stream<String> readVertexStrings(final InputStream inputStream) throws IOException {
         if (unwrapAdjacencyList) {
-               final JsonNode root = mapper.readTree(inputStream);
+            final JsonNode root = mapper.readTree(inputStream);
             final JsonNode vertices = root.get(GraphSONTokens.VERTICES);
-            if (!vertices.getNodeType().equals(JsonNodeType.ARRAY)) throw new IOException(String.format("The '%s' key must be an array", GraphSONTokens.VERTICES));
+            if (!vertices.getNodeType().equals(JsonNodeType.ARRAY))
+                throw new IOException(String.format("The '%s' key must be an array", GraphSONTokens.VERTICES));
             return IteratorUtils.stream(vertices.elements()).map(Object::toString);
         } else {
-               final BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
+            final BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
             return br.lines();
         }
-       
+
     }
 
-    
     public static Builder build() {
         return new Builder();
     }
@@ -307,9 +323,9 @@ public final class GraphSONPartialReader implements GraphReader {
 
         private Mapper<ObjectMapper> mapper = GraphSONMapper.build().create();
         private boolean unwrapAdjacencyList = false;
-        
 
-        private Builder() {}
+        private Builder() {
+        }
 
         /**
          * Number of mutations to perform before a commit is executed when using
@@ -322,7 +338,7 @@ public final class GraphSONPartialReader implements GraphReader {
 
         /**
          * Override all of the {@link GraphSONMapper} builder
-         * options with this mapper.  If this value is set to something other than null then that value will be
+         * options with this mapper. If this value is set to something other than null then that value will be
          * used to construct the writer.
          */
         public Builder mapper(final Mapper<ObjectMapper> mapper) {
@@ -333,11 +349,11 @@ public final class GraphSONPartialReader implements GraphReader {
         /**
          * If the adjacency list is wrapped in a JSON object, as is done when writing a graph with
          * {@link GraphSONWriter.Builder#wrapAdjacencyList} set to {@code true}, this setting needs to be set to
-         * {@code true} to properly read it.  By default, this value is {@code false} and the adjacency list is
+         * {@code true} to properly read it. By default, this value is {@code false} and the adjacency list is
          * simply read as line delimited vertices.
          * <p/>
          * By setting this value to {@code true}, the generated JSON is no longer "splittable" by line and thus not
-         * suitable for OLAP processing.  Furthermore, reading this format of the JSON with
+         * suitable for OLAP processing. Furthermore, reading this format of the JSON with
          * {@link GraphSONPartialReader#readGraph(InputStream, Graph)} or
          * {@link GraphSONPartialReader#readVertices(InputStream, Function, Function, Direction)} requires that the
          * entire JSON object be read into memory, so it is best saved for "small" graphs.