Skip to content

Commit

Permalink
Merge pull request #402 from multiflexi/email_collector
Browse files Browse the repository at this point in the history
Email collector improvements
  • Loading branch information
Progress1 authored Nov 4, 2024
2 parents b9924b0 + b06f0e8 commit 83ddf0f
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 55 deletions.
136 changes: 85 additions & 51 deletions src/collectors/collectors/email_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class EmailCollector(BaseCollector):
description = "Collector for gathering data from emails"

parameters = [
Parameter(0, "EMAIL_SERVER_TYPE", "Email server type", "Server type parameter means IMAP or POP3 email server", ParameterType.STRING),
Parameter(0, "EMAIL_SERVER_TYPE", "Email server type", "IMAP or POP3 protocol", ParameterType.STRING),
Parameter(0, "EMAIL_SERVER_HOSTNAME", "Email server hostname", "Hostname of email server", ParameterType.STRING),
Parameter(0, "EMAIL_SERVER_PORT", "Email server port", "Port of email server", ParameterType.NUMBER),
Parameter(0, "EMAIL_USERNAME", "Username", "Username of email account", ParameterType.STRING),
Expand All @@ -49,6 +49,8 @@ def collect(self, source):
Parameters:
source -- Source object.
"""
BaseCollector.update_last_attempt(source)
self.collector_source = f"{self.name} '{source.name}':"
news_items = []
email_server_type = source.parameter_values["EMAIL_SERVER_TYPE"]
email_server_hostname = source.parameter_values["EMAIL_SERVER_HOSTNAME"]
Expand All @@ -58,7 +60,8 @@ def collect(self, source):
proxy_server = source.parameter_values["PROXY_SERVER"]

def proxy_tunnel():
server = f"{email_server_type.lower()}.{email_server_hostname.lower()}"
logger.debug(f"{self.collector_source} Establishing proxy tunnel")
server = f"{email_server_hostname.lower()}"
port = email_server_port

server_proxy = proxy_server.rsplit(":", 1)[0]
Expand All @@ -72,68 +75,97 @@ def proxy_tunnel():
s.send(str.encode(con))
s.recv(4096)

def get_data():
def process_email(email_message):
email_string = email_message.as_string()
if len(email_string) > 3000:
email_string = f"{email_string[:3000]}\n..."
logger.debug(f"{self.collector_source} Processing email: {email_string}")
review = ""
content = ""
url = ""
address = ""
link = ""
key = ""
value = ""
news_item = None

date_tuple = email.utils.parsedate_tz(email_message["Date"])
local_date = datetime.datetime.fromtimestamp(email.utils.mktime_tz(date_tuple))
published = f'{str(local_date.strftime("%a, %d %b %Y %H:%M:%S"))}'

author = str(email.header.make_header(email.header.decode_header(email_message["From"])))
title = str(email.header.make_header(email.header.decode_header(email_message["Subject"])))
logger.debug(f"{self.collector_source} Processing email: {title}")
author = str(email.header.make_header(email.header.decode_header(email_message["From"])))
address = email.utils.parseaddr(email_message["From"])[1]
message_id = str(email.header.make_header(email.header.decode_header(email_message["Message-ID"])))
date_tuple = email.utils.parsedate_tz(email_message["Date"])
published = datetime.datetime.fromtimestamp(email.utils.mktime_tz(date_tuple)).strftime("%d.%m.%Y - %H:%M")

for part in email_message.walk():
if part.get_content_type() == "text/plain":
content = part.get_payload(decode=True)
review = content[:500].decode("utf-8")
content = content.decode("utf-8")

for_hash = author + title + message_id

news_item = NewsItemData(
uuid.uuid4(),
hashlib.sha256(for_hash.encode()).hexdigest(),
title,
review,
url,
link,
published,
author,
datetime.datetime.now(),
content,
source.id,
attributes,
)

if part.get_content_maintype() == "multipart":
pass
if part.get("Content-Disposition") is None:
pass

file_name = part.get_filename()

if file_name:

charset = part.get_content_charset()
logger.debug(f"{self.collector_source} Detected encoding of email '{title}': {charset}")
text_data = part.get_payload(decode=True)
if charset is None:
charset = "utf-8"
content = text_data.decode(charset)
review = content[:500]

for_hash = author + title + message_id

news_item = NewsItemData(
uuid.uuid4(),
hashlib.sha256(for_hash.encode()).hexdigest(),
title,
review,
address,
link,
published,
author,
datetime.datetime.now(),
content,
source.id,
attributes,
)
break

if news_item:
for part in email_message.walk():
file_name = part.get_filename()
binary_mime_type = part.get_content_type()
binary_value = part.get_payload()

news_attribute = NewsItemAttribute(uuid.uuid4(), key, value, binary_mime_type, binary_value)
news_item.attributes.append(news_attribute)
if binary_mime_type == "message/rfc822":
logger.debug(f"{self.collector_source} Found an attached email")
attached = part.get_payload()
if isinstance(attached, list):
attached_email = attached[0]
else:
attached_email = attached
# Process .eml file as an email
process_email(attached_email)

elif binary_mime_type == "application/pkcs7-signature" or binary_mime_type == "application/x-pkcs7-signature":
logger.debug(f"{self.collector_source} Found a X.509 signature attachment")
# Skip signature attachments
continue

elif binary_mime_type == "application/pgp-signature":
logger.debug(f"{self.collector_source} Found a PGP signature attachment")
binary_value = part.get_payload()
# Skip signature attachments
continue

elif file_name:
# Handle other binary attachments
logger.debug(f"{self.collector_source} Found an attachment '{file_name}' with MIME type '{binary_mime_type}'")
binary_value = part.get_payload()
if binary_value:
news_attribute = NewsItemAttribute(binary_mime_type, file_name, binary_mime_type, binary_value)
news_item.attributes.append(news_attribute)
else:
logger.error(f"{self.collector_source} Attachment is empty or could not be decoded: {file_name}")

news_items.append(news_item)

if email_server_type.lower() == "imap":
logger.debug(f"{self.collector_source} Fetching emails using IMAP")
try:
if proxy_server:
proxy_tunnel()

connection = imaplib.IMAP4_SSL(f"{email_server_type.lower()}.{email_server_hostname.lower()}", email_server_port)
connection = imaplib.IMAP4_SSL(email_server_hostname.lower(), email_server_port)
connection.login(email_username, email_password)
connection.select("inbox")

Expand All @@ -148,18 +180,20 @@ def get_data():
raw_email_string = raw_email.decode("utf-8")
email_message = email.message_from_string(raw_email_string, policy=policy.default)

get_data()
process_email(email_message)

connection.close()
connection.logout()
except Exception as error:
logger.exception(f"{self.collector_source} Fetch emails using IMAP failed: {error}")
logger.exception(f"{self.collector_source} Failed to fetch emails using IMAP: {error}")

elif email_server_type.lower() == "pop3":
logger.debug(f"{self.collector_source} Fetching emails using POP3")
try:
if proxy_server:
proxy_tunnel()

connection = poplib.POP3_SSL(f"{email_server_type.lower()}.{email_server_hostname.lower()}", email_server_port)
connection = poplib.POP3_SSL(email_server_hostname.lower(), email_server_port)
connection.user(email_username)
connection.pass_(email_password)

Expand All @@ -171,11 +205,11 @@ def get_data():
raw_email = b"\n".join(connection.retr(i + 1)[1])
email_message = email.message_from_bytes(raw_email)

get_data()
process_email(email_message)

connection.quit()
except Exception as error:
logger.exception(f"{self.collector_source} Fetch emails using POP3 failed: {error}")
logger.exception(f"{self.collector_source} Failed to fetch emails using POP3: {error}")
else:
logger.error(f"{self.collector_source} Email server connection type is not supported: {email_server_type}")

Expand Down
10 changes: 7 additions & 3 deletions src/collectors/collectors/web_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,8 +788,12 @@ def __process_title_page_articles(self, browser, title_page_handle, index_url):
news_item = self.__process_article_page(index_url, browser)
if news_item:
logger.debug(f"{self.collector_source} ... Title : {news_item.title}")
logger.debug(f"{self.collector_source} ... Review : {news_item.review.replace('\r', '').replace('\n', ' ').strip()[:100]}")
logger.debug(f"{self.collector_source} ... Content : {news_item.content.replace('\r', '').replace('\n', ' ').strip()[:100]}")
logger.debug(
f"{self.collector_source} ... Review : {news_item.review.replace('\r', '').replace('\n', ' ').strip()[:100]}"
)
logger.debug(
f"{self.collector_source} ... Content : {news_item.content.replace('\r', '').replace('\n', ' ').strip()[:100]}"
)
logger.debug(f"{self.collector_source} ... Published: {news_item.published}")
self.news_items.append(news_item)
else:
Expand Down Expand Up @@ -871,6 +875,6 @@ def __process_article_page(self, index_url, browser):
key = "Additional_ID"
binary_mime_type = ""
binary_value = ""
attribute = NewsItemAttribute(uuid.uuid4(), key, value, binary_mime_type, binary_value)
attribute = NewsItemAttribute(key, value, binary_mime_type, binary_value)
news_item.attributes.append(attribute)
return news_item
5 changes: 4 additions & 1 deletion src/publishers/publishers/email_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ def publish(self, publisher_input):
logger.info(f"Encrypting email with file {encrypt}")
envelope.encryption(key=open(encrypt))

logger.debug(f"=== COMPOSED FOLLOWING EMAIL ===\n{envelope}")
email_string = str(envelope)
if len(email_string) > 3000:
email_string = f"{email_string[:3000]}\n..."
logger.debug(f"=== COMPOSED FOLLOWING EMAIL ===\n{email_string}")

envelope.smtp(smtp)
try:
Expand Down

0 comments on commit 83ddf0f

Please sign in to comment.