1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.resources.streamReaders;
24 import org.json.JSONException;
25 import org.json.JSONObject;
26 import org.json.JSONTokener;
27 import org.onap.dmaap.dmf.mr.CambriaApiException;
28 import org.onap.dmaap.dmf.mr.backends.Publisher.message;
29 import org.onap.dmaap.dmf.mr.beans.LogDetails;
30 import org.onap.dmaap.dmf.mr.resources.CambriaEventSet.reader;
32 import javax.servlet.http.HttpServletResponse;
33 import java.io.InputStream;
37 * @author anowarul.islam
40 public class CambriaJsonStreamReader implements reader {
41 private final JSONTokener fTokens;
42 private final boolean fIsList;
44 private final String fDefPart;
45 public static final String kKeyField = "cambria.partition";
51 * @throws CambriaApiException
53 public CambriaJsonStreamReader(InputStream is, String defPart) throws CambriaApiException {
55 fTokens = new JSONTokener(is);
59 final int c = fTokens.next();
62 } else if (c == '{') {
66 throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expecting an array or an object.");
68 } catch (JSONException e) {
69 throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, e.getMessage());
74 public message next() throws CambriaApiException {
76 if (!fTokens.more()) {
80 final int c = fTokens.next();
84 if (c == ']' || (fCount > 0 && c == 10))
88 if (fCount > 0 && c != ',' && c!= 10) {
89 throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
90 "Expected ',' or closing ']' after last object.");
93 if (fCount == 0 && c != '{' && c!= 10 && c!=32) {
94 throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected { to start an object.");
96 } else if (fCount != 0 || c != '{') {
97 throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected '{' to start an object.");
103 final JSONObject o = new JSONObject(fTokens);
106 } catch (JSONException e) {
107 throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, e.getMessage());
112 private class msg implements message {
113 private final String fKey;
115 private LogDetails logDetails;
116 private boolean transactionEnabled;
126 public msg(JSONObject o) {
127 String key = o.optString(kKeyField, fDefPart);
129 key = "" + System.currentTimeMillis();
133 fMsg = o.toString().trim();
138 public String getKey() {
143 public String getMessage() {
148 public boolean isTransactionEnabled() {
149 return transactionEnabled;
153 public void setTransactionEnabled(boolean transactionEnabled) {
154 this.transactionEnabled = transactionEnabled;
158 public void setLogDetails(LogDetails logDetails) {
159 this.logDetails = logDetails;
163 public LogDetails getLogDetails() {