#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""*********************************************************************
* Edward is free software: you can redistribute it and/or modify *
* it under the terms of the GNU Affero Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* Edward is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU Affero Public License for more details. *
* *
* You should have received a copy of the GNU Affero Public License *
* along with Edward. If not, see . *
* *
* Copyright (C) 2014-2015 Andrew Engelbrecht (AGPLv3+) *
* Copyright (C) 2014 Josh Drake (AGPLv3+) *
* Copyright (C) 2014 Lisa Marie Maginnis (AGPLv3+) *
* Copyright (C) 2009-2015 Tails developers ( GPLv3+) *
* Copyright (C) 2009 W. Trevor King ( GPLv2+) *
* *
* Special thanks to Josh Drake for writing the original edward bot! :) *
* *
************************************************************************
Code sourced from these projects:
* http://agpl.fsf.org/emailselfdefense.fsf.org/edward/CURRENT/edward.tar.gz
* https://git-tails.immerda.ch/whisperback/tree/whisperBack/encryption.py?h=feature/python3
* http://www.physics.drexel.edu/~wking/code/python/send_pgp_mime
"""
import sys
import gpgme
import re
import io
import os
import importlib
import email.parser
import email.message
import email.encoders
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from email.mime.nonmultipart import MIMENonMultipart
import edward_config
langs = ["de", "el", "en", "fr", "ja", "pt-br", "ro", "ru", "tr"]
"""This list contains the abbreviated names of reply languages available to
edward."""
match_types = [('clearsign',
'-----BEGIN PGP SIGNED MESSAGE-----.*?-----BEGIN PGP SIGNATURE-----.*?-----END PGP SIGNATURE-----'),
('message',
'-----BEGIN PGP MESSAGE-----.*?-----END PGP MESSAGE-----'),
('pubkey',
'-----BEGIN PGP PUBLIC KEY BLOCK-----.*?-----END PGP PUBLIC KEY BLOCK-----'),
('detachedsig',
'-----BEGIN PGP SIGNATURE-----.*?-----END PGP SIGNATURE-----')]
"""This list of tuples matches query names with re.search() queries used
to find GPG data for edward to process."""
class EddyMsg (object):
"""
The EddyMsg class represents relevant parts of a mime message.
The represented message can be single-part or multi-part.
'multipart' is set to True if there are multiple mime parts.
'subparts' points to a list of mime sub-parts if it is a multi-part
message. Otherwise it points to an empty list.
'payload_bytes' contains the raw mime-decoded bytes that haven't been
encoded into a character set.
'payload_pieces' is a list of objects containing strings that when strung
together form the fully-decoded string representation of the mime part.
The 'charset' describes the character set of payload_bytes.
The 'filename', 'content_type' and 'description_list' come from the mime
part parameters.
"""
multipart = False
subparts = []
payload_bytes = None
payload_pieces = []
charset = None
filename = None
content_type = None
description_list = None
class PayloadPiece (object):
"""
PayloadPiece represents a complte or sub-section of a mime part.
Instances of this class are often strung together within one or more arrays
pointed to by each instance of the EddyMsg class.
'piece_type' refers to a string whose value describes the content of
'string'. Examples include "pubkey", for public keys, and "message", for
encrypted data (or armored signatures until they are known to be such.) The
names derive from the header and footer of each of these ascii-encoded gpg
blocks.
'string' contains some string of text, such as non-GPG text, an encrypted
block of text, a signature, or a public key.
'gpg_data' points to any instances of GPGData that have been created based
on the contents of 'string'.
"""
piece_type = None
string = None
gpg_data = None
class GPGData (object):
"""
GPGData holds info from decryption, sig. verification, and/or pub. keys.
Instances of this class contain decrypted information, signature
fingerprints and/or fingerprints of processed and imported public keys.
'decrypted' is set to True if 'plainobj' was created from encrypted data.
'plainobj' points to any decrypted, or signed part of, a GPG signature. It
is intended to be an instance of the EddyMsg class.
'sigs' is a list of fingerprints of keys used to sign the data in plainobj.
'keys' is a list of fingerprints of keys obtained in public key blocks.
"""
decrypted = False
plainobj = None
sigs = []
keys = []
class ReplyInfo (object):
"""
ReplyInfo contains details that edward uses in generating its reply.
Instances of this class contain information about whether a message was
successfully encrypted or signed, and whether a public key was attached, or
retrievable, from the local GPG store. It stores the fingerprints of
potential encryption key candidates and the message (if any at all) to
quote in edward's reply.
'replies' points one of the dictionaries of translated replies.
'target_key' refers to the fingerprint of a key used to sign encrypted
data. This is the preferred key, if it is set, and if is available.
'fallback_target_key' referst to the fingerprint of a key used to sign
unencrypted data; alternatively it may be a public key attached to the
message.
'msg_to_quote' refers to the part of a message which edward should quote in
his reply. This should remain as None if there was no encrypted and singed
part. This is to avoid making edward a service for decrypting other
people's messages to edward.
'success_decrypt' is set to True if edward could decrypt part of the
message.
'failed_decrypt' is set to True if edward failed to decrypt part of the
message.
'publick_key_received' is set to True if edward successfully imported a
public key.
'no_public_key' is set to True if edward doesn't have a key to encrypt to
when replying to the user.
'sig_success' is set to True if edward could to some extent verify the
signature of a signed part of the message to edward.
'sig_failure' is set to True if edward failed to some extent verify the
signature of a signed part of the message to edward.
"""
replies = None
target_key = None
fallback_target_key = None
msg_to_quote = ""
success_decrypt = False
failed_decrypt = False
public_key_received = False
no_public_key = False
sig_success = False
sig_failure = False
def main ():
"""
This is the main function for edward, a GPG reply bot.
Edward responds to GPG-encrypted and signed mail, encrypting and signing
the response if the user's public key is, or was, included in the message.
Args:
None
Returns:
None
Pre:
Mime or plaintext email passing in through standard input. Portions of
the email may be encrypted. If the To: address contains the text
"edward-ja", then the reply will contain a reply written in the
Japanese language. There are other languages as well. The default
language is English.
Post:
A reply email will be printed to standard output. The contents of the
reply email depends on whether the original email was encrypted or not,
has or doesn't have a signature, whether a public key used in the
original message is provided or locally stored, and the language
implied by the To: address in the original email.
"""
handle_args()
gpgme_ctx = get_gpg_context(edward_config.gnupghome,
edward_config.sign_with_key)
email_text = sys.stdin.read()
email_struct = parse_pgp_mime(email_text, gpgme_ctx)
email_to, email_from, email_subject = email_to_from_subject(email_text)
lang = import_lang(email_to)
replyinfo_obj = ReplyInfo()
replyinfo_obj.replies = lang.replies
prepare_for_reply(email_struct, replyinfo_obj)
encrypt_to_key = get_key_from_fp(replyinfo_obj, gpgme_ctx)
reply_plaintext = write_reply(replyinfo_obj)
reply_mime = generate_encrypted_mime(reply_plaintext, email_from, \
email_subject, encrypt_to_key,
gpgme_ctx)
print(reply_mime)
def get_gpg_context (gnupghome, sign_with_key_fp):
os.environ['GNUPGHOME'] = gnupghome
gpgme_ctx = gpgme.Context()
gpgme_ctx.armor = True
try:
sign_with_key = gpgme_ctx.get_key(sign_with_key_fp)
except:
error("unable to load signing key. is the gnupghome "
+ "and signing key properly set in the edward_config.py?")
exit(1)
gpgme_ctx.signers = [sign_with_key]
return gpgme_ctx
def parse_pgp_mime (email_text, gpgme_ctx):
email_struct = email.parser.Parser().parsestr(email_text)
eddymsg_obj = parse_mime(email_struct)
split_payloads(eddymsg_obj)
gpg_on_payloads(eddymsg_obj, gpgme_ctx)
return eddymsg_obj
def parse_mime(msg_struct):
eddymsg_obj = EddyMsg()
if msg_struct.is_multipart() == True:
payloads = msg_struct.get_payload()
eddymsg_obj.multipart = True
eddymsg_obj.subparts = list(map(parse_mime, payloads))
else:
eddymsg_obj = get_subpart_data(msg_struct)
return eddymsg_obj
def scan_and_split (payload_piece, match_type, pattern):
# don't try to re-split pieces containing gpg data
if payload_piece.piece_type != "text":
return [payload_piece]
flags = re.DOTALL | re.MULTILINE
matches = re.search("(?P.*?)(?P" + pattern +
")(?P.*)", payload_piece.string, flags=flags)
if matches == None:
pieces = [payload_piece]
else:
beginning = PayloadPiece()
beginning.string = matches.group('beginning')
beginning.piece_type = payload_piece.piece_type
match = PayloadPiece()
match.string = matches.group('match')
match.piece_type = match_type
rest = PayloadPiece()
rest.string = matches.group('rest')
rest.piece_type = payload_piece.piece_type
more_pieces = scan_and_split(rest, match_type, pattern)
pieces = [beginning, match ] + more_pieces
return pieces
def get_subpart_data (part):
obj = EddyMsg()
obj.charset = part.get_content_charset()
obj.payload_bytes = part.get_payload(decode=True)
obj.filename = part.get_filename()
obj.content_type = part.get_content_type()
obj.description_list = part['content-description']
# your guess is as good as a-myy-ee-ine...
if obj.charset == None:
obj.charset = 'utf-8'
if obj.payload_bytes != None:
try:
payload = PayloadPiece()
payload.string = obj.payload_bytes.decode(obj.charset)
payload.piece_type = 'text'
obj.payload_pieces = [payload]
except UnicodeDecodeError:
pass
return obj
def do_to_eddys_pieces (function_to_do, eddymsg_obj, data):
if eddymsg_obj.multipart == True:
for sub in eddymsg_obj.subparts:
do_to_eddys_pieces(function_to_do, sub, data)
else:
function_to_do(eddymsg_obj, data)
def split_payloads (eddymsg_obj):
for match_type in match_types:
do_to_eddys_pieces(split_payload_pieces, eddymsg_obj, match_type)
def split_payload_pieces (eddymsg_obj, match_type):
(match_name, pattern) = match_type
new_pieces_list = []
for piece in eddymsg_obj.payload_pieces:
new_pieces_list += scan_and_split(piece, match_name, pattern)
eddymsg_obj.payload_pieces = new_pieces_list
def gpg_on_payloads (eddymsg_obj, gpgme_ctx, prev_parts=[]):
if eddymsg_obj.multipart == True:
prev_parts=[]
for sub in eddymsg_obj.subparts:
gpg_on_payloads (sub, gpgme_ctx, prev_parts)
prev_parts += [sub]
return
for piece in eddymsg_obj.payload_pieces:
if piece.piece_type == "text":
# don't transform the plaintext.
pass
elif piece.piece_type == "message":
(plaintext, sigs) = decrypt_block(piece.string, gpgme_ctx)
if plaintext:
piece.gpg_data = GPGData()
piece.gpg_data.decrypted = True
piece.gpg_data.sigs = sigs
# recurse!
piece.gpg_data.plainobj = parse_pgp_mime(plaintext, gpgme_ctx)
continue
# if not encrypted, check to see if this is an armored signature.
(plaintext, sigs) = verify_sig_message(piece.string, gpgme_ctx)
if plaintext:
piece.piece_type = "signature"
piece.gpg_data = GPGData()
piece.gpg_data.sigs = sigs
# recurse!
piece.gpg_data.plainobj = parse_pgp_mime(plaintext, gpgme_ctx)
elif piece.piece_type == "pubkey":
key_fps = add_gpg_key(piece.string, gpgme_ctx)
if key_fps != []:
piece.gpg_data = GPGData()
piece.gpg_data.keys = key_fps
elif piece.piece_type == "clearsign":
(plaintext, sig_fps) = verify_clear_signature(piece.string, gpgme_ctx)
if sig_fps != []:
piece.gpg_data = GPGData()
piece.gpg_data.sigs = sig_fps
piece.gpg_data.plainobj = parse_pgp_mime(plaintext, gpgme_ctx)
elif piece.piece_type == "detachedsig":
for prev in prev_parts:
payload_bytes = prev.payload_bytes
sig_fps = verify_detached_signature(piece.string, payload_bytes, gpgme_ctx)
if sig_fps != []:
piece.gpg_data = GPGData()
piece.gpg_data.sigs = sig_fps
piece.gpg_data.plainobj = prev
break
else:
pass
def prepare_for_reply (eddymsg_obj, replyinfo_obj):
do_to_eddys_pieces(prepare_for_reply_pieces, eddymsg_obj, replyinfo_obj)
def prepare_for_reply_pieces (eddymsg_obj, replyinfo_obj):
for piece in eddymsg_obj.payload_pieces:
if piece.piece_type == "text":
# don't quote the plaintext part.
pass
elif piece.piece_type == "message":
prepare_for_reply_message(piece, replyinfo_obj)
elif piece.piece_type == "pubkey":
prepare_for_reply_pubkey(piece, replyinfo_obj)
elif (piece.piece_type == "clearsign") \
or (piece.piece_type == "detachedsig") \
or (piece.piece_type == "signature"):
prepare_for_reply_sig(piece, replyinfo_obj)
def prepare_for_reply_message (piece, replyinfo_obj):
if piece.gpg_data == None:
replyinfo_obj.failed_decrypt = True
return
replyinfo_obj.success_decrypt = True
# we already have a key (and a message)
if replyinfo_obj.target_key != None:
return
if piece.gpg_data.sigs != []:
replyinfo_obj.target_key = piece.gpg_data.sigs[0]
get_signed_part = False
else:
# only include a signed message in the reply.
get_signed_part = True
replyinfo_obj.msg_to_quote = flatten_decrypted_payloads(piece.gpg_data.plainobj, get_signed_part)
# to catch public keys in encrypted blocks
prepare_for_reply(piece.gpg_data.plainobj, replyinfo_obj)
def prepare_for_reply_pubkey (piece, replyinfo_obj):
if piece.gpg_data == None or piece.gpg_data.keys == []:
replyinfo_obj.no_public_key = True
else:
replyinfo_obj.public_key_received = True
if replyinfo_obj.fallback_target_key == None:
replyinfo_obj.fallback_target_key = piece.gpg_data.keys[0]
def prepare_for_reply_sig (piece, replyinfo_obj):
if piece.gpg_data == None or piece.gpg_data.sigs == []:
replyinfo_obj.sig_failure = True
else:
replyinfo_obj.sig_success = True
if replyinfo_obj.fallback_target_key == None:
replyinfo_obj.fallback_target_key = piece.gpg_data.sigs[0]
def flatten_decrypted_payloads (eddymsg_obj, get_signed_part):
flat_string = ""
if eddymsg_obj == None:
return ""
# recurse on multi-part mime
if eddymsg_obj.multipart == True:
for sub in eddymsg_obj.subparts:
flat_string += flatten_decrypted_payloads (sub, get_signed_part)
return flat_string
for piece in eddymsg_obj.payload_pieces:
if piece.piece_type == "text":
flat_string += piece.string
if (get_signed_part):
# don't include nested encryption
if (piece.piece_type == "message") \
and (piece.gpg_data != None) \
and (piece.gpg_data.decrypted == False):
flat_string += flatten_decrypted_payloads(piece.gpg_data.plainobj, get_signed_part)
elif ((piece.piece_type == "clearsign") \
or (piece.piece_type == "detachedsig") \
or (piece.piece_type == "signature")) \
and (piece.gpg_data != None):
# FIXME: the key used to sign this message needs to be the one that is used for the encrypted reply.
flat_string += flatten_decrypted_payloads (piece.gpg_data.plainobj, get_signed_part)
return flat_string
def get_key_from_fp (replyinfo_obj, gpgme_ctx):
if replyinfo_obj.target_key == None:
replyinfo_obj.target_key = replyinfo_obj.fallback_target_key
if replyinfo_obj.target_key != None:
try:
encrypt_to_key = gpgme_ctx.get_key(replyinfo_obj.target_key)
return encrypt_to_key
except:
pass
# no available key to use
replyinfo_obj.target_key = None
replyinfo_obj.fallback_target_key = None
replyinfo_obj.no_public_key = True
replyinfo_obj.public_key_received = False
return None
def write_reply (replyinfo_obj):
reply_plain = ""
if replyinfo_obj.success_decrypt == True:
reply_plain += replyinfo_obj.replies['success_decrypt']
if replyinfo_obj.no_public_key == False:
quoted_text = email_quote_text(replyinfo_obj.msg_to_quote)
reply_plain += quoted_text
elif replyinfo_obj.failed_decrypt == True:
reply_plain += replyinfo_obj.replies['failed_decrypt']
if replyinfo_obj.sig_success == True:
reply_plain += "\n\n"
reply_plain += replyinfo_obj.replies['sig_success']
elif replyinfo_obj.sig_failure == True:
reply_plain += "\n\n"
reply_plain += replyinfo_obj.replies['sig_failure']
if replyinfo_obj.public_key_received == True:
reply_plain += "\n\n"
reply_plain += replyinfo_obj.replies['public_key_received']
elif replyinfo_obj.no_public_key == True:
reply_plain += "\n\n"
reply_plain += replyinfo_obj.replies['no_public_key']
reply_plain += "\n\n"
reply_plain += replyinfo_obj.replies['signature']
return reply_plain
def add_gpg_key (key_block, gpgme_ctx):
fp = io.BytesIO(key_block.encode('ascii'))
result = gpgme_ctx.import_(fp)
imports = result.imports
key_fingerprints = []
if imports != []:
for import_ in imports:
fingerprint = import_[0]
key_fingerprints += [fingerprint]
debug("added gpg key: " + fingerprint)
return key_fingerprints
def verify_sig_message (msg_block, gpgme_ctx):
block_b = io.BytesIO(msg_block.encode('ascii'))
plain_b = io.BytesIO()
try:
sigs = gpgme_ctx.verify(block_b, None, plain_b)
except:
return ("",[])
plaintext = plain_b.getvalue().decode('utf-8')
fingerprints = []
for sig in sigs:
fingerprints += [sig.fpr]
return (plaintext, fingerprints)
def verify_clear_signature (sig_block, gpgme_ctx):
# FIXME: this might require the un-decoded bytes
# or the correct re-encoding with the carset of the mime part.
msg_fp = io.BytesIO(sig_block.encode('utf-8'))
ptxt_fp = io.BytesIO()
result = gpgme_ctx.verify(msg_fp, None, ptxt_fp)
# FIXME: this might require using the charset of the mime part.
plaintext = ptxt_fp.getvalue().decode('utf-8')
sig_fingerprints = []
for res_ in result:
sig_fingerprints += [res_.fpr]
return plaintext, sig_fingerprints
def verify_detached_signature (detached_sig, plaintext_bytes, gpgme_ctx):
detached_sig_fp = io.BytesIO(detached_sig.encode('ascii'))
plaintext_fp = io.BytesIO(plaintext_bytes)
ptxt_fp = io.BytesIO()
result = gpgme_ctx.verify(detached_sig_fp, plaintext_fp, None)
sig_fingerprints = []
for res_ in result:
sig_fingerprints += [res_.fpr]
return sig_fingerprints
def decrypt_block (msg_block, gpgme_ctx):
block_b = io.BytesIO(msg_block.encode('ascii'))
plain_b = io.BytesIO()
try:
sigs = gpgme_ctx.decrypt_verify(block_b, plain_b)
except:
return ("",[])
plaintext = plain_b.getvalue().decode('utf-8')
fingerprints = []
for sig in sigs:
fingerprints += [sig.fpr]
return (plaintext, fingerprints)
def choose_reply_encryption_key (gpgme_ctx, fingerprints):
reply_key = None
for fp in fingerprints:
try:
key = gpgme_ctx.get_key(fp)
if (key.can_encrypt == True):
reply_key = key
break
except:
continue
return reply_key
def email_to_from_subject (email_text):
email_struct = email.parser.Parser().parsestr(email_text)
email_to = email_struct['To']
email_from = email_struct['From']
email_subject = email_struct['Subject']
return email_to, email_from, email_subject
def import_lang(email_to):
if email_to != None:
for lang in langs:
if "edward-" + lang in email_to:
lang = "lang." + re.sub('-', '_', lang)
language = importlib.import_module(lang)
return language
return importlib.import_module("lang.en")
def generate_encrypted_mime (plaintext, email_from, email_subject, encrypt_to_key,
gpgme_ctx):
# quoted printable encoding lets most ascii characters look normal
# before the mime message is decoded.
char_set = email.charset.Charset("utf-8")
char_set.body_encoding = email.charset.QP
# MIMEText doesn't allow setting the text encoding
# so we use MIMENonMultipart.
plaintext_mime = MIMENonMultipart('text', 'plain')
plaintext_mime.set_payload(plaintext, charset=char_set)
if (encrypt_to_key != None):
encrypted_text = encrypt_sign_message(plaintext_mime.as_string(),
encrypt_to_key,
gpgme_ctx)
control_mime = MIMEApplication("Version: 1",
_subtype='pgp-encrypted',
_encoder=email.encoders.encode_7or8bit)
control_mime['Content-Description'] = 'PGP/MIME version identification'
control_mime.set_charset('us-ascii')
encoded_mime = MIMEApplication(encrypted_text,
_subtype='octet-stream; name="encrypted.asc"',
_encoder=email.encoders.encode_7or8bit)
encoded_mime['Content-Description'] = 'OpenPGP encrypted message'
encoded_mime['Content-Disposition'] = 'inline; filename="encrypted.asc"'
encoded_mime.set_charset('us-ascii')
message_mime = MIMEMultipart(_subtype="encrypted", protocol="application/pgp-encrypted")
message_mime.attach(control_mime)
message_mime.attach(encoded_mime)
message_mime['Content-Disposition'] = 'inline'
else:
message_mime = plaintext_mime
message_mime['To'] = email_from
message_mime['Subject'] = email_subject
reply = message_mime.as_string()
return reply
def email_quote_text (text):
quoted_message = re.sub(r'^', r'> ', text, flags=re.MULTILINE)
return quoted_message
def encrypt_sign_message (plaintext, encrypt_to_key, gpgme_ctx):
plaintext_bytes = io.BytesIO(plaintext.encode('ascii'))
encrypted_bytes = io.BytesIO()
gpgme_ctx.encrypt_sign([encrypt_to_key], gpgme.ENCRYPT_ALWAYS_TRUST,
plaintext_bytes, encrypted_bytes)
encrypted_txt = encrypted_bytes.getvalue().decode('ascii')
return encrypted_txt
def error (error_msg):
sys.stderr.write(progname + ": " + str(error_msg) + "\n")
def debug (debug_msg):
if edward_config.debug == True:
error(debug_msg)
def handle_args ():
if __name__ == "__main__":
global progname
progname = sys.argv[0]
if len(sys.argv) > 1:
print(progname + ": error, this program doesn't " \
"need any arguments.", file=sys.stderr)
exit(1)
main()