Fix simulator topics for lower case
[policy/models.git] / models-sim / models-sim-dmaap / src / main / java / org / onap / policy / models / sim / dmaap / rest / CambriaMessageBodyHandler.java
index e269ac0..0efab16 100644 (file)
-/*
- * ============LICENSE_START======================================================= ONAP
- * ================================================================================ Copyright (C) 2019 AT&T Intellectual
- * Property. All rights reserved. ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2023 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  * ============LICENSE_END=========================================================
  */
 
 package org.onap.policy.models.sim.dmaap.rest;
 
+import jakarta.ws.rs.Consumes;
+import jakarta.ws.rs.core.MediaType;
+import jakarta.ws.rs.core.MultivaluedMap;
+import jakarta.ws.rs.ext.MessageBodyReader;
+import jakarta.ws.rs.ext.Provider;
 import java.io.BufferedReader;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.io.Reader;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Type;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.ext.MessageBodyReader;
-import javax.ws.rs.ext.Provider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.commons.io.IOUtils;
 
 /**
- * Provider that serializes and de-serializes JSON via gson.
+ * Provider that decodes "application/cambria" messages.
  */
 @Provider
 @Consumes(CambriaMessageBodyHandler.MEDIA_TYPE_APPLICATION_CAMBRIA)
-@Produces(CambriaMessageBodyHandler.MEDIA_TYPE_APPLICATION_CAMBRIA)
 public class CambriaMessageBodyHandler implements MessageBodyReader<Object> {
-    // Media type for Cambria
     public static final String MEDIA_TYPE_APPLICATION_CAMBRIA = "application/cambria";
 
-    public static final Logger logger = LoggerFactory.getLogger(CambriaMessageBodyHandler.class);
+    /**
+     * Maximum length of a message or partition.
+     */
+    private static final int MAX_LEN = 10000000;
+
+    /**
+     * Maximum digits in a length field.
+     */
+    private static final int MAX_DIGITS = 10;
 
     @Override
     public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
-        return MEDIA_TYPE_APPLICATION_CAMBRIA.equals(mediaType.toString());
+        return (mediaType != null && MEDIA_TYPE_APPLICATION_CAMBRIA.equals(mediaType.toString()));
     }
 
     @Override
-    public String readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType,
-            MultivaluedMap<String, String> httpHeaders, InputStream entityStream)
-            throws IOException {
-
-        String cambriaString = "";
-        try (BufferedReader bufferedReader = new BufferedReader(
-                new InputStreamReader(entityStream))) {
-            String line;
-            while ((line = bufferedReader.readLine()) != null) {
-                cambriaString += line;
+    public List<Object> readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType,
+                    MultivaluedMap<String, String> httpHeaders, InputStream entityStream) throws IOException {
+
+        try (var bufferedReader = new BufferedReader(new InputStreamReader(entityStream, StandardCharsets.UTF_8))) {
+            List<Object> messages = new LinkedList<>();
+            String msg;
+            while ((msg = readMessage(bufferedReader)) != null) {
+                messages.add(msg);
             }
 
-            return cambriaString.substring(cambriaString.indexOf('{'), cambriaString.length());
+            return messages;
+        }
+    }
+
+    /**
+     * Reads a message.
+     *
+     * @param reader source from which to read
+     * @return the message that was read, or {@code null} if there are no more messages
+     * @throws IOException if an error occurs
+     */
+    private String readMessage(Reader reader) throws IOException {
+        if (!skipWhitespace(reader)) {
+            return null;
+        }
+
+        int partlen = readLength(reader);
+        if (partlen > MAX_LEN) {
+            throw new IOException("invalid partition length");
+        }
+
+        int msglen = readLength(reader);
+        if (msglen > MAX_LEN) {
+            throw new IOException("invalid message length");
         }
+
+        // skip over the partition
+        reader.skip(partlen);
+
+        return readString(reader, msglen);
+    }
+
+    /**
+     * Skips whitespace.
+     *
+     * @param reader source from which to read
+     * @return {@code true} if there is another character after the whitespace,
+     *         {@code false} if the end of the stream has been reached
+     * @throws IOException if an error occurs
+     */
+    private boolean skipWhitespace(Reader reader) throws IOException {
+        int chr;
+
+        do {
+            reader.mark(1);
+            if ((chr = reader.read()) < 0) {
+                return false;
+            }
+        } while (Character.isWhitespace(chr));
+
+        // push the last character back onto the reader
+        reader.reset();
+
+        return true;
+    }
+
+    /**
+     * Reads a length field, which is a number followed by ".".
+     *
+     * @param reader source from which to read
+     * @return the length, or -1 if EOF has been reached
+     * @throws IOException if an error occurs
+     */
+    private int readLength(Reader reader) throws IOException {
+        var bldr = new StringBuilder(MAX_DIGITS);
+
+        int chr;
+        for (var x = 0; x < MAX_DIGITS; ++x) {
+            if ((chr = reader.read()) < 0) {
+                throw new EOFException("missing '.' in 'length' field");
+            }
+
+            if (chr == '.') {
+                String text = bldr.toString().trim();
+                return (text.isEmpty() ? 0 : Integer.parseInt(text));
+            }
+
+            if (!Character.isDigit(chr)) {
+                throw new IOException("invalid character in 'length' field");
+            }
+
+            bldr.append((char) chr);
+        }
+
+        throw new IOException("too many digits in 'length' field");
+    }
+
+    /**
+     * Reads a string.
+     *
+     * @param reader source from which to read
+     * @param len length of the string (i.e., number of characters to read)
+     * @return the string that was read
+     * @throws IOException if an error occurs
+     */
+    private String readString(Reader reader, int len) throws IOException {
+        var buf = new char[len];
+        IOUtils.readFully(reader, buf);
+
+        return new String(buf);
     }
 }