From 4836d08afce29b049cfa199aa64731925f89ec49 Mon Sep 17 00:00:00 2001 From: Arjan H Date: Sat, 2 Apr 2022 17:04:07 +0200 Subject: [PATCH] Update acme_tiny.py to latest version --- acme_tiny.py | 88 +++++++++++++++++++++++----------------------------- 1 file changed, 39 insertions(+), 49 deletions(-) diff --git a/acme_tiny.py b/acme_tiny.py index 7f5de06..f9d88f0 100644 --- a/acme_tiny.py +++ b/acme_tiny.py @@ -1,19 +1,19 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright Daniel Roesler, under MIT license, see LICENSE at github.com/diafygi/acme-tiny import argparse, subprocess, json, os, sys, base64, binascii, time, hashlib, re, copy, textwrap, logging try: from urllib.request import urlopen, Request # Python 3 -except ImportError: +except ImportError: # pragma: no cover from urllib2 import urlopen, Request # Python 2 -DEFAULT_CA = "http://LABCA_FQDN" # DEPRECATED! USE DEFAULT_DIRECTORY_URL INSTEAD -DEFAULT_DIRECTORY_URL = "http://LABCA_FQDN/directory" +DEFAULT_CA = "https://LABCA_FQDN" # DEPRECATED! USE DEFAULT_DIRECTORY_URL INSTEAD +DEFAULT_DIRECTORY_URL = "https://LABCA_FQDN/directory" LOGGER = logging.getLogger(__name__) LOGGER.addHandler(logging.StreamHandler()) LOGGER.setLevel(logging.INFO) -def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None): +def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None, check_port=None): directory, acct_headers, alg, jwk = None, None, None, None # global variables # helper functions - base64 encode for jose spec @@ -33,13 +33,14 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check try: resp = urlopen(Request(url, data=data, headers={"Content-Type": "application/jose+json", "User-Agent": "acme-tiny"})) resp_data, code, headers = resp.read().decode("utf8"), resp.getcode(), resp.headers - resp_data = json.loads(resp_data) # try to parse json results - except ValueError: - pass # ignore json parsing errors except IOError as e: resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e) code, headers = getattr(e, "code", None), {} - if depth < 100 and code == 400 and json.loads(resp_data)['type'] == "urn:ietf:params:acme:error:badNonce": + try: + resp_data = json.loads(resp_data) # try to parse json results + except ValueError: + pass # ignore json parsing errors + if depth < 100 and code == 400 and resp_data['type'] == "urn:ietf:params:acme:error:badNonce": raise IndexError(resp_data) # allow 100 retrys for bad nonces if code not in [200, 201, 204]: raise ValueError("{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format(err_msg, url, data, code, resp_data)) @@ -47,7 +48,7 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check # helper function - make signed requests def _send_signed_request(url, payload, err_msg, depth=0): - payload64 = _b64(json.dumps(payload).encode('utf8')) + payload64 = "" if payload is None else _b64(json.dumps(payload).encode('utf8')) new_nonce = _do_request(directory['newNonce'])[2]['Replay-Nonce'] protected = {"url": url, "alg": alg, "nonce": new_nonce} protected.update({"jwk": jwk} if acct_headers is None else {"kid": acct_headers['Location']}) @@ -62,22 +63,21 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check # helper function - poll until complete def _poll_until_not(url, pending_statuses, err_msg): - while True: - result, _, _ = _do_request(url, err_msg=err_msg) - if result['status'] in pending_statuses: - time.sleep(2) - continue - return result + result, t0 = None, time.time() + while result is None or result['status'] in pending_statuses: + assert (time.time() - t0 < 3600), "Polling timeout" # 1 hour timeout + time.sleep(0 if result is None else 2) + result, _, _ = _send_signed_request(url, None, err_msg) + return result # parse account key to get public key log.info("Parsing account key...") out = _cmd(["openssl", "rsa", "-in", account_key, "-noout", "-text"], err_msg="OpenSSL Error") - pub_pattern = r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)" + pub_pattern = r"modulus:[\s]+?00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)" pub_hex, pub_exp = re.search(pub_pattern, out.decode('utf8'), re.MULTILINE|re.DOTALL).groups() pub_exp = "{0:x}".format(int(pub_exp)) pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp - alg = "RS256" - jwk = { + alg, jwk = "RS256", { "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))), "kty": "RSA", "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))), @@ -92,12 +92,12 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check common_name = re.search(r"Subject:.*? CN\s?=\s?([^\s,;/]+)", out.decode('utf8')) if common_name is not None: domains.add(common_name.group(1)) - subject_alt_names = re.search(r"X509v3 Subject Alternative Name: \n +([^\n]+)\n", out.decode('utf8'), re.MULTILINE|re.DOTALL) + subject_alt_names = re.search(r"X509v3 Subject Alternative Name: (?:critical)?\n +([^\n]+)\n", out.decode('utf8'), re.MULTILINE|re.DOTALL) if subject_alt_names is not None: for san in subject_alt_names.group(1).split(", "): if san.startswith("DNS:"): domains.add(san[4:]) - log.info("Found domains: {0}".format(", ".join(domains))) + log.info(u"Found domains: {0}".format(", ".join(domains))) # get the ACME directory of urls log.info("Getting directory...") @@ -107,9 +107,9 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check # create account, update contact details (if any), and set the global key identifier log.info("Registering account...") - reg_payload = {"termsOfServiceAgreed": True} + reg_payload = {"termsOfServiceAgreed": True} if contact is None else {"termsOfServiceAgreed": True, "contact": contact} account, code, acct_headers = _send_signed_request(directory['newAccount'], reg_payload, "Error registering") - log.info("Registered!" if code == 201 else "Already registered!") + log.info("{0} Account ID: {1}".format("Registered!" if code == 201 else "Already registered!", acct_headers['Location'])) if contact is not None: account, _, _ = _send_signed_request(acct_headers['Location'], {"contact": contact}, "Error updating contact details") log.info("Updated contact details:\n{0}".format("\n".join(account['contact']))) @@ -122,8 +122,13 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check # get the authorizations that need to be completed for auth_url in order['authorizations']: - authorization, _, _ = _do_request(auth_url, err_msg="Error getting challenges") + authorization, _, _ = _send_signed_request(auth_url, None, "Error getting challenges") domain = authorization['identifier']['value'] + + # skip if already valid + if authorization['status'] == "valid": + log.info("Already verified: {0}, skipping...".format(domain)) + continue log.info("Verifying {0}...".format(domain)) # find the http-01 challenge and write the challenge file @@ -136,10 +141,9 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check # check that the file is in place try: - wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token) - assert(disable_check or _do_request(wellknown_url)[0] == keyauthorization) + wellknown_url = "http://{0}{1}/.well-known/acme-challenge/{2}".format(domain, "" if check_port is None else ":{0}".format(check_port), token) + assert (disable_check or _do_request(wellknown_url)[0] == keyauthorization) except (AssertionError, ValueError) as e: - os.remove(wellknown_path) raise ValueError("Wrote file to {0}, but couldn't download {1}: {2}".format(wellknown_path, wellknown_url, e)) # say the challenge is done @@ -147,6 +151,7 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check authorization = _poll_until_not(auth_url, ["pending"], "Error checking challenge status for {0}".format(domain)) if authorization['status'] != "valid": raise ValueError("Challenge did not pass for {0}: {1}".format(domain, authorization)) + os.remove(wellknown_path) log.info("{0} verified!".format(domain)) # finalize the order with the csr @@ -160,7 +165,7 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check raise ValueError("Order failed: {0}".format(order)) # download the certificate - certificate_pem, _, _ = _do_request(order['certificate'], err_msg="Certificate download failed") + certificate_pem, _, _ = _send_signed_request(order['certificate'], None, "Certificate download failed") log.info("Certificate signed!") return certificate_pem @@ -168,15 +173,11 @@ def main(argv=None): parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description=textwrap.dedent("""\ - This script automates the process of getting a signed TLS certificate from Let's Encrypt using - the ACME protocol. It will need to be run on your server and have access to your private - account key, so PLEASE READ THROUGH IT! It's only ~200 lines, so it won't take long. + This script automates the process of getting a signed TLS certificate from Let's Encrypt using the ACME protocol. + It will need to be run on your server and have access to your private account key, so PLEASE READ THROUGH IT! + It's only ~200 lines, so it won't take long. - Example Usage: - python acme_tiny.py --account-key ./account.key --csr ./domain.csr --acme-dir /usr/share/nginx/html/.well-known/acme-challenge/ > signed_chain.crt - - Example Crontab Renewal (once per month): - 0 0 1 * * python /path/to/acme_tiny.py --account-key /path/to/account.key --csr /path/to/domain.csr --acme-dir /usr/share/nginx/html/.well-known/acme-challenge/ > /path/to/signed_chain.crt 2>> /var/log/acme_tiny.log + Example Usage: python acme_tiny.py --account-key ./account.key --csr ./domain.csr --acme-dir /usr/share/nginx/html/.well-known/acme-challenge/ > signed_chain.crt """) ) parser.add_argument("--account-key", required=True, help="path to your Let's Encrypt account private key") @@ -187,23 +188,12 @@ def main(argv=None): parser.add_argument("--directory-url", default=DEFAULT_DIRECTORY_URL, help="certificate authority directory url, default is Let's Encrypt") parser.add_argument("--ca", default=DEFAULT_CA, help="DEPRECATED! USE --directory-url INSTEAD!") parser.add_argument("--contact", metavar="CONTACT", default=None, nargs="*", help="Contact details (e.g. mailto:aaa@bbb.com) for your account-key") + parser.add_argument("--check-port", metavar="PORT", default=None, help="what port to use when self-checking the challenge file, default is port 80") args = parser.parse_args(argv) LOGGER.setLevel(args.quiet or LOGGER.level) - signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca, disable_check=args.disable_check, directory_url=args.directory_url, contact=args.contact) + signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca, disable_check=args.disable_check, directory_url=args.directory_url, contact=args.contact, check_port=args.check_port) sys.stdout.write(signed_crt) if __name__ == "__main__": # pragma: no cover main(sys.argv[1:]) - - - -## echo "172.17.0.1 tessie.hakwerk.local" >> /etc/hosts -## openssl genrsa 4096 > account.key -## openssl genrsa 4096 > domain.key -## openssl req -new -sha256 -key domain.key -subj "/" -reqexts SAN -config <(cat /etc/ssl/openssl.cnf <(printf "[SAN]\nsubjectAltName=DNS:tessie.hakwerk.local")) > domain.csr -## python acme_tiny.py --account-key ./account.key --csr ./domain.csr --acme-dir /home/labca/nginx_data/static/.well-known/acme-challenge/ > domain_chain.crt -## cp domain_chain.crt ~/boulder/test/ - -## docker exec -ti boulder_boulder_1 /bin/bash -## bin/checkocsp test/domain_chain.crt