cmc-sales/cmc-django/cmcsales/cmc/management/vault.py

589 lines
31 KiB
Python
Raw Normal View History

import os
import re
import subprocess
import uuid
import shutil
from datetime import datetime
from email.parser import HeaderParser
from email.utils import parsedate_to_datetime, getaddresses
from email.header import decode_header
from django.core.management.base import BaseCommand
from django.conf import settings
from django.db import transaction
from django.utils import timezone
import cmc
# Assuming models are in 'your_app'. Adjust the import path as needed.
from cmc.models import (
Enquiry, Contact, Invoice, PurchaseOrder, User, Email, EmailRecipient,
Job, EmailAttachment
)
# --- Configuration ---
# Consider moving these to Django settings (settings.py) for better practice
RIPMIME_PATH = getattr(settings, 'VAULT_RIPMIME_PATH', '/usr/bin/ripmime') # Default to /usr/bin/ripmime
EMAIL_DIR = getattr(settings, 'VAULT_EMAIL_DIR', '/var/www/emails') # Attachment storage base
VAULT_DIR = getattr(settings, 'VAULT_NEW_DIR', '/var/www/vaultmsgs/new') # Incoming emails
PROCESSED_DIR = getattr(settings, 'VAULT_PROCESSED_DIR', '/var/www/vaultmsgs/cur') # Processed emails
# --- Regex Patterns for Identifiers (adjust if needed) ---
# Use Python's raw strings (r"...") for regex
ENQUIRY_REGEX = re.compile(r"CMC\d+([NVQWSOT]|ACT|NT)E\d+-\d+")
INVOICE_REGEX = re.compile(r"CMCIN\d+")
PO_REGEX = re.compile(r"CMCPO\d+")
JOB_REGEX = re.compile(r"(JAN|FEB|MAR|APR|MAY|JUN|JUL|AUG|SEP|OCT|NOV|DEC)\d+(N|V|W|S|T|NT|ACT|Q|O)J\d+")
class Command(BaseCommand):
help = 'Processes email files from the vault directory, extracts info, saves to DB, and archives.'
def _make_map(self, model_class, key_field, value_field='id'):
"""Helper to create a dictionary map from model data."""
# Use .values() for efficiency
qs = model_class.objects.all().values(value_field, key_field)
# Handle potential lowercase requirement (like for user emails)
is_user_email = model_class == User and key_field == 'email'
return {
(item[key_field].lower() if is_user_email and item[key_field] else item[key_field]): item[value_field]
for item in qs if item[key_field] # Ensure key is not None or empty
}
def _decode_mime_header(self, header_value):
"""Decodes MIME encoded headers (like Subject) into a readable string."""
if not header_value:
return ""
try:
parts = decode_header(str(header_value))
decoded_parts = []
for part, encoding in parts:
if isinstance(part, bytes):
# Use errors='ignore' or 'replace' for robustness
decoded_parts.append(part.decode(encoding or 'utf-8', errors='replace'))
else:
decoded_parts.append(part)
return "".join(decoded_parts)
except Exception as e:
self.stderr.write(f"Warning: Could not decode header '{header_value}': {e}")
# Return original or placeholder if decoding fails
return str(header_value) if isinstance(header_value, str) else repr(header_value)
def _get_recipients(self, headers):
"""Extracts To, From, Cc recipients from parsed headers."""
recipients = {'to': [], 'from': [], 'cc': []}
address_fields = {'to': 'To', 'from': 'From', 'cc': 'Cc'}
for key, header_name in address_fields.items():
header_val = headers.get(header_name)
if header_val:
# getaddresses handles parsing "Name <email@example.com>" formats
for _, email_addr in getaddresses([str(header_val)]):
if email_addr and '@' in email_addr: # Basic validation
recipients[key].append(email_addr.lower()) # Normalize to lowercase
return recipients
def _check_valid_identifier(self, subject, identifier_map, regex):
"""Finds identifiers in the subject using regex and returns corresponding DB IDs."""
found_ids = set() # Use a set to avoid duplicates
if not subject:
return list(found_ids)
# Find all matches in the subject
matches = regex.findall(subject)
for match in matches:
# If regex uses capturing groups, match might be a tuple
identifier = match[0] if isinstance(match, tuple) else match
if identifier in identifier_map:
found_ids.add(identifier_map[identifier])
return list(found_ids)
def _get_attachment_directory(self, base_dir, dt_object):
"""Ensures the YYYY/MM directory exists and returns the relative path."""
if not dt_object: # Handle cases where date parsing failed
dt_object = timezone.now() # Fallback to current time
# Use YYYY/MM structure which is common and sorts better
relative_path = dt_object.strftime('%Y/%m')
full_path = os.path.join(base_dir, relative_path)
try:
# exist_ok=True prevents error if directory already exists
os.makedirs(full_path, exist_ok=True)
return relative_path
except OSError as e:
self.stderr.write(f"ERROR: Could not create directory {full_path}: {e}")
# Fallback to base directory if creation fails? Or raise error?
# Raising error might be safer to avoid scattering files.
raise # Re-raise the exception
def _fetch_body_attachments(self, email_filename, vault_path, attachment_base_dir, relative_path):
"""Uses ripmime to extract attachments and identifies the most likely body."""
attachments_data = []
email_file_path = os.path.join(vault_path, email_filename)
output_dir = os.path.join(attachment_base_dir, relative_path)
# Generate a unique prefix for this email's attachments to prevent collisions
# Note: ripmime's --prefix is different; we prepend manually after extraction if needed.
# The original script seemed to rename *after* ripmime using UUID. Let's replicate that.
# ripmime extracts to output_dir. We parse its output, then rename the files.
command = [
RIPMIME_PATH,
'-i', email_file_path,
'-d', output_dir,
'--stdout', # Get output list on stdout
'--no-nameless', # Ignore parts without filenames (less useful often)
'--paranoid', # Use safe filenames
'-v', # Verbose needed for content type
'--verbose-contenttype',
'--recursion-max', '5' # Limit recursion depth (original had 30)
]
try:
self.stdout.write(f"Running ripmime: {' '.join(command)}")
# Use check=True to raise CalledProcessError on failure
# Capture output, decode from bytes
result = subprocess.run(command, capture_output=True, text=True, check=True, encoding='utf-8', errors='replace')
output_lines = result.stdout.splitlines()
self.stdout.write(f"ripmime output:\n{result.stdout}") # Log output for debugging
except FileNotFoundError:
self.stderr.write(f"ERROR: ripmime command not found at {RIPMIME_PATH}. Please check path.")
return [] # Cannot proceed without ripmime
except subprocess.CalledProcessError as e:
self.stderr.write(f"ERROR: ripmime failed for {email_filename} with status {e.returncode}.")
self.stderr.write(f"ripmime stderr:\n{e.stderr}")
self.stderr.write(f"ripmime stdout:\n{e.stdout}")
return [] # Failed to extract
except Exception as e:
self.stderr.write(f"ERROR: Unexpected error running ripmime for {email_filename}: {e}")
return []
# --- Process ripmime output ---
# Example ripmime verbose output line (might vary slightly):
# Extracted 'text_plain_plain_1.txt' (text/plain) Size: 1234
# Or with --verbose-contenttype:
# file=text_plain_plain_1.txt content-type=text/plain size=12345
# Let's parse the second format if available, otherwise adapt
raw_attachments = {} # Store temp data before renaming and size check
for line in output_lines:
line = line.strip()
# Try parsing key=value format first
parts = {p.split('=', 1)[0]: p.split('=', 1)[1] for p in line.split() if '=' in p}
if 'file' in parts and 'content-type' in parts:
original_filename = parts['file'].strip("'\"") # Remove potential quotes
mime_type = parts['content-type'].strip("'\"")
# Size might be present or need to be obtained via os.path.getsize
raw_attachments[original_filename] = {'type': mime_type, 'size': None} # Size TBD
else:
# Fallback parsing (less reliable, adjust based on actual ripmime output)
match = re.match(r"Extracted\s+'?([^']+)'?\s+\(([^)]+)\)", line)
if match:
original_filename = match.group(1)
mime_type = match.group(2)
raw_attachments[original_filename] = {'type': mime_type, 'size': None} # Size TBD
# --- Rename files with UUID, get size, and determine body ---
processed_attachments = []
file_uuid = uuid.uuid4()
biggest_html_idx = -1
biggest_html_size = -1
biggest_plain_idx = -1
biggest_plain_size = -1
idx = 0
for original_filename, data in raw_attachments.items():
old_path = os.path.join(output_dir, original_filename)
# Sanitize filename slightly (replace problematic chars) before adding UUID
safe_part = re.sub(r'[^\w\.\-]', '_', original_filename)
new_filename = f"{file_uuid}-{safe_part}"
new_path = os.path.join(output_dir, new_filename)
relative_new_path = os.path.join(relative_path, new_filename) # Path to store in DB
try:
if os.path.exists(old_path):
shutil.move(old_path, new_path) # Use shutil.move for cross-filesystem safety
size = os.path.getsize(new_path)
data['size'] = size
data['new_name'] = relative_new_path # Store the path relative to EMAIL_DIR
data['original_filename'] = original_filename # Keep original name for DB
data['is_message_body'] = False # Default
processed_attachments.append(data)
# Track largest text/html and text/plain
if data['type'].lower() == 'text/html':
if size > biggest_html_size:
biggest_html_size = size
biggest_html_idx = idx
elif data['type'].lower() == 'text/plain':
if size > biggest_plain_size:
biggest_plain_size = size
biggest_plain_idx = idx
idx += 1
else:
self.stderr.write(f"Warning: Ripped file '{original_filename}' not found at '{old_path}'. Skipping.")
except OSError as e:
self.stderr.write(f"ERROR: Could not rename/get size for '{original_filename}': {e}")
except Exception as e: # Catch any other unexpected errors
self.stderr.write(f"ERROR: Unexpected error processing attachment '{original_filename}': {e}")
# Mark the likely message body
if biggest_html_idx != -1:
processed_attachments[biggest_html_idx]['is_message_body'] = True
elif biggest_plain_idx != -1: # Fallback to plain text if no HTML
processed_attachments[biggest_plain_idx]['is_message_body'] = True
else:
# If neither found, maybe log a warning? Depends on requirements.
self.stdout.write(f"Warning: No clear text/html or text/plain body found for {email_filename}")
# Prepare final list for DB insertion
final_attachments = [
{
'name': att['new_name'], # Path relative to EMAIL_DIR
'filename': att['original_filename'],
'type': att['type'],
'size': att['size'],
'is_message_body': att['is_message_body']
}
for att in processed_attachments if att.get('size') is not None # Ensure size was obtained
]
return final_attachments
def _move_processed_file(self, email_filename):
"""Moves the processed email file from vault to processed directory."""
source_path = os.path.join(VAULT_DIR, email_filename)
# Mimic the original ":S" suffix if needed, otherwise just move
# The purpose of ":S" isn't clear, maybe 'S' for 'Seen' or 'Success'? Let's omit it for simplicity unless required.
# target_filename = f"{email_filename}:S"
target_filename = email_filename
target_path = os.path.join(PROCESSED_DIR, target_filename)
try:
# Ensure target directory exists
os.makedirs(PROCESSED_DIR, exist_ok=True)
shutil.move(source_path, target_path)
self.stdout.write(f"Moved '{source_path}' to '{target_path}'")
return True
except OSError as e:
self.stderr.write(f"ERROR: Unable to move '{source_path}' to '{target_path}': {e}")
# Decide what to do here: leave the file? Try again later?
# For now, just report error and continue.
return False
except Exception as e:
self.stderr.write(f"ERROR: Unexpected error moving '{source_path}': {e}")
return False
# --- Main Handler ---
def handle(self, *args, **options):
self.stdout.write("Starting Vault email processing...")
# --- Pre-load data into maps for efficiency ---
try:
self.stdout.write("Loading data maps...")
# Use specific fields needed for maps
enquiry_map = self._make_map(Enquiry, 'title')
invoice_map = self._make_map(Invoice, 'title')
po_map = self._make_map(PurchaseOrder, 'title')
job_map = self._make_map(Job, 'title')
# Load User emails into a map {email.lower(): user_id}
user_map = self._make_map(User, 'email')
self.stdout.write(f"Loaded {len(enquiry_map)} enquiries, {len(invoice_map)} invoices, "
f"{len(po_map)} POs, {len(job_map)} jobs, {len(user_map)} users.")
except Exception as e:
self.stderr.write(f"FATAL: Could not load initial data maps: {e}")
return # Cannot proceed without maps
# --- Ensure directories exist ---
for dir_path in [EMAIL_DIR, VAULT_DIR, PROCESSED_DIR]:
if not os.path.isdir(dir_path):
try:
self.stdout.write(f"Creating required directory: {dir_path}")
os.makedirs(dir_path, exist_ok=True)
except OSError as e:
self.stderr.write(f"FATAL: Could not create required directory {dir_path}: {e}")
return # Cannot proceed
# --- Process new emails ---
try:
new_email_files = [f for f in os.listdir(VAULT_DIR)
if os.path.isfile(os.path.join(VAULT_DIR, f)) and not f.startswith('.')]
except OSError as e:
self.stderr.write(f"FATAL: Could not list files in vault directory {VAULT_DIR}: {e}")
return # Cannot proceed
self.stdout.write(f"Found {len(new_email_files)} new email files to process.")
processed_count = 0
skipped_count = 0
error_count = 0
for email_filename in new_email_files:
self.stdout.write(f"\n--- Handling '{email_filename}' ---")
file_path = os.path.join(VAULT_DIR, email_filename)
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
# Read content - original used str_replace("\r", ""), reading in text mode handles line endings
content = f.read()
except FileNotFoundError:
self.stderr.write(f"Warning: File '{email_filename}' disappeared before processing. Skipping.")
skipped_count += 1
continue # Already gone, or was never a file
except OSError as e:
self.stderr.write(f"ERROR: Could not read file '{email_filename}': {e}. Skipping.")
error_count += 1
# Decide whether to move the unreadable file
# self._move_processed_file(email_filename) # Maybe move to an error dir?
continue
except Exception as e: # Catch other potential read errors
self.stderr.write(f"ERROR: Unexpected error reading file '{email_filename}': {e}. Skipping.")
error_count += 1
continue
if not content:
self.stdout.write("No content found. Moving and skipping.")
self._move_processed_file(email_filename)
skipped_count += 1
continue
# --- Parse Headers ---
try:
parser = HeaderParser()
headers = parser.parsestr(content)
except Exception as e:
self.stderr.write(f"ERROR: Could not parse headers for '{email_filename}': {e}. Skipping.")
error_count += 1
self._move_processed_file(email_filename) # Move problematic file
continue
# --- Extract Core Info ---
subject_decoded = self._decode_mime_header(headers.get('Subject'))
date_str = headers.get('Date')
email_datetime = None
if date_str:
try:
# parsedate_to_datetime handles timezone info better
dt = parsedate_to_datetime(date_str)
# Convert to timezone-aware datetime if naive, using settings.TIME_ZONE
if timezone.is_naive(dt):
email_datetime = timezone.make_aware(dt, timezone.get_default_timezone())
else:
# Convert to default timezone for consistency if needed, or store as is
email_datetime = dt.astimezone(timezone.get_default_timezone())
except (TypeError, ValueError, Exception) as e: # Catch various parsing errors
self.stderr.write(f"Warning: Could not parse date '{date_str}' for {email_filename}: {e}. Using current time.")
email_datetime = timezone.now() # Fallback
else:
self.stdout.write(f"Warning: No date header found for {email_filename}. Using current time.")
email_datetime = timezone.now() # Fallback
recipients = self._get_recipients(headers)
all_recipient_emails = set(recipients['to'] + recipients['cc'] + recipients['from'])
# --- Determine if email should be saved ---
save_this = False
from_known_user = False
known_user_involved = False # Tracks if any To/From/Cc is known
# Check if any recipient (To, From, Cc) is a known user
for email_addr in all_recipient_emails:
if email_addr in user_map:
known_user_involved = True
if email_addr in recipients['from']:
from_known_user = True
# No need to break, we need to check all for recipient linking later
# break # Optimization: if one known user is found, we might save
if not subject_decoded:
self.stdout.write("No subject found. Moving and skipping.")
self._move_processed_file(email_filename)
skipped_count += 1
continue
# Check for identifiers in the subject
found_enquiry_ids = self._check_valid_identifier(subject_decoded, enquiry_map, ENQUIRY_REGEX)
found_invoice_ids = self._check_valid_identifier(subject_decoded, invoice_map, INVOICE_REGEX)
found_po_ids = self._check_valid_identifier(subject_decoded, po_map, PO_REGEX)
found_job_ids = self._check_valid_identifier(subject_decoded, job_map, JOB_REGEX)
found_any_identifier = bool(found_enquiry_ids or found_invoice_ids or found_po_ids or found_job_ids)
# Decision logic (matches original script): Save if From/To/Cc is known OR if identifier found
# The original script's logic was a bit tangled with `saveThis`. Let's simplify:
# We save if *any* known user is involved OR if *any* identifier is found.
should_save_email = known_user_involved or found_any_identifier
if not should_save_email:
self.stdout.write(f"Email does not involve known users and has no known identifiers in subject '{subject_decoded}'. Moving and skipping.")
self._move_processed_file(email_filename)
skipped_count += 1
continue
# --- Prepare to Save Email (Inside a Transaction) ---
self.stdout.write("Saving email and related data...")
try:
with transaction.atomic(): # Ensure all DB operations succeed or fail together
# --- Get/Create User IDs for all recipients ---
recipient_user_ids = {'to': [], 'from': [], 'cc': []}
sender_user_id = None
for recipient_type, email_list in recipients.items():
for email_addr in email_list:
user_id = user_map.get(email_addr)
if not user_id:
# User not found, create a new one
self.stdout.write(f"Creating new user for email: {email_addr}")
# Use get_or_create to handle potential race conditions if run concurrently (unlikely here)
# Or just create if duplicates are handled by unique constraint
try:
new_user, created = User.objects.get_or_create(
email=email_addr,
defaults={'type': 'contact', 'by_vault': True}
)
user_id = new_user.id
if created:
self.stdout.write(f"New user '{email_addr}' created with ID: {user_id}")
user_map[email_addr] = user_id # Update map for subsequent lookups in this run
else:
self.stdout.write(f"User '{email_addr}' already existed (ID: {user_id}), linking.")
# Ensure it's in the map if somehow missed initial load
user_map[email_addr] = user_id
except Exception as db_err:
# Log detailed error, but try to continue if possible without this user
self.stderr.write(f"ERROR: Failed to create or get user for '{email_addr}': {db_err}. This recipient may be skipped.")
continue # Skip adding this recipient if user creation failed critically
# Store the user ID for linking later
if user_id: # Only store if we successfully got/created the user
recipient_user_ids[recipient_type].append(user_id)
if recipient_type == 'from':
# Assuming only one 'from' address is primary sender
if sender_user_id is None:
sender_user_id = user_id
else:
self.stdout.write(f"Warning: Multiple 'From' addresses found for {email_filename}. Using first: ID {sender_user_id}")
if sender_user_id is None:
# This case should be rare if we save based on known users/identifiers,
# but handle it defensively. Maybe create a placeholder 'unknown' user?
self.stderr.write(f"ERROR: Could not determine sender User ID for {email_filename}. Skipping save.")
# Don't move the file yet, as the transaction will roll back.
# We need to explicitly move it outside the transaction block if skipping here.
raise ValueError("Sender User ID could not be determined.") # Raise to trigger rollback
# --- Create Email Record ---
new_email_obj = Email.objects.create(
user_id=sender_user_id, # Link to the sender User object
subject=subject_decoded[:500], # Truncate if subject is longer than field allows
udate=email_datetime,
filename=email_filename # Store original vault filename
)
self.stdout.write(f"Created Email object with ID: {new_email_obj.id}")
# --- Create EmailRecipient Records ---
recipients_to_create = []
unique_recipients = set() # Track (user_id, type) pairs to avoid duplicates
for recipient_type, user_id_list in recipient_user_ids.items():
# Map recipient_type ('to', 'cc', 'from') to model choices if needed
model_recipient_type = recipient_type # Assuming model choices match keys
for user_id in user_id_list:
if (user_id, model_recipient_type) not in unique_recipients:
recipients_to_create.append(
EmailRecipient(email=new_email_obj, user_id=user_id, type=model_recipient_type)
)
unique_recipients.add((user_id, model_recipient_type))
if recipients_to_create:
EmailRecipient.objects.bulk_create(recipients_to_create)
self.stdout.write(f"Created {len(recipients_to_create)} EmailRecipient links.")
# --- Process and Save Attachments ---
attachment_dir_relative = self._get_attachment_directory(EMAIL_DIR, email_datetime)
attachments = self._fetch_body_attachments(
email_filename,
VAULT_DIR,
EMAIL_DIR,
attachment_dir_relative
)
attachments_to_create = []
if attachments: # Check if list is not empty
for att_data in attachments:
attachments_to_create.append(
EmailAttachment(
email=new_email_obj,
name=att_data['name'][:500], # Relative path, check length
filename=att_data['filename'][:255], # Original name, check length
type=att_data['type'][:100], # Mime type, check length
size=att_data['size'],
is_message_body=att_data['is_message_body']
)
)
if attachments_to_create:
EmailAttachment.objects.bulk_create(attachments_to_create)
self.stdout.write(f"Created {len(attachments_to_create)} EmailAttachment records.")
else:
self.stdout.write("No attachments found or processed by ripmime.")
# --- Link to Found Identifiers (ManyToMany) ---
if found_enquiry_ids:
new_email_obj.enquiries.add(*found_enquiry_ids)
self.stdout.write(f"Linked email to Enquiries: {found_enquiry_ids}")
if found_invoice_ids:
new_email_obj.invoices.add(*found_invoice_ids)
self.stdout.write(f"Linked email to Invoices: {found_invoice_ids}")
if found_po_ids:
new_email_obj.purchase_orders.add(*found_po_ids)
self.stdout.write(f"Linked email to Purchase Orders: {found_po_ids}")
if found_job_ids:
new_email_obj.jobs.add(*found_job_ids)
self.stdout.write(f"Linked email to Jobs: {found_job_ids}")
# If transaction completes, move the file
if self._move_processed_file(email_filename):
processed_count += 1
else:
# DB changes are saved, but file move failed. Log this clearly.
self.stderr.write(f"CRITICAL WARNING: DB record created for {email_filename} (Email ID: {new_email_obj.id}), but FAILED TO MOVE file.")
error_count += 1 # Count as error due to incomplete processing
except ValueError as ve: # Catch specific error for sender determination
self.stderr.write(f"Skipping save for {email_filename} due to error: {ve}")
# Move the file even if saving failed due to missing sender
self._move_processed_file(email_filename)
error_count += 1
except Exception as e:
# Catch any other error during DB operations or attachment processing
self.stderr.write(f"ERROR: Failed to save email data for '{email_filename}' during transaction: {e}")
# Transaction automatically rolls back on exception
# Do NOT move the file, as processing failed. It will be retried next run.
error_count += 1
# Consider logging the full traceback for debugging
import traceback
self.stderr.write(traceback.format_exc())
self.stdout.write("\n--- Processing Summary ---")
self.stdout.write(f"Successfully processed and saved: {processed_count}")
self.stdout.write(f"Skipped (no subject/content/not relevant): {skipped_count}")
self.stdout.write(f"Errors during processing (file may remain in '{VAULT_DIR}'): {error_count}")
self.stdout.write("Vault email processing finished.")