1301 lines
34 KiB
Go
1301 lines
34 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"regexp"
|
|
"strings"
|
|
"time"
|
|
|
|
_ "github.com/go-sql-driver/mysql"
|
|
"github.com/google/uuid"
|
|
"github.com/gorilla/mux"
|
|
"github.com/jhillyerd/enmime"
|
|
"golang.org/x/oauth2"
|
|
"golang.org/x/oauth2/google"
|
|
"google.golang.org/api/gmail/v1"
|
|
"google.golang.org/api/option"
|
|
)
|
|
|
|
type Config struct {
|
|
Mode string
|
|
EmailDir string
|
|
VaultDir string
|
|
ProcessedDir string
|
|
DBHost string
|
|
DBUser string
|
|
DBPassword string
|
|
DBName string
|
|
Port string
|
|
CredentialsFile string
|
|
TokenFile string
|
|
GmailQuery string
|
|
}
|
|
|
|
type VaultService struct {
|
|
db *sql.DB
|
|
config Config
|
|
gmailService *gmail.Service
|
|
indexer *GmailIndexer
|
|
processor *EmailProcessor
|
|
server *HTTPServer
|
|
}
|
|
|
|
type EmailProcessor struct {
|
|
db *sql.DB
|
|
config Config
|
|
enquiryMap map[string]int
|
|
invoiceMap map[string]int
|
|
poMap map[string]int
|
|
userMap map[string]int
|
|
jobMap map[string]int
|
|
}
|
|
|
|
type GmailIndexer struct {
|
|
db *sql.DB
|
|
gmailService *gmail.Service
|
|
processor *EmailProcessor
|
|
}
|
|
|
|
type HTTPServer struct {
|
|
db *sql.DB
|
|
gmailService *gmail.Service
|
|
processor *EmailProcessor
|
|
}
|
|
|
|
type EmailMetadata struct {
|
|
Subject string
|
|
From []string
|
|
To []string
|
|
CC []string
|
|
Date time.Time
|
|
GmailMessageID string
|
|
GmailThreadID string
|
|
AttachmentCount int
|
|
Attachments []AttachmentMeta
|
|
}
|
|
|
|
type AttachmentMeta struct {
|
|
Filename string
|
|
ContentType string
|
|
Size int
|
|
GmailAttachmentID string
|
|
}
|
|
|
|
func main() {
|
|
var config Config
|
|
flag.StringVar(&config.Mode, "mode", "serve", "Mode: index, serve, or local")
|
|
flag.StringVar(&config.EmailDir, "emaildir", "/var/www/emails", "Email storage directory")
|
|
flag.StringVar(&config.VaultDir, "vaultdir", "/var/www/vaultmsgs/new", "Vault messages directory")
|
|
flag.StringVar(&config.ProcessedDir, "processeddir", "/var/www/vaultmsgs/cur", "Processed messages directory")
|
|
flag.StringVar(&config.DBHost, "dbhost", "127.0.0.1", "Database host")
|
|
flag.StringVar(&config.DBUser, "dbuser", "cmc", "Database user")
|
|
flag.StringVar(&config.DBPassword, "dbpass", "xVRQI&cA?7AU=hqJ!%au", "Database password")
|
|
flag.StringVar(&config.DBName, "dbname", "cmc", "Database name")
|
|
flag.StringVar(&config.Port, "port", "8080", "HTTP server port")
|
|
flag.StringVar(&config.CredentialsFile, "credentials", "credentials.json", "Gmail credentials file")
|
|
flag.StringVar(&config.TokenFile, "token", "token.json", "Gmail token file")
|
|
flag.StringVar(&config.GmailQuery, "gmail-query", "is:unread", "Gmail search query")
|
|
flag.Parse()
|
|
|
|
// Connect to database
|
|
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?parseTime=true",
|
|
config.DBUser, config.DBPassword, config.DBHost, config.DBName)
|
|
|
|
db, err := sql.Open("mysql", dsn)
|
|
if err != nil {
|
|
log.Fatal("Failed to connect to database:", err)
|
|
}
|
|
defer db.Close()
|
|
|
|
// Create processor
|
|
processor := &EmailProcessor{
|
|
db: db,
|
|
config: config,
|
|
}
|
|
|
|
if err := processor.loadMaps(); err != nil {
|
|
log.Fatal("Failed to load maps:", err)
|
|
}
|
|
|
|
// Create vault service
|
|
service := &VaultService{
|
|
db: db,
|
|
config: config,
|
|
processor: processor,
|
|
}
|
|
|
|
switch config.Mode {
|
|
case "index":
|
|
// Initialize Gmail service
|
|
gmailService, err := getGmailService(config.CredentialsFile, config.TokenFile)
|
|
if err != nil {
|
|
log.Fatal("Failed to get Gmail service:", err)
|
|
}
|
|
service.gmailService = gmailService
|
|
|
|
// Create and run indexer
|
|
service.indexer = &GmailIndexer{
|
|
db: db,
|
|
gmailService: gmailService,
|
|
processor: processor,
|
|
}
|
|
|
|
if err := service.indexer.IndexEmails(config.GmailQuery); err != nil {
|
|
log.Fatal("Failed to index emails:", err)
|
|
}
|
|
|
|
case "serve":
|
|
// Initialize Gmail service
|
|
gmailService, err := getGmailService(config.CredentialsFile, config.TokenFile)
|
|
if err != nil {
|
|
log.Fatal("Failed to get Gmail service:", err)
|
|
}
|
|
service.gmailService = gmailService
|
|
|
|
// Create and start HTTP server
|
|
service.server = &HTTPServer{
|
|
db: db,
|
|
gmailService: gmailService,
|
|
processor: processor,
|
|
}
|
|
|
|
log.Printf("Starting HTTP server on port %s", config.Port)
|
|
service.server.Start(config.Port)
|
|
|
|
case "local":
|
|
// Original file-based processing
|
|
if err := processor.processEmails(); err != nil {
|
|
log.Fatal("Failed to process emails:", err)
|
|
}
|
|
|
|
default:
|
|
log.Fatal("Invalid mode. Use: index, serve, or local")
|
|
}
|
|
}
|
|
|
|
// Gmail OAuth2 functions
|
|
func getGmailService(credentialsFile, tokenFile string) (*gmail.Service, error) {
|
|
ctx := context.Background()
|
|
|
|
b, err := ioutil.ReadFile(credentialsFile)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to read client secret file: %v", err)
|
|
}
|
|
|
|
config, err := google.ConfigFromJSON(b, gmail.GmailReadonlyScope)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to parse client secret file to config: %v", err)
|
|
}
|
|
|
|
client := getClient(config, tokenFile)
|
|
srv, err := gmail.NewService(ctx, option.WithHTTPClient(client))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to retrieve Gmail client: %v", err)
|
|
}
|
|
|
|
return srv, nil
|
|
}
|
|
|
|
func getClient(config *oauth2.Config, tokFile string) *http.Client {
|
|
tok, err := tokenFromFile(tokFile)
|
|
if err != nil {
|
|
tok = getTokenFromWeb(config)
|
|
saveToken(tokFile, tok)
|
|
}
|
|
return config.Client(context.Background(), tok)
|
|
}
|
|
|
|
func getTokenFromWeb(config *oauth2.Config) *oauth2.Token {
|
|
authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
|
|
fmt.Printf("Go to the following link in your browser then type the authorization code: \n%v\n", authURL)
|
|
|
|
var authCode string
|
|
if _, err := fmt.Scan(&authCode); err != nil {
|
|
log.Fatalf("Unable to read authorization code: %v", err)
|
|
}
|
|
|
|
tok, err := config.Exchange(context.TODO(), authCode)
|
|
if err != nil {
|
|
log.Fatalf("Unable to retrieve token from web: %v", err)
|
|
}
|
|
return tok
|
|
}
|
|
|
|
func tokenFromFile(file string) (*oauth2.Token, error) {
|
|
f, err := os.Open(file)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer f.Close()
|
|
tok := &oauth2.Token{}
|
|
err = json.NewDecoder(f).Decode(tok)
|
|
return tok, err
|
|
}
|
|
|
|
func saveToken(path string, token *oauth2.Token) {
|
|
fmt.Printf("Saving credential file to: %s\n", path)
|
|
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
|
|
if err != nil {
|
|
log.Fatalf("Unable to cache oauth token: %v", err)
|
|
}
|
|
defer f.Close()
|
|
json.NewEncoder(f).Encode(token)
|
|
}
|
|
|
|
// GmailIndexer methods
|
|
func (g *GmailIndexer) IndexEmails(query string) error {
|
|
user := "me"
|
|
var pageToken string
|
|
|
|
for {
|
|
// List messages with query
|
|
call := g.gmailService.Users.Messages.List(user).Q(query).MaxResults(500)
|
|
if pageToken != "" {
|
|
call = call.PageToken(pageToken)
|
|
}
|
|
|
|
response, err := call.Do()
|
|
if err != nil {
|
|
return fmt.Errorf("unable to retrieve messages: %v", err)
|
|
}
|
|
|
|
// Process each message
|
|
for _, msg := range response.Messages {
|
|
if err := g.indexMessage(msg.Id); err != nil {
|
|
log.Printf("Error indexing message %s: %v", msg.Id, err)
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Check for more pages
|
|
pageToken = response.NextPageToken
|
|
if pageToken == "" {
|
|
break
|
|
}
|
|
|
|
log.Printf("Processed %d messages, continuing with next page...", len(response.Messages))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (g *GmailIndexer) indexMessage(messageID string) error {
|
|
// Get message metadata only
|
|
message, err := g.gmailService.Users.Messages.Get("me", messageID).
|
|
Format("METADATA").
|
|
MetadataHeaders("From", "To", "Cc", "Subject", "Date").
|
|
Do()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Extract headers
|
|
headers := make(map[string]string)
|
|
for _, header := range message.Payload.Headers {
|
|
headers[header.Name] = header.Value
|
|
}
|
|
|
|
// Parse email addresses
|
|
toRecipients := g.processor.parseEnmimeAddresses(headers["To"])
|
|
fromRecipients := g.processor.parseEnmimeAddresses(headers["From"])
|
|
ccRecipients := g.processor.parseEnmimeAddresses(headers["Cc"])
|
|
|
|
// Check if we should save this email
|
|
saveThis := false
|
|
fromKnownUser := false
|
|
|
|
for _, email := range toRecipients {
|
|
if g.processor.userExists(email) {
|
|
saveThis = true
|
|
}
|
|
}
|
|
|
|
for _, email := range fromRecipients {
|
|
if g.processor.userExists(email) {
|
|
saveThis = true
|
|
fromKnownUser = true
|
|
}
|
|
}
|
|
|
|
for _, email := range ccRecipients {
|
|
if g.processor.userExists(email) {
|
|
saveThis = true
|
|
}
|
|
}
|
|
|
|
subject := headers["Subject"]
|
|
if subject == "" {
|
|
return nil // Skip emails without subject
|
|
}
|
|
|
|
// Check for identifiers in subject
|
|
foundEnquiries := g.processor.checkIdentifier(subject, g.processor.enquiryMap, "enquiry")
|
|
foundInvoices := g.processor.checkIdentifier(subject, g.processor.invoiceMap, "invoice")
|
|
foundPOs := g.processor.checkIdentifier(subject, g.processor.poMap, "purchaseorder")
|
|
foundJobs := g.processor.checkIdentifier(subject, g.processor.jobMap, "job")
|
|
|
|
foundIdent := len(foundEnquiries) > 0 || len(foundInvoices) > 0 ||
|
|
len(foundPOs) > 0 || len(foundJobs) > 0
|
|
|
|
if fromKnownUser || saveThis || foundIdent {
|
|
// Parse date
|
|
unixTime := time.Now().Unix()
|
|
if dateStr := headers["Date"]; dateStr != "" {
|
|
if t, err := time.Parse(time.RFC1123Z, dateStr); err == nil {
|
|
unixTime = t.Unix()
|
|
}
|
|
}
|
|
|
|
// Get recipient user IDs
|
|
recipientIDs := make(map[string][]int)
|
|
recipientIDs["to"] = g.processor.getUserIDs(toRecipients)
|
|
recipientIDs["from"] = g.processor.getUserIDs(fromRecipients)
|
|
recipientIDs["cc"] = g.processor.getUserIDs(ccRecipients)
|
|
|
|
if len(recipientIDs["from"]) == 0 {
|
|
return nil // Skip if no from recipient
|
|
}
|
|
|
|
// Marshal headers for storage
|
|
headerJSON, _ := json.Marshal(headers)
|
|
|
|
// Count attachments (from message metadata)
|
|
attachmentCount := 0
|
|
if message.Payload != nil {
|
|
attachmentCount = countAttachments(message.Payload)
|
|
}
|
|
|
|
fmt.Printf("Indexing message: %s - Subject: %s\n", messageID, subject)
|
|
|
|
// Save to database
|
|
if err := g.saveEmailMetadata(messageID, message.ThreadId, subject, string(headerJSON),
|
|
unixTime, recipientIDs, attachmentCount, message.Payload,
|
|
foundEnquiries, foundInvoices, foundPOs, foundJobs); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func countAttachments(payload *gmail.MessagePart) int {
|
|
count := 0
|
|
if payload.Body != nil && payload.Body.AttachmentId != "" {
|
|
count++
|
|
}
|
|
for _, part := range payload.Parts {
|
|
count += countAttachments(part)
|
|
}
|
|
return count
|
|
}
|
|
|
|
func (g *GmailIndexer) saveEmailMetadata(gmailMessageID, threadID, subject, headers string,
|
|
unixTime int64, recipientIDs map[string][]int, attachmentCount int,
|
|
payload *gmail.MessagePart, foundEnquiries, foundInvoices, foundPOs, foundJobs []int) error {
|
|
|
|
tx, err := g.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
// Insert email
|
|
result, err := tx.Exec(
|
|
`INSERT INTO emails (user_id, udate, created, subject, gmail_message_id,
|
|
gmail_thread_id, raw_headers, is_downloaded, email_attachment_count)
|
|
VALUES (?, ?, NOW(), ?, ?, ?, ?, FALSE, ?)`,
|
|
recipientIDs["from"][0], unixTime, subject, gmailMessageID,
|
|
threadID, headers, attachmentCount)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
emailID, err := result.LastInsertId()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Insert recipients
|
|
for recipType, userIDs := range recipientIDs {
|
|
for _, userID := range userIDs {
|
|
if recipType == "from" {
|
|
continue // From is already stored in emails.user_id
|
|
}
|
|
_, err = tx.Exec(
|
|
"INSERT INTO email_recipients (email_id, user_id, type) VALUES (?, ?, ?)",
|
|
emailID, userID, recipType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Index attachment metadata
|
|
if payload != nil {
|
|
if err := g.indexAttachments(tx, emailID, gmailMessageID, payload); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Insert associations
|
|
for _, jobID := range foundJobs {
|
|
_, err = tx.Exec("INSERT INTO emails_jobs (email_id, job_id) VALUES (?, ?)", emailID, jobID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for _, poID := range foundPOs {
|
|
_, err = tx.Exec("INSERT INTO emails_purchase_orders (email_id, purchase_order_id) VALUES (?, ?)", emailID, poID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for _, enqID := range foundEnquiries {
|
|
_, err = tx.Exec("INSERT INTO emails_enquiries (email_id, enquiry_id) VALUES (?, ?)", emailID, enqID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for _, invID := range foundInvoices {
|
|
_, err = tx.Exec("INSERT INTO emails_invoices (email_id, invoice_id) VALUES (?, ?)", emailID, invID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
func (g *GmailIndexer) indexAttachments(tx *sql.Tx, emailID int64, gmailMessageID string, part *gmail.MessagePart) error {
|
|
// Check if this part is an attachment
|
|
if part.Body != nil && part.Body.AttachmentId != "" {
|
|
filename := part.Filename
|
|
if filename == "" {
|
|
filename = "attachment"
|
|
}
|
|
|
|
_, err := tx.Exec(
|
|
`INSERT INTO email_attachments
|
|
(email_id, gmail_attachment_id, gmail_message_id, type, size, filename, is_message_body, created)
|
|
VALUES (?, ?, ?, ?, ?, ?, 0, NOW())`,
|
|
emailID, part.Body.AttachmentId, gmailMessageID, part.MimeType, part.Body.Size, filename)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Process sub-parts
|
|
for _, subPart := range part.Parts {
|
|
if err := g.indexAttachments(tx, emailID, gmailMessageID, subPart); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// HTTPServer methods
|
|
func (s *HTTPServer) Start(port string) {
|
|
router := mux.NewRouter()
|
|
|
|
// API routes
|
|
router.HandleFunc("/api/emails", s.ListEmails).Methods("GET")
|
|
router.HandleFunc("/api/emails/{id:[0-9]+}", s.GetEmail).Methods("GET")
|
|
router.HandleFunc("/api/emails/{id:[0-9]+}/content", s.StreamEmailContent).Methods("GET")
|
|
router.HandleFunc("/api/emails/{id:[0-9]+}/attachments", s.ListAttachments).Methods("GET")
|
|
router.HandleFunc("/api/emails/{id:[0-9]+}/attachments/{attachmentId:[0-9]+}", s.StreamAttachment).Methods("GET")
|
|
router.HandleFunc("/api/emails/{id:[0-9]+}/raw", s.StreamRawEmail).Methods("GET")
|
|
|
|
log.Fatal(http.ListenAndServe(":"+port, router))
|
|
}
|
|
|
|
func (s *HTTPServer) ListEmails(w http.ResponseWriter, r *http.Request) {
|
|
// TODO: Add pagination
|
|
rows, err := s.db.Query(`
|
|
SELECT id, subject, user_id, created, gmail_message_id, email_attachment_count
|
|
FROM emails
|
|
ORDER BY created DESC
|
|
LIMIT 100`)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
var emails []map[string]interface{}
|
|
for rows.Next() {
|
|
var id, userID, attachmentCount int
|
|
var subject, gmailMessageID string
|
|
var created time.Time
|
|
|
|
if err := rows.Scan(&id, &subject, &userID, &created, &gmailMessageID, &attachmentCount); err != nil {
|
|
continue
|
|
}
|
|
|
|
emails = append(emails, map[string]interface{}{
|
|
"id": id,
|
|
"subject": subject,
|
|
"user_id": userID,
|
|
"created": created,
|
|
"gmail_message_id": gmailMessageID,
|
|
"attachment_count": attachmentCount,
|
|
})
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(emails)
|
|
}
|
|
|
|
func (s *HTTPServer) GetEmail(w http.ResponseWriter, r *http.Request) {
|
|
vars := mux.Vars(r)
|
|
emailID := vars["id"]
|
|
|
|
var gmailMessageID, subject, rawHeaders string
|
|
var created time.Time
|
|
|
|
err := s.db.QueryRow(`
|
|
SELECT gmail_message_id, subject, created, raw_headers
|
|
FROM emails WHERE id = ?`, emailID).
|
|
Scan(&gmailMessageID, &subject, &created, &rawHeaders)
|
|
|
|
if err != nil {
|
|
http.Error(w, "Email not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
response := map[string]interface{}{
|
|
"id": emailID,
|
|
"gmail_message_id": gmailMessageID,
|
|
"subject": subject,
|
|
"created": created,
|
|
"headers": rawHeaders,
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
|
|
func (s *HTTPServer) StreamEmailContent(w http.ResponseWriter, r *http.Request) {
|
|
vars := mux.Vars(r)
|
|
emailID := vars["id"]
|
|
|
|
// Get Gmail message ID from database
|
|
var gmailMessageID string
|
|
err := s.db.QueryRow("SELECT gmail_message_id FROM emails WHERE id = ?", emailID).
|
|
Scan(&gmailMessageID)
|
|
if err != nil {
|
|
http.Error(w, "Email not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
// Fetch from Gmail
|
|
message, err := s.gmailService.Users.Messages.Get("me", gmailMessageID).
|
|
Format("RAW").Do()
|
|
if err != nil {
|
|
http.Error(w, "Failed to fetch email from Gmail", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Decode message
|
|
rawEmail, err := base64.URLEncoding.DecodeString(message.Raw)
|
|
if err != nil {
|
|
http.Error(w, "Failed to decode email", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Parse with enmime
|
|
env, err := enmime.ReadEnvelope(bytes.NewReader(rawEmail))
|
|
if err != nil {
|
|
http.Error(w, "Failed to parse email", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Stream HTML or Text directly to client
|
|
if env.HTML != "" {
|
|
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
|
w.Write([]byte(env.HTML))
|
|
} else if env.Text != "" {
|
|
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
|
w.Write([]byte(env.Text))
|
|
} else {
|
|
http.Error(w, "No content found in email", http.StatusNotFound)
|
|
}
|
|
}
|
|
|
|
func (s *HTTPServer) ListAttachments(w http.ResponseWriter, r *http.Request) {
|
|
vars := mux.Vars(r)
|
|
emailID := vars["id"]
|
|
|
|
rows, err := s.db.Query(`
|
|
SELECT id, filename, type, size, gmail_attachment_id
|
|
FROM email_attachments
|
|
WHERE email_id = ?`, emailID)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
var attachments []map[string]interface{}
|
|
for rows.Next() {
|
|
var id, size int
|
|
var filename, contentType, gmailAttachmentID string
|
|
|
|
if err := rows.Scan(&id, &filename, &contentType, &size, &gmailAttachmentID); err != nil {
|
|
continue
|
|
}
|
|
|
|
attachments = append(attachments, map[string]interface{}{
|
|
"id": id,
|
|
"filename": filename,
|
|
"content_type": contentType,
|
|
"size": size,
|
|
"gmail_attachment_id": gmailAttachmentID,
|
|
})
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(attachments)
|
|
}
|
|
|
|
func (s *HTTPServer) StreamAttachment(w http.ResponseWriter, r *http.Request) {
|
|
vars := mux.Vars(r)
|
|
attachmentID := vars["attachmentId"]
|
|
|
|
// Get attachment info from database
|
|
var gmailMessageID, gmailAttachmentID, filename, contentType string
|
|
err := s.db.QueryRow(`
|
|
SELECT gmail_message_id, gmail_attachment_id, filename, type
|
|
FROM email_attachments
|
|
WHERE id = ?`, attachmentID).
|
|
Scan(&gmailMessageID, &gmailAttachmentID, &filename, &contentType)
|
|
|
|
if err != nil {
|
|
http.Error(w, "Attachment not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
// Fetch from Gmail
|
|
attachment, err := s.gmailService.Users.Messages.Attachments.
|
|
Get("me", gmailMessageID, gmailAttachmentID).Do()
|
|
if err != nil {
|
|
http.Error(w, "Failed to fetch attachment from Gmail", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Decode base64
|
|
data, err := base64.URLEncoding.DecodeString(attachment.Data)
|
|
if err != nil {
|
|
http.Error(w, "Failed to decode attachment", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Set headers and stream
|
|
w.Header().Set("Content-Type", contentType)
|
|
w.Header().Set("Content-Disposition", fmt.Sprintf("inline; filename=\"%s\"", filename))
|
|
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(data)))
|
|
w.Write(data)
|
|
}
|
|
|
|
func (s *HTTPServer) StreamRawEmail(w http.ResponseWriter, r *http.Request) {
|
|
vars := mux.Vars(r)
|
|
emailID := vars["id"]
|
|
|
|
// Get Gmail message ID
|
|
var gmailMessageID string
|
|
err := s.db.QueryRow("SELECT gmail_message_id FROM emails WHERE id = ?", emailID).
|
|
Scan(&gmailMessageID)
|
|
if err != nil {
|
|
http.Error(w, "Email not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
// Fetch from Gmail
|
|
message, err := s.gmailService.Users.Messages.Get("me", gmailMessageID).
|
|
Format("RAW").Do()
|
|
if err != nil {
|
|
http.Error(w, "Failed to fetch email from Gmail", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Decode and stream
|
|
rawEmail, err := base64.URLEncoding.DecodeString(message.Raw)
|
|
if err != nil {
|
|
http.Error(w, "Failed to decode email", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "message/rfc822")
|
|
w.Write(rawEmail)
|
|
}
|
|
|
|
// Original EmailProcessor methods (kept for local mode and shared logic)
|
|
func (p *EmailProcessor) loadMaps() error {
|
|
p.enquiryMap = make(map[string]int)
|
|
p.invoiceMap = make(map[string]int)
|
|
p.poMap = make(map[string]int)
|
|
p.userMap = make(map[string]int)
|
|
p.jobMap = make(map[string]int)
|
|
|
|
// Load enquiries
|
|
rows, err := p.db.Query("SELECT id, title FROM enquiries")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
var id int
|
|
var title string
|
|
if err := rows.Scan(&id, &title); err != nil {
|
|
return err
|
|
}
|
|
p.enquiryMap[title] = id
|
|
}
|
|
|
|
// Load invoices
|
|
rows, err = p.db.Query("SELECT id, title FROM invoices")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
var id int
|
|
var title string
|
|
if err := rows.Scan(&id, &title); err != nil {
|
|
return err
|
|
}
|
|
p.invoiceMap[title] = id
|
|
}
|
|
|
|
// Load purchase orders
|
|
rows, err = p.db.Query("SELECT id, title FROM purchase_orders")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
var id int
|
|
var title string
|
|
if err := rows.Scan(&id, &title); err != nil {
|
|
return err
|
|
}
|
|
p.poMap[title] = id
|
|
}
|
|
|
|
// Load users
|
|
rows, err = p.db.Query("SELECT id, email FROM users")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
var id int
|
|
var email string
|
|
if err := rows.Scan(&id, &email); err != nil {
|
|
return err
|
|
}
|
|
p.userMap[strings.ToLower(email)] = id
|
|
}
|
|
|
|
// Load jobs
|
|
rows, err = p.db.Query("SELECT id, title FROM jobs")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
var id int
|
|
var title string
|
|
if err := rows.Scan(&id, &title); err != nil {
|
|
return err
|
|
}
|
|
p.jobMap[title] = id
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *EmailProcessor) processEmails() error {
|
|
files, err := ioutil.ReadDir(p.config.VaultDir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, file := range files {
|
|
if file.IsDir() || file.Name() == "." || file.Name() == ".." {
|
|
continue
|
|
}
|
|
|
|
fmt.Printf("Handling %s\n", file.Name())
|
|
if err := p.processEmail(file.Name()); err != nil {
|
|
log.Printf("Error processing %s: %v\n", file.Name(), err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *EmailProcessor) processEmail(filename string) error {
|
|
emailPath := filepath.Join(p.config.VaultDir, filename)
|
|
content, err := ioutil.ReadFile(emailPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(content) == 0 {
|
|
fmt.Println("No content found. Ignoring this email")
|
|
return p.moveEmail(filename)
|
|
}
|
|
|
|
// Parse email with enmime
|
|
env, err := enmime.ReadEnvelope(bytes.NewReader(content))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Get recipients
|
|
toRecipients := p.parseEnmimeAddresses(env.GetHeader("To"))
|
|
fromRecipients := p.parseEnmimeAddresses(env.GetHeader("From"))
|
|
ccRecipients := p.parseEnmimeAddresses(env.GetHeader("Cc"))
|
|
|
|
// Check if we should save this email
|
|
saveThis := false
|
|
fromKnownUser := false
|
|
|
|
for _, email := range toRecipients {
|
|
if p.userExists(email) {
|
|
saveThis = true
|
|
}
|
|
}
|
|
|
|
for _, email := range fromRecipients {
|
|
if p.userExists(email) {
|
|
saveThis = true
|
|
fromKnownUser = true
|
|
}
|
|
}
|
|
|
|
for _, email := range ccRecipients {
|
|
if p.userExists(email) {
|
|
saveThis = true
|
|
}
|
|
}
|
|
|
|
subject := env.GetHeader("Subject")
|
|
if subject == "" {
|
|
fmt.Println("No subject found. Ignoring this email")
|
|
return p.moveEmail(filename)
|
|
}
|
|
|
|
// Check for identifiers in subject
|
|
foundEnquiries := p.checkIdentifier(subject, p.enquiryMap, "enquiry")
|
|
foundInvoices := p.checkIdentifier(subject, p.invoiceMap, "invoice")
|
|
foundPOs := p.checkIdentifier(subject, p.poMap, "purchaseorder")
|
|
foundJobs := p.checkIdentifier(subject, p.jobMap, "job")
|
|
|
|
foundIdent := len(foundEnquiries) > 0 || len(foundInvoices) > 0 ||
|
|
len(foundPOs) > 0 || len(foundJobs) > 0
|
|
|
|
if fromKnownUser || saveThis || foundIdent {
|
|
// Process and save the email
|
|
unixTime := time.Now().Unix()
|
|
if date, err := env.Date(); err == nil {
|
|
unixTime = date.Unix()
|
|
}
|
|
|
|
// Get recipient user IDs
|
|
recipientIDs := make(map[string][]int)
|
|
recipientIDs["to"] = p.getUserIDs(toRecipients)
|
|
recipientIDs["from"] = p.getUserIDs(fromRecipients)
|
|
recipientIDs["cc"] = p.getUserIDs(ccRecipients)
|
|
|
|
if len(recipientIDs["from"]) == 0 {
|
|
fmt.Println("Email has no From Recipient ID. Ignoring this email")
|
|
return p.moveEmail(filename)
|
|
}
|
|
|
|
fmt.Println("---------START MESSAGE -----------------")
|
|
fmt.Printf("Subject: %s\n", subject)
|
|
|
|
// Extract attachments using enmime
|
|
relativePath := p.getAttachmentDirectory(unixTime)
|
|
attachments := p.extractEnmimeAttachments(env, relativePath)
|
|
|
|
// Save email to database
|
|
if err := p.saveEmail(filename, subject, unixTime, recipientIDs, attachments,
|
|
foundEnquiries, foundInvoices, foundPOs, foundJobs); err != nil {
|
|
return err
|
|
}
|
|
|
|
fmt.Println("--------END MESSAGE ------")
|
|
} else {
|
|
fmt.Printf("Email will not be saved. Subject: %s\n", subject)
|
|
}
|
|
|
|
return p.moveEmail(filename)
|
|
}
|
|
|
|
func (p *EmailProcessor) parseEnmimeAddresses(header string) []string {
|
|
var emails []string
|
|
if header == "" {
|
|
return emails
|
|
}
|
|
|
|
addresses, err := enmime.ParseAddressList(header)
|
|
if err != nil {
|
|
return emails
|
|
}
|
|
|
|
for _, addr := range addresses {
|
|
emails = append(emails, strings.ToLower(addr.Address))
|
|
}
|
|
|
|
return emails
|
|
}
|
|
|
|
func (p *EmailProcessor) userExists(email string) bool {
|
|
_, exists := p.userMap[strings.ToLower(email)]
|
|
return exists
|
|
}
|
|
|
|
func (p *EmailProcessor) getUserIDs(emails []string) []int {
|
|
var ids []int
|
|
for _, email := range emails {
|
|
if id, exists := p.userMap[strings.ToLower(email)]; exists {
|
|
ids = append(ids, id)
|
|
} else {
|
|
// Create new user
|
|
newID := p.createUser(email)
|
|
if newID > 0 {
|
|
ids = append(ids, newID)
|
|
p.userMap[strings.ToLower(email)] = newID
|
|
}
|
|
}
|
|
}
|
|
return ids
|
|
}
|
|
|
|
func (p *EmailProcessor) createUser(email string) int {
|
|
fmt.Printf("Making a new User for: '%s'\n", email)
|
|
|
|
result, err := p.db.Exec(
|
|
`INSERT INTO users (principle_id, customer_id, type, access_level, username, password,
|
|
first_name, last_name, email, job_title, phone, mobile, fax, phone_extension,
|
|
direct_phone, notes, by_vault, blacklisted, enabled, primary_contact)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
0, 0, "contact", "none", "", "", "", "", strings.ToLower(email), "", "", "", "", "", "", "", 1, 0, 1, 0)
|
|
|
|
if err != nil {
|
|
fmt.Printf("Serious Error: Unable to create user for email '%s': %v\n", email, err)
|
|
return 0
|
|
}
|
|
|
|
id, err := result.LastInsertId()
|
|
if err != nil {
|
|
return 0
|
|
}
|
|
|
|
fmt.Printf("New User '%s' Added with ID: %d\n", email, id)
|
|
return int(id)
|
|
}
|
|
|
|
func (p *EmailProcessor) checkIdentifier(subject string, identMap map[string]int, identType string) []int {
|
|
var results []int
|
|
var re *regexp.Regexp
|
|
|
|
switch identType {
|
|
case "enquiry":
|
|
re = regexp.MustCompile(`CMC\d+([NVQWSOT]|ACT|NT)E\d+-\d+`)
|
|
case "invoice":
|
|
re = regexp.MustCompile(`CMCIN\d+`)
|
|
case "purchaseorder":
|
|
re = regexp.MustCompile(`CMCPO\d+`)
|
|
case "job":
|
|
re = regexp.MustCompile(`(JAN|FEB|MAR|APR|MAY|JUN|JUL|AUG|SEP|OCT|NOV|DEC)\d+(N|V|W|S|T|NT|ACT|Q|O)J\d+`)
|
|
}
|
|
|
|
if re != nil {
|
|
matches := re.FindAllString(subject, -1)
|
|
for _, match := range matches {
|
|
if id, exists := identMap[match]; exists {
|
|
results = append(results, id)
|
|
}
|
|
}
|
|
}
|
|
|
|
return results
|
|
}
|
|
|
|
func (p *EmailProcessor) getAttachmentDirectory(unixTime int64) string {
|
|
t := time.Unix(unixTime, 0)
|
|
monthYear := t.Format("01-2006")
|
|
path := filepath.Join(p.config.EmailDir, monthYear)
|
|
|
|
if err := os.MkdirAll(path, 0755); err != nil {
|
|
log.Printf("Failed to create directory %s: %v", path, err)
|
|
}
|
|
|
|
return monthYear
|
|
}
|
|
|
|
func (p *EmailProcessor) extractEnmimeAttachments(env *enmime.Envelope, relativePath string) []Attachment {
|
|
var attachments []Attachment
|
|
outputDir := filepath.Join(p.config.EmailDir, relativePath)
|
|
uuid := uuid.New().String()
|
|
|
|
// Ensure output directory exists
|
|
if err := os.MkdirAll(outputDir, 0755); err != nil {
|
|
log.Printf("Failed to create output directory: %v", err)
|
|
return attachments
|
|
}
|
|
|
|
biggestHTMLSize := int64(0)
|
|
biggestHTMLIdx := -1
|
|
biggestPlainSize := int64(0)
|
|
biggestPlainIdx := -1
|
|
|
|
// Process HTML part if exists
|
|
if env.HTML != "" {
|
|
htmlData := []byte(env.HTML)
|
|
fileName := "texthtml"
|
|
newFileName := uuid + "-" + fileName
|
|
filePath := filepath.Join(outputDir, newFileName)
|
|
|
|
if err := ioutil.WriteFile(filePath, htmlData, 0644); err == nil {
|
|
att := Attachment{
|
|
Type: "text/html",
|
|
Name: filepath.Join(relativePath, newFileName),
|
|
Filename: fileName,
|
|
Size: int64(len(htmlData)),
|
|
IsMessageBody: 0,
|
|
}
|
|
attachments = append(attachments, att)
|
|
biggestHTMLIdx = len(attachments) - 1
|
|
biggestHTMLSize = int64(len(htmlData))
|
|
}
|
|
}
|
|
|
|
// Process plain text part if exists
|
|
if env.Text != "" {
|
|
textData := []byte(env.Text)
|
|
fileName := "textplain"
|
|
newFileName := uuid + "-" + fileName
|
|
filePath := filepath.Join(outputDir, newFileName)
|
|
|
|
if err := ioutil.WriteFile(filePath, textData, 0644); err == nil {
|
|
att := Attachment{
|
|
Type: "text/plain",
|
|
Name: filepath.Join(relativePath, newFileName),
|
|
Filename: fileName,
|
|
Size: int64(len(textData)),
|
|
IsMessageBody: 0,
|
|
}
|
|
attachments = append(attachments, att)
|
|
biggestPlainIdx = len(attachments) - 1
|
|
biggestPlainSize = int64(len(textData))
|
|
}
|
|
}
|
|
|
|
// Process file attachments
|
|
for _, part := range env.Attachments {
|
|
fileName := part.FileName
|
|
if fileName == "" {
|
|
fileName = "attachment"
|
|
}
|
|
|
|
newFileName := uuid + "-" + fileName
|
|
filePath := filepath.Join(outputDir, newFileName)
|
|
|
|
if err := ioutil.WriteFile(filePath, part.Content, 0644); err != nil {
|
|
log.Printf("Failed to save attachment %s: %v", fileName, err)
|
|
continue
|
|
}
|
|
|
|
att := Attachment{
|
|
Type: part.ContentType,
|
|
Name: filepath.Join(relativePath, newFileName),
|
|
Filename: fileName,
|
|
Size: int64(len(part.Content)),
|
|
IsMessageBody: 0,
|
|
}
|
|
|
|
attachments = append(attachments, att)
|
|
idx := len(attachments) - 1
|
|
|
|
// Track largest HTML and plain text attachments
|
|
if strings.HasPrefix(part.ContentType, "text/html") && int64(len(part.Content)) > biggestHTMLSize {
|
|
biggestHTMLSize = int64(len(part.Content))
|
|
biggestHTMLIdx = idx
|
|
} else if strings.HasPrefix(part.ContentType, "text/plain") && int64(len(part.Content)) > biggestPlainSize {
|
|
biggestPlainSize = int64(len(part.Content))
|
|
biggestPlainIdx = idx
|
|
}
|
|
}
|
|
|
|
// Process inline parts
|
|
for _, part := range env.Inlines {
|
|
fileName := part.FileName
|
|
if fileName == "" {
|
|
fileName = "inline"
|
|
}
|
|
|
|
newFileName := uuid + "-" + fileName
|
|
filePath := filepath.Join(outputDir, newFileName)
|
|
|
|
if err := ioutil.WriteFile(filePath, part.Content, 0644); err != nil {
|
|
log.Printf("Failed to save inline part %s: %v", fileName, err)
|
|
continue
|
|
}
|
|
|
|
att := Attachment{
|
|
Type: part.ContentType,
|
|
Name: filepath.Join(relativePath, newFileName),
|
|
Filename: fileName,
|
|
Size: int64(len(part.Content)),
|
|
IsMessageBody: 0,
|
|
}
|
|
|
|
attachments = append(attachments, att)
|
|
idx := len(attachments) - 1
|
|
|
|
// Track largest HTML and plain text attachments
|
|
if strings.HasPrefix(part.ContentType, "text/html") && int64(len(part.Content)) > biggestHTMLSize {
|
|
biggestHTMLSize = int64(len(part.Content))
|
|
biggestHTMLIdx = idx
|
|
} else if strings.HasPrefix(part.ContentType, "text/plain") && int64(len(part.Content)) > biggestPlainSize {
|
|
biggestPlainSize = int64(len(part.Content))
|
|
biggestPlainIdx = idx
|
|
}
|
|
}
|
|
|
|
// Mark the message body
|
|
if biggestHTMLIdx >= 0 {
|
|
attachments[biggestHTMLIdx].IsMessageBody = 1
|
|
} else if biggestPlainIdx >= 0 {
|
|
attachments[biggestPlainIdx].IsMessageBody = 1
|
|
}
|
|
|
|
return attachments
|
|
}
|
|
|
|
func (p *EmailProcessor) saveEmail(filename, subject string, unixTime int64,
|
|
recipientIDs map[string][]int, attachments []Attachment,
|
|
foundEnquiries, foundInvoices, foundPOs, foundJobs []int) error {
|
|
|
|
tx, err := p.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
// Insert email
|
|
result, err := tx.Exec(
|
|
"INSERT INTO emails (user_id, udate, created, subject) VALUES (?, ?, NOW(), ?)",
|
|
recipientIDs["from"][0], unixTime, subject)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
emailID, err := result.LastInsertId()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Insert recipients
|
|
for recipType, userIDs := range recipientIDs {
|
|
for _, userID := range userIDs {
|
|
if recipType == "from" {
|
|
continue // From is already stored in emails.user_id
|
|
}
|
|
_, err = tx.Exec(
|
|
"INSERT INTO email_recipients (email_id, user_id, type) VALUES (?, ?, ?)",
|
|
emailID, userID, recipType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Insert attachments
|
|
for _, att := range attachments {
|
|
_, err = tx.Exec(
|
|
"INSERT INTO email_attachments (email_id, name, type, size, filename, is_message_body, created) VALUES (?, ?, ?, ?, ?, ?, NOW())",
|
|
emailID, att.Name, att.Type, att.Size, att.Filename, att.IsMessageBody)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Insert associations
|
|
for _, jobID := range foundJobs {
|
|
_, err = tx.Exec("INSERT INTO emails_jobs (email_id, job_id) VALUES (?, ?)", emailID, jobID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for _, poID := range foundPOs {
|
|
_, err = tx.Exec("INSERT INTO emails_purchase_orders (email_id, purchase_order_id) VALUES (?, ?)", emailID, poID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for _, enqID := range foundEnquiries {
|
|
_, err = tx.Exec("INSERT INTO emails_enquiries (email_id, enquiry_id) VALUES (?, ?)", emailID, enqID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for _, invID := range foundInvoices {
|
|
_, err = tx.Exec("INSERT INTO emails_invoices (email_id, invoice_id) VALUES (?, ?)", emailID, invID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return err
|
|
}
|
|
|
|
fmt.Println("Success. We made an email")
|
|
return nil
|
|
}
|
|
|
|
func (p *EmailProcessor) moveEmail(filename string) error {
|
|
oldPath := filepath.Join(p.config.VaultDir, filename)
|
|
newPath := filepath.Join(p.config.ProcessedDir, filename+":S")
|
|
|
|
if err := os.Rename(oldPath, newPath); err != nil {
|
|
fmt.Printf("Unable to move %s to %s: %v\n", oldPath, newPath, err)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type Attachment struct {
|
|
Type string
|
|
Name string
|
|
Filename string
|
|
Size int64
|
|
IsMessageBody int
|
|
}
|