Source code for autocrypt.bingpg

# -*- coding: utf-8 -*-
# vim:ts=4:sw=4:expandtab

""" BinGPG is a "gpg" or "gpg2" command line wrapper which
implements all operations we need for Autocrypt usage.
It is not meant as a general wrapper outside Autocrypt
contexts.
"""

from __future__ import print_function, unicode_literals
import logging
from distutils.version import LooseVersion as V
import os
import sys
from subprocess import Popen, PIPE
from contextlib import contextmanager
from base64 import b64encode
import tempfile
import re
iswin32 = sys.platform == "win32" or (getattr(os, '_name', False) == 'nt')


def b64encode_u(x):
    res = b64encode(x)
    if isinstance(res, bytes):
        res = res.decode("ascii")
    return res


def cached_property(f):
    # returns a property definition which lazily computes and
    # caches the result of calling f.  The property also allows
    # setting the value (before or after read).
    def get(self):
        propcache = self.__dict__.setdefault("_property_cache", {})
        key = f.__name__
        try:
            return propcache[key]
        except KeyError:
            x = self._property_cache[key] = f(self)
            return x

    def set(self, val):
        propcache = self.__dict__.setdefault("_property_cache", {})
        propcache[f.__name__] = val
    return property(get, set)


class InvocationFailure(Exception):
    def __init__(self, ret, cmd, out, err, extrainfo=None):
        self.ret = ret
        self.cmd = cmd
        self.out = out
        self.err = err
        self.extrainfo = extrainfo

    def __str__(self):
        lines = ["GPG Command '%s' retcode=%d" % (self.cmd, self.ret)]
        for name, olines in [("stdout:", self.out), ("stderr:", self.err)]:
            lines.append(name)
            for line in olines.splitlines():
                lines.append("  " + line)
        if self.extrainfo:
            lines.append(self.extrainfo)
        return "\n".join(lines)


[docs]class BinGPG(object): """ basic wrapper for gpg command line invocations. """ InvocationFailure = InvocationFailure
[docs] def __init__(self, homedir=None, gpgpath="gpg"): self.homedir = homedir p = find_executable(gpgpath) if p is None: raise ValueError("could not find binary for {!r}".format(gpgpath)) self.gpgpath = p self._ensure_init()
def __str__(self): return "BinGPG(gpgpath={gpgpath!r}, homedir={homedir!r})".format( gpgpath=self.gpgpath, homedir=self.homedir) @cached_property def isgpg2(self, min_version=V("2.0")): return V(self.get_version()) >= min_version def _ensure_init(self): if self.homedir is None: return if not os.path.exists(self.homedir): # we create the dir if the basedir exists, otherwise we fail os.mkdir(self.homedir) os.chmod(self.homedir, 0o700) # fix bad defaults for certain gpg2 versions if V("2.0") <= V(self.get_version()) < V("2.1.12"): p = os.path.join(self.homedir, "gpg-agent.conf") if not os.path.exists(p): with open(p, "w") as f: f.write("allow-loopback-pinentry\n") def killagent(self): if self.isgpg2: args = [find_executable("gpg-connect-agent"), "--no-autostart"] args += self._homedirflags + ["KILLAGENT"] popen = Popen(args) popen.wait() @contextmanager def temp_written_file(self, data): with tempfile.NamedTemporaryFile(delete=False) as f: f.write(data) try: yield f.name finally: os.remove(f.name) @property def _homedirflags(self): return ["--homedir", self.homedir] if self.homedir else [] @cached_property def _nopassphrase(self): return ((["--pinentry-mode=loopback"] if self.isgpg2 else []) + ["--passphrase", "''"]) def _gpg_out(self, argv, input=None, strict=False, encoding="utf8"): return self._gpg_outerr(argv, input=input, strict=strict, encoding=encoding)[0] def _gpg_outerr(self, argv, input=None, strict=False, encoding="utf8"): """ return stdout and stderr output of invoking gpg with the specified parameters. If the invocation leads to a non-zero exit status an InvocationFailure exception is thrown. It is also thrown if strict is True and there was non-empty stderr output. stderr output will always be returned as a text type (utf8-decoded) while stdout output is returned decoded if encoding is set (default is "utf8"). If you want binary stdout output specify encoding=None. """ args = [self.gpgpath, "--batch"] + self._homedirflags # make sure we use unicode for all provided arguments def ensure_unicode(x): return x.decode("utf8") if isinstance(x, bytes) else x args.extend(map(ensure_unicode, argv)) # open the process with a C locale, pipe everything env = os.environ.copy() env["LANG"] = "C" popen = Popen(args, stdout=PIPE, stderr=PIPE, stdin=PIPE, env=env) # some debugging info G = os.environ.get("GNUPGHOME") extra = "" if not G else ("GNUPGHOME=" + G + " ") logging.debug("$ %s%s", extra, " ".join(args)) out, err = popen.communicate(input=input) ret = popen.wait() if ret == 130: raise KeyboardInterrupt("detected in gpg invocation") err = err.decode("utf8") if encoding: out = out.decode(encoding) if ret != 0 or (strict and err): raise self.InvocationFailure(ret, " ".join(args), out=out, err=err) return out, err @cached_property def _version_info(self): return self._gpg_out(['--version']) def get_version(self): vline = self._version_info.split('\n', 1)[0] return vline.split(' ')[2] def supports_eddsa(self): for l in self._version_info.split('\n'): if l.startswith('Pubkey:'): return 'eddsa' in map( lambda x: x.strip().lower(), l.split(':', 1)[1].split(',')) return False def gen_secret_key(self, emailadr): spec = "\n".join([ "Key-Type: RSA", "Key-Length: 2048", "Key-Usage: sign", "Subkey-Type: RSA", "Subkey-Length: 2048", "Subkey-Usage: encrypt", # "Name-Real: " + uid, "Name-Email: " + emailadr, "Expire-Date: 0", "%commit" ]).encode("utf8") with self.temp_written_file(spec) as fn: try: out, err = self._gpg_outerr(self._nopassphrase + ["--gen-key", fn]) except InvocationFailure as e: e.extrainfo = open(fn).read() raise keyhandle = self._find_keyhandle(err) logging.debug("created secret key: %s", keyhandle) return keyhandle def list_secret_keyinfos(self, keyhandle=None): args = ["--skip-verify", "--with-colons", "--list-secret-keys"] if keyhandle is not None: args.append(keyhandle) return self._parse_list(args, ("sec", "ssb")) def list_public_keyinfos(self, keyhandle=None): args = ["--skip-verify", "--with-colons", "--list-public-keys"] if keyhandle is not None: args.append(keyhandle) return self._parse_list(args, ("pub", "sub")) def _parse_list(self, args, types): out = self._gpg_out(args) keyinfos = [] last_main_type_keyinfo = None for line in out.splitlines(): parts = line.split(":") if parts[0] in types: keyinfos.append( KeyInfo(type=parts[3], bits=int(parts[2]), uid=parts[9], id=parts[4], date_created=parts[5])) if parts[0] == types[0]: last_main_type_keyinfo = keyinfos[-1] elif parts[0] == "uid": last_main_type_keyinfo.uids.append(parts[9]) return keyinfos def _find_keyhandle(self, string, _pattern=re.compile("key (?:ID )?([0-9A-F]+)")): m = _pattern.search(string) assert m and len(m.groups()) == 1, string x = m.groups()[0] # now search the fingerprint if we only have a shortid if len(x) <= 8: # keyid has 8 hex bytes keyinfos = self.list_public_keyinfos(x) for k in keyinfos: if k.match(x): return k.id raise ValueError("could not find fingerprint %r in %r" % (x, keyinfos)) # note that this might be a 16-char fingerprint or a 40-char one (gpg-2.1.18) return x def list_secret_key_packets(self, keyhandle): return self.list_packets(self.get_secret_keydata(keyhandle)) def list_public_key_packets(self, keyhandle): return self.list_packets(self.get_public_keydata(keyhandle)) def list_packets(self, keydata): out = self._gpg_out(["--list-packets"], input=keydata) # build up a list of (pkgname, pkgvalue, lines) tuples packets = [] lines = [] last_package_type = None for rawline in out.splitlines(): line = rawline.strip() c = line[0:1] if c == "#": continue if c == ":": i = line[1:].find(c) if i != -1: ptype = line[1: i + 1] pvalue = line[i + 2:].strip() if last_package_type is not None: packets.append(last_package_type + (lines,)) lines = [] last_package_type = (ptype, pvalue) else: assert last_package_type, line lines.append(line) else: packets.append(last_package_type + (lines,)) return packets def get_public_keydata(self, keyhandle, armor=False, b64=False): args = ["-a"] if armor else [] args.extend(["--export-options=export-minimal", "--export", str(keyhandle)]) out = self._gpg_out(args, strict=True, encoding=None) return out if not b64 else b64encode_u(out) def get_secret_keydata(self, keyhandle, armor=False): args = ["-a"] if armor else [] args.extend(self._nopassphrase + ["--export-options=export-minimal", "--export-secret-key", keyhandle]) return self._gpg_out(args, strict=True, encoding=None) def encrypt(self, data, recipients): recs = [] for r in recipients: recs.extend(["--recipient", r]) return self._gpg_out(recs + ["--encrypt", "--always-trust"], input=data, encoding=None) def sign(self, data, keyhandle): args = self._nopassphrase + ["--detach-sign", "-u", keyhandle] return self._gpg_out(args, input=data, encoding=None) def verify(self, data, signature): with self.temp_written_file(signature) as sig_fn: out, err = self._gpg_outerr(["--verify", sig_fn, "-"], input=data) return self._find_keyhandle(err) def decrypt(self, enc_data): args = self._nopassphrase + ["--with-colons", "--decrypt"] out, err = self._gpg_outerr(args, input=enc_data, encoding=None) lines = err.splitlines() keyinfos = [] while lines: line1 = lines.pop(0) m = re.match("gpg.*with (\d+)-bit (\w+).*" "ID (\w+).*created (.*)", line1) if m: bits, keytype, id, date = m.groups() line2 = lines.pop(0) if line2.startswith(" "): uid = line2.strip().strip('"') keyinfos.append(KeyInfo(keytype, bits, id, uid, date)) return out, keyinfos def import_keydata(self, keydata): out, err = self._gpg_outerr(["--skip-verify", "--import"], input=keydata) return self._find_keyhandle(err)
class KeyInfo: def __init__(self, type, bits, id, uid, date_created): self.type = type self.bits = int(bits) self.id = id self.uids = [uid] if uid else [] self.date_created = date_created def match(self, other_id): i = min(len(other_id), len(self.id)) return self.id[-i:] == other_id[-i:] def __str__(self): return "KeyInfo(id={id!r}, uids={uids!r}, bits={bits}, type={type})".format( **self.__dict__) __repr__ = __str__
[docs]def find_executable(name): """ return a path object found by looking at the systems underlying PATH specification. If an executable cannot be found, None is returned. copied and adapted from py.path.local.sysfind. """ if os.path.isabs(name): return name if os.path.isfile(name) else None else: if iswin32: paths = os.environ['Path'].split(';') if '' not in paths and '.' not in paths: paths.append('.') try: systemroot = os.environ['SYSTEMROOT'] except KeyError: pass else: paths = [re.sub('%SystemRoot%', systemroot, path) for path in paths] else: paths = os.environ['PATH'].split(':') tryadd = [] if iswin32: tryadd += os.environ['PATHEXT'].split(os.pathsep) tryadd.append("") for x in paths: for addext in tryadd: p = os.path.join(x, name) + addext try: if os.path.isfile(p): return p except Exception: pass return None