#! /usr/bin/env python3 # -*- coding: utf-8 -*- """********************************************************************* * Edward is free software: you can redistribute it and/or modify * * it under the terms of the GNU Affero Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * Edward is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU Affero Public License for more details. * * * * You should have received a copy of the GNU Affero Public License * * along with Edward. If not, see . * * * * Copyright (C) 2014-2015 Andrew Engelbrecht (AGPLv3+) * * Copyright (C) 2014 Josh Drake (AGPLv3+) * * Copyright (C) 2014 Lisa Marie Maginnis (AGPLv3+) * * Copyright (C) 2009-2015 Tails developers ( GPLv3+) * * Copyright (C) 2009 W. Trevor King ( GPLv2+) * * * * Special thanks to Josh Drake for writing the original edward bot! :) * * * ************************************************************************ Code sourced from these projects: * http://agpl.fsf.org/emailselfdefense.fsf.org/edward/CURRENT/edward.tar.gz * https://git-tails.immerda.ch/whisperback/tree/whisperBack/encryption.py?h=feature/python3 * http://www.physics.drexel.edu/~wking/code/python/send_pgp_mime """ import sys import gpgme import re import io import os import importlib import email.parser import email.message import email.encoders from email.mime.multipart import MIMEMultipart from email.mime.application import MIMEApplication from email.mime.nonmultipart import MIMENonMultipart import edward_config langs = ["an", "de", "el", "en", "fr", "ja", "pt-br", "ro", "ru", "tr"] match_types = [('clearsign', '-----BEGIN PGP SIGNED MESSAGE-----.*?-----BEGIN PGP SIGNATURE-----.*?-----END PGP SIGNATURE-----'), ('message', '-----BEGIN PGP MESSAGE-----.*?-----END PGP MESSAGE-----'), ('pubkey', '-----BEGIN PGP PUBLIC KEY BLOCK-----.*?-----END PGP PUBLIC KEY BLOCK-----'), ('detachedsig', '-----BEGIN PGP SIGNATURE-----.*?-----END PGP SIGNATURE-----')] class EddyMsg (object): def __init__(self): self.multipart = False self.subparts = [] self.charset = None self.payload_bytes = None self.payload_pieces = [] self.filename = None self.content_type = None self.description_list = None class PayloadPiece (object): def __init__(self): self.piece_type = None self.string = None self.gpg_data = None class GPGData (object): def __init__(self): self.decrypted = False self.plainobj = None self.sigs = [] self.keys = [] class ReplyInfo (object): def __init__(self): self.replies = None self.target_key = None self.fallback_target_key = None self.msg_to_quote = "" self.success_decrypt = False self.failed_decrypt = False self.public_key_received = False self.no_public_key = False self.sig_success = False self.sig_failure = False def main (): handle_args() gpgme_ctx = get_gpg_context(edward_config.gnupghome, edward_config.sign_with_key) email_text = sys.stdin.read() email_struct = parse_pgp_mime(email_text, gpgme_ctx) email_to, email_from, email_subject = email_to_from_subject(email_text) lang = import_lang(email_to) replyinfo_obj = ReplyInfo() replyinfo_obj.replies = lang.replies prepare_for_reply(email_struct, replyinfo_obj) encrypt_to_key = get_key_from_fp(replyinfo_obj, gpgme_ctx) reply_plaintext = write_reply(replyinfo_obj) # TODO error handling for missing keys and setting .no_public_key reply_mime = generate_encrypted_mime(reply_plaintext, email_from, \ email_subject, encrypt_to_key, gpgme_ctx) print(reply_mime) def get_gpg_context (gnupghome, sign_with_key_fp): os.environ['GNUPGHOME'] = gnupghome gpgme_ctx = gpgme.Context() gpgme_ctx.armor = True try: sign_with_key = gpgme_ctx.get_key(sign_with_key_fp) except: error("unable to load signing key. is the gnupghome " + "and signing key properly set in the edward_config.py?") exit(1) gpgme_ctx.signers = [sign_with_key] return gpgme_ctx def parse_pgp_mime (email_text, gpgme_ctx): email_struct = email.parser.Parser().parsestr(email_text) eddymsg_obj = parse_mime(email_struct) split_payloads(eddymsg_obj) gpg_on_payloads(eddymsg_obj, gpgme_ctx) return eddymsg_obj def parse_mime(msg_struct): eddymsg_obj = EddyMsg() if msg_struct.is_multipart() == True: payloads = msg_struct.get_payload() eddymsg_obj.multipart = True eddymsg_obj.subparts = list(map(parse_mime, payloads)) else: eddymsg_obj = get_subpart_data(msg_struct) return eddymsg_obj def scan_and_split (payload_piece, match_type, pattern): # don't try to re-split pieces containing gpg data if payload_piece.piece_type != "text": return [payload_piece] flags = re.DOTALL | re.MULTILINE matches = re.search("(?P.*?)(?P" + pattern + ")(?P.*)", payload_piece.string, flags=flags) if matches == None: pieces = [payload_piece] else: beginning = PayloadPiece() beginning.string = matches.group('beginning') beginning.piece_type = payload_piece.piece_type match = PayloadPiece() match.string = matches.group('match') match.piece_type = match_type rest = PayloadPiece() rest.string = matches.group('rest') rest.piece_type = payload_piece.piece_type more_pieces = scan_and_split(rest, match_type, pattern) pieces = [beginning, match ] + more_pieces return pieces def get_subpart_data (part): obj = EddyMsg() obj.charset = part.get_content_charset() obj.payload_bytes = part.get_payload(decode=True) obj.filename = part.get_filename() obj.content_type = part.get_content_type() obj.description_list = part['content-description'] # your guess is as good as a-myy-ee-ine... if obj.charset == None: obj.charset = 'utf-8' if obj.payload_bytes != None: try: payload = PayloadPiece() payload.string = obj.payload_bytes.decode(obj.charset) payload.piece_type = 'text' obj.payload_pieces = [payload] except UnicodeDecodeError: pass return obj def do_to_eddys_pieces (function_to_do, eddymsg_obj, data): if eddymsg_obj.multipart == True: for sub in eddymsg_obj.subparts: do_to_eddys_pieces(function_to_do, sub, data) else: function_to_do(eddymsg_obj, data) def split_payloads (eddymsg_obj): for match_type in match_types: do_to_eddys_pieces(split_payload_pieces, eddymsg_obj, match_type) def split_payload_pieces (eddymsg_obj, match_type): (match_name, pattern) = match_type new_pieces_list = [] for piece in eddymsg_obj.payload_pieces: new_pieces_list += scan_and_split(piece, match_name, pattern) eddymsg_obj.payload_pieces = new_pieces_list def gpg_on_payloads (eddymsg_obj, gpgme_ctx, prev_parts=[]): if eddymsg_obj.multipart == True: prev_parts=[] for sub in eddymsg_obj.subparts: gpg_on_payloads (sub, gpgme_ctx, prev_parts) prev_parts += [sub] return for piece in eddymsg_obj.payload_pieces: if piece.piece_type == "text": # don't transform the plaintext. pass elif piece.piece_type == "message": (plaintext, sigs) = decrypt_block(piece.string, gpgme_ctx) if plaintext: piece.gpg_data = GPGData() piece.gpg_data.decrypted = True piece.gpg_data.sigs = sigs # recurse! piece.gpg_data.plainobj = parse_pgp_mime(plaintext, gpgme_ctx) elif piece.piece_type == "pubkey": key_fps = add_gpg_key(piece.string, gpgme_ctx) if key_fps != []: piece.gpg_data = GPGData() piece.gpg_data.keys = key_fps elif piece.piece_type == "clearsign": (plaintext, sig_fps) = verify_clear_signature(piece.string, gpgme_ctx) if sig_fps != []: piece.gpg_data = GPGData() piece.gpg_data.sigs = sig_fps piece.gpg_data.plainobj = parse_pgp_mime(plaintext, gpgme_ctx) elif piece.piece_type == "detachedsig": for prev in prev_parts: payload_bytes = prev.payload_bytes sig_fps = verify_detached_signature(piece.string, payload_bytes, gpgme_ctx) if sig_fps != []: piece.gpg_data = GPGData() piece.gpg_data.sigs = sig_fps piece.gpg_data.plainobj = prev break else: pass def prepare_for_reply (eddymsg_obj, replyinfo_obj): do_to_eddys_pieces(prepare_for_reply_pieces, eddymsg_obj, replyinfo_obj) def prepare_for_reply_pieces (eddymsg_obj, replyinfo_obj): for piece in eddymsg_obj.payload_pieces: if piece.piece_type == "text": # don't quote the plaintext part. pass elif piece.piece_type == "message": if piece.gpg_data == None: replyinfo_obj.failed_decrypt = True else: replyinfo_obj.success_decrypt = True if replyinfo_obj.target_key != None: continue if piece.gpg_data.sigs != []: replyinfo_obj.target_key = piece.gpg_data.sigs[0] replyinfo_obj.msg_to_quote = flatten_payloads(piece.gpg_data.plainobj) # to catch public keys in encrypted blocks prepare_for_reply(piece.gpg_data.plainobj, replyinfo_obj) elif piece.piece_type == "pubkey": if piece.gpg_data == None: replyinfo_obj.no_public_key = True else: replyinfo_obj.public_key_received = True elif (piece.piece_type == "clearsign") \ or (piece.piece_type == "detachedsig"): if piece.gpg_data == None: replyinfo_obj.sig_failure = True else: replyinfo_obj.sig_success = True if replyinfo_obj.target_key == None: replyinfo_obj.fallback_target_key = piece.gpg_data.sigs[0] def flatten_payloads (eddymsg_obj): flat_string = "" if eddymsg_obj == None: return "" if eddymsg_obj.multipart == True: for sub in eddymsg_obj.subparts: flat_string += flatten_payloads (sub) return flat_string # todo: don't include nested decrypted messages. for piece in eddymsg_obj.payload_pieces: if piece.piece_type == "text": flat_string += piece.string elif piece.piece_type == "message": flat_string += flatten_payloads(piece.plainobj) elif ((piece.piece_type == "clearsign") \ or (piece.piece_type == "detachedsig")) \ and (piece.gpg_data != None): flat_string += flatten_payloads (piece.gpg_data.plainobj) return flat_string def get_key_from_fp (replyinfo_obj, gpgme_ctx): if replyinfo_obj.target_key == None: replyinfo_obj.target_key = replyinfo_obj.fallback_target_key if replyinfo_obj.target_key != None: try: encrypt_to_key = gpgme_ctx.get_key(replyinfo_obj.target_key) return encrypt_to_key except: pass # no available key to use replyinfo_obj.target_key = None replyinfo_obj.fallback_target_key = None replyinfo_obj.no_public_key = True replyinfo_obj.public_key_received = False return None def write_reply (replyinfo_obj): reply_plain = "" if replyinfo_obj.success_decrypt == True: reply_plain += replyinfo_obj.replies['success_decrypt'] if replyinfo_obj.no_public_key == False: quoted_text = email_quote_text(replyinfo_obj.msg_to_quote) reply_plain += quoted_text elif replyinfo_obj.failed_decrypt == True: reply_plain += replyinfo_obj.replies['failed_decrypt'] if replyinfo_obj.sig_success == True: reply_plain += "\n\n" reply_plain += replyinfo_obj.replies['sig_success'] elif replyinfo_obj.sig_failure == True: reply_plain += "\n\n" reply_plain += replyinfo_obj.replies['sig_failure'] if replyinfo_obj.public_key_received == True: reply_plain += "\n\n" reply_plain += replyinfo_obj.replies['public_key_received'] elif replyinfo_obj.no_public_key == True: reply_plain += "\n\n" reply_plain += replyinfo_obj.replies['no_public_key'] reply_plain += "\n\n" reply_plain += replyinfo_obj.replies['signature'] return reply_plain def add_gpg_key (key_block, gpgme_ctx): fp = io.BytesIO(key_block.encode('ascii')) result = gpgme_ctx.import_(fp) imports = result.imports key_fingerprints = [] if imports != []: for import_ in imports: fingerprint = import_[0] key_fingerprints += [fingerprint] debug("added gpg key: " + fingerprint) return key_fingerprints def verify_clear_signature (sig_block, gpgme_ctx): # FIXME: this might require the un-decoded bytes # or the correct re-encoding with the carset of the mime part. msg_fp = io.BytesIO(sig_block.encode('utf-8')) ptxt_fp = io.BytesIO() result = gpgme_ctx.verify(msg_fp, None, ptxt_fp) # FIXME: this might require using the charset of the mime part. plaintext = ptxt_fp.getvalue().decode('utf-8') sig_fingerprints = [] for res_ in result: sig_fingerprints += [res_.fpr] return plaintext, sig_fingerprints def verify_detached_signature (detached_sig, plaintext_bytes, gpgme_ctx): detached_sig_fp = io.BytesIO(detached_sig.encode('ascii')) plaintext_fp = io.BytesIO(plaintext_bytes) ptxt_fp = io.BytesIO() result = gpgme_ctx.verify(detached_sig_fp, plaintext_fp, None) sig_fingerprints = [] for res_ in result: sig_fingerprints += [res_.fpr] return sig_fingerprints def decrypt_block (msg_block, gpgme_ctx): block_b = io.BytesIO(msg_block.encode('ascii')) plain_b = io.BytesIO() try: sigs = gpgme_ctx.decrypt_verify(block_b, plain_b) except: return ("",[]) plaintext = plain_b.getvalue().decode('utf-8') fingerprints = [] for sig in sigs: fingerprints += [sig.fpr] return (plaintext, fingerprints) def choose_reply_encryption_key (gpgme_ctx, fingerprints): reply_key = None for fp in fingerprints: try: key = gpgme_ctx.get_key(fp) if (key.can_encrypt == True): reply_key = key break except: continue return reply_key def email_to_from_subject (email_text): email_struct = email.parser.Parser().parsestr(email_text) email_to = email_struct['To'] email_from = email_struct['From'] email_subject = email_struct['Subject'] return email_to, email_from, email_subject def import_lang(email_to): if email_to != None: for lang in langs: if "edward-" + lang in email_to: lang = "lang." + re.sub('-', '_', lang) language = importlib.import_module(lang) return language return importlib.import_module("lang.en") def generate_encrypted_mime (plaintext, email_from, email_subject, encrypt_to_key, gpgme_ctx): # quoted printable encoding lets most ascii characters look normal # before the decrypted mime message is decoded. char_set = email.charset.Charset("utf-8") char_set.body_encoding = email.charset.QP # MIMEText doesn't allow setting the text encoding # so we use MIMENonMultipart. plaintext_mime = MIMENonMultipart('text', 'plain') plaintext_mime.set_payload(plaintext, charset=char_set) if (encrypt_to_key != None): encrypted_text = encrypt_sign_message(plaintext_mime.as_string(), encrypt_to_key, gpgme_ctx) control_mime = MIMEApplication("Version: 1", _subtype='pgp-encrypted', _encoder=email.encoders.encode_7or8bit) control_mime['Content-Description'] = 'PGP/MIME version identification' control_mime.set_charset('us-ascii') encoded_mime = MIMEApplication(encrypted_text, _subtype='octet-stream; name="encrypted.asc"', _encoder=email.encoders.encode_7or8bit) encoded_mime['Content-Description'] = 'OpenPGP encrypted message' encoded_mime['Content-Disposition'] = 'inline; filename="encrypted.asc"' encoded_mime.set_charset('us-ascii') message_mime = MIMEMultipart(_subtype="encrypted", protocol="application/pgp-encrypted") message_mime.attach(control_mime) message_mime.attach(encoded_mime) message_mime['Content-Disposition'] = 'inline' else: message_mime = plaintext_mime message_mime['To'] = email_from message_mime['Subject'] = email_subject reply = message_mime.as_string() return reply def email_quote_text (text): quoted_message = re.sub(r'^', r'> ', text, flags=re.MULTILINE) return quoted_message def encrypt_sign_message (plaintext, encrypt_to_key, gpgme_ctx): plaintext_bytes = io.BytesIO(plaintext.encode('ascii')) encrypted_bytes = io.BytesIO() gpgme_ctx.encrypt_sign([encrypt_to_key], gpgme.ENCRYPT_ALWAYS_TRUST, plaintext_bytes, encrypted_bytes) encrypted_txt = encrypted_bytes.getvalue().decode('ascii') return encrypted_txt def error (error_msg): sys.stderr.write(progname + ": " + str(error_msg) + "\n") def debug (debug_msg): if edward_config.debug == True: error(debug_msg) def handle_args (): if __name__ == "__main__": global progname progname = sys.argv[0] if len(sys.argv) > 1: print(progname + ": error, this program doesn't " \ "need any arguments.", file=sys.stderr) exit(1) main()