jpayne@68: # Copyright (c) 2018 The Pooch Developers. jpayne@68: # Distributed under the terms of the BSD 3-Clause License. jpayne@68: # SPDX-License-Identifier: BSD-3-Clause jpayne@68: # jpayne@68: # This code is part of the Fatiando a Terra project (https://www.fatiando.org) jpayne@68: # jpayne@68: """ jpayne@68: Calculating and checking file hashes. jpayne@68: """ jpayne@68: import hashlib jpayne@68: import functools jpayne@68: from pathlib import Path jpayne@68: jpayne@68: # From the docs: https://docs.python.org/3/library/hashlib.html#hashlib.new jpayne@68: # The named constructors are much faster than new() and should be jpayne@68: # preferred. jpayne@68: # Need to fallback on new() for some algorithms. jpayne@68: ALGORITHMS_AVAILABLE = { jpayne@68: alg: getattr(hashlib, alg, functools.partial(hashlib.new, alg)) jpayne@68: for alg in hashlib.algorithms_available jpayne@68: } jpayne@68: jpayne@68: try: jpayne@68: import xxhash jpayne@68: jpayne@68: # xxhash doesn't have a list of available algorithms yet. jpayne@68: # https://github.com/ifduyue/python-xxhash/issues/48 jpayne@68: ALGORITHMS_AVAILABLE.update( jpayne@68: { jpayne@68: alg: getattr(xxhash, alg, None) jpayne@68: for alg in ["xxh128", "xxh64", "xxh32", "xxh3_128", "xxh3_64"] jpayne@68: } jpayne@68: ) jpayne@68: # The xxh3 algorithms are only available for version>=2.0. Set to None and jpayne@68: # remove to ensure backwards compatibility. jpayne@68: ALGORITHMS_AVAILABLE = { jpayne@68: alg: func for alg, func in ALGORITHMS_AVAILABLE.items() if func is not None jpayne@68: } jpayne@68: except ImportError: jpayne@68: pass jpayne@68: jpayne@68: jpayne@68: def file_hash(fname, alg="sha256"): jpayne@68: """ jpayne@68: Calculate the hash of a given file. jpayne@68: jpayne@68: Useful for checking if a file has changed or been corrupted. jpayne@68: jpayne@68: Parameters jpayne@68: ---------- jpayne@68: fname : str jpayne@68: The name of the file. jpayne@68: alg : str jpayne@68: The type of the hashing algorithm jpayne@68: jpayne@68: Returns jpayne@68: ------- jpayne@68: hash : str jpayne@68: The hash of the file. jpayne@68: jpayne@68: Examples jpayne@68: -------- jpayne@68: jpayne@68: >>> fname = "test-file-for-hash.txt" jpayne@68: >>> with open(fname, "w") as f: jpayne@68: ... __ = f.write("content of the file") jpayne@68: >>> print(file_hash(fname)) jpayne@68: 0fc74468e6a9a829f103d069aeb2bb4f8646bad58bf146bb0e3379b759ec4a00 jpayne@68: >>> import os jpayne@68: >>> os.remove(fname) jpayne@68: jpayne@68: """ jpayne@68: if alg not in ALGORITHMS_AVAILABLE: jpayne@68: raise ValueError( jpayne@68: f"Algorithm '{alg}' not available to the pooch library. " jpayne@68: "Only the following algorithms are available " jpayne@68: f"{list(ALGORITHMS_AVAILABLE.keys())}." jpayne@68: ) jpayne@68: # Calculate the hash in chunks to avoid overloading the memory jpayne@68: chunksize = 65536 jpayne@68: hasher = ALGORITHMS_AVAILABLE[alg]() jpayne@68: with open(fname, "rb") as fin: jpayne@68: buff = fin.read(chunksize) jpayne@68: while buff: jpayne@68: hasher.update(buff) jpayne@68: buff = fin.read(chunksize) jpayne@68: return hasher.hexdigest() jpayne@68: jpayne@68: jpayne@68: def hash_algorithm(hash_string): jpayne@68: """ jpayne@68: Parse the name of the hash method from the hash string. jpayne@68: jpayne@68: The hash string should have the following form ``algorithm:hash``, where jpayne@68: algorithm can be the name of any algorithm known to :mod:`hashlib`. jpayne@68: jpayne@68: If the algorithm is omitted or the hash string is None, will default to jpayne@68: ``"sha256"``. jpayne@68: jpayne@68: Parameters jpayne@68: ---------- jpayne@68: hash_string : str jpayne@68: The hash string with optional algorithm prepended. jpayne@68: jpayne@68: Returns jpayne@68: ------- jpayne@68: hash_algorithm : str jpayne@68: The name of the algorithm. jpayne@68: jpayne@68: Examples jpayne@68: -------- jpayne@68: jpayne@68: >>> print(hash_algorithm("qouuwhwd2j192y1lb1iwgowdj2898wd2d9")) jpayne@68: sha256 jpayne@68: >>> print(hash_algorithm("md5:qouuwhwd2j192y1lb1iwgowdj2898wd2d9")) jpayne@68: md5 jpayne@68: >>> print(hash_algorithm("sha256:qouuwhwd2j192y1lb1iwgowdj2898wd2d9")) jpayne@68: sha256 jpayne@68: >>> print(hash_algorithm("SHA256:qouuwhwd2j192y1lb1iwgowdj2898wd2d9")) jpayne@68: sha256 jpayne@68: >>> print(hash_algorithm("xxh3_64:qouuwhwd2j192y1lb1iwgowdj2898wd2d9")) jpayne@68: xxh3_64 jpayne@68: >>> print(hash_algorithm(None)) jpayne@68: sha256 jpayne@68: jpayne@68: """ jpayne@68: default = "sha256" jpayne@68: if hash_string is None: jpayne@68: algorithm = default jpayne@68: elif ":" not in hash_string: jpayne@68: algorithm = default jpayne@68: else: jpayne@68: algorithm = hash_string.split(":")[0] jpayne@68: return algorithm.lower() jpayne@68: jpayne@68: jpayne@68: def hash_matches(fname, known_hash, strict=False, source=None): jpayne@68: """ jpayne@68: Check if the hash of a file matches a known hash. jpayne@68: jpayne@68: If the *known_hash* is None, will always return True. jpayne@68: jpayne@68: Coverts hashes to lowercase before comparison to avoid system specific jpayne@68: mismatches between hashes in the registry and computed hashes. jpayne@68: jpayne@68: Parameters jpayne@68: ---------- jpayne@68: fname : str or PathLike jpayne@68: The path to the file. jpayne@68: known_hash : str jpayne@68: The known hash. Optionally, prepend ``alg:`` to the hash to specify the jpayne@68: hashing algorithm. Default is SHA256. jpayne@68: strict : bool jpayne@68: If True, will raise a :class:`ValueError` if the hash does not match jpayne@68: informing the user that the file may be corrupted. jpayne@68: source : str jpayne@68: The source of the downloaded file (name or URL, for example). Will be jpayne@68: used in the error message if *strict* is True. Has no other use other jpayne@68: than reporting to the user where the file came from in case of hash jpayne@68: mismatch. If None, will default to *fname*. jpayne@68: jpayne@68: Returns jpayne@68: ------- jpayne@68: is_same : bool jpayne@68: True if the hash matches, False otherwise. jpayne@68: jpayne@68: """ jpayne@68: if known_hash is None: jpayne@68: return True jpayne@68: algorithm = hash_algorithm(known_hash) jpayne@68: new_hash = file_hash(fname, alg=algorithm) jpayne@68: matches = new_hash.lower() == known_hash.split(":")[-1].lower() jpayne@68: if strict and not matches: jpayne@68: if source is None: jpayne@68: source = str(fname) jpayne@68: raise ValueError( jpayne@68: f"{algorithm.upper()} hash of downloaded file ({source}) does not match" jpayne@68: f" the known hash: expected {known_hash} but got {new_hash}. Deleted" jpayne@68: " download for safety. The downloaded file may have been corrupted or" jpayne@68: " the known hash may be outdated." jpayne@68: ) jpayne@68: return matches jpayne@68: jpayne@68: jpayne@68: def make_registry(directory, output, recursive=True): jpayne@68: """ jpayne@68: Make a registry of files and hashes for the given directory. jpayne@68: jpayne@68: This is helpful if you have many files in your test dataset as it keeps you jpayne@68: from needing to manually update the registry. jpayne@68: jpayne@68: Parameters jpayne@68: ---------- jpayne@68: directory : str jpayne@68: Directory of the test data to put in the registry. All file names in jpayne@68: the registry will be relative to this directory. jpayne@68: output : str jpayne@68: Name of the output registry file. jpayne@68: recursive : bool jpayne@68: If True, will recursively look for files in subdirectories of jpayne@68: *directory*. jpayne@68: jpayne@68: """ jpayne@68: directory = Path(directory) jpayne@68: if recursive: jpayne@68: pattern = "**/*" jpayne@68: else: jpayne@68: pattern = "*" jpayne@68: jpayne@68: files = sorted( jpayne@68: str(path.relative_to(directory)) jpayne@68: for path in directory.glob(pattern) jpayne@68: if path.is_file() jpayne@68: ) jpayne@68: jpayne@68: hashes = [file_hash(str(directory / fname)) for fname in files] jpayne@68: jpayne@68: with open(output, "w", encoding="utf-8") as outfile: jpayne@68: for fname, fhash in zip(files, hashes): jpayne@68: # Only use Unix separators for the registry so that we don't go jpayne@68: # insane dealing with file paths. jpayne@68: outfile.write("{} {}\n".format(fname.replace("\\", "/"), fhash))