Source code for floatcsep.utils.accessors

import git
import requests
import hashlib
import os
import sys
import shutil

HOST_CATALOG = "https://service.iris.edu/fdsnws/event/1/query?"
TIMEOUT = 180


[docs] def from_zenodo(record_id, folder, force=False): """ Download data from a Zenodo repository. Downloads if file does not exist, checksum has changed in local respect to url or force Args: record_id: corresponding to the Zenodo repository folder: where the repository files will be downloaded force: force download even if file exists and checksum passes Returns: """ # Grab the urls and filenames and checksums r = requests.get(f"https://zenodo.org/api/records/{record_id}", timeout=3) download_urls = [f["links"]["self"] for f in r.json()["files"]] filenames = [(f["key"], f["checksum"]) for f in r.json()["files"]] # Download and verify checksums for (fname, checksum), url in zip(filenames, download_urls): full_path = os.path.join(folder, fname) if os.path.exists(full_path): value, digest = check_hash(full_path, checksum) if value != digest: print(f"Checksum is different: re-downloading {fname}" f" from Zenodo...") download_file(url, full_path) elif force: print(f"Re-downloading {fname} from Zenodo...") download_file(url, full_path) else: print(f"Found file {fname}. Checksum OK.") else: print(f"Downloading {fname} from Zenodo...") download_file(url, full_path) value, digest = check_hash(full_path, checksum) if value != digest: print("Error: Checksum does not match") sys.exit(-1)
[docs] def from_git(url, path, branch=None, depth=1, **kwargs): """ Clones a shallow repository from a git url. Args: url (str): url of the repository path (str): path/folder where to clone the repo branch (str): repository's branch to clone (default: main) depth (int): depth history of commits **kwargs: keyword args passed to Repo.clone_from Returns: the pygit repository """ kwargs.update({"depth": depth}) git.refresh() try: repo = git.Repo(path) except (git.NoSuchPathError, git.InvalidGitRepositoryError): repo = git.Repo.clone_from(url, path, branch=branch, **kwargs) git_dir = os.path.join(path, ".git") if os.path.isdir(git_dir): shutil.rmtree(git_dir) return repo
[docs] def download_file(url: str, filename: str) -> None: """ Downloads files (from zenodo). Args: url (str): the url where the file is located filename (str): the filename required. """ progress_bar_length = 72 block_size = 1024 r = requests.get(url, timeout=3, stream=True) total_size = r.headers.get("content-length", False) if not total_size: with requests.head(url) as h: try: total_size = int(h.headers.get("Content-Length", 0)) except TypeError: total_size = 0 else: total_size = int(total_size) download_size = 0 if total_size: print(f"Downloading file with size of {total_size / block_size:.3f} kB") else: print("Downloading file with unknown size") with open(filename, "wb") as f: for data in r.iter_content(chunk_size=block_size): download_size += len(data) f.write(data) if total_size: progress = int(progress_bar_length * download_size / total_size) sys.stdout.write( "\r[{}{}] {:.1f}%".format( "█" * progress, "." * (progress_bar_length - progress), 100 * download_size / total_size, ) ) sys.stdout.flush() sys.stdout.write("\n")
[docs] def check_hash(filename, checksum): """Checks if existing file hash matches checksum from url.""" algorithm, value = checksum.split(":") if not os.path.exists(filename): return value, "invalid" h = hashlib.new(algorithm) with open(filename, "rb") as f: while True: data = f.read(4096) if not data: break h.update(data) digest = h.hexdigest() return value, digest