p.write(CONTENT)
assert SHA512 == utils.cal_file_hash("", str(p), 'SHA512')
assert SHA256 == utils.cal_file_hash(p.dirname, p.basename, 'sha256')
+
+def test_cal_file_hash_remote(mocker):
+ class FakeRequest(object):
+ def __init__(self, *args):
+ self.status_code = 200
+ self.content = CONTENT
+ mocker.patch('requests.get', new=FakeRequest)
+ assert SHA256 == utils.cal_file_hash("", "http://fake", 'sha256')
if desc['Algorithm'] not in SUPPORTED_HASH_ALGO:
raise ManifestException("Unsupported hash algorithm: %s" % desc['Algorithm'])
# validate file digest hash
- # TODO need to support remote file
- if "://" not in desc['Source']:
- hash = utils.cal_file_hash(self.root, desc['Source'], desc['Algorithm'])
- if hash != desc['Hash']:
- raise ManifestException("Mismatched hash for file %s" % desc['Source'])
+ hash = utils.cal_file_hash(self.root, desc['Source'], desc['Algorithm'])
+ if hash != desc['Hash']:
+ raise ManifestException("Mismatched hash for file %s" % desc['Source'])
# nothing is wrong, let's store this
self.digests[desc['Source']] = (desc['Algorithm'], desc['Hash'])
elif key:
#
import hashlib
+from io import BytesIO
import os
+import urlparse
+
+import requests
+
def _hash_value_for_file(f, hash_function, block_size=2**20):
while True:
def cal_file_hash(root, path, algo):
- with open(os.path.join(root, path), 'rb') as fp:
- h = hashlib.new(algo)
+ h = hashlib.new(algo)
+ if urlparse.urlparse(path).scheme:
+ r = requests.get(path)
+ if r.status_code != 200:
+ raise ValueError('Server at {0} returned a {1} status code'
+ .format(path, r.status_code))
+ fp = BytesIO(r.content)
return _hash_value_for_file(fp, h)
-
+ else:
+ with open(os.path.join(root, path), 'rb') as fp:
+ return _hash_value_for_file(fp, h)