Added all common modules in conductor directory
[optf/has.git] / conductor / conductor / common / music / messaging / message.py
1 #
2 # -------------------------------------------------------------------------
3 #   Copyright (c) 2015-2017 AT&T Intellectual Property
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 #
17 # -------------------------------------------------------------------------
18 #
19
20 """Message Model"""
21
22 import json
23 import time
24
25 from conductor.common.music.model import base
26
27
28 def current_time_millis():
29     """Current time in milliseconds."""
30     return int(round(time.time() * 1000))
31
32
33 class Message(base.Base):
34     """Message model.
35
36     DO NOT use this class directly! With messaging, the table
37     name must be the message topic, thus this class has a
38     __tablename__ and __keyspace__ of None.
39
40     Only create Message-based classes using:
41     base.create_dynamic_model(keyspace=KEYSPACE,
42         baseclass=Message, classname=TOPIC_NAME).
43     The table will be automatically created if it doesn't exist.
44     """
45
46     __tablename__ = None
47     __keyspace__ = None
48
49     id = None  # pylint: disable=C0103
50     action = None
51     created = None
52     updated = None
53     ctxt = None
54     method = None
55     args = None
56     status = None
57     response = None
58     failure = None
59
60     # Actions
61     CALL = "call"
62     CAST = "cast"
63     ACTIONS = [CALL, CAST, ]
64
65     # Status
66     ENQUEUED = "enqueued"
67     COMPLETED = "completed"
68     ERROR = "error"
69     STATUS = [ENQUEUED, COMPLETED, ERROR, ]
70     FINISHED = [COMPLETED, ERROR, ]
71
72     @classmethod
73     def schema(cls):
74         """Return schema."""
75         schema = {
76             'id': 'text',  # Message ID in UUID4 format
77             'action': 'text',  # Message type (call, cast)
78             'created': 'bigint',  # Creation time in msec from epoch
79             'updated': 'bigint',  # Last update time in msec from epoch
80             'ctxt': 'text',  # JSON request context dictionary
81             'method': 'text',  # RPC method name
82             'args': 'text',  # JSON argument dictionary
83             'status': 'text',  # Status (enqueued, complete, error)
84             'response': 'text',  # Response JSON
85             'failure': 'text',  # Failure JSON (used for exceptions)
86             'PRIMARY KEY': '(id)',
87         }
88         return schema
89
90     @classmethod
91     def atomic(cls):
92         """Use atomic operations"""
93         return False  # FIXME: this should be True for atomic operations
94
95     @classmethod
96     def pk_name(cls):
97         """Primary key name"""
98         return 'id'
99
100     def pk_value(self):
101         """Primary key value"""
102         return self.id
103
104     @property
105     def enqueued(self):
106         return self.status == self.ENQUEUED
107
108     @property
109     def finished(self):
110         return self.status in self.FINISHED
111
112     @property
113     def ok(self):
114         return self.status == self.COMPLETED
115
116     def update(self):
117         """Update message
118
119         Side-effect: Sets the updated field to the current time.
120         """
121         self.updated = current_time_millis()
122         super(Message, self).update()
123
124     def values(self):
125         """Values"""
126         return {
127             'action': self.action,
128             'created': self.created,
129             'updated': self.updated,
130             'ctxt': json.dumps(self.ctxt),
131             'method': self.method,
132             'args': json.dumps(self.args),
133             'status': self.status,
134             'response': json.dumps(self.response),
135             'failure': self.failure,  # already serialized by oslo_messaging
136         }
137
138     def __init__(self, action, ctxt, method, args,
139                  created=None, updated=None, status=None,
140                  response=None, failure=None, _insert=True):
141         """Initializer"""
142         super(Message, self).__init__()
143         self.action = action
144         self.created = created or current_time_millis()
145         self.updated = updated or current_time_millis()
146         self.method = method
147         self.status = status or self.ENQUEUED
148         if _insert:
149             self.ctxt = ctxt or {}
150             self.args = args or {}
151             self.response = response or {}
152             self.failure = failure or ""
153             self.insert()
154         else:
155             self.ctxt = json.loads(ctxt)
156             self.args = json.loads(args)
157             self.response = json.loads(response)
158             self.failure = failure  # oslo_messaging will deserialize this
159
160     def __repr__(self):
161         """Object representation"""
162         return '<Message Topic %r>' % self.__tablename__
163
164     def __json__(self):
165         """JSON representation"""
166         json_ = {}
167         json_['id'] = self.id
168         json_['action'] = self.action
169         # TODO(jdandrea): Format timestamps as ISO
170         json_['created'] = self.created
171         json_['updated'] = self.updated
172         json_['ctxt'] = self.ctxt
173         json_['method'] = self.method
174         json_['args'] = self.args
175         json_['status'] = self.status
176         json_['response'] = self.response
177         json_['failure'] = self.failure
178         return json_