Level 3: Technical Implementation

Level 3: Technical Implementation

Database Schema

-- Import jobs tracking
CREATE TABLE import_jobs (
  id UUID PRIMARY KEY,
  tenant_id UUID NOT NULL REFERENCES tenants(id),
  workspace_id UUID REFERENCES workspaces(id),

  -- File information
  file_name VARCHAR(255),
  file_size BIGINT,
  file_url TEXT,  -- S3 URL
  file_type VARCHAR(50),  -- csv, xlsx, json

  -- Import configuration
  field_mapping JSONB,
  import_options JSONB,  -- duplicate strategy, tags, etc.

  -- Status
  status VARCHAR(50),  -- pending, processing, completed, failed, rolled_back

  -- Results
  total_rows INTEGER,
  imported_count INTEGER DEFAULT 0,
  updated_count INTEGER DEFAULT 0,
  created_count INTEGER DEFAULT 0,
  skipped_count INTEGER DEFAULT 0,
  error_count INTEGER DEFAULT 0,

  -- Error details
  errors JSONB,  -- Array of row errors

  -- Progress tracking
  processed_rows INTEGER DEFAULT 0,
  started_at TIMESTAMP,
  completed_at TIMESTAMP,

  -- User tracking
  created_by UUID REFERENCES users(id),
  created_at TIMESTAMP DEFAULT NOW(),

  -- Rollback support
  can_rollback BOOLEAN DEFAULT TRUE,
  rolled_back_at TIMESTAMP
);

CREATE INDEX idx_import_jobs_tenant ON import_jobs(tenant_id);
CREATE INDEX idx_import_jobs_status ON import_jobs(status);
CREATE INDEX idx_import_jobs_created ON import_jobs(created_at);

-- Import events (for rollback)
CREATE TABLE import_events (
  id UUID PRIMARY KEY,
  import_job_id UUID NOT NULL REFERENCES import_jobs(id),
  contact_id UUID NOT NULL REFERENCES contacts(id),

  -- Event type
  event_type VARCHAR(50),  -- created, updated, skipped

  -- Snapshot of old data (for rollback)
  previous_data JSONB,
  new_data JSONB,

  created_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_import_events_job ON import_events(import_job_id);
CREATE INDEX idx_import_events_contact ON import_events(contact_id);

-- Export jobs
CREATE TABLE export_jobs (
  id UUID PRIMARY KEY,
  tenant_id UUID NOT NULL REFERENCES tenants(id),

  -- Export configuration
  export_type VARCHAR(50),  -- full, filtered, segment
  filters JSONB,
  selected_fields JSONB,

  -- Output
  file_format VARCHAR(50),  -- csv, xlsx, json
  file_url TEXT,  -- S3 URL
  file_size BIGINT,

  -- Status
  status VARCHAR(50),  -- pending, processing, completed, failed

  -- Results
  total_contacts INTEGER,

  -- Progress
  processed_contacts INTEGER DEFAULT 0,
  started_at TIMESTAMP,
  completed_at TIMESTAMP,
  expires_at TIMESTAMP,  -- Auto-delete after 7 days

  created_by UUID REFERENCES users(id),
  created_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_export_jobs_tenant ON export_jobs(tenant_id);
CREATE INDEX idx_export_jobs_status ON export_jobs(status);

Import Service

interface ImportOptions {
  fileName: string;
  fileUrl: string;
  fileType: 'csv' | 'xlsx' | 'json';
  fieldMapping: Record<string, string>;
  duplicateStrategy: 'skip' | 'update' | 'create';
  tags?: string[];
  segmentName?: string;
  validateEmail?: boolean;
  validatePhone?: boolean;
}

interface ImportResult {
  jobId: string;
  totalRows: number;
  imported: number;
  created: number;
  updated: number;
  skipped: number;
  errors: ImportError[];
}

interface ImportError {
  row: number;
  field?: string;
  value?: any;
  error: string;
}

class ImportService {
  async startImport(
    tenantId: string,
    options: ImportOptions
  ): Promise<ImportResult> {
    // Create import job
    const job = await db.importJobs.create({
      tenantId,
      fileName: options.fileName,
      fileUrl: options.fileUrl,
      fileType: options.fileType,
      fieldMapping: options.fieldMapping,
      importOptions: {
        duplicateStrategy: options.duplicateStrategy,
        tags: options.tags,
        segmentName: options.segmentName,
      },
      status: 'pending',
    });

    // Queue for processing
    await importQueue.add('process-import', {
      jobId: job.id,
      options,
    });

    return {
      jobId: job.id,
      totalRows: 0,
      imported: 0,
      created: 0,
      updated: 0,
      skipped: 0,
      errors: [],
    };
  }

  async processImport(jobId: string, options: ImportOptions): Promise<void> {
    const job = await db.importJobs.findById(jobId);

    try {
      // Update status
      await db.importJobs.update(jobId, {
        status: 'processing',
        startedAt: new Date(),
      });

      // Download file from S3
      const fileContent = await s3.getObject(options.fileUrl);

      // Parse file
      const rows = await this.parseFile(fileContent, options.fileType);

      await db.importJobs.update(jobId, {
        totalRows: rows.length,
      });

      // Process rows
      const results = await this.processRows(
        jobId,
        rows,
        options
      );

      // Create segment if specified
      if (options.segmentName) {
        await this.createSegment(
          job.tenantId,
          options.segmentName,
          results.contactIds
        );
      }

      // Update job with results
      await db.importJobs.update(jobId, {
        status: 'completed',
        completedAt: new Date(),
        importedCount: results.imported,
        createdCount: results.created,
        updatedCount: results.updated,
        skippedCount: results.skipped,
        errorCount: results.errors.length,
        errors: results.errors,
      });

      // Send notification
      await this.notifyImportComplete(job, results);

    } catch (error) {
      await db.importJobs.update(jobId, {
        status: 'failed',
        completedAt: new Date(),
        errors: [{ row: 0, error: error.message }],
      });

      throw error;
    }
  }

  private async parseFile(
    content: Buffer,
    fileType: string
  ): Promise<any[]> {
    switch (fileType) {
      case 'csv':
        return await this.parseCSV(content);
      case 'xlsx':
        return await this.parseExcel(content);
      case 'json':
        return JSON.parse(content.toString());
      default:
        throw new Error(`Unsupported file type: ${fileType}`);
    }
  }

  private async parseCSV(content: Buffer): Promise<any[]> {
    return new Promise((resolve, reject) => {
      const results: any[] = [];

      const stream = Readable.from(content);

      stream
        .pipe(csv())
        .on('data', (row) => results.push(row))
        .on('end', () => resolve(results))
        .on('error', reject);
    });
  }

  private async parseExcel(content: Buffer): Promise<any[]> {
    const workbook = XLSX.read(content);
    const sheetName = workbook.SheetNames[0];
    const sheet = workbook.Sheets[sheetName];

    return XLSX.utils.sheet_to_json(sheet);
  }

  private async processRows(
    jobId: string,
    rows: any[],
    options: ImportOptions
  ): Promise<{
    imported: number;
    created: number;
    updated: number;
    skipped: number;
    errors: ImportError[];
    contactIds: string[];
  }> {
    let imported = 0;
    let created = 0;
    let updated = 0;
    let skipped = 0;
    const errors: ImportError[] = [];
    const contactIds: string[] = [];

    for (let i = 0; i < rows.length; i++) {
      const row = rows[i];

      try {
        // Map fields
        const mappedData = this.mapFields(row, options.fieldMapping);

        // Validate
        const validation = await this.validateContact(mappedData, options);

        if (!validation.valid) {
          errors.push({
            row: i + 1,
            error: validation.error,
          });
          skipped++;
          continue;
        }

        // Check for duplicates
        const existing = await db.contacts.findByEmail(
          mappedData.email
        );

        if (existing) {
          if (options.duplicateStrategy === 'skip') {
            skipped++;
            continue;
          } else if (options.duplicateStrategy === 'update') {
            // Store previous data for rollback
            await db.importEvents.create({
              importJobId: jobId,
              contactId: existing.id,
              eventType: 'updated',
              previousData: existing,
              newData: mappedData,
            });

            // Update contact
            await db.contacts.update(existing.id, mappedData);

            updated++;
            imported++;
            contactIds.push(existing.id);
          }
        } else {
          // Create new contact
          const contact = await db.contacts.create(mappedData);

          await db.importEvents.create({
            importJobId: jobId,
            contactId: contact.id,
            eventType: 'created',
            newData: mappedData,
          });

          created++;
          imported++;
          contactIds.push(contact.id);
        }

        // Add tags
        if (options.tags && options.tags.length > 0) {
          const contact = existing || await db.contacts.findByEmail(mappedData.email);
          await this.addTags(contact.id, options.tags);
        }

        // Update progress
        await db.importJobs.update(jobId, {
          processedRows: i + 1,
        });

      } catch (error) {
        errors.push({
          row: i + 1,
          error: error.message,
        });
        skipped++;
      }
    }

    return {
      imported,
      created,
      updated,
      skipped,
      errors,
      contactIds,
    };
  }

  private mapFields(
    row: any,
    mapping: Record<string, string>
  ): any {
    const mapped: any = {};

    for (const [csvField, dbField] of Object.entries(mapping)) {
      if (row[csvField] !== undefined) {
        mapped[dbField] = row[csvField];
      }
    }

    return mapped;
  }

  private async validateContact(
    data: any,
    options: ImportOptions
  ): Promise<{ valid: boolean; error?: string }> {
    // Email required
    if (!data.email) {
      return { valid: false, error: 'Email is required' };
    }

    // Email format
    if (options.validateEmail) {
      const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
      if (!emailRegex.test(data.email)) {
        return { valid: false, error: 'Invalid email format' };
      }
    }

    // Phone format
    if (options.validatePhone && data.phone) {
      const phoneRegex = /^[\+]?[(]?[0-9]{3}[)]?[-\s\.]?[0-9]{3}[-\s\.]?[0-9]{4,6}$/;
      if (!phoneRegex.test(data.phone)) {
        return { valid: false, error: 'Invalid phone format' };
      }
    }

    return { valid: true };
  }

  async rollbackImport(jobId: string): Promise<void> {
    const job = await db.importJobs.findById(jobId);

    if (!job.canRollback) {
      throw new Error('Import cannot be rolled back');
    }

    // Get all events
    const events = await db.importEvents.findAll({
      where: { importJobId: jobId },
    });

    for (const event of events) {
      if (event.eventType === 'created') {
        // Delete contact
        await db.contacts.delete(event.contactId);
      } else if (event.eventType === 'updated') {
        // Restore previous data
        await db.contacts.update(
          event.contactId,
          event.previousData
        );
      }
    }

    // Update job
    await db.importJobs.update(jobId, {
      status: 'rolled_back',
      rolledBackAt: new Date(),
    });
  }
}

Export Service

interface ExportOptions {
  format: 'csv' | 'xlsx' | 'json';
  filters?: any;
  segmentId?: string;
  selectedFields?: string[];
  includeCustomFields?: boolean;
  includeEngagementMetrics?: boolean;
}

class ExportService {
  async startExport(
    tenantId: string,
    options: ExportOptions
  ): Promise<{ jobId: string }> {
    const job = await db.exportJobs.create({
      tenantId,
      exportType: options.segmentId ? 'segment' : options.filters ? 'filtered' : 'full',
      filters: options.filters,
      selectedFields: options.selectedFields,
      fileFormat: options.format,
      status: 'pending',
      expiresAt: new Date(Date.now() + 7 * 24 * 60 * 60 * 1000),  // 7 days
    });

    // Queue for processing
    await exportQueue.add('process-export', {
      jobId: job.id,
      options,
    });

    return { jobId: job.id };
  }

  async processExport(jobId: string, options: ExportOptions): Promise<void> {
    try {
      await db.exportJobs.update(jobId, {
        status: 'processing',
        startedAt: new Date(),
      });

      // Fetch contacts
      const contacts = await this.fetchContacts(options);

      await db.exportJobs.update(jobId, {
        totalContacts: contacts.length,
      });

      // Generate file
      const fileContent = await this.generateFile(contacts, options);

      // Upload to S3
      const fileUrl = await s3.putObject({
        bucket: 'exports',
        key: `${jobId}.${options.format}`,
        body: fileContent,
        expiresIn: 7 * 24 * 60 * 60,  // 7 days
      });

      await db.exportJobs.update(jobId, {
        status: 'completed',
        completedAt: new Date(),
        fileUrl,
        fileSize: fileContent.length,
      });

    } catch (error) {
      await db.exportJobs.update(jobId, {
        status: 'failed',
        completedAt: new Date(),
      });
      throw error;
    }
  }

  private async fetchContacts(options: ExportOptions): Promise<any[]> {
    let query: any = {};

    if (options.segmentId) {
      query.include = [{
        model: db.segmentContacts,
        where: { segmentId: options.segmentId },
      }];
    }

    if (options.filters) {
      query.where = options.filters;
    }

    return await db.contacts.findAll(query);
  }

  private async generateFile(
    contacts: any[],
    options: ExportOptions
  ): Promise<Buffer> {
    switch (options.format) {
      case 'csv':
        return this.generateCSV(contacts, options);
      case 'xlsx':
        return this.generateExcel(contacts, options);
      case 'json':
        return Buffer.from(JSON.stringify(contacts, null, 2));
      default:
        throw new Error(`Unsupported format: ${options.format}`);
    }
  }

  private async generateCSV(
    contacts: any[],
    options: ExportOptions
  ): Promise<Buffer> {
    const fields = options.selectedFields || [
      'email',
      'firstName',
      'lastName',
      'company',
      'phone',
    ];

    const csvWriter = createObjectCsvStringifier({
      header: fields.map(f => ({ id: f, title: f })),
    });

    const csv = csvWriter.getHeaderString() + csvWriter.stringifyRecords(contacts);

    return Buffer.from(csv);
  }

  private async generateExcel(
    contacts: any[],
    options: ExportOptions
  ): Promise<Buffer> {
    const workbook = XLSX.utils.book_new();
    const worksheet = XLSX.utils.json_to_sheet(contacts);

    XLSX.utils.book_append_sheet(workbook, worksheet, 'Contacts');

    return XLSX.write(workbook, { type: 'buffer', bookType: 'xlsx' });
  }
}

Background Jobs

// Process import jobs
importQueue.process('process-import', async (job) => {
  const { jobId, options } = job.data;
  const service = new ImportService();

  await service.processImport(jobId, options);
});

// Process export jobs
exportQueue.process('process-export', async (job) => {
  const { jobId, options } = job.data;
  const service = new ExportService();

  await service.processExport(jobId, options);
});

// Clean up old exports
cron.schedule('0 4 * * *', async () => {  // 4 AM daily
  const expiredExports = await db.exportJobs.findAll({
    where: {
      expiresAt: { [Op.lt]: new Date() },
      status: 'completed',
    },
  });

  for (const exportJob of expiredExports) {
    // Delete from S3
    if (exportJob.fileUrl) {
      await s3.deleteObject(exportJob.fileUrl);
    }

    // Delete job
    await db.exportJobs.delete(exportJob.id);
  }
});

API Endpoints

// Start import
app.post('/api/contacts/import', authenticate, async (req, res) => {
  const { fileName, fileUrl, fieldMapping, options } = req.body;

  const service = new ImportService();
  const result = await service.startImport(req.user.tenantId, {
    fileName,
    fileUrl,
    fileType: 'csv',
    fieldMapping,
    ...options,
  });

  return res.json(result);
});

// Get import status
app.get('/api/contacts/import/:jobId', authenticate, async (req, res) => {
  const job = await db.importJobs.findById(req.params.jobId);

  if (job.tenantId !== req.user.tenantId) {
    return res.status(403).json({ error: 'Forbidden' });
  }

  return res.json(job);
});

// Rollback import
app.post('/api/contacts/import/:jobId/rollback', authenticate, async (req, res) => {
  const service = new ImportService();
  await service.rollbackImport(req.params.jobId);

  return res.json({ success: true });
});

// Start export
app.post('/api/contacts/export', authenticate, async (req, res) => {
  const service = new ExportService();
  const result = await service.startExport(req.user.tenantId, req.body);

  return res.json(result);
});

// Get export status
app.get('/api/contacts/export/:jobId', authenticate, async (req, res) => {
  const job = await db.exportJobs.findById(req.params.jobId);

  if (job.tenantId !== req.user.tenantId) {
    return res.status(403).json({ error: 'Forbidden' });
  }

  return res.json(job);
});

// Download export
app.get('/api/contacts/export/:jobId/download', authenticate, async (req, res) => {
  const job = await db.exportJobs.findById(req.params.jobId);

  if (job.tenantId !== req.user.tenantId) {
    return res.status(403).json({ error: 'Forbidden' });
  }

  if (job.status !== 'completed') {
    return res.status(400).json({ error: 'Export not ready' });
  }

  // Generate signed URL for S3 download
  const signedUrl = await s3.getSignedUrl(job.fileUrl, 300);  // 5 minutes

  return res.json({ downloadUrl: signedUrl });
});



Last Updated: November 25, 2025 Status: Planned - High Priority (Level 2) Target Release: Q1 2026 Owner: Leads Team