* You should have received a copy of the GNU Affero Public License *
* along with Edward. If not, see <http://www.gnu.org/licenses/>. *
* *
-* Copyright (C) 2014-2015 Andrew Engelbrecht (AGPLv3+) *
+* Copyright (C) 2014-2021 Andrew Engelbrecht (AGPLv3+) *
* Copyright (C) 2014 Josh Drake (AGPLv3+) *
* Copyright (C) 2014 Lisa Marie Maginnis (AGPLv3+) *
* Copyright (C) 2009-2015 Tails developers <tails@boum.org> ( GPLv3+) *
import io
import os
import sys
+import gpg
import enum
-import gpgme
import smtplib
import importlib
test_auto_reply(email_bytes)
- gpgme_ctx = get_gpg_context(edward_config.gnupghome,
- edward_config.sign_with_key)
+ ctx = gpg.Context(home_dir=edward_config.gnupghome)
+ signing_keys = list(ctx.get_key(edward_config.sign_with_key))
+ ctx.signers = signing_keys
# do this twice so sigs can be verified with newly imported keys
- parse_pgp_mime(email_bytes, gpgme_ctx)
- email_struct = parse_pgp_mime(email_bytes, gpgme_ctx)
+ parse_pgp_mime(email_bytes, ctx)
+ email_struct = parse_pgp_mime(email_bytes, ctx)
email_to, email_reply_to, email_subject = email_to_reply_to_subject(email_bytes)
lang, reply_from = import_lang_pick_address(email_to, edward_config.hostname)
replyinfo_obj.replies = lang.replies
prepare_for_reply(email_struct, replyinfo_obj)
- get_key_from_fp(replyinfo_obj, gpgme_ctx)
+ get_key_from_fp(replyinfo_obj, ctx)
reply_plaintext = write_reply(replyinfo_obj)
reply_mime = generate_encrypted_mime(reply_plaintext, email_reply_to, reply_from, \
email_subject, replyinfo_obj.encrypt_to_key,
- gpgme_ctx)
+ ctx)
if print_reply_only == True:
print(reply_mime)
os.environ['GNUPGHOME'] = gnupghome
- gpgme_ctx = gpgme.Context()
- gpgme_ctx.armor = True
+ ctx = gpg.Context()
+ ctx.armor = True
try:
- sign_with_key = gpgme_ctx.get_key(sign_with_key_fp)
- except gpgme.GpgmeError:
+ sign_with_key = ctx.get_key(sign_with_key_fp)
+ except gpg.errors.GPGMEError:
error("unable to load signing key. is the gnupghome "
+ "and signing key properly set in the edward_config.py?")
exit(1)
- gpgme_ctx.signers = [sign_with_key]
+ ctx.signers = [sign_with_key]
- return gpgme_ctx
+ return ctx
-def parse_pgp_mime (email_bytes, gpgme_ctx):
+def parse_pgp_mime (email_bytes, ctx):
"""Parses the email for mime payloads and decrypts/verfies signatures.
This function creates a representation of a mime or plaintext email with
Args:
email_bytes: an email message in byte string format
- gpgme_ctx: a gpgme context
+ ctx: a gpgme context
Returns:
A message as an instance of EddyMsg
eddymsg_obj = parse_mime(email_struct)
split_payloads(eddymsg_obj)
- gpg_on_payloads(eddymsg_obj, gpgme_ctx)
+ gpg_on_payloads(eddymsg_obj, ctx)
return eddymsg_obj
eddymsg_obj.payload_pieces = new_pieces_list
-def gpg_on_payloads (eddymsg_obj, gpgme_ctx, prev_parts=[]):
+def gpg_on_payloads (eddymsg_obj, ctx, prev_parts=[]):
"""Performs GPG operations on the GPG parts of the message
This function decrypts text, verifies signatures, and imports public keys
Args:
eddymsg_obj: an EddyMsg object with its payload_pieces split into GPG
and non-GPG sections by split_payloads()
- gpgme_ctx: a gpgme context
+ ctx: a gpgme context
prev_parts: a list of mime parts that occur before the eddymsg_obj
part, under the same multi-part mime part. This is used for
if eddymsg_obj.multipart == True:
prev_parts=[]
for sub in eddymsg_obj.subparts:
- gpg_on_payloads (sub, gpgme_ctx, prev_parts)
+ gpg_on_payloads (sub, ctx, prev_parts)
prev_parts += [sub]
return
elif piece.piece_type == TxtType.message:
piece.gpg_data = GPGData()
- (plaintext_b, sigs, sigkey_missing, key_cannot_encrypt) = decrypt_block(piece.string, gpgme_ctx)
+ (plaintext_b, sigs, sigkey_missing, key_cannot_encrypt) = decrypt_block(piece.string, ctx)
piece.gpg_data.sigkey_missing = sigkey_missing
piece.gpg_data.key_cannot_encrypt = key_cannot_encrypt
piece.gpg_data.decrypted = True
piece.gpg_data.sigs = sigs
# recurse!
- piece.gpg_data.plainobj = parse_pgp_mime(plaintext_b, gpgme_ctx)
+ piece.gpg_data.plainobj = parse_pgp_mime(plaintext_b, ctx)
continue
# if not encrypted, check to see if this is an armored signature.
- (plaintext_b, sigs, sigkey_missing, key_cannot_encrypt) = verify_sig_message(piece.string, gpgme_ctx)
+ (plaintext_b, sigs, sigkey_missing, key_cannot_encrypt) = verify_sig_message(piece.string, ctx)
piece.gpg_data.sigkey_missing = sigkey_missing
piece.gpg_data.key_cannot_encrypt = key_cannot_encrypt
piece.piece_type = TxtType.signature
piece.gpg_data.sigs = sigs
# recurse!
- piece.gpg_data.plainobj = parse_pgp_mime(plaintext_b, gpgme_ctx)
+ piece.gpg_data.plainobj = parse_pgp_mime(plaintext_b, ctx)
elif piece.piece_type == TxtType.pubkey:
piece.gpg_data = GPGData()
- (key_fps, key_cannot_encrypt) = add_gpg_key(piece.string, gpgme_ctx)
+ (key_fps, key_cannot_encrypt) = add_gpg_key(piece.string, ctx)
piece.gpg_data.key_cannot_encrypt = key_cannot_encrypt
piece.gpg_data = GPGData()
for prev in prev_parts:
- (sig_fps, sigkey_missing, key_cannot_encrypt) = verify_detached_signature(piece.string, prev.payload_bytes, gpgme_ctx)
+ (sig_fps, sigkey_missing, key_cannot_encrypt) = verify_detached_signature(piece.string, prev.payload_bytes, ctx)
piece.gpg_data.sigkey_missing = sigkey_missing
piece.gpg_data.key_cannot_encrypt = key_cannot_encrypt
replyinfo_obj.msg_to_quote += piece.string
-def get_key_from_fp (replyinfo_obj, gpgme_ctx):
+def get_key_from_fp (replyinfo_obj, ctx):
"""Obtains a public key object from a key fingerprint
If the .target_key is not set, then we use .fallback_target_key, if
Args:
replyinfo_obj: ReplyInfo instance
- gpgme_ctx: the gpgme context
+ ctx: the gpgme context
Return:
Nothing
for key in (replyinfo_obj.target_key, replyinfo_obj.fallback_target_key):
if key != None:
try:
- encrypt_to_key = gpgme_ctx.get_key(key)
+ encrypt_to_key = ctx.get_key(key)
- except gpgme.GpgmeError:
+ except gpg.errors.GPGMEError:
continue
if is_key_usable(encrypt_to_key):
return reply_plain
-def add_gpg_key (key_block, gpgme_ctx):
+def add_gpg_key (key_block, ctx):
"""Adds a GPG pubkey to the local keystore
This adds keys received through email into the key store so they can be
Args:
key_block: the string form of the ascii-armored public key block
- gpgme_ctx: the gpgme context
+ ctx: the gpgme context
Returns:
the fingerprint(s) of the imported key(s) which can be used for
fp = io.BytesIO(key_block.encode('ascii'))
try:
- result = gpgme_ctx.import_(fp)
+ result = ctx.import_(fp)
imports = result.imports
- except gpgme.GpgmeError:
+ except gpg.errors.GPGMEError:
imports = []
key_fingerprints = []
fingerprint = import_res[0]
try:
- key_obj = gpgme_ctx.get_key(fingerprint)
+ key_obj = ctx.get_key(fingerprint)
except:
key_obj = None
return (key_fingerprints, key_cannot_encrypt)
-def verify_sig_message (msg_block, gpgme_ctx):
+def verify_sig_message (msg_block, ctx):
"""Verifies the signature of a signed, ascii-armored block of text.
It encodes the string into ascii, since binary GPG files are currently
Args:
msg_block: a GPG Message block in string form. It may be encrypted or
not. If it is encrypted, it will return empty results.
- gpgme_ctx: the gpgme context
+ ctx: the gpgme context
Returns:
A tuple containing the plaintext bytes of the signed part, the list of
block_b = io.BytesIO(msg_block.encode('ascii'))
plain_b = io.BytesIO()
+ print(foo)
try:
- sigs = gpgme_ctx.verify(block_b, None, plain_b)
- except gpgme.GpgmeError:
+ sigs = ctx.verify(msg_block)
+ except gpg.errors.GPGMEError as e:
+ print(e)
return ("",[],False,False)
plaintext_b = plain_b.getvalue()
- (fingerprints, sigkey_missing, key_cannot_encrypt) = get_signature_fp(sigs, gpgme_ctx)
+ (fingerprints, sigkey_missing, key_cannot_encrypt) = get_signature_fp(sigs, ctx)
return (plaintext_b, fingerprints, sigkey_missing, key_cannot_encrypt)
-def verify_detached_signature (detached_sig, plaintext_bytes, gpgme_ctx):
+def verify_detached_signature (detached_sig, plaintext_bytes, ctx):
"""Verifies the signature of a detached signature.
This requires the signature part and the signed part as separate arguments.
Args:
detached_sig: the signature part of the detached signature
plaintext_bytes: the byte form of the message being signed.
- gpgme_ctx: the gpgme context
+ ctx: the gpgme context
Returns:
A tuple containging a list of encryption capable signing fingerprints
plaintext_fp = io.BytesIO(plaintext_bytes)
try:
- sigs = gpgme_ctx.verify(detached_sig_fp, plaintext_fp, None)
- except gpgme.GpgmeError:
+ sigs = ctx.verify(detached_sig_fp, plaintext_fp, None)
+ except gpg.errors.GPGMEError:
return ([],False,False)
- (fingerprints, sigkey_missing, key_cannot_encrypt) = get_signature_fp(sigs, gpgme_ctx)
+ (fingerprints, sigkey_missing, key_cannot_encrypt) = get_signature_fp(sigs, ctx)
return (fingerprints, sigkey_missing, key_cannot_encrypt)
-def decrypt_block (msg_block, gpgme_ctx):
+def decrypt_block (msg_block, ctx):
"""Decrypts a block of GPG text and verifies any included sigatures.
Some encypted messages have embeded signatures, so those are verified too.
Args:
msg_block: the encrypted(/signed) text
- gpgme_ctx: the gpgme context
+ ctx: the gpgme context
Returns:
A tuple containing plaintext bytes, encryption-capable signatures (if
signature keys are incapable of encryption.
"""
- block_b = io.BytesIO(msg_block.encode('ascii'))
- plain_b = io.BytesIO()
+ block_b = msg_block.encode('ascii')
try:
- sigs = gpgme_ctx.decrypt_verify(block_b, plain_b)
- except gpgme.GpgmeError:
+ plaintext_b, target_keys, verify_result = ctx.decrypt(block_b)
+ except gpg.errors.GPGMEError:
return ("",[],False,False)
- plaintext_b = plain_b.getvalue()
-
- (fingerprints, sigkey_missing, key_cannot_encrypt) = get_signature_fp(sigs, gpgme_ctx)
+ print(verify_result)
+ #(fingerprints, sigkey_missing, key_cannot_encrypt) = get_signature_fp(sigs, ctx)
+ fingerprints = verify_result.signatures[0].fpr
return (plaintext_b, fingerprints, sigkey_missing, key_cannot_encrypt)
-def get_signature_fp (sigs, gpgme_ctx):
+def get_signature_fp (sigs, ctx):
"""Selects valid signatures from output of gpgme signature verifying functions
get_signature_fp returns a list of valid signature fingerprints if those
Args:
sigs: a signature verification result object list
- gpgme_ctx: a gpgme context
+ ctx: a gpgme context
Returns:
fingerprints: a list of fingerprints
fingerprints = []
for sig in sigs:
- if (sig.summary == 0) or (sig.summary & gpgme.SIGSUM_VALID != 0) or (sig.summary & gpgme.SIGSUM_GREEN != 0):
+ if (sig.summary == 0) or (sig.summary & gpg.SIGSUM_VALID != 0) or (sig.summary & gpg.SIGSUM_GREEN != 0):
try:
- key_obj = gpgme_ctx.get_key(sig.fpr)
+ key_obj = ctx.get_key(sig.fpr)
except:
if fingerprints == []:
sigkey_missing = True
key_cannot_encrypt = True
elif fingerprints == []:
- if (sig.summary & gpgme.SIGSUM_KEY_MISSING != 0):
+ if (sig.summary & gpg.SIGSUM_KEY_MISSING != 0):
sigkey_missing = True
return (fingerprints, sigkey_missing, key_cannot_encrypt)
def generate_encrypted_mime (plaintext, email_to, email_from, email_subject,
- encrypt_to_key, gpgme_ctx):
+ encrypt_to_key, ctx):
"""This function creates the mime email reply. It can encrypt the email.
If the encrypt_key is included, then the email is encrypted and signed.
email_subject: the subject to use in reply
encrypt_to_key: the key object to use for encrypting the email. (or
None)
- gpgme_ctx: the gpgme context
+ ctx: the gpgme context
Returns
A string version of the mime message, possibly encrypted and signed.
encrypted_text = encrypt_sign_message(plaintext_mime.as_string(),
encrypt_to_key,
- gpgme_ctx)
+ ctx)
control_mime = MIMEApplication("Version: 1",
_subtype='pgp-encrypted',
return quoted_message
-def encrypt_sign_message (plaintext, encrypt_to_key, gpgme_ctx):
+def encrypt_sign_message (plaintext, encrypt_to_key, ctx):
"""Encrypts and signs plaintext
This encrypts and signs a message.
Args:
plaintext: text to sign and ecrypt
encrypt_to_key: the key object to encrypt to
- gpgme_ctx: the gpgme context
+ ctx: the gpgme context
Returns:
An encrypted and signed string of text
plaintext_bytes = io.BytesIO(plaintext.encode('ascii'))
encrypted_bytes = io.BytesIO()
- gpgme_ctx.encrypt_sign([encrypt_to_key], gpgme.ENCRYPT_ALWAYS_TRUST,
+ ctx.encrypt_sign([encrypt_to_key], gpg.ENCRYPT_ALWAYS_TRUST,
plaintext_bytes, encrypted_bytes)
encrypted_txt = encrypted_bytes.getvalue().decode('ascii')