jpayne@69: # Copyright (c) 2018 The Pooch Developers. jpayne@69: # Distributed under the terms of the BSD 3-Clause License. jpayne@69: # SPDX-License-Identifier: BSD-3-Clause jpayne@69: # jpayne@69: # This code is part of the Fatiando a Terra project (https://www.fatiando.org) jpayne@69: # jpayne@69: """ jpayne@69: Calculating and checking file hashes. jpayne@69: """ jpayne@69: import hashlib jpayne@69: import functools jpayne@69: from pathlib import Path jpayne@69: jpayne@69: # From the docs: https://docs.python.org/3/library/hashlib.html#hashlib.new jpayne@69: # The named constructors are much faster than new() and should be jpayne@69: # preferred. jpayne@69: # Need to fallback on new() for some algorithms. jpayne@69: ALGORITHMS_AVAILABLE = { jpayne@69: alg: getattr(hashlib, alg, functools.partial(hashlib.new, alg)) jpayne@69: for alg in hashlib.algorithms_available jpayne@69: } jpayne@69: jpayne@69: try: jpayne@69: import xxhash jpayne@69: jpayne@69: # xxhash doesn't have a list of available algorithms yet. jpayne@69: # https://github.com/ifduyue/python-xxhash/issues/48 jpayne@69: ALGORITHMS_AVAILABLE.update( jpayne@69: { jpayne@69: alg: getattr(xxhash, alg, None) jpayne@69: for alg in ["xxh128", "xxh64", "xxh32", "xxh3_128", "xxh3_64"] jpayne@69: } jpayne@69: ) jpayne@69: # The xxh3 algorithms are only available for version>=2.0. Set to None and jpayne@69: # remove to ensure backwards compatibility. jpayne@69: ALGORITHMS_AVAILABLE = { jpayne@69: alg: func for alg, func in ALGORITHMS_AVAILABLE.items() if func is not None jpayne@69: } jpayne@69: except ImportError: jpayne@69: pass jpayne@69: jpayne@69: jpayne@69: def file_hash(fname, alg="sha256"): jpayne@69: """ jpayne@69: Calculate the hash of a given file. jpayne@69: jpayne@69: Useful for checking if a file has changed or been corrupted. jpayne@69: jpayne@69: Parameters jpayne@69: ---------- jpayne@69: fname : str jpayne@69: The name of the file. jpayne@69: alg : str jpayne@69: The type of the hashing algorithm jpayne@69: jpayne@69: Returns jpayne@69: ------- jpayne@69: hash : str jpayne@69: The hash of the file. jpayne@69: jpayne@69: Examples jpayne@69: -------- jpayne@69: jpayne@69: >>> fname = "test-file-for-hash.txt" jpayne@69: >>> with open(fname, "w") as f: jpayne@69: ... __ = f.write("content of the file") jpayne@69: >>> print(file_hash(fname)) jpayne@69: 0fc74468e6a9a829f103d069aeb2bb4f8646bad58bf146bb0e3379b759ec4a00 jpayne@69: >>> import os jpayne@69: >>> os.remove(fname) jpayne@69: jpayne@69: """ jpayne@69: if alg not in ALGORITHMS_AVAILABLE: jpayne@69: raise ValueError( jpayne@69: f"Algorithm '{alg}' not available to the pooch library. " jpayne@69: "Only the following algorithms are available " jpayne@69: f"{list(ALGORITHMS_AVAILABLE.keys())}." jpayne@69: ) jpayne@69: # Calculate the hash in chunks to avoid overloading the memory jpayne@69: chunksize = 65536 jpayne@69: hasher = ALGORITHMS_AVAILABLE[alg]() jpayne@69: with open(fname, "rb") as fin: jpayne@69: buff = fin.read(chunksize) jpayne@69: while buff: jpayne@69: hasher.update(buff) jpayne@69: buff = fin.read(chunksize) jpayne@69: return hasher.hexdigest() jpayne@69: jpayne@69: jpayne@69: def hash_algorithm(hash_string): jpayne@69: """ jpayne@69: Parse the name of the hash method from the hash string. jpayne@69: jpayne@69: The hash string should have the following form ``algorithm:hash``, where jpayne@69: algorithm can be the name of any algorithm known to :mod:`hashlib`. jpayne@69: jpayne@69: If the algorithm is omitted or the hash string is None, will default to jpayne@69: ``"sha256"``. jpayne@69: jpayne@69: Parameters jpayne@69: ---------- jpayne@69: hash_string : str jpayne@69: The hash string with optional algorithm prepended. jpayne@69: jpayne@69: Returns jpayne@69: ------- jpayne@69: hash_algorithm : str jpayne@69: The name of the algorithm. jpayne@69: jpayne@69: Examples jpayne@69: -------- jpayne@69: jpayne@69: >>> print(hash_algorithm("qouuwhwd2j192y1lb1iwgowdj2898wd2d9")) jpayne@69: sha256 jpayne@69: >>> print(hash_algorithm("md5:qouuwhwd2j192y1lb1iwgowdj2898wd2d9")) jpayne@69: md5 jpayne@69: >>> print(hash_algorithm("sha256:qouuwhwd2j192y1lb1iwgowdj2898wd2d9")) jpayne@69: sha256 jpayne@69: >>> print(hash_algorithm("SHA256:qouuwhwd2j192y1lb1iwgowdj2898wd2d9")) jpayne@69: sha256 jpayne@69: >>> print(hash_algorithm("xxh3_64:qouuwhwd2j192y1lb1iwgowdj2898wd2d9")) jpayne@69: xxh3_64 jpayne@69: >>> print(hash_algorithm(None)) jpayne@69: sha256 jpayne@69: jpayne@69: """ jpayne@69: default = "sha256" jpayne@69: if hash_string is None: jpayne@69: algorithm = default jpayne@69: elif ":" not in hash_string: jpayne@69: algorithm = default jpayne@69: else: jpayne@69: algorithm = hash_string.split(":")[0] jpayne@69: return algorithm.lower() jpayne@69: jpayne@69: jpayne@69: def hash_matches(fname, known_hash, strict=False, source=None): jpayne@69: """ jpayne@69: Check if the hash of a file matches a known hash. jpayne@69: jpayne@69: If the *known_hash* is None, will always return True. jpayne@69: jpayne@69: Coverts hashes to lowercase before comparison to avoid system specific jpayne@69: mismatches between hashes in the registry and computed hashes. jpayne@69: jpayne@69: Parameters jpayne@69: ---------- jpayne@69: fname : str or PathLike jpayne@69: The path to the file. jpayne@69: known_hash : str jpayne@69: The known hash. Optionally, prepend ``alg:`` to the hash to specify the jpayne@69: hashing algorithm. Default is SHA256. jpayne@69: strict : bool jpayne@69: If True, will raise a :class:`ValueError` if the hash does not match jpayne@69: informing the user that the file may be corrupted. jpayne@69: source : str jpayne@69: The source of the downloaded file (name or URL, for example). Will be jpayne@69: used in the error message if *strict* is True. Has no other use other jpayne@69: than reporting to the user where the file came from in case of hash jpayne@69: mismatch. If None, will default to *fname*. jpayne@69: jpayne@69: Returns jpayne@69: ------- jpayne@69: is_same : bool jpayne@69: True if the hash matches, False otherwise. jpayne@69: jpayne@69: """ jpayne@69: if known_hash is None: jpayne@69: return True jpayne@69: algorithm = hash_algorithm(known_hash) jpayne@69: new_hash = file_hash(fname, alg=algorithm) jpayne@69: matches = new_hash.lower() == known_hash.split(":")[-1].lower() jpayne@69: if strict and not matches: jpayne@69: if source is None: jpayne@69: source = str(fname) jpayne@69: raise ValueError( jpayne@69: f"{algorithm.upper()} hash of downloaded file ({source}) does not match" jpayne@69: f" the known hash: expected {known_hash} but got {new_hash}. Deleted" jpayne@69: " download for safety. The downloaded file may have been corrupted or" jpayne@69: " the known hash may be outdated." jpayne@69: ) jpayne@69: return matches jpayne@69: jpayne@69: jpayne@69: def make_registry(directory, output, recursive=True): jpayne@69: """ jpayne@69: Make a registry of files and hashes for the given directory. jpayne@69: jpayne@69: This is helpful if you have many files in your test dataset as it keeps you jpayne@69: from needing to manually update the registry. jpayne@69: jpayne@69: Parameters jpayne@69: ---------- jpayne@69: directory : str jpayne@69: Directory of the test data to put in the registry. All file names in jpayne@69: the registry will be relative to this directory. jpayne@69: output : str jpayne@69: Name of the output registry file. jpayne@69: recursive : bool jpayne@69: If True, will recursively look for files in subdirectories of jpayne@69: *directory*. jpayne@69: jpayne@69: """ jpayne@69: directory = Path(directory) jpayne@69: if recursive: jpayne@69: pattern = "**/*" jpayne@69: else: jpayne@69: pattern = "*" jpayne@69: jpayne@69: files = sorted( jpayne@69: str(path.relative_to(directory)) jpayne@69: for path in directory.glob(pattern) jpayne@69: if path.is_file() jpayne@69: ) jpayne@69: jpayne@69: hashes = [file_hash(str(directory / fname)) for fname in files] jpayne@69: jpayne@69: with open(output, "w", encoding="utf-8") as outfile: jpayne@69: for fname, fhash in zip(files, hashes): jpayne@69: # Only use Unix separators for the registry so that we don't go jpayne@69: # insane dealing with file paths. jpayne@69: outfile.write("{} {}\n".format(fname.replace("\\", "/"), fhash))