2 # -------------------------------------------------------------------------
3 # Copyright (c) 2015-2017 AT&T Intellectual Property
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 # -------------------------------------------------------------------------
25 from conductor.common.music.model import base
28 def current_time_millis():
29 """Current time in milliseconds."""
30 return int(round(time.time() * 1000))
33 class Message(base.Base):
36 DO NOT use this class directly! With messaging, the table
37 name must be the message topic, thus this class has a
38 __tablename__ and __keyspace__ of None.
40 Only create Message-based classes using:
41 base.create_dynamic_model(keyspace=KEYSPACE,
42 baseclass=Message, classname=TOPIC_NAME).
43 The table will be automatically created if it doesn't exist.
49 id = None # pylint: disable=C0103
63 ACTIONS = [CALL, CAST, ]
67 COMPLETED = "completed"
69 STATUS = [ENQUEUED, COMPLETED, ERROR, ]
70 FINISHED = [COMPLETED, ERROR, ]
76 'id': 'text', # Message ID in UUID4 format
77 'action': 'text', # Message type (call, cast)
78 'created': 'bigint', # Creation time in msec from epoch
79 'updated': 'bigint', # Last update time in msec from epoch
80 'ctxt': 'text', # JSON request context dictionary
81 'method': 'text', # RPC method name
82 'args': 'text', # JSON argument dictionary
83 'status': 'text', # Status (enqueued, complete, error)
84 'response': 'text', # Response JSON
85 'failure': 'text', # Failure JSON (used for exceptions)
86 'PRIMARY KEY': '(id)',
92 """Use atomic operations"""
93 return False # FIXME: this should be True for atomic operations
97 """Primary key name"""
101 """Primary key value"""
106 return self.status == self.ENQUEUED
110 return self.status in self.FINISHED
114 return self.status == self.COMPLETED
119 Side-effect: Sets the updated field to the current time.
121 self.updated = current_time_millis()
122 super(Message, self).update()
127 'action': self.action,
128 'created': self.created,
129 'updated': self.updated,
130 'ctxt': json.dumps(self.ctxt),
131 'method': self.method,
132 'args': json.dumps(self.args),
133 'status': self.status,
134 'response': json.dumps(self.response),
135 'failure': self.failure, # already serialized by oslo_messaging
138 def __init__(self, action, ctxt, method, args,
139 created=None, updated=None, status=None,
140 response=None, failure=None, _insert=True):
142 super(Message, self).__init__()
144 self.created = created or current_time_millis()
145 self.updated = updated or current_time_millis()
147 self.status = status or self.ENQUEUED
149 self.ctxt = ctxt or {}
150 self.args = args or {}
151 self.response = response or {}
152 self.failure = failure or ""
155 self.ctxt = json.loads(ctxt)
156 self.args = json.loads(args)
157 self.response = json.loads(response)
158 self.failure = failure # oslo_messaging will deserialize this
161 """Object representation"""
162 return '<Message Topic %r>' % self.__tablename__
165 """JSON representation"""
167 json_['id'] = self.id
168 json_['action'] = self.action
169 # TODO(jdandrea): Format timestamps as ISO
170 json_['created'] = self.created
171 json_['updated'] = self.updated
172 json_['ctxt'] = self.ctxt
173 json_['method'] = self.method
174 json_['args'] = self.args
175 json_['status'] = self.status
176 json_['response'] = self.response
177 json_['failure'] = self.failure