Add Initial Code Import
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / nsa / cambria / resources / streamReaders / CambriaJsonStreamReader.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.cambria.resources.streamReaders;
23
24 import java.io.InputStream;
25
26 import javax.servlet.http.HttpServletResponse;
27
28 import org.json.JSONException;
29 import org.json.JSONObject;
30 import org.json.JSONTokener;
31
32 import com.att.nsa.cambria.CambriaApiException;
33 import com.att.nsa.cambria.backends.Publisher.message;
34 import com.att.nsa.cambria.beans.LogDetails;
35 import com.att.nsa.cambria.resources.CambriaEventSet.reader;
36
37 /**
38  * 
39  * @author author
40  *
41  */
42 public class CambriaJsonStreamReader implements reader {
43         private final JSONTokener fTokens;
44         private final boolean fIsList;
45         private long fCount;
46         private final String fDefPart;
47         public static final String kKeyField = "cambria.partition";
48
49         /**
50          * 
51          * @param is
52          * @param defPart
53          * @throws CambriaApiException
54          */
55         public CambriaJsonStreamReader(InputStream is, String defPart) throws CambriaApiException {
56                 try {
57                         fTokens = new JSONTokener(is);
58                         fCount = 0;
59                         fDefPart = defPart;
60
61                         final int c = fTokens.next();
62                         if (c == '[') {
63                                 fIsList = true;
64                         } else if (c == '{') {
65                                 fTokens.back();
66                                 fIsList = false;
67                         } else {
68                                 throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expecting an array or an object.");
69                         }
70                 } catch (JSONException e) {
71                         throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, e.getMessage());
72                 }
73         }
74
75         @Override
76         public message next() throws CambriaApiException {
77                 try {
78                         if (!fTokens.more()) {
79                                 return null;
80                         }
81
82                         final int c = fTokens.next();
83                         
84                         /*if (c ==','){
85                                 fCloseCount++;
86                                 System.out.println("fCloseCount=" + fCloseCount +" fCount "+fCount);
87                         }*/
88                         if (fIsList) {
89                                 if (c == ']' || (fCount > 0 && c == 10))
90                                         return null;
91
92
93                                 if (fCount > 0 && c != ',' && c!= 10) {
94                                         throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
95                                                         "Expected ',' or closing ']' after last object.");
96                                 }
97
98                                 if (fCount == 0 && c != '{' && c!= 10  && c!=32) {
99                                         throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected { to start an object.");
100                                 }
101                         } else if (fCount != 0 || c != '{') {
102                                 throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected '{' to start an object.");
103                         }
104
105                         if (c == '{') {
106                                 fTokens.back();
107                         }
108                         final JSONObject o = new JSONObject(fTokens);
109                         fCount++;
110                         return new msg(o);
111                 } catch (JSONException e) {
112                         throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, e.getMessage());
113
114                 }
115         }
116
117         private class msg implements message {
118                 private final String fKey;
119                 private  String fMsg;
120                 private LogDetails logDetails;
121                 private boolean transactionEnabled;
122
123                 /**
124                  * constructor
125                  * 
126                  * @param o
127                  */
128                 //public msg(JSONObject o){}
129                 
130                 
131                 public msg(JSONObject o) {
132                         String key = o.optString(kKeyField, fDefPart);
133                         if (key == null) {
134                                 key = "" + System.currentTimeMillis();
135                         }
136                         fKey = key;
137                                         
138                                 fMsg = o.toString().trim();
139                         
140                 }
141
142                 @Override
143                 public String getKey() {
144                         return fKey;
145                 }
146
147                 @Override
148                 public String getMessage() {
149                         return fMsg;
150                 }
151
152                 @Override
153                 public boolean isTransactionEnabled() {
154                         return transactionEnabled;
155                 }
156
157                 @Override
158                 public void setTransactionEnabled(boolean transactionEnabled) {
159                         this.transactionEnabled = transactionEnabled;
160                 }
161
162                 @Override
163                 public void setLogDetails(LogDetails logDetails) {
164                         this.logDetails = logDetails;
165                 }
166
167                 @Override
168                 public LogDetails getLogDetails() {
169                         return logDetails;
170                 }
171         }
172 }