Added all common modules in conductor directory
[optf/has.git] / conductor / conductor / common / music / api.py
1 #
2 # -------------------------------------------------------------------------
3 #   Copyright (c) 2015-2017 AT&T Intellectual Property
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 #
17 # -------------------------------------------------------------------------
18 #
19
20 """Music Data Store API"""
21
22 import copy
23 import time
24
25 from oslo_config import cfg
26 from oslo_log import log
27
28 from conductor.common import rest
29 from conductor.i18n import _LE, _LI  # pylint: disable=W0212
30
31 LOG = log.getLogger(__name__)
32
33 CONF = cfg.CONF
34
35 global MUSIC_API
36
37 MUSIC_API_OPTS = [
38     cfg.StrOpt('server_url',
39                default='http://controller:8080/MUSIC/rest',
40                help='Base URL for Music REST API without a trailing slash.'),
41     cfg.ListOpt('hostnames',
42                 deprecated_for_removal=True,
43                 deprecated_reason='Use server_url instead',
44                 help='List of hostnames (round-robin access)'),
45     cfg.IntOpt('port',
46                deprecated_for_removal=True,
47                deprecated_reason='Use server_url instead',
48                help='Port'),
49     cfg.StrOpt('path',
50                deprecated_for_removal=True,
51                deprecated_reason='Use server_url instead',
52                help='Path'),
53     cfg.FloatOpt('connect_timeout',
54                  default=3.05,
55                  help='Socket connection timeout'),
56     cfg.FloatOpt('read_timeout',
57                  default=12.05,
58                  help='Socket read timeout'),
59     cfg.IntOpt('lock_timeout',
60                default=10,
61                help='Lock timeout'),
62     cfg.IntOpt('replication_factor',
63                default=1,
64                help='Replication factor'),
65     cfg.BoolOpt('debug',
66                 default=False,
67                 help='Log debug messages. Default value is False.'),
68     cfg.BoolOpt('mock',
69                 default=False,
70                 help='Use mock API'),
71 ]
72
73 CONF.register_opts(MUSIC_API_OPTS, group='music_api')
74
75
76 class MusicAPI(object):
77     """Wrapper for Music API"""
78
79     lock_ids = None  # Cache of lock ids, indexed by name
80     lock_timeout = None  # Maximum time in seconds to acquire a lock
81
82     rest = None  # API Endpoint
83     replication_factor = None  # Number of Music nodes to replicate across
84
85     def __init__(self):
86         """Initializer."""
87         global MUSIC_API
88
89         LOG.info(_LI("Initializing Music API"))
90         server_url = CONF.music_api.server_url.rstrip('/')
91         if not server_url:
92             # host/port/path are deprecated and should not be used anymore.
93             # Defaults removed from oslo_config to give more incentive.
94
95             # No more round robin either. Just take the first entry.
96             host = next(iter(CONF.music_api.hostnames or []), 'controller')
97             port = CONF.music_api.port or 8080
98             path = CONF.music_api.path or '/MUSIC/rest'
99             server_url = 'http://{}:{}/{}'.format(
100                 host, port, path.rstrip('/').lstrip('/'))
101
102         kwargs = {
103             'server_url': server_url,
104             'log_debug': CONF.music_api.debug,
105             'connect_timeout': CONF.music_api.connect_timeout,
106             'read_timeout': CONF.music_api.read_timeout,
107         }
108         self.rest = rest.REST(**kwargs)
109
110         self.lock_ids = {}
111
112         # TODO(jdandrea): Allow override at creation time.
113         self.lock_timeout = CONF.music_api.lock_timeout
114         self.replication_factor = CONF.music_api.replication_factor
115
116         MUSIC_API = self
117
118     def __del__(self):
119         """Deletion."""
120         if type(self.lock_ids) is dict:
121             for lock_name in self.lock_ids.keys():
122                 self.lock_delete(lock_name)
123
124     @staticmethod
125     def _row_url_path(keyspace, table, pk_name, pk_value):
126         """Returns a Music-compliant row URL path."""
127         path = '/keyspaces/%(keyspace)s/tables/%(table)s/rows' % {
128             'keyspace': keyspace,
129             'table': table,
130         }
131
132         if pk_name and pk_value:
133             path += '?%s=%s' % (pk_name, pk_value)
134         return path
135
136     @staticmethod
137     def _lock_name_generate(keyspace, table, pk_value):
138         """Generate a lock name."""
139
140         # The Music API dictates that the lock name must be of the form
141         # keyspace.table.primary_key
142         lock_name = '%(keyspace)s.%(table)s.%(primary_key)s' % {
143             'keyspace': keyspace,
144             'table': table,
145             'primary_key': pk_value,
146         }
147         return lock_name
148
149     def _lock_id_create(self, lock_name):
150         """Returns the lock id. Use for acquiring and releasing."""
151         path = '/locks/create/%s' % lock_name
152         response = self.rest.request(method='post',
153                                      content_type='text/plain', path=path)
154         lock_id = None
155         if response and response.ok:
156             lock_id = response.text
157         return lock_id
158
159     def _lock_id_acquire(self, lock_id):
160         """Acquire a lock by id. Returns True if successful."""
161         path = '/locks/acquire/%s' % lock_id
162         response = self.rest.request(method='get',
163                                      content_type='text/plain', path=path)
164         status = False
165         if response and response.ok:
166             status = (response.text.lower() == 'true')
167         return status
168
169     def _lock_id_release(self, lock_id):
170         """Release a lock by id. Returns True if successful."""
171         path = '/locks/release/%s' % lock_id
172         response = self.rest.request(method='delete',
173                                      content_type='text/plain', path=path)
174         return response and response.ok
175
176     def payload_init(self, keyspace=None, table=None,
177                      pk_value=None, atomic=False):
178         """Initialize payload for Music requests.
179
180         Supports atomic operations.
181         Returns a payload of data and lock_name (if any).
182         """
183         if atomic:
184             lock_name = self.lock_create(keyspace, table, pk_value)
185         else:
186             lock_name = None
187
188         lock_id = self.lock_ids.get(lock_name)
189         data = {
190             'consistencyInfo': {
191                 'type': 'atomic' if atomic else 'eventual',
192                 'lockId': lock_id,
193             }
194         }
195         return {'data': data, 'lock_name': lock_name}
196
197     def payload_delete(self, payload):
198         """Delete payload for Music requests. Cleans up atomic operations."""
199
200         # Doesn't actually delete the payload.
201         # We just delete the lock inside of it!
202         # This way payload_init/payload_delete is paired up neatly.
203         lock_name = payload.get('lock_name')
204         if lock_name:
205             self.lock_delete(lock_name)
206
207     def keyspace_create(self, keyspace):
208         """Creates a keyspace."""
209         payload = self.payload_init()
210         data = payload.get('data')
211         data['durabilityOfWrites'] = True
212         data['replicationInfo'] = {
213             'class': 'SimpleStrategy',
214             'replication_factor': self.replication_factor,
215         }
216
217         path = '/keyspaces/%s' % keyspace
218         if CONF.music_api.debug:
219             LOG.debug("Creating keyspace {}".format(keyspace))
220         response = self.rest.request(method='post', path=path, data=data)
221         return response and response.ok
222
223     def keyspace_delete(self, keyspace):
224         """Drops a keyspace."""
225         payload = self.payload_init()
226         data = payload.get('data')
227
228         path = '/keyspaces/%s' % keyspace
229         if CONF.music_api.debug:
230             LOG.debug("Deleting keyspace {}".format(keyspace))
231         response = self.rest.request(method='delete', path=path, data=data)
232         return response and response.ok
233
234     def lock_create(self, keyspace, table, pk_value):
235         """Create and acquire a lock. Returns a lock name."""
236
237         # Generate the lock name, then create/acquire the lock id.
238         lock_name = self._lock_name_generate(keyspace, table, pk_value)
239         if CONF.music_api.debug:
240             LOG.debug("Creating lock {}".format(lock_name))
241         lock_id = self._lock_id_create(lock_name)
242         time_now = time.time()
243         while not self._lock_id_acquire(lock_id):
244             if time.time() - time_now > self.lock_timeout:
245                 raise IndexError(
246                     _LE('Lock id acquire timeout: %s') % lock_name)
247
248         # Cache the lock name/id.
249         self.lock_ids[lock_name] = lock_id
250         return lock_name
251
252     def lock_release(self, lock_name):
253         """Release lock by name. Returns True if successful"""
254
255         # No need to delete the lock. lock_create() will not complain
256         # if a lock with the same name is created later.
257         if CONF.music_api.debug:
258             LOG.debug("Releasing lock {}".format(lock_name))
259         if lock_name:
260             return self._lock_id_release(self.lock_ids.get(lock_name))
261
262     def lock_delete(self, lock_name):
263         """Delete a lock by name. Returns True if successful."""
264         path = '/locks/delete/%s' % lock_name
265         if CONF.music_api.debug:
266             LOG.debug("Deleting lock {}".format(lock_name))
267         response = self.rest.request(content_type='text/plain',
268                                      method='delete', path=path)
269         if response and response.ok:
270             del self.lock_ids[lock_name]
271         return response and response.ok
272
273     def row_create(self, keyspace, table,  # pylint: disable=R0913
274                    pk_name, pk_value, values, atomic=False):
275         """Create a row."""
276         payload = self.payload_init(keyspace, table, pk_value, atomic)
277         data = payload.get('data')
278         data['values'] = values
279
280         path = '/keyspaces/%(keyspace)s/tables/%(table)s/rows' % {
281             'keyspace': keyspace,
282             'table': table,
283         }
284         if CONF.music_api.debug:
285             LOG.debug("Creating row with pk_value {} in table "
286                       "{}, keyspace {}".format(pk_value, table, keyspace))
287         response = self.rest.request(method='post', path=path, data=data)
288         self.payload_delete(payload)
289         return response and response.ok
290
291     def row_update(self, keyspace, table,  # pylint: disable=R0913
292                    pk_name, pk_value, values, atomic=False):
293         """Update a row."""
294         payload = self.payload_init(keyspace, table, pk_value, atomic)
295         data = payload.get('data')
296         data['values'] = values
297
298         path = self._row_url_path(keyspace, table, pk_name, pk_value)
299         if CONF.music_api.debug:
300             LOG.debug("Updating row with pk_value {} in table "
301                       "{}, keyspace {}".format(pk_value, table, keyspace))
302         response = self.rest.request(method='put', path=path, data=data)
303         self.payload_delete(payload)
304         return response and response.ok
305
306     def row_read(self, keyspace, table, pk_name=None, pk_value=None):
307         """Read one or more rows. Not atomic."""
308         path = self._row_url_path(keyspace, table, pk_name, pk_value)
309         if CONF.music_api.debug:
310             LOG.debug("Reading row with pk_value {} from table "
311                       "{}, keyspace {}".format(pk_value, table, keyspace))
312         response = self.rest.request(path=path)
313         return response and response.json()
314
315     def row_delete(self, keyspace, table, pk_name, pk_value, atomic=False):
316         """Delete a row."""
317         payload = self.payload_init(keyspace, table, pk_value, atomic)
318         data = payload.get('data')
319
320         path = self._row_url_path(keyspace, table, pk_name, pk_value)
321         if CONF.music_api.debug:
322             LOG.debug("Deleting row with pk_value {} from table "
323                       "{}, keyspace {}".format(pk_value, table, keyspace))
324         response = self.rest.request(method='delete', path=path, data=data)
325         self.payload_delete(payload)
326         return response and response.ok
327
328     @staticmethod
329     def _table_path_generate(keyspace, table):
330         path = '/keyspaces/%(keyspace)s/tables/%(table)s/' % {
331             'keyspace': keyspace,
332             'table': table,
333         }
334         return path
335
336     def table_create(self, keyspace, table, schema):
337         """Creates a table."""
338         payload = self.payload_init()
339         data = payload.get('data')
340         data['fields'] = schema
341
342         path = self._table_path_generate(keyspace, table)
343         if CONF.music_api.debug:
344             LOG.debug("Creating table {}, keyspace {}".format(table, keyspace))
345         response = self.rest.request(method='post', path=path, data=data)
346         return response and response.ok
347
348     def table_delete(self, keyspace, table):
349         """Creates a table."""
350         payload = self.payload_init()
351         data = payload.get('data')
352
353         path = self._table_path_generate(keyspace, table)
354         if CONF.music_api.debug:
355             LOG.debug("Deleting table {}, keyspace {}".format(table, keyspace))
356         response = self.rest.request(method='delete', path=path, data=data)
357         return response and response.ok
358
359     def version(self):
360         """Returns version string."""
361         path = '/version'
362         if CONF.music_api.debug:
363             LOG.debug("Requesting version info")
364         response = self.rest.request(method='get',
365                                      content_type='text/plain', path=path)
366         return response and response.text
367
368
369 class MockAPI(object):
370     """Wrapper for Music API"""
371
372     # Mock state for Music
373     music = {
374         'keyspaces': {}
375     }
376
377     def __init__(self):
378         """Initializer."""
379         LOG.info(_LI("Initializing Music Mock API"))
380
381         global MUSIC_API
382
383         self.music['keyspaces'] = {}
384
385         MUSIC_API = self
386
387     @property
388     def _keyspaces(self):
389         return self.music.get('keyspaces')
390
391     def _set_keyspace(self, keyspace):
392         self._keyspaces[keyspace] = {}
393
394     def _unset_keyspace(self, keyspace):
395         self._keyspaces.pop(keyspace)
396
397     def _set_table(self, keyspace, table):
398         self._keyspaces[keyspace][table] = {}
399
400     def _unset_table(self, keyspace, table):
401         self._keyspaces[keyspace].pop(table)
402
403     def _get_row(self, keyspace, table, key=None):
404         rows = {}
405         row_num = 0
406         for row_key, row in self._keyspaces[keyspace][table].items():
407             if not key or key == row_key:
408                 row_num += 1
409                 rows['row {}'.format(row_num)] = copy.deepcopy(row)
410         return rows
411
412     def _set_row(self, keyspace, table, key, row):
413         self._keyspaces[keyspace][table][key] = row
414
415     def _unset_row(self, keyspace, table, row):
416         self._keyspaces[keyspace][table].pop(row)
417
418     def keyspace_create(self, keyspace):
419         """Creates a keyspace."""
420         if CONF.music_api.debug:
421             LOG.debug("Creating keyspace {}".format(keyspace))
422         self._set_keyspace(keyspace)
423         return True
424
425     def keyspace_delete(self, keyspace):
426         """Drops a keyspace."""
427         if CONF.music_api.debug:
428             LOG.debug("Deleting keyspace {}".format(keyspace))
429         self._unset_keyspace(keyspace)
430         return True
431
432     def row_create(self, keyspace, table,  # pylint: disable=R0913
433                    pk_name, pk_value, values, atomic=False):
434         """Create a row."""
435         if CONF.music_api.debug:
436             LOG.debug("Creating row with pk_value {} in table "
437                       "{}, keyspace {}".format(pk_value, table, keyspace))
438         self._set_row(keyspace, table, pk_value, values)
439         return True
440
441     def row_update(self, keyspace, table,  # pylint: disable=R0913
442                    pk_name, pk_value, values, atomic=False):
443         """Update a row."""
444         if CONF.music_api.debug:
445             LOG.debug("Updating row with pk_value {} in table "
446                       "{}, keyspace {}".format(pk_value, table, keyspace))
447         self._set_row(keyspace, table, pk_value, values)
448         return True
449
450     def row_read(self, keyspace, table, pk_name=None, pk_value=None):
451         """Read one or more rows. Not atomic."""
452         if CONF.music_api.debug:
453             LOG.debug("Reading row with pk_value {} from table "
454                       "{}, keyspace {}".format(pk_value, table, keyspace))
455         values = self._get_row(keyspace, table, pk_value)
456         return values
457
458     def row_delete(self, keyspace, table, pk_name, pk_value, atomic=False):
459         """Delete a row."""
460         if CONF.music_api.debug:
461             LOG.debug("Deleting row with pk_value {} from table "
462                       "{}, keyspace {}".format(pk_value, table, keyspace))
463         self._unset_row(keyspace, table, pk_value)
464         return True
465
466     def table_create(self, keyspace, table, schema):
467         """Creates a table."""
468         if CONF.music_api.debug:
469             LOG.debug("Creating table {}, keyspace {}".format(table, keyspace))
470         self._set_table(keyspace, table)
471         return True
472
473     def table_delete(self, keyspace, table):
474         """Creates a table."""
475         if CONF.music_api.debug:
476             LOG.debug("Deleting table {}, keyspace {}".format(table, keyspace))
477         self._unset_table(keyspace, table)
478         return True
479
480     def version(self):
481         """Returns version string."""
482         if CONF.music_api.debug:
483             LOG.debug("Requesting version info")
484         return "v1-mock"
485
486
487 def API():
488     """Wrapper for Music and Music Mock API"""
489
490     # FIXME(jdandrea): Follow more formal practices for defining/using mocks
491     if CONF.music_api.mock:
492         return MockAPI()
493     return MusicAPI()