2 # -------------------------------------------------------------------------
3 # Copyright (c) 2015-2017 AT&T Intellectual Property
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 # -------------------------------------------------------------------------
20 """Music Data Store API"""
25 from oslo_config import cfg
26 from oslo_log import log
28 from conductor.common import rest
29 from conductor.i18n import _LE, _LI # pylint: disable=W0212
31 LOG = log.getLogger(__name__)
38 cfg.StrOpt('server_url',
39 default='http://controller:8080/MUSIC/rest',
40 help='Base URL for Music REST API without a trailing slash.'),
41 cfg.ListOpt('hostnames',
42 deprecated_for_removal=True,
43 deprecated_reason='Use server_url instead',
44 help='List of hostnames (round-robin access)'),
46 deprecated_for_removal=True,
47 deprecated_reason='Use server_url instead',
50 deprecated_for_removal=True,
51 deprecated_reason='Use server_url instead',
53 cfg.FloatOpt('connect_timeout',
55 help='Socket connection timeout'),
56 cfg.FloatOpt('read_timeout',
58 help='Socket read timeout'),
59 cfg.IntOpt('lock_timeout',
62 cfg.IntOpt('replication_factor',
64 help='Replication factor'),
67 help='Log debug messages. Default value is False.'),
73 CONF.register_opts(MUSIC_API_OPTS, group='music_api')
76 class MusicAPI(object):
77 """Wrapper for Music API"""
79 lock_ids = None # Cache of lock ids, indexed by name
80 lock_timeout = None # Maximum time in seconds to acquire a lock
82 rest = None # API Endpoint
83 replication_factor = None # Number of Music nodes to replicate across
89 LOG.info(_LI("Initializing Music API"))
90 server_url = CONF.music_api.server_url.rstrip('/')
92 # host/port/path are deprecated and should not be used anymore.
93 # Defaults removed from oslo_config to give more incentive.
95 # No more round robin either. Just take the first entry.
96 host = next(iter(CONF.music_api.hostnames or []), 'controller')
97 port = CONF.music_api.port or 8080
98 path = CONF.music_api.path or '/MUSIC/rest'
99 server_url = 'http://{}:{}/{}'.format(
100 host, port, path.rstrip('/').lstrip('/'))
103 'server_url': server_url,
104 'log_debug': CONF.music_api.debug,
105 'connect_timeout': CONF.music_api.connect_timeout,
106 'read_timeout': CONF.music_api.read_timeout,
108 self.rest = rest.REST(**kwargs)
112 # TODO(jdandrea): Allow override at creation time.
113 self.lock_timeout = CONF.music_api.lock_timeout
114 self.replication_factor = CONF.music_api.replication_factor
120 if type(self.lock_ids) is dict:
121 for lock_name in self.lock_ids.keys():
122 self.lock_delete(lock_name)
125 def _row_url_path(keyspace, table, pk_name, pk_value):
126 """Returns a Music-compliant row URL path."""
127 path = '/keyspaces/%(keyspace)s/tables/%(table)s/rows' % {
128 'keyspace': keyspace,
132 if pk_name and pk_value:
133 path += '?%s=%s' % (pk_name, pk_value)
137 def _lock_name_generate(keyspace, table, pk_value):
138 """Generate a lock name."""
140 # The Music API dictates that the lock name must be of the form
141 # keyspace.table.primary_key
142 lock_name = '%(keyspace)s.%(table)s.%(primary_key)s' % {
143 'keyspace': keyspace,
145 'primary_key': pk_value,
149 def _lock_id_create(self, lock_name):
150 """Returns the lock id. Use for acquiring and releasing."""
151 path = '/locks/create/%s' % lock_name
152 response = self.rest.request(method='post',
153 content_type='text/plain', path=path)
155 if response and response.ok:
156 lock_id = response.text
159 def _lock_id_acquire(self, lock_id):
160 """Acquire a lock by id. Returns True if successful."""
161 path = '/locks/acquire/%s' % lock_id
162 response = self.rest.request(method='get',
163 content_type='text/plain', path=path)
165 if response and response.ok:
166 status = (response.text.lower() == 'true')
169 def _lock_id_release(self, lock_id):
170 """Release a lock by id. Returns True if successful."""
171 path = '/locks/release/%s' % lock_id
172 response = self.rest.request(method='delete',
173 content_type='text/plain', path=path)
174 return response and response.ok
176 def payload_init(self, keyspace=None, table=None,
177 pk_value=None, atomic=False):
178 """Initialize payload for Music requests.
180 Supports atomic operations.
181 Returns a payload of data and lock_name (if any).
184 lock_name = self.lock_create(keyspace, table, pk_value)
188 lock_id = self.lock_ids.get(lock_name)
191 'type': 'atomic' if atomic else 'eventual',
195 return {'data': data, 'lock_name': lock_name}
197 def payload_delete(self, payload):
198 """Delete payload for Music requests. Cleans up atomic operations."""
200 # Doesn't actually delete the payload.
201 # We just delete the lock inside of it!
202 # This way payload_init/payload_delete is paired up neatly.
203 lock_name = payload.get('lock_name')
205 self.lock_delete(lock_name)
207 def keyspace_create(self, keyspace):
208 """Creates a keyspace."""
209 payload = self.payload_init()
210 data = payload.get('data')
211 data['durabilityOfWrites'] = True
212 data['replicationInfo'] = {
213 'class': 'SimpleStrategy',
214 'replication_factor': self.replication_factor,
217 path = '/keyspaces/%s' % keyspace
218 if CONF.music_api.debug:
219 LOG.debug("Creating keyspace {}".format(keyspace))
220 response = self.rest.request(method='post', path=path, data=data)
221 return response and response.ok
223 def keyspace_delete(self, keyspace):
224 """Drops a keyspace."""
225 payload = self.payload_init()
226 data = payload.get('data')
228 path = '/keyspaces/%s' % keyspace
229 if CONF.music_api.debug:
230 LOG.debug("Deleting keyspace {}".format(keyspace))
231 response = self.rest.request(method='delete', path=path, data=data)
232 return response and response.ok
234 def lock_create(self, keyspace, table, pk_value):
235 """Create and acquire a lock. Returns a lock name."""
237 # Generate the lock name, then create/acquire the lock id.
238 lock_name = self._lock_name_generate(keyspace, table, pk_value)
239 if CONF.music_api.debug:
240 LOG.debug("Creating lock {}".format(lock_name))
241 lock_id = self._lock_id_create(lock_name)
242 time_now = time.time()
243 while not self._lock_id_acquire(lock_id):
244 if time.time() - time_now > self.lock_timeout:
246 _LE('Lock id acquire timeout: %s') % lock_name)
248 # Cache the lock name/id.
249 self.lock_ids[lock_name] = lock_id
252 def lock_release(self, lock_name):
253 """Release lock by name. Returns True if successful"""
255 # No need to delete the lock. lock_create() will not complain
256 # if a lock with the same name is created later.
257 if CONF.music_api.debug:
258 LOG.debug("Releasing lock {}".format(lock_name))
260 return self._lock_id_release(self.lock_ids.get(lock_name))
262 def lock_delete(self, lock_name):
263 """Delete a lock by name. Returns True if successful."""
264 path = '/locks/delete/%s' % lock_name
265 if CONF.music_api.debug:
266 LOG.debug("Deleting lock {}".format(lock_name))
267 response = self.rest.request(content_type='text/plain',
268 method='delete', path=path)
269 if response and response.ok:
270 del self.lock_ids[lock_name]
271 return response and response.ok
273 def row_create(self, keyspace, table, # pylint: disable=R0913
274 pk_name, pk_value, values, atomic=False):
276 payload = self.payload_init(keyspace, table, pk_value, atomic)
277 data = payload.get('data')
278 data['values'] = values
280 path = '/keyspaces/%(keyspace)s/tables/%(table)s/rows' % {
281 'keyspace': keyspace,
284 if CONF.music_api.debug:
285 LOG.debug("Creating row with pk_value {} in table "
286 "{}, keyspace {}".format(pk_value, table, keyspace))
287 response = self.rest.request(method='post', path=path, data=data)
288 self.payload_delete(payload)
289 return response and response.ok
291 def row_update(self, keyspace, table, # pylint: disable=R0913
292 pk_name, pk_value, values, atomic=False):
294 payload = self.payload_init(keyspace, table, pk_value, atomic)
295 data = payload.get('data')
296 data['values'] = values
298 path = self._row_url_path(keyspace, table, pk_name, pk_value)
299 if CONF.music_api.debug:
300 LOG.debug("Updating row with pk_value {} in table "
301 "{}, keyspace {}".format(pk_value, table, keyspace))
302 response = self.rest.request(method='put', path=path, data=data)
303 self.payload_delete(payload)
304 return response and response.ok
306 def row_read(self, keyspace, table, pk_name=None, pk_value=None):
307 """Read one or more rows. Not atomic."""
308 path = self._row_url_path(keyspace, table, pk_name, pk_value)
309 if CONF.music_api.debug:
310 LOG.debug("Reading row with pk_value {} from table "
311 "{}, keyspace {}".format(pk_value, table, keyspace))
312 response = self.rest.request(path=path)
313 return response and response.json()
315 def row_delete(self, keyspace, table, pk_name, pk_value, atomic=False):
317 payload = self.payload_init(keyspace, table, pk_value, atomic)
318 data = payload.get('data')
320 path = self._row_url_path(keyspace, table, pk_name, pk_value)
321 if CONF.music_api.debug:
322 LOG.debug("Deleting row with pk_value {} from table "
323 "{}, keyspace {}".format(pk_value, table, keyspace))
324 response = self.rest.request(method='delete', path=path, data=data)
325 self.payload_delete(payload)
326 return response and response.ok
329 def _table_path_generate(keyspace, table):
330 path = '/keyspaces/%(keyspace)s/tables/%(table)s/' % {
331 'keyspace': keyspace,
336 def table_create(self, keyspace, table, schema):
337 """Creates a table."""
338 payload = self.payload_init()
339 data = payload.get('data')
340 data['fields'] = schema
342 path = self._table_path_generate(keyspace, table)
343 if CONF.music_api.debug:
344 LOG.debug("Creating table {}, keyspace {}".format(table, keyspace))
345 response = self.rest.request(method='post', path=path, data=data)
346 return response and response.ok
348 def table_delete(self, keyspace, table):
349 """Creates a table."""
350 payload = self.payload_init()
351 data = payload.get('data')
353 path = self._table_path_generate(keyspace, table)
354 if CONF.music_api.debug:
355 LOG.debug("Deleting table {}, keyspace {}".format(table, keyspace))
356 response = self.rest.request(method='delete', path=path, data=data)
357 return response and response.ok
360 """Returns version string."""
362 if CONF.music_api.debug:
363 LOG.debug("Requesting version info")
364 response = self.rest.request(method='get',
365 content_type='text/plain', path=path)
366 return response and response.text
369 class MockAPI(object):
370 """Wrapper for Music API"""
372 # Mock state for Music
379 LOG.info(_LI("Initializing Music Mock API"))
383 self.music['keyspaces'] = {}
388 def _keyspaces(self):
389 return self.music.get('keyspaces')
391 def _set_keyspace(self, keyspace):
392 self._keyspaces[keyspace] = {}
394 def _unset_keyspace(self, keyspace):
395 self._keyspaces.pop(keyspace)
397 def _set_table(self, keyspace, table):
398 self._keyspaces[keyspace][table] = {}
400 def _unset_table(self, keyspace, table):
401 self._keyspaces[keyspace].pop(table)
403 def _get_row(self, keyspace, table, key=None):
406 for row_key, row in self._keyspaces[keyspace][table].items():
407 if not key or key == row_key:
409 rows['row {}'.format(row_num)] = copy.deepcopy(row)
412 def _set_row(self, keyspace, table, key, row):
413 self._keyspaces[keyspace][table][key] = row
415 def _unset_row(self, keyspace, table, row):
416 self._keyspaces[keyspace][table].pop(row)
418 def keyspace_create(self, keyspace):
419 """Creates a keyspace."""
420 if CONF.music_api.debug:
421 LOG.debug("Creating keyspace {}".format(keyspace))
422 self._set_keyspace(keyspace)
425 def keyspace_delete(self, keyspace):
426 """Drops a keyspace."""
427 if CONF.music_api.debug:
428 LOG.debug("Deleting keyspace {}".format(keyspace))
429 self._unset_keyspace(keyspace)
432 def row_create(self, keyspace, table, # pylint: disable=R0913
433 pk_name, pk_value, values, atomic=False):
435 if CONF.music_api.debug:
436 LOG.debug("Creating row with pk_value {} in table "
437 "{}, keyspace {}".format(pk_value, table, keyspace))
438 self._set_row(keyspace, table, pk_value, values)
441 def row_update(self, keyspace, table, # pylint: disable=R0913
442 pk_name, pk_value, values, atomic=False):
444 if CONF.music_api.debug:
445 LOG.debug("Updating row with pk_value {} in table "
446 "{}, keyspace {}".format(pk_value, table, keyspace))
447 self._set_row(keyspace, table, pk_value, values)
450 def row_read(self, keyspace, table, pk_name=None, pk_value=None):
451 """Read one or more rows. Not atomic."""
452 if CONF.music_api.debug:
453 LOG.debug("Reading row with pk_value {} from table "
454 "{}, keyspace {}".format(pk_value, table, keyspace))
455 values = self._get_row(keyspace, table, pk_value)
458 def row_delete(self, keyspace, table, pk_name, pk_value, atomic=False):
460 if CONF.music_api.debug:
461 LOG.debug("Deleting row with pk_value {} from table "
462 "{}, keyspace {}".format(pk_value, table, keyspace))
463 self._unset_row(keyspace, table, pk_value)
466 def table_create(self, keyspace, table, schema):
467 """Creates a table."""
468 if CONF.music_api.debug:
469 LOG.debug("Creating table {}, keyspace {}".format(table, keyspace))
470 self._set_table(keyspace, table)
473 def table_delete(self, keyspace, table):
474 """Creates a table."""
475 if CONF.music_api.debug:
476 LOG.debug("Deleting table {}, keyspace {}".format(table, keyspace))
477 self._unset_table(keyspace, table)
481 """Returns version string."""
482 if CONF.music_api.debug:
483 LOG.debug("Requesting version info")
488 """Wrapper for Music and Music Mock API"""
490 # FIXME(jdandrea): Follow more formal practices for defining/using mocks
491 if CONF.music_api.mock: