Source code for airmailer.backend.aws

#!/usr/bin/env python
# -*- coding: utf-8 -*-


from typing import List, Dict, Any
import boto3

import botocore
from botocore.vendored.requests.packages.urllib3.exceptions import ResponseError
from botocore.exceptions import BotoCoreError, ClientError

from ..logging import logger
from ..message import sanitize_address, EmailMessage
from .base import BaseEmailBackend


[docs]class SESEmailBackend(BaseEmailBackend): """ Send mails using the AWS SES API. """ def __init__( self, fail_silently: bool = False, aws_access_key_id: str = None, aws_secret_access_key: str = None, aws_region_name: str = None, configuration_set_name: str = None, aws_region_endpoint: str = None, aws_config=None, ses_from_arn: str = None, ses_source_arn: str = None, ses_return_path_arn: str = None, ses_tags: Dict[str, str] = None, **kwargs ): """ Creates a client for the AWS SES API. Keyword Arguments: fail_silently: If ``True``, don't raise execeptions on client errors aws_access_key_id: the ``AWS_ACCESS_KEY_ID``, defaults to read from environment aws_secret_access_key: the ``AWS_SECRET_ACCESS_KEY``, defaults to read from environment aws_region_name: the name of the AWS region to use, defaults to read from environment configuration_set_name: the name of the SES Configuration Set to use aws_region_endpoint: the URL for the SES endpoint for the region aws_config: a properly constructed :py:class:`botocore.config.Config` object ses_from_arn: the ``FromArn`` when using cross-account identities ses_source_arn: the ``SourceArn`` when using cross-account identities ses_return_path_arn: the ``ReturnPathArn`` when using cross-account identities ses_tags: a dictionary of tags to apply set as ``X-SES-MESSAGE-TAGS`` on each message sent """ super().__init__(fail_silently=fail_silently) self.aws_access_key_id = aws_access_key_id self.aws_secret_access_key = aws_secret_access_key self.aws_region_name = aws_region_name self.aws_region_endpoint = aws_region_endpoint self.aws_config: botocore.client.Config = aws_config self.ses_source_arn = ses_source_arn self.ses_from_arn = ses_from_arn self.ses_return_path_arn = ses_return_path_arn self.configuration_set_name = configuration_set_name self.ses_tags = ses_tags self.connection = None
[docs] def open(self) -> bool: """ Opens a connection to the AWS SES API. Returns: ``True`` if the connection was opened successfully, ``False`` otherwise. """ if self.connection: return False try: self.connection = boto3.client( "ses", aws_access_key_id=self.aws_access_key_id, aws_secret_access_key=self.aws_secret_access_key, region_name=self.aws_region_name, endpoint_url=self.aws_region_endpoint, config=self.aws_config ) except (ClientError, BotoCoreError): if not self.fail_silently: raise return False return True
[docs] def close(self) -> None: """ Close the connection to the AWS SES API. """ self.connection = None
[docs] def send_messages(self, email_messages: List[EmailMessage]) -> int: """ Sends one or more messages returns the number of email messages sent. Args: email_messages: A list of emails to send Raises: botocore.exceptions.ClientError: AWS SES had an issue botocore.exceptions.BotoCoreError: AWS SES had an issue Returns: The number of messages sent """ if not email_messages: return 0 new_conn_created = self.open() if not self.connection: return 0 sent_message_count = 0 for email_message in email_messages: if self._send(email_message): sent_message_count += 1 if new_conn_created: self.close() return sent_message_count
def _send(self, email_message: EmailMessage) -> bool: """ Sends an individual message. If the message was submitted successfully to the AWS SES API, set * ``email_message.extra_headers['status']`` to 200 * ``email_message.extra_headers['message_id']`` to the ``MessageId`` * ``email_message.extra_headers['request_id']`` to the ``RequestId`` of the AWS API call response If the message was not submitted successfully to the AWS SES API, set * ``email_message.extra_headers['status']`` to HTTP status of the response * ``email_message.extra_headers['reason']`` to "Reason" given for the error in the response * ``email_message.extra_headers['error_code']`` to error code of the response * ``email_message.extra_headers['error_message']`` to error message from the response * ``email_message.extra_headers['body']`` to body from the response * ``email_message.extra_headers['request_id']`` to the ``RequestId`` of the AWS API call response Args: email_message: An email to send Raises: botocore.exceptions.ClientError: AWS SES had an issue botocore.exceptions.BotoCoreError: AWS SES had an issue Returns: ``True`` if the message was sent, ``False`` otherwise """ if not email_message.recipients(): return False encoding = email_message.encoding from_email = sanitize_address(email_message.from_email, email_message.encoding) recipients = [sanitize_address(addr, encoding) for addr in email_message.recipients()] message = email_message.message() try: kwargs: Dict[str, Any] = { "Source": from_email, "Destinations": recipients, "RawMessage": {"Data": message.as_bytes(linesep="\r\n")}, } if self.configuration_set_name is not None: kwargs["ConfigurationSetName"] = self.configuration_set_name if self.ses_source_arn: kwargs['SourceArn'] = self.ses_source_arn if self.ses_from_arn: kwargs['FromArn'] = self.ses_from_arn if self.ses_return_path_arn: kwargs['ReturnPathArn'] = self.ses_return_path_arn if self.ses_tags is not None: kwargs["Tags"] = [ {"Name": key, "Value": value} for key, value in self.ses_tags.items() ] response = self.connection.send_raw_email(**kwargs) # type: ignore except ResponseError as err: # Store failure information so to post process it if required error_keys = ['status', 'reason', 'body', 'request_id', 'error_code', 'error_message'] for key in error_keys: email_message.extra_headers[key] = getattr(err, key, None) if not self.fail_silently: raise if self.configuration_set_name: logger.debug( "airmailer.ses.send.success from='{}' recipients='{}' request_id='{}' " "ses-configuration-set='{}' status='{}' error_code='{}' error_message='{}'".format( email_message.from_email, ", ".join(email_message.recipients()), email_message.extra_headers['request_id'], self.configuration_set_name, email_message.extra_headers['status'], email_message.extra_headers['error_code'], email_message.extra_headers['error_message'], ) ) return False email_message.extra_headers['status'] = 200 email_message.extra_headers['message_id'] = response['MessageId'] email_message.extra_headers['request_id'] = response['ResponseMetadata']['RequestId'] logger.debug( "airmailer.ses.send.success from='{}' recipients='{}' message_id='{}' request_id='{}' " "ses-configuration-set='{}'".format( email_message.from_email, ", ".join(email_message.recipients()), email_message.extra_headers['message_id'], email_message.extra_headers['request_id'], self.configuration_set_name ) ) return True