Update acme_tiny.py to latest version

This commit is contained in:
Arjan H
2022-04-02 17:04:07 +02:00
parent bcabb49c70
commit 4836d08afc

View File

@@ -1,19 +1,19 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# Copyright Daniel Roesler, under MIT license, see LICENSE at github.com/diafygi/acme-tiny
import argparse, subprocess, json, os, sys, base64, binascii, time, hashlib, re, copy, textwrap, logging
try:
from urllib.request import urlopen, Request # Python 3
except ImportError:
except ImportError: # pragma: no cover
from urllib2 import urlopen, Request # Python 2
DEFAULT_CA = "http://LABCA_FQDN" # DEPRECATED! USE DEFAULT_DIRECTORY_URL INSTEAD
DEFAULT_DIRECTORY_URL = "http://LABCA_FQDN/directory"
DEFAULT_CA = "https://LABCA_FQDN" # DEPRECATED! USE DEFAULT_DIRECTORY_URL INSTEAD
DEFAULT_DIRECTORY_URL = "https://LABCA_FQDN/directory"
LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.StreamHandler())
LOGGER.setLevel(logging.INFO)
def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None):
def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None, check_port=None):
directory, acct_headers, alg, jwk = None, None, None, None # global variables
# helper functions - base64 encode for jose spec
@@ -33,13 +33,14 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check
try:
resp = urlopen(Request(url, data=data, headers={"Content-Type": "application/jose+json", "User-Agent": "acme-tiny"}))
resp_data, code, headers = resp.read().decode("utf8"), resp.getcode(), resp.headers
resp_data = json.loads(resp_data) # try to parse json results
except ValueError:
pass # ignore json parsing errors
except IOError as e:
resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e)
code, headers = getattr(e, "code", None), {}
if depth < 100 and code == 400 and json.loads(resp_data)['type'] == "urn:ietf:params:acme:error:badNonce":
try:
resp_data = json.loads(resp_data) # try to parse json results
except ValueError:
pass # ignore json parsing errors
if depth < 100 and code == 400 and resp_data['type'] == "urn:ietf:params:acme:error:badNonce":
raise IndexError(resp_data) # allow 100 retrys for bad nonces
if code not in [200, 201, 204]:
raise ValueError("{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format(err_msg, url, data, code, resp_data))
@@ -47,7 +48,7 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check
# helper function - make signed requests
def _send_signed_request(url, payload, err_msg, depth=0):
payload64 = _b64(json.dumps(payload).encode('utf8'))
payload64 = "" if payload is None else _b64(json.dumps(payload).encode('utf8'))
new_nonce = _do_request(directory['newNonce'])[2]['Replay-Nonce']
protected = {"url": url, "alg": alg, "nonce": new_nonce}
protected.update({"jwk": jwk} if acct_headers is None else {"kid": acct_headers['Location']})
@@ -62,22 +63,21 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check
# helper function - poll until complete
def _poll_until_not(url, pending_statuses, err_msg):
while True:
result, _, _ = _do_request(url, err_msg=err_msg)
if result['status'] in pending_statuses:
time.sleep(2)
continue
return result
result, t0 = None, time.time()
while result is None or result['status'] in pending_statuses:
assert (time.time() - t0 < 3600), "Polling timeout" # 1 hour timeout
time.sleep(0 if result is None else 2)
result, _, _ = _send_signed_request(url, None, err_msg)
return result
# parse account key to get public key
log.info("Parsing account key...")
out = _cmd(["openssl", "rsa", "-in", account_key, "-noout", "-text"], err_msg="OpenSSL Error")
pub_pattern = r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)"
pub_pattern = r"modulus:[\s]+?00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)"
pub_hex, pub_exp = re.search(pub_pattern, out.decode('utf8'), re.MULTILINE|re.DOTALL).groups()
pub_exp = "{0:x}".format(int(pub_exp))
pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
alg = "RS256"
jwk = {
alg, jwk = "RS256", {
"e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
"kty": "RSA",
"n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
@@ -92,12 +92,12 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check
common_name = re.search(r"Subject:.*? CN\s?=\s?([^\s,;/]+)", out.decode('utf8'))
if common_name is not None:
domains.add(common_name.group(1))
subject_alt_names = re.search(r"X509v3 Subject Alternative Name: \n +([^\n]+)\n", out.decode('utf8'), re.MULTILINE|re.DOTALL)
subject_alt_names = re.search(r"X509v3 Subject Alternative Name: (?:critical)?\n +([^\n]+)\n", out.decode('utf8'), re.MULTILINE|re.DOTALL)
if subject_alt_names is not None:
for san in subject_alt_names.group(1).split(", "):
if san.startswith("DNS:"):
domains.add(san[4:])
log.info("Found domains: {0}".format(", ".join(domains)))
log.info(u"Found domains: {0}".format(", ".join(domains)))
# get the ACME directory of urls
log.info("Getting directory...")
@@ -107,9 +107,9 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check
# create account, update contact details (if any), and set the global key identifier
log.info("Registering account...")
reg_payload = {"termsOfServiceAgreed": True}
reg_payload = {"termsOfServiceAgreed": True} if contact is None else {"termsOfServiceAgreed": True, "contact": contact}
account, code, acct_headers = _send_signed_request(directory['newAccount'], reg_payload, "Error registering")
log.info("Registered!" if code == 201 else "Already registered!")
log.info("{0} Account ID: {1}".format("Registered!" if code == 201 else "Already registered!", acct_headers['Location']))
if contact is not None:
account, _, _ = _send_signed_request(acct_headers['Location'], {"contact": contact}, "Error updating contact details")
log.info("Updated contact details:\n{0}".format("\n".join(account['contact'])))
@@ -122,8 +122,13 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check
# get the authorizations that need to be completed
for auth_url in order['authorizations']:
authorization, _, _ = _do_request(auth_url, err_msg="Error getting challenges")
authorization, _, _ = _send_signed_request(auth_url, None, "Error getting challenges")
domain = authorization['identifier']['value']
# skip if already valid
if authorization['status'] == "valid":
log.info("Already verified: {0}, skipping...".format(domain))
continue
log.info("Verifying {0}...".format(domain))
# find the http-01 challenge and write the challenge file
@@ -136,10 +141,9 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check
# check that the file is in place
try:
wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token)
assert(disable_check or _do_request(wellknown_url)[0] == keyauthorization)
wellknown_url = "http://{0}{1}/.well-known/acme-challenge/{2}".format(domain, "" if check_port is None else ":{0}".format(check_port), token)
assert (disable_check or _do_request(wellknown_url)[0] == keyauthorization)
except (AssertionError, ValueError) as e:
os.remove(wellknown_path)
raise ValueError("Wrote file to {0}, but couldn't download {1}: {2}".format(wellknown_path, wellknown_url, e))
# say the challenge is done
@@ -147,6 +151,7 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check
authorization = _poll_until_not(auth_url, ["pending"], "Error checking challenge status for {0}".format(domain))
if authorization['status'] != "valid":
raise ValueError("Challenge did not pass for {0}: {1}".format(domain, authorization))
os.remove(wellknown_path)
log.info("{0} verified!".format(domain))
# finalize the order with the csr
@@ -160,7 +165,7 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check
raise ValueError("Order failed: {0}".format(order))
# download the certificate
certificate_pem, _, _ = _do_request(order['certificate'], err_msg="Certificate download failed")
certificate_pem, _, _ = _send_signed_request(order['certificate'], None, "Certificate download failed")
log.info("Certificate signed!")
return certificate_pem
@@ -168,15 +173,11 @@ def main(argv=None):
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\
This script automates the process of getting a signed TLS certificate from Let's Encrypt using
the ACME protocol. It will need to be run on your server and have access to your private
account key, so PLEASE READ THROUGH IT! It's only ~200 lines, so it won't take long.
This script automates the process of getting a signed TLS certificate from Let's Encrypt using the ACME protocol.
It will need to be run on your server and have access to your private account key, so PLEASE READ THROUGH IT!
It's only ~200 lines, so it won't take long.
Example Usage:
python acme_tiny.py --account-key ./account.key --csr ./domain.csr --acme-dir /usr/share/nginx/html/.well-known/acme-challenge/ > signed_chain.crt
Example Crontab Renewal (once per month):
0 0 1 * * python /path/to/acme_tiny.py --account-key /path/to/account.key --csr /path/to/domain.csr --acme-dir /usr/share/nginx/html/.well-known/acme-challenge/ > /path/to/signed_chain.crt 2>> /var/log/acme_tiny.log
Example Usage: python acme_tiny.py --account-key ./account.key --csr ./domain.csr --acme-dir /usr/share/nginx/html/.well-known/acme-challenge/ > signed_chain.crt
""")
)
parser.add_argument("--account-key", required=True, help="path to your Let's Encrypt account private key")
@@ -187,23 +188,12 @@ def main(argv=None):
parser.add_argument("--directory-url", default=DEFAULT_DIRECTORY_URL, help="certificate authority directory url, default is Let's Encrypt")
parser.add_argument("--ca", default=DEFAULT_CA, help="DEPRECATED! USE --directory-url INSTEAD!")
parser.add_argument("--contact", metavar="CONTACT", default=None, nargs="*", help="Contact details (e.g. mailto:aaa@bbb.com) for your account-key")
parser.add_argument("--check-port", metavar="PORT", default=None, help="what port to use when self-checking the challenge file, default is port 80")
args = parser.parse_args(argv)
LOGGER.setLevel(args.quiet or LOGGER.level)
signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca, disable_check=args.disable_check, directory_url=args.directory_url, contact=args.contact)
signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca, disable_check=args.disable_check, directory_url=args.directory_url, contact=args.contact, check_port=args.check_port)
sys.stdout.write(signed_crt)
if __name__ == "__main__": # pragma: no cover
main(sys.argv[1:])
## echo "172.17.0.1 tessie.hakwerk.local" >> /etc/hosts
## openssl genrsa 4096 > account.key
## openssl genrsa 4096 > domain.key
## openssl req -new -sha256 -key domain.key -subj "/" -reqexts SAN -config <(cat /etc/ssl/openssl.cnf <(printf "[SAN]\nsubjectAltName=DNS:tessie.hakwerk.local")) > domain.csr
## python acme_tiny.py --account-key ./account.key --csr ./domain.csr --acme-dir /home/labca/nginx_data/static/.well-known/acme-challenge/ > domain_chain.crt
## cp domain_chain.crt ~/boulder/test/
## docker exec -ti boulder_boulder_1 /bin/bash
## bin/checkocsp test/domain_chain.crt