import time
import git
import requests
import hashlib
import os
import shutil
[docs]
def from_zenodo(record_id, folder, force=False, keys=None):
record_url = f"https://zenodo.org/api/records/{record_id}"
max_tries = 5
os.makedirs(folder, exist_ok=True)
for attempt in range(1, max_tries + 1):
r = requests.get(record_url, timeout=30, headers={"User-Agent": "floatcsep"})
if r.status_code == 200:
break
if r.status_code == 403:
text = (r.text or "").lower()
if "unusual traffic" in text or "<html" in text:
snippet = (r.text or "")[:400].replace("\n", "\\n")
raise RuntimeError(
"Zenodo returned HTTP 403 and appears to be blocking this network/IP due "
"to unusual traffic.\n"
f"URL: {record_url}\n"
f"Response snippet: {snippet}"
)
r.raise_for_status()
if r.status_code in (429, 500, 502, 503, 504):
wait = min(2 ** (attempt - 1), 30)
ra = r.headers.get("Retry-After")
if ra:
try:
wait = max(wait, int(ra))
except ValueError:
pass
time.sleep(wait)
continue
r.raise_for_status()
else:
raise RuntimeError(f"Zenodo API request failed after {max_tries} attempts: {record_url}")
try:
data = r.json()
except Exception as e:
snippet = (r.text or "")[:400].replace("\n", "\\n")
raise RuntimeError(
"Zenodo API did not return valid JSON.\n"
f"URL: {record_url}\n"
f"Content-Type: {r.headers.get('Content-Type')!r}\n"
f"Snippet: {snippet}"
) from e
files = data.get("files", [])
if not isinstance(files, list):
raise RuntimeError(f"Zenodo record JSON missing expected 'files' list: {record_url}")
if keys is not None:
wanted = set(keys)
files = [f for f in files if f.get("key") in wanted]
missing = wanted - {f.get("key") for f in files}
if missing:
raise FileNotFoundError(
f"Zenodo record {record_id} does not contain required file(s): {sorted(missing)}"
)
download_urls = [f["links"]["self"] for f in files]
filenames = [(f["key"], f["checksum"]) for f in files]
for (fname, checksum), url in zip(filenames, download_urls):
full_path = os.path.join(folder, fname)
if os.path.exists(full_path):
value, digest = check_hash(full_path, checksum)
if value != digest:
print(f"Checksum differs, re-downloading {fname} ...")
download_file(url, full_path)
elif force:
print(f"Re-downloading {fname} ...")
download_file(url, full_path)
else:
print(f"Found {fname}. Checksum OK.")
else:
print(f"Downloading {fname} ...")
download_file(url, full_path)
value, digest = check_hash(full_path, checksum)
if value != digest:
raise Exception("Error: Checksum does not match")
[docs]
def from_git(url, path, branch=None, depth=1, force=False, **kwargs):
kwargs = dict(kwargs, depth=depth)
git.refresh()
if os.path.exists(path):
if force:
shutil.rmtree(path)
elif os.listdir(path):
raise ValueError(f"Cannot clone into non-empty directory: {path}")
os.makedirs(path, exist_ok=True)
repo = git.Repo.clone_from(url, path, branch=branch, **kwargs)
git_dir = os.path.join(path, ".git")
if os.path.isdir(git_dir):
shutil.rmtree(git_dir)
return repo
[docs]
def download_file(url: str, filename: str) -> None:
os.makedirs(os.path.dirname(filename) or ".", exist_ok=True)
r = requests.get(url, timeout=30, stream=True, headers={"User-Agent": "floatcsep"})
r.raise_for_status()
cl = r.headers.get("Content-Length") or r.headers.get("content-length")
try:
total_size = int(cl) if cl else 0
except ValueError:
total_size = 0
base = os.path.basename(filename)
if total_size:
print(f"{base} ({total_size / (1024 * 1024):.2f} MB)")
else:
print(f"{base}")
with open(filename, "wb") as f:
for data in r.iter_content(chunk_size=1024 * 64):
if not data:
continue
f.write(data)
print(f"Complete: {base}")
[docs]
def check_hash(filename, checksum):
algorithm, value = checksum.split(":")
if not os.path.exists(filename):
return value, "invalid"
h = hashlib.new(algorithm)
with open(filename, "rb") as f:
while True:
data = f.read(4096)
if not data:
break
h.update(data)
digest = h.hexdigest()
return value, digest