Back to Skills

Agent CRM

by Tyrell

A complete CRM with no UIβ€”just natural language. Track contacts, deals, interactions, and tasks through conversation. Use when: (1) adding/finding contacts, (2) creating or updating deals and pipeline, (3) logging calls/emails/meetings, (4) managing follow-up tasks, (5) generating pipeline reports or charts, (6) parsing emails/notes into CRM data. Supports: pipeline tracking, deal stages, activity logging, task management, alerts for stale contacts, visual charts, CSV/JSON export, database backup/restore.

1.0.1
$ npx skills add https://github.com/ianpcook/agent-crm

Files

SKILL.mdMain
11.7 KB
---
name: Agent CRM
description: >
  A complete CRM with no UIβ€”just natural language. Track contacts, deals, interactions, 
  and tasks through conversation. Use when: (1) adding/finding contacts, (2) creating or 
  updating deals and pipeline, (3) logging calls/emails/meetings, (4) managing follow-up 
  tasks, (5) generating pipeline reports or charts, (6) parsing emails/notes into CRM data.
  Supports: pipeline tracking, deal stages, activity logging, task management, alerts for 
  stale contacts, visual charts, CSV/JSON export, database backup/restore.
version: 1.0.1
author: Tyrell
category: productivity
agents:
  - claude-code
  - openclaw
---

# Agent CRM

A CRM with no UI. Just you, a database, and natural language.

## Overview

The Agent CRM replaces traditional CRM software with a conversational interface. Data lives in SQLite; you are the interface. Every action is audited with conversation context.

## Scripts

| Script | Purpose |
|--------|---------|
| `crm.py` | Core CRUD operations |
| `crm-ingest.py` | Parse unstructured text β†’ structured CRM actions |
| `crm-digest.py` | Generate daily digest / pipeline summary |
| `crm-notify.py` | Check for alerts (overdue tasks, stale contacts, closing deals) |
| `crm-webhook.py` | HTTP server for form/lead ingestion |
| `crm-report.py` | Pipeline analytics, activity reports, win/loss analysis |
| `crm-chart.py` | Generate visual charts (auto-bootstraps matplotlib) |
| `crm-export.py` | Export data to CSV/JSON |
| `crm-backup.py` | Backup/restore database |

---

## CLI: `crm`

All scripts are in the `scripts/` directory. The database auto-initializes on first use.

### Contacts

```bash
# Add a contact
crm add-contact "Sarah Chen" --email sarah@replicate.com --company Replicate --role CTO --source "AI meetup"

# Find contacts
crm find-contact "sarah"
crm find-contact "replicate"

# List contacts
crm list-contacts
crm list-contacts --recent

# Update contact
crm update-contact "Sarah Chen" --phone "415-555-1234" --notes "Interested in API integration"

# Delete (use with caution)
crm delete-contact <id> --reason "Duplicate entry"
```

### Deals

```bash
# Add a deal
crm add-deal "Replicate API License" --value 50000 --contact "Sarah Chen" --stage qualified --expected-close "next month"

# List deals
crm list-deals
crm list-deals --stage proposal

# Update deal
crm update-deal "Replicate" --stage negotiation --probability 70

# Pipeline summary
crm pipeline
```

### Interactions

```bash
# Log an interaction
crm log call "Discussed pricing, she'll review with team" --contact "Sarah Chen" --direction outbound

crm log email "Sent proposal PDF" --contact "Sarah" --direction outbound

crm log meeting "Demo of API features, very positive" --contact "Sarah" --date yesterday
```

### Tasks

```bash
# Add task
crm add-task "Follow up on proposal" --contact "Sarah Chen" --due "next tuesday" --priority high

# List tasks
crm list-tasks --pending
crm list-tasks --overdue

# Complete task
crm complete-task "Follow up on proposal"
```

### Queries

```bash
# Pipeline stats
crm stats

# Raw SQL (SELECT only)
crm query "SELECT name, company FROM contacts WHERE company LIKE '%tech%'"
```

## Confirmation Rules

**Always confirm before:**
- Creating/updating deals > $10,000
- Changing deal stage to `won` or `lost`
- Deleting any record
- Bulk updates (future)

**Example flow:**
```
User: "Mark the Replicate deal as won"
You: "Confirm: Mark 'Replicate API License' ($50,000) as WON? (yes/no)"
User: "yes"
You: [execute] "Done. Deal closed at $50K. πŸŽ‰"
```

## Audit Trail

Every write operation is logged to `audit_log` table with:
- What changed (old β†’ new values)
- Why (use `--reason` flag)
- When

View audit history:
```bash
crm query "SELECT * FROM audit_log ORDER BY created_at DESC LIMIT 10"
```

## Data Location

- **Database:** `~/.local/share/agent-crm/crm.db`
- **Schema:** `skills/agent-crm/schema.sql`

## Stages

Valid deal stages (in order):
1. `lead` β€” Initial contact
2. `qualified` β€” Confirmed interest/budget
3. `proposal` β€” Sent proposal/quote
4. `negotiation` β€” Active negotiation
5. `won` β€” Closed won βœ…
6. `lost` β€” Closed lost ❌

## Interaction Types

- `email` β€” Email correspondence
- `call` β€” Phone/video call
- `meeting` β€” In-person or scheduled meeting
- `note` β€” Internal note (no contact involved)
- `linkedin` β€” LinkedIn message/interaction
- `text` β€” SMS/iMessage/WhatsApp

---

## CLI: `crm-ingest`

Parses unstructured text (emails, meeting notes, call summaries) and extracts structured data.

```bash
# From stdin
echo "Met Sarah Chen at the AI meetup. She's CTO at Replicate, interested in API." | crm-ingest

# From file
crm-ingest --file meeting-notes.txt

# Force type detection
crm-ingest --type email --file forwarded-email.txt
```

**Output:** JSON with extracted entities and suggested actions:
- Contact names, emails, phones, companies
- Interaction type and direction
- Deal signals (stage hints, positive/negative indicators)
- Monetary amounts
- Potential tasks
- Suggested CRM actions for your review

**Workflow:**
1. User pastes text or forwards email
2. Run `crm-ingest` to extract entities
3. Review the suggested actions
4. Execute the ones that make sense via `crm` commands

---

## CLI: `crm-digest`

Generates a daily summary of CRM activity.

```bash
# Human-readable digest
crm-digest

# JSON output
crm-digest --json

# Custom time range
crm-digest --lookback 7 --lookahead 14
```

**Includes:**
- Recent activity (new contacts, deals, interactions)
- Pipeline summary by stage
- Tasks due today / overdue
- Deals closing soon
- Contacts needing follow-up (14+ days inactive)
- Won deals this month

**For daily briefings:** Schedule via cron or include in morning heartbeat.

---

## Confirmation Flow

**ALWAYS confirm before:**

| Action | Threshold |
|--------|-----------|
| Create deal | value > $10,000 |
| Update deal | value > $10,000 |
| Change stage | β†’ `won` or `lost` |
| Delete any record | Always |

**Flow pattern:**
```
User: "Mark the Replicate deal as won"

You: "⚠️ Confirm: Mark 'Replicate API License' ($50,000) as WON?
      This will close the deal and log the win.
      Reply 'yes' to confirm."

User: "yes"

You: [run: crm update-deal "Replicate" --stage won --reason "User confirmed close"]
     "Done. Deal closed at $50K. πŸŽ‰
      Want me to create a follow-up task for invoicing?"
```

**Never auto-execute high-stakes actions.** Even if the user sounds certain, confirm first.

---

## Tips

1. **Be conversational.** User says "I just talked to Sarah" β†’ you log the interaction
2. **Infer intelligently.** "Add Mike from Acme" β†’ create contact with company=Acme
3. **Create follow-ups.** After logging a call, offer to create a task
4. **Summarize.** "What's my pipeline?" β†’ run `crm pipeline` and present nicely
5. **Link things.** Deals to contacts, tasks to deals, interactions to everything
6. **Use ingest for bulk.** User pastes meeting notes β†’ run through `crm-ingest` β†’ execute sensible actions
7. **Daily digest.** Run `crm-digest` during morning heartbeat if CRM has data
8. **Check alerts.** Run `crm-notify` during heartbeat to catch overdue items
9. **Proactive follow-ups.** When `crm-notify` shows stale contacts, suggest reaching out

---

## CLI: `crm-notify`

Checks for items needing attention. Run from heartbeat or on-demand.

```bash
# All alerts
crm-notify

# JSON output
crm-notify --json

# Specific alert types
crm-notify --type overdue_task
crm-notify --type stale_contact

# Custom thresholds
crm-notify --stale-days 7 --closing-days 14 --stuck-days 30
```

**Alert types:**
- `overdue_task` β€” Tasks past due date
- `task_due_today` β€” Tasks due today
- `deal_closing_soon` β€” Deals with expected close within N days
- `stale_contact` β€” Contacts with open deals but no interaction in N days
- `deal_stuck` β€” Deals unchanged for N days

**Heartbeat integration:** Add to HEARTBEAT.md check cycle.

---

## CLI: `crm-webhook`

HTTP server for ingesting leads from external forms (Typeform, Tally, etc).

```bash
# Start server
crm-webhook --port 8901

# Endpoints:
# POST /lead    β€” Create contact from form submission
# POST /contact β€” Alias for /lead
# GET  /health  β€” Health check
```

**Supported formats:**
- Typeform webhooks
- Tally webhooks
- Generic JSON with standard field names (name, email, phone, company)

**Example curl:**
```bash
curl -X POST http://localhost:8901/lead \
  -H "Content-Type: application/json" \
  -d '{"name": "Alex Rivera", "email": "alex@datastack.io", "company": "DataStack"}'
```

**Log file:** `~/.local/share/agent-crm/webhook.log`

---

## CLI: `crm-report`

Analytics and pipeline reports.

```bash
# Pipeline summary with forecast
crm-report pipeline

# Activity report (last 30 days)
crm-report activity --days 30

# Win/loss analysis
crm-report winloss --days 90

# JSON output
crm-report pipeline --json
```

**Pipeline report includes:**
- Deals by stage with weighted values
- Forecast by expected close month
- Top 10 deals by value

**Activity report includes:**
- Interactions by type
- Tasks created vs completed
- Deal stage movements

**Win/loss report includes:**
- Win rate percentage
- Average deal value (won vs lost)
- Average sales cycle length

---

---

## CLI: `crm-chart`

Generate visual charts from CRM data. Auto-bootstraps its own venv with matplotlib on first run.

```bash
crm-chart pipeline    # Deal value by stage
crm-chart forecast    # Expected closes by month
crm-chart activity    # Interactions over time
crm-chart winloss     # Won vs lost by month
crm-chart summary     # Full dashboard
```

**Options:**
```bash
crm-chart forecast --months 6     # Forecast range
crm-chart activity --days 30      # Activity lookback
crm-chart pipeline --output /tmp/chart.png  # Custom output
```

**Output:** JSON with path to PNG:
```json
{"status": "success", "chart": "pipeline", "path": "/Users/.../.local/share/agent-crm/charts/pipeline_20260208.png"}
```

**Sending to user:** Run the chart, then use `message` tool with `filePath` to send the PNG.

**Example flow:**
```
User: "Show me the pipeline"
You: [run crm-chart pipeline]
     [send image via message tool with filePath]
```

---

## CLI: `crm-export`

Export CRM data to CSV or JSON.

```bash
crm-export contacts              # Export contacts (JSON)
crm-export deals --format csv    # Export deals (CSV)
crm-export all                   # Full database export
crm-export tasks --output /tmp   # Custom output dir
```

**Export types:** `contacts`, `deals`, `interactions`, `tasks`, `all`

**Output:** Files saved to `~/.local/share/agent-crm/exports/`

---

## CLI: `crm-backup`

Database backup and restore.

```bash
# Create backup
crm-backup backup
crm-backup backup --note "Before big import"

# List backups
crm-backup list

# Restore (requires --confirm)
crm-backup restore                     # Restore latest
crm-backup restore /path/to/backup.db --confirm

# Prune old backups
crm-backup prune --keep 5
```

**Safety:** Restore always creates a safety backup first.

**Output:** Backups saved to `~/.local/share/agent-crm/backups/`

---

## Heartbeat Integration

For proactive CRM monitoring, add to `HEARTBEAT.md`:

```markdown
## CRM Check (every 4 hours)

If 4+ hours since last CRM check:

1. Run `crm-notify` to check for alerts
2. If high-priority alerts exist, message Tyrell
3. Update `lastCrmCheck` in `memory/heartbeat-state.json`
```

---

## Example Conversation

```
User: Met Alex Rivera at the startup mixer. She's founder of DataStack, 
      looking for AI consulting. Could be a $30K engagement.

You: Created:
  β€’ Contact: Alex Rivera (Founder @ DataStack)
    Source: startup mixer
  β€’ Deal: "DataStack AI Consulting" β€” $30,000 (lead stage)
  β€’ Interaction: Met at startup mixer, interested in AI consulting
  
  Want me to set a follow-up task?

User: Yeah, email her next week

You: Task created: "Email Alex Rivera" due Feb 15 (next Saturday)
```
schema.sql
3.6 KB
-- Agent CRM Schema
-- SQLite version (single-player)

-- Enable foreign keys
PRAGMA foreign_keys = ON;

-- Contacts
CREATE TABLE IF NOT EXISTS contacts (
    id          TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
    name        TEXT NOT NULL,
    email       TEXT,
    phone       TEXT,
    company     TEXT,
    role        TEXT,
    source      TEXT,
    tags        TEXT,  -- JSON array
    notes       TEXT,
    created_at  TEXT DEFAULT (datetime('now')),
    updated_at  TEXT DEFAULT (datetime('now'))
);

CREATE INDEX IF NOT EXISTS idx_contacts_name ON contacts(name);
CREATE INDEX IF NOT EXISTS idx_contacts_company ON contacts(company);
CREATE INDEX IF NOT EXISTS idx_contacts_email ON contacts(email);

-- Companies (denormalized from contacts, but useful for rollups)
CREATE TABLE IF NOT EXISTS companies (
    id          TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
    name        TEXT NOT NULL UNIQUE,
    domain      TEXT,
    industry    TEXT,
    size        TEXT,
    notes       TEXT,
    created_at  TEXT DEFAULT (datetime('now')),
    updated_at  TEXT DEFAULT (datetime('now'))
);

-- Deals
CREATE TABLE IF NOT EXISTS deals (
    id              TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
    contact_id      TEXT REFERENCES contacts(id),
    company_id      TEXT REFERENCES companies(id),
    title           TEXT NOT NULL,
    value           REAL,
    currency        TEXT DEFAULT 'USD',
    stage           TEXT NOT NULL DEFAULT 'lead',
    probability     INTEGER,
    expected_close  TEXT,
    notes           TEXT,
    created_at      TEXT DEFAULT (datetime('now')),
    updated_at      TEXT DEFAULT (datetime('now')),
    closed_at       TEXT
);

CREATE INDEX IF NOT EXISTS idx_deals_stage ON deals(stage);
CREATE INDEX IF NOT EXISTS idx_deals_contact ON deals(contact_id);

-- Interactions
CREATE TABLE IF NOT EXISTS interactions (
    id          TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
    contact_id  TEXT REFERENCES contacts(id),
    deal_id     TEXT REFERENCES deals(id),
    type        TEXT NOT NULL,  -- email, call, meeting, note, linkedin, text
    direction   TEXT,           -- inbound, outbound
    summary     TEXT NOT NULL,
    raw_content TEXT,
    occurred_at TEXT NOT NULL,
    logged_at   TEXT DEFAULT (datetime('now')),
    logged_by   TEXT DEFAULT 'agent'
);

CREATE INDEX IF NOT EXISTS idx_interactions_contact ON interactions(contact_id);
CREATE INDEX IF NOT EXISTS idx_interactions_occurred ON interactions(occurred_at);

-- Tasks
CREATE TABLE IF NOT EXISTS tasks (
    id           TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
    contact_id   TEXT REFERENCES contacts(id),
    deal_id      TEXT REFERENCES deals(id),
    title        TEXT NOT NULL,
    due_at       TEXT,
    completed_at TEXT,
    priority     TEXT DEFAULT 'normal',
    created_at   TEXT DEFAULT (datetime('now'))
);

CREATE INDEX IF NOT EXISTS idx_tasks_due ON tasks(due_at);
CREATE INDEX IF NOT EXISTS idx_tasks_completed ON tasks(completed_at);

-- Audit Log
CREATE TABLE IF NOT EXISTS audit_log (
    id               TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
    table_name       TEXT NOT NULL,
    record_id        TEXT NOT NULL,
    action           TEXT NOT NULL,  -- INSERT, UPDATE, DELETE
    old_values       TEXT,           -- JSON
    new_values       TEXT,           -- JSON
    reason           TEXT,
    conversation_ref TEXT,
    created_at       TEXT DEFAULT (datetime('now'))
);

CREATE INDEX IF NOT EXISTS idx_audit_record ON audit_log(table_name, record_id);
CREATE INDEX IF NOT EXISTS idx_audit_created ON audit_log(created_at);
crm-backup.py
6.7 KB
#!/usr/bin/env python3
"""
CRM Backup/Restore - Database backup and restore operations

Commands:
- backup: Create a timestamped backup of the database
- restore: Restore from a backup file
- list: List available backups
- prune: Remove old backups (keep N most recent)
"""

import argparse
import json
import os
import shutil
import sqlite3
from datetime import datetime
from pathlib import Path

DB_PATH = os.environ.get('CRM_DB', os.path.expanduser('~/.local/share/agent-crm/crm.db'))
BACKUP_DIR = os.environ.get('CRM_BACKUP_DIR', os.path.expanduser('~/.local/share/agent-crm/backups'))

def ensure_backup_dir():
    """Create backup directory if needed."""
    Path(BACKUP_DIR).mkdir(parents=True, exist_ok=True)

def get_backup_files() -> list[dict]:
    """Get list of backup files with metadata."""
    ensure_backup_dir()
    backups = []
    for f in Path(BACKUP_DIR).glob('crm_backup_*.db'):
        stat = f.stat()
        # Parse timestamp from filename
        try:
            ts_str = f.stem.replace('crm_backup_', '')
            ts = datetime.strptime(ts_str, '%Y%m%d_%H%M%S')
        except:
            ts = datetime.fromtimestamp(stat.st_mtime)
        
        backups.append({
            'path': str(f),
            'filename': f.name,
            'size_bytes': stat.st_size,
            'size_human': format_size(stat.st_size),
            'created_at': ts.isoformat(),
            'age_days': (datetime.now() - ts).days
        })
    
    return sorted(backups, key=lambda x: x['created_at'], reverse=True)

def format_size(bytes: int) -> str:
    """Format bytes as human-readable size."""
    for unit in ['B', 'KB', 'MB', 'GB']:
        if bytes < 1024:
            return f'{bytes:.1f} {unit}'
        bytes /= 1024
    return f'{bytes:.1f} TB'

def backup_database(note: str = None) -> dict:
    """Create a backup of the database."""
    if not Path(DB_PATH).exists():
        return {'error': 'Database not found', 'path': DB_PATH}
    
    ensure_backup_dir()
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    backup_path = os.path.join(BACKUP_DIR, f'crm_backup_{timestamp}.db')
    
    # Use SQLite backup API for consistency
    source = sqlite3.connect(DB_PATH)
    dest = sqlite3.connect(backup_path)
    source.backup(dest)
    source.close()
    dest.close()
    
    # Get stats
    stat = Path(backup_path).stat()
    
    result = {
        'status': 'success',
        'path': backup_path,
        'size': format_size(stat.st_size),
        'timestamp': timestamp
    }
    
    if note:
        # Save note alongside backup
        note_path = backup_path + '.note'
        with open(note_path, 'w') as f:
            f.write(note)
        result['note'] = note
    
    return result

def restore_database(backup_path: str, confirm: bool = False) -> dict:
    """Restore database from backup."""
    if not Path(backup_path).exists():
        return {'error': 'Backup file not found', 'path': backup_path}
    
    if not confirm:
        return {
            'status': 'confirmation_required',
            'message': 'This will overwrite the current database. Pass --confirm to proceed.',
            'backup_path': backup_path,
            'current_db': DB_PATH
        }
    
    # Create safety backup first
    if Path(DB_PATH).exists():
        safety_backup = backup_database('Pre-restore safety backup')
        if 'error' in safety_backup:
            return {'error': 'Failed to create safety backup', 'details': safety_backup}
    
    # Restore
    shutil.copy2(backup_path, DB_PATH)
    
    # Verify
    try:
        conn = sqlite3.connect(DB_PATH)
        conn.execute("SELECT COUNT(*) FROM contacts")
        conn.close()
    except Exception as e:
        return {'error': 'Restored database appears corrupt', 'details': str(e)}
    
    return {
        'status': 'success',
        'message': 'Database restored successfully',
        'restored_from': backup_path,
        'safety_backup': safety_backup.get('path') if Path(DB_PATH).exists() else None
    }

def list_backups() -> dict:
    """List all available backups."""
    backups = get_backup_files()
    
    # Load notes
    for b in backups:
        note_path = b['path'] + '.note'
        if Path(note_path).exists():
            b['note'] = Path(note_path).read_text().strip()
    
    return {
        'count': len(backups),
        'backups': backups,
        'backup_dir': BACKUP_DIR
    }

def prune_backups(keep: int = 10) -> dict:
    """Remove old backups, keeping N most recent."""
    backups = get_backup_files()
    
    if len(backups) <= keep:
        return {
            'status': 'success',
            'message': f'No pruning needed. {len(backups)} backups exist, keeping {keep}.',
            'removed': 0
        }
    
    to_remove = backups[keep:]
    removed = []
    
    for b in to_remove:
        try:
            Path(b['path']).unlink()
            # Also remove note if exists
            note_path = b['path'] + '.note'
            if Path(note_path).exists():
                Path(note_path).unlink()
            removed.append(b['filename'])
        except Exception as e:
            pass
    
    return {
        'status': 'success',
        'kept': keep,
        'removed': len(removed),
        'removed_files': removed
    }

def main():
    parser = argparse.ArgumentParser(description='CRM backup and restore')
    subparsers = parser.add_subparsers(dest='command', required=True)
    
    # backup
    p = subparsers.add_parser('backup', help='Create a backup')
    p.add_argument('--note', '-n', help='Note to attach to backup')
    
    # restore
    p = subparsers.add_parser('restore', help='Restore from backup')
    p.add_argument('path', nargs='?', help='Backup file path (uses latest if not specified)')
    p.add_argument('--confirm', action='store_true', help='Confirm overwrite')
    
    # list
    p = subparsers.add_parser('list', help='List backups')
    
    # prune
    p = subparsers.add_parser('prune', help='Remove old backups')
    p.add_argument('--keep', '-k', type=int, default=10, help='Number of backups to keep')
    
    args = parser.parse_args()
    
    if args.command == 'backup':
        result = backup_database(args.note)
    elif args.command == 'restore':
        path = args.path
        if not path:
            # Use latest backup
            backups = get_backup_files()
            if not backups:
                result = {'error': 'No backups found'}
            else:
                path = backups[0]['path']
                result = restore_database(path, args.confirm)
        else:
            result = restore_database(path, args.confirm)
    elif args.command == 'list':
        result = list_backups()
    elif args.command == 'prune':
        result = prune_backups(args.keep)
    
    print(json.dumps(result, indent=2))

if __name__ == '__main__':
    main()
crm-chart.py
18.6 KB
#!/usr/bin/env python3
"""
CRM Charts - Generate visual reports from CRM data

Charts:
- pipeline: Deal value by stage (horizontal bar)
- forecast: Expected closes by month (bar)
- activity: Interactions over time (line)
- funnel: Stage conversion funnel (funnel chart)

Output: PNG image suitable for sending via chat
"""

import argparse
import json
import os
import sqlite3
import subprocess
import sys
from datetime import datetime, timedelta
from pathlib import Path

SCRIPT_DIR = Path(__file__).parent.parent
VENV_DIR = SCRIPT_DIR / '.venv'
VENV_PYTHON = VENV_DIR / 'bin' / 'python'

def ensure_venv():
    """Ensure venv exists with matplotlib, re-exec if needed."""
    # If we're already in the venv, continue
    if sys.prefix != sys.base_prefix:
        return True
    
    # Check if venv exists
    if not VENV_PYTHON.exists():
        print(json.dumps({'status': 'installing', 'message': 'Setting up chart dependencies...'}))
        subprocess.run([sys.executable, '-m', 'venv', str(VENV_DIR)], check=True)
        subprocess.run([str(VENV_PYTHON), '-m', 'pip', 'install', '--quiet', 'matplotlib'], check=True)
    
    # Re-exec with venv python
    os.execv(str(VENV_PYTHON), [str(VENV_PYTHON)] + sys.argv)

# Ensure we're in venv before importing matplotlib
ensure_venv()

import matplotlib
matplotlib.use('Agg')  # Non-interactive backend
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
HAS_MATPLOTLIB = True

DB_PATH = os.environ.get('CRM_DB', os.path.expanduser('~/.local/share/agent-crm/crm.db'))
OUTPUT_DIR = os.environ.get('CRM_CHARTS_DIR', os.path.expanduser('~/.local/share/agent-crm/charts'))

# Color scheme
COLORS = {
    'lead': '#94a3b8',       # slate
    'qualified': '#60a5fa',  # blue
    'proposal': '#a78bfa',   # purple
    'negotiation': '#fbbf24', # amber
    'won': '#34d399',        # green
    'lost': '#f87171',       # red
    'primary': '#3b82f6',    # blue
    'secondary': '#8b5cf6',  # violet
    'accent': '#06b6d4',     # cyan
}

STAGE_ORDER = ['lead', 'qualified', 'proposal', 'negotiation', 'won', 'lost']

def get_db() -> sqlite3.Connection:
    """Get database connection."""
    if not Path(DB_PATH).exists():
        return None
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn

def ensure_output_dir():
    """Create output directory if needed."""
    Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True)

def format_currency(value):
    """Format as currency."""
    if value >= 1_000_000:
        return f'${value/1_000_000:.1f}M'
    elif value >= 1_000:
        return f'${value/1_000:.0f}K'
    else:
        return f'${value:.0f}'

def chart_pipeline(output_path: str = None) -> str:
    """Generate pipeline chart - deals by stage."""
    conn = get_db()
    if not conn:
        return None
    
    rows = conn.execute("""
        SELECT stage, COUNT(*) as count, COALESCE(SUM(value), 0) as total_value
        FROM deals
        WHERE stage NOT IN ('won', 'lost')
        GROUP BY stage
    """).fetchall()
    conn.close()
    
    if not rows:
        return None
    
    # Organize by stage order
    data = {r['stage']: {'count': r['count'], 'value': r['total_value']} for r in rows}
    stages = [s for s in STAGE_ORDER if s in data]
    values = [data[s]['value'] for s in stages]
    counts = [data[s]['count'] for s in stages]
    colors = [COLORS.get(s, COLORS['primary']) for s in stages]
    
    # Create chart
    fig, ax = plt.subplots(figsize=(10, 5))
    
    y_pos = range(len(stages))
    bars = ax.barh(y_pos, values, color=colors, height=0.6)
    
    # Labels
    ax.set_yticks(y_pos)
    ax.set_yticklabels([s.title() for s in stages], fontsize=12)
    ax.invert_yaxis()
    
    # Value labels on bars
    for i, (bar, value, count) in enumerate(zip(bars, values, counts)):
        width = bar.get_width()
        label = f'{format_currency(value)} ({count} deal{"s" if count != 1 else ""})'
        if width > max(values) * 0.3:
            ax.text(width - max(values) * 0.02, bar.get_y() + bar.get_height()/2,
                   label, ha='right', va='center', color='white', fontweight='bold', fontsize=11)
        else:
            ax.text(width + max(values) * 0.02, bar.get_y() + bar.get_height()/2,
                   label, ha='left', va='center', color='#1f2937', fontsize=11)
    
    # Styling
    ax.set_xlabel('Deal Value', fontsize=12)
    ax.set_title('Pipeline by Stage', fontsize=16, fontweight='bold', pad=20)
    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)
    ax.set_xlim(0, max(values) * 1.3 if values else 1)
    
    # Total annotation
    total = sum(values)
    ax.annotate(f'Total Pipeline: {format_currency(total)}',
                xy=(0.98, 0.02), xycoords='axes fraction',
                ha='right', va='bottom', fontsize=12, fontweight='bold',
                color=COLORS['primary'])
    
    plt.tight_layout()
    
    # Save
    ensure_output_dir()
    if not output_path:
        output_path = os.path.join(OUTPUT_DIR, f'pipeline_{datetime.now().strftime("%Y%m%d_%H%M%S")}.png')
    plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='white')
    plt.close()
    
    return output_path

def chart_forecast(months: int = 6, output_path: str = None) -> str:
    """Generate forecast chart - expected closes by month."""
    conn = get_db()
    if not conn:
        return None
    
    rows = conn.execute("""
        SELECT 
            strftime('%Y-%m', expected_close) as month,
            COUNT(*) as count,
            COALESCE(SUM(value), 0) as total_value,
            COALESCE(SUM(value * COALESCE(probability, 50) / 100.0), 0) as weighted_value
        FROM deals
        WHERE stage NOT IN ('won', 'lost')
        AND expected_close IS NOT NULL
        AND expected_close >= date('now')
        GROUP BY month
        ORDER BY month
        LIMIT ?
    """, (months,)).fetchall()
    conn.close()
    
    if not rows:
        return None
    
    months_list = [r['month'] for r in rows]
    values = [r['total_value'] for r in rows]
    weighted = [r['weighted_value'] for r in rows]
    counts = [r['count'] for r in rows]
    
    # Create chart
    fig, ax = plt.subplots(figsize=(10, 5))
    
    x = range(len(months_list))
    width = 0.35
    
    bars1 = ax.bar([i - width/2 for i in x], values, width, label='Total Value', color=COLORS['primary'], alpha=0.8)
    bars2 = ax.bar([i + width/2 for i in x], weighted, width, label='Weighted Value', color=COLORS['secondary'], alpha=0.8)
    
    # Labels
    ax.set_xticks(x)
    ax.set_xticklabels([datetime.strptime(m, '%Y-%m').strftime('%b %Y') for m in months_list], fontsize=11)
    
    # Value labels
    for bar, val, count in zip(bars1, values, counts):
        ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + max(values) * 0.02,
               f'{count}', ha='center', va='bottom', fontsize=10, color='#6b7280')
    
    # Styling
    ax.set_ylabel('Deal Value', fontsize=12)
    ax.set_title('Forecast: Expected Closes by Month', fontsize=16, fontweight='bold', pad=20)
    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)
    ax.legend(loc='upper right')
    
    # Format y-axis as currency
    ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: format_currency(x)))
    
    plt.tight_layout()
    
    # Save
    ensure_output_dir()
    if not output_path:
        output_path = os.path.join(OUTPUT_DIR, f'forecast_{datetime.now().strftime("%Y%m%d_%H%M%S")}.png')
    plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='white')
    plt.close()
    
    return output_path

def chart_activity(days: int = 30, output_path: str = None) -> str:
    """Generate activity chart - interactions over time."""
    conn = get_db()
    if not conn:
        return None
    
    since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
    
    rows = conn.execute("""
        SELECT 
            date(occurred_at) as day,
            type,
            COUNT(*) as count
        FROM interactions
        WHERE occurred_at >= ?
        GROUP BY day, type
        ORDER BY day
    """, (since,)).fetchall()
    conn.close()
    
    if not rows:
        return None
    
    # Organize data by day and type
    from collections import defaultdict
    daily = defaultdict(lambda: defaultdict(int))
    types = set()
    for r in rows:
        daily[r['day']][r['type']] = r['count']
        types.add(r['type'])
    
    days_list = sorted(daily.keys())
    types = sorted(types)
    
    # Create chart
    fig, ax = plt.subplots(figsize=(12, 5))
    
    type_colors = {
        'email': COLORS['primary'],
        'call': COLORS['secondary'],
        'meeting': COLORS['accent'],
        'note': '#94a3b8',
        'linkedin': '#0077b5',
        'text': '#25d366',
    }
    
    bottom = [0] * len(days_list)
    for t in types:
        values = [daily[d][t] for d in days_list]
        color = type_colors.get(t, '#6b7280')
        ax.bar(range(len(days_list)), values, bottom=bottom, label=t.title(), color=color, alpha=0.8)
        bottom = [b + v for b, v in zip(bottom, values)]
    
    # Labels
    # Show every Nth label to avoid crowding
    step = max(1, len(days_list) // 10)
    ax.set_xticks(range(0, len(days_list), step))
    ax.set_xticklabels([datetime.strptime(days_list[i], '%Y-%m-%d').strftime('%m/%d') 
                        for i in range(0, len(days_list), step)], fontsize=10)
    
    # Styling
    ax.set_ylabel('Interactions', fontsize=12)
    ax.set_title(f'Activity: Last {days} Days', fontsize=16, fontweight='bold', pad=20)
    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)
    ax.legend(loc='upper left', ncol=len(types))
    
    plt.tight_layout()
    
    # Save
    ensure_output_dir()
    if not output_path:
        output_path = os.path.join(OUTPUT_DIR, f'activity_{datetime.now().strftime("%Y%m%d_%H%M%S")}.png')
    plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='white')
    plt.close()
    
    return output_path

def chart_winloss(days: int = 90, output_path: str = None) -> str:
    """Generate win/loss chart."""
    conn = get_db()
    if not conn:
        return None
    
    since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
    
    rows = conn.execute("""
        SELECT 
            strftime('%Y-%m', closed_at) as month,
            stage,
            COUNT(*) as count,
            COALESCE(SUM(value), 0) as total_value
        FROM deals
        WHERE stage IN ('won', 'lost')
        AND closed_at >= ?
        GROUP BY month, stage
        ORDER BY month
    """, (since,)).fetchall()
    conn.close()
    
    if not rows:
        return None
    
    # Organize data
    from collections import defaultdict
    monthly = defaultdict(lambda: {'won': 0, 'lost': 0, 'won_count': 0, 'lost_count': 0})
    for r in rows:
        if r['stage'] == 'won':
            monthly[r['month']]['won'] = r['total_value']
            monthly[r['month']]['won_count'] = r['count']
        else:
            monthly[r['month']]['lost'] = r['total_value']
            monthly[r['month']]['lost_count'] = r['count']
    
    months_list = sorted(monthly.keys())
    won_values = [monthly[m]['won'] for m in months_list]
    lost_values = [monthly[m]['lost'] for m in months_list]
    
    # Create chart
    fig, ax = plt.subplots(figsize=(10, 5))
    
    x = range(len(months_list))
    width = 0.35
    
    bars1 = ax.bar([i - width/2 for i in x], won_values, width, label='Won', color=COLORS['won'])
    bars2 = ax.bar([i + width/2 for i in x], lost_values, width, label='Lost', color=COLORS['lost'])
    
    # Labels
    ax.set_xticks(x)
    ax.set_xticklabels([datetime.strptime(m, '%Y-%m').strftime('%b %Y') for m in months_list], fontsize=11)
    
    # Styling
    ax.set_ylabel('Deal Value', fontsize=12)
    ax.set_title('Win/Loss Analysis', fontsize=16, fontweight='bold', pad=20)
    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)
    ax.legend(loc='upper right')
    ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: format_currency(x)))
    
    # Win rate annotation
    total_won = sum(won_values)
    total_lost = sum(lost_values)
    if total_won + total_lost > 0:
        win_rate = total_won / (total_won + total_lost) * 100
        ax.annotate(f'Win Rate: {win_rate:.0f}%',
                    xy=(0.02, 0.98), xycoords='axes fraction',
                    ha='left', va='top', fontsize=12, fontweight='bold',
                    color=COLORS['won'])
    
    plt.tight_layout()
    
    # Save
    ensure_output_dir()
    if not output_path:
        output_path = os.path.join(OUTPUT_DIR, f'winloss_{datetime.now().strftime("%Y%m%d_%H%M%S")}.png')
    plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='white')
    plt.close()
    
    return output_path

def chart_summary(output_path: str = None) -> str:
    """Generate summary dashboard with multiple metrics."""
    conn = get_db()
    if not conn:
        return None
    
    # Get all metrics
    stats = {}
    
    # Pipeline by stage
    rows = conn.execute("""
        SELECT stage, COUNT(*) as count, COALESCE(SUM(value), 0) as value
        FROM deals WHERE stage NOT IN ('won', 'lost')
        GROUP BY stage
    """).fetchall()
    stats['pipeline'] = {r['stage']: {'count': r['count'], 'value': r['value']} for r in rows}
    
    # Won this month
    month_start = datetime.now().replace(day=1).strftime('%Y-%m-%d')
    row = conn.execute("""
        SELECT COUNT(*) as count, COALESCE(SUM(value), 0) as value
        FROM deals WHERE stage = 'won' AND closed_at >= ?
    """, (month_start,)).fetchone()
    stats['won_month'] = {'count': row['count'], 'value': row['value']}
    
    # Tasks
    row = conn.execute("""
        SELECT 
            COUNT(*) FILTER (WHERE completed_at IS NULL) as pending,
            COUNT(*) FILTER (WHERE completed_at IS NULL AND due_at < datetime('now')) as overdue
        FROM tasks
    """).fetchone()
    stats['tasks'] = {'pending': row['pending'], 'overdue': row['overdue']}
    
    # Contacts
    row = conn.execute("SELECT COUNT(*) as count FROM contacts").fetchone()
    stats['contacts'] = row['count']
    
    conn.close()
    
    # Create dashboard
    fig, axes = plt.subplots(2, 2, figsize=(12, 10))
    
    # Pipeline pie chart
    ax = axes[0, 0]
    if stats['pipeline']:
        stages = [s for s in STAGE_ORDER if s in stats['pipeline']]
        values = [stats['pipeline'][s]['value'] for s in stages]
        colors = [COLORS.get(s, '#6b7280') for s in stages]
        
        wedges, texts, autotexts = ax.pie(values, labels=[s.title() for s in stages], 
                                          colors=colors, autopct='%1.0f%%',
                                          pctdistance=0.75)
        ax.set_title('Pipeline Distribution', fontsize=14, fontweight='bold')
    else:
        ax.text(0.5, 0.5, 'No pipeline data', ha='center', va='center', fontsize=14)
        ax.set_title('Pipeline Distribution', fontsize=14, fontweight='bold')
    
    # Key metrics
    ax = axes[0, 1]
    ax.axis('off')
    
    total_pipeline = sum(d['value'] for d in stats['pipeline'].values())
    metrics_text = f"""
    πŸ“Š Key Metrics
    
    Pipeline Value: {format_currency(total_pipeline)}
    Open Deals: {sum(d['count'] for d in stats['pipeline'].values())}
    
    Won This Month: {format_currency(stats['won_month']['value'])}
    ({stats['won_month']['count']} deals)
    
    Contacts: {stats['contacts']}
    
    Tasks Pending: {stats['tasks']['pending']}
    Tasks Overdue: {stats['tasks']['overdue']}
    """
    ax.text(0.1, 0.9, metrics_text, transform=ax.transAxes, fontsize=13,
            verticalalignment='top', fontfamily='monospace',
            bbox=dict(boxstyle='round', facecolor='#f1f5f9', alpha=0.8))
    
    # Stage breakdown bar
    ax = axes[1, 0]
    if stats['pipeline']:
        stages = [s for s in STAGE_ORDER if s in stats['pipeline']]
        values = [stats['pipeline'][s]['value'] for s in stages]
        counts = [stats['pipeline'][s]['count'] for s in stages]
        colors = [COLORS.get(s, '#6b7280') for s in stages]
        
        bars = ax.barh(range(len(stages)), values, color=colors)
        ax.set_yticks(range(len(stages)))
        ax.set_yticklabels([s.title() for s in stages])
        ax.invert_yaxis()
        ax.set_title('Value by Stage', fontsize=14, fontweight='bold')
        ax.xaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: format_currency(x)))
        
        for bar, count in zip(bars, counts):
            ax.text(bar.get_width() + max(values) * 0.02, bar.get_y() + bar.get_height()/2,
                   f'{count}', va='center', fontsize=10)
    
    # Tasks status
    ax = axes[1, 1]
    task_labels = ['Pending', 'Overdue']
    task_values = [stats['tasks']['pending'] - stats['tasks']['overdue'], stats['tasks']['overdue']]
    task_colors = [COLORS['primary'], COLORS['lost']]
    
    if sum(task_values) > 0:
        ax.pie(task_values, labels=task_labels, colors=task_colors, autopct='%1.0f%%',
               startangle=90)
    else:
        ax.text(0.5, 0.5, 'No tasks', ha='center', va='center', fontsize=14)
    ax.set_title('Task Status', fontsize=14, fontweight='bold')
    
    plt.suptitle(f'CRM Dashboard β€” {datetime.now().strftime("%B %d, %Y")}', 
                 fontsize=18, fontweight='bold', y=1.02)
    plt.tight_layout()
    
    # Save
    ensure_output_dir()
    if not output_path:
        output_path = os.path.join(OUTPUT_DIR, f'summary_{datetime.now().strftime("%Y%m%d_%H%M%S")}.png')
    plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='white')
    plt.close()
    
    return output_path

def main():
    parser = argparse.ArgumentParser(description='Generate CRM charts')
    parser.add_argument('chart', choices=['pipeline', 'forecast', 'activity', 'winloss', 'summary'],
                       help='Chart type to generate')
    parser.add_argument('--output', '-o', help='Output file path')
    parser.add_argument('--days', '-d', type=int, default=30, help='Days to analyze (activity/winloss)')
    parser.add_argument('--months', '-m', type=int, default=6, help='Months to forecast')
    
    args = parser.parse_args()
    
    chart_funcs = {
        'pipeline': lambda: chart_pipeline(args.output),
        'forecast': lambda: chart_forecast(args.months, args.output),
        'activity': lambda: chart_activity(args.days, args.output),
        'winloss': lambda: chart_winloss(args.days, args.output),
        'summary': lambda: chart_summary(args.output),
    }
    
    output_path = chart_funcs[args.chart]()
    
    if output_path:
        print(json.dumps({'status': 'success', 'chart': args.chart, 'path': output_path}))
    else:
        print(json.dumps({'status': 'error', 'message': 'No data to chart or database not found'}))
        sys.exit(1)

if __name__ == '__main__':
    main()
crm-digest.py
10.0 KB
#!/usr/bin/env python3
"""
CRM Daily Digest - Summary of CRM activity and upcoming items

Generates a daily briefing with:
- Yesterday's activity
- Pipeline summary  
- Tasks due today/overdue
- Contacts needing follow-up
- Deals closing soon
"""

import argparse
import json
import os
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path

DB_PATH = os.environ.get('CRM_DB', os.path.expanduser('~/.local/share/agent-crm/crm.db'))

def get_db() -> sqlite3.Connection:
    """Get database connection."""
    if not Path(DB_PATH).exists():
        return None
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn

def generate_digest(lookback_days: int = 1, lookahead_days: int = 7) -> dict:
    """Generate the daily digest."""
    conn = get_db()
    if not conn:
        return {'error': 'Database not found', 'path': DB_PATH}
    
    now = datetime.now()
    yesterday = (now - timedelta(days=lookback_days)).isoformat()
    week_ahead = (now + timedelta(days=lookahead_days)).isoformat()
    today_start = now.replace(hour=0, minute=0, second=0).isoformat()
    today_end = now.replace(hour=23, minute=59, second=59).isoformat()
    
    digest = {
        'generated_at': now.isoformat(),
        'period': {
            'lookback_days': lookback_days,
            'lookahead_days': lookahead_days
        }
    }
    
    # Recent activity
    activity = {}
    
    # New contacts
    rows = conn.execute("""
        SELECT COUNT(*) as count FROM contacts WHERE created_at >= ?
    """, (yesterday,)).fetchone()
    activity['new_contacts'] = rows['count']
    
    # New deals
    rows = conn.execute("""
        SELECT COUNT(*) as count, SUM(value) as total_value 
        FROM deals WHERE created_at >= ?
    """, (yesterday,)).fetchone()
    activity['new_deals'] = rows['count']
    activity['new_deal_value'] = rows['total_value'] or 0
    
    # Interactions logged
    rows = conn.execute("""
        SELECT type, COUNT(*) as count FROM interactions 
        WHERE logged_at >= ?
        GROUP BY type
    """, (yesterday,)).fetchall()
    activity['interactions'] = {r['type']: r['count'] for r in rows}
    activity['total_interactions'] = sum(r['count'] for r in rows)
    
    # Tasks completed
    rows = conn.execute("""
        SELECT COUNT(*) as count FROM tasks WHERE completed_at >= ?
    """, (yesterday,)).fetchone()
    activity['tasks_completed'] = rows['count']
    
    # Deal stage changes
    rows = conn.execute("""
        SELECT new_values, old_values FROM audit_log 
        WHERE table_name = 'deals' AND action = 'UPDATE' AND created_at >= ?
    """, (yesterday,)).fetchall()
    stage_changes = []
    for r in rows:
        try:
            old = json.loads(r['old_values']) if r['old_values'] else {}
            new = json.loads(r['new_values']) if r['new_values'] else {}
            if 'stage' in new and old.get('stage') != new.get('stage'):
                stage_changes.append({'from': old.get('stage'), 'to': new['stage']})
        except:
            pass
    activity['deal_stage_changes'] = stage_changes
    
    digest['recent_activity'] = activity
    
    # Pipeline summary
    rows = conn.execute("""
        SELECT stage, COUNT(*) as count, SUM(value) as total_value
        FROM deals WHERE stage NOT IN ('won', 'lost')
        GROUP BY stage
        ORDER BY CASE stage 
            WHEN 'lead' THEN 1
            WHEN 'qualified' THEN 2
            WHEN 'proposal' THEN 3
            WHEN 'negotiation' THEN 4
        END
    """).fetchall()
    
    pipeline = {
        'stages': [dict(r) for r in rows],
        'total_deals': sum(r['count'] for r in rows),
        'total_value': sum(r['total_value'] or 0 for r in rows)
    }
    
    # Weighted pipeline
    weighted = conn.execute("""
        SELECT SUM(value * COALESCE(probability, 50) / 100.0) as weighted
        FROM deals WHERE stage NOT IN ('won', 'lost')
    """).fetchone()
    pipeline['weighted_value'] = weighted['weighted'] or 0
    
    digest['pipeline'] = pipeline
    
    # Tasks due today
    rows = conn.execute("""
        SELECT t.*, c.name as contact_name FROM tasks t
        LEFT JOIN contacts c ON t.contact_id = c.id
        WHERE t.completed_at IS NULL 
        AND t.due_at >= ? AND t.due_at <= ?
        ORDER BY t.priority DESC, t.due_at ASC
    """, (today_start, today_end)).fetchall()
    digest['tasks_due_today'] = [dict(r) for r in rows]
    
    # Overdue tasks
    rows = conn.execute("""
        SELECT t.*, c.name as contact_name FROM tasks t
        LEFT JOIN contacts c ON t.contact_id = c.id
        WHERE t.completed_at IS NULL AND t.due_at < ?
        ORDER BY t.due_at ASC
        LIMIT 10
    """, (today_start,)).fetchall()
    digest['overdue_tasks'] = [dict(r) for r in rows]
    
    # Deals closing soon
    rows = conn.execute("""
        SELECT d.*, c.name as contact_name FROM deals d
        LEFT JOIN contacts c ON d.contact_id = c.id
        WHERE d.stage NOT IN ('won', 'lost')
        AND d.expected_close <= ?
        ORDER BY d.expected_close ASC
        LIMIT 5
    """, (week_ahead,)).fetchall()
    digest['deals_closing_soon'] = [dict(r) for r in rows]
    
    # Contacts needing follow-up (no interaction in 14+ days)
    stale_date = (now - timedelta(days=14)).isoformat()
    rows = conn.execute("""
        SELECT c.*, MAX(i.occurred_at) as last_interaction
        FROM contacts c
        LEFT JOIN interactions i ON c.id = i.contact_id
        GROUP BY c.id
        HAVING last_interaction < ? OR last_interaction IS NULL
        ORDER BY last_interaction ASC
        LIMIT 10
    """, (stale_date,)).fetchall()
    digest['needs_followup'] = [dict(r) for r in rows]
    
    # Won deals this month
    month_start = now.replace(day=1, hour=0, minute=0, second=0).isoformat()
    rows = conn.execute("""
        SELECT SUM(value) as total, COUNT(*) as count FROM deals
        WHERE stage = 'won' AND closed_at >= ?
    """, (month_start,)).fetchone()
    digest['won_this_month'] = {
        'count': rows['count'] or 0,
        'value': rows['total'] or 0
    }
    
    conn.close()
    return digest

def format_digest_text(digest: dict) -> str:
    """Format digest as human-readable text."""
    if 'error' in digest:
        return f"❌ {digest['error']}"
    
    lines = []
    lines.append(f"πŸ“Š **CRM Digest** β€” {datetime.now().strftime('%B %d, %Y')}")
    lines.append("")
    
    # Recent activity
    act = digest['recent_activity']
    if any([act['new_contacts'], act['total_interactions'], act['tasks_completed']]):
        lines.append("**Recent Activity:**")
        if act['new_contacts']:
            lines.append(f"β€’ {act['new_contacts']} new contact(s)")
        if act['new_deals']:
            lines.append(f"β€’ {act['new_deals']} new deal(s) (${act['new_deal_value']:,.0f})")
        if act['total_interactions']:
            types = ', '.join(f"{v} {k}" for k, v in act['interactions'].items())
            lines.append(f"β€’ {act['total_interactions']} interaction(s) logged ({types})")
        if act['tasks_completed']:
            lines.append(f"β€’ {act['tasks_completed']} task(s) completed")
        if act['deal_stage_changes']:
            for change in act['deal_stage_changes']:
                lines.append(f"β€’ Deal moved: {change['from']} β†’ {change['to']}")
        lines.append("")
    
    # Pipeline
    pipe = digest['pipeline']
    if pipe['total_deals']:
        lines.append("**Pipeline:**")
        for stage in pipe['stages']:
            val = f"${stage['total_value']:,.0f}" if stage['total_value'] else "$0"
            lines.append(f"β€’ {stage['stage'].title()}: {stage['count']} deal(s) ({val})")
        lines.append(f"β€’ **Total:** {pipe['total_deals']} deals, ${pipe['total_value']:,.0f}")
        lines.append(f"β€’ **Weighted:** ${pipe['weighted_value']:,.0f}")
        lines.append("")
    
    # Tasks
    if digest['overdue_tasks']:
        lines.append("**⚠️ Overdue Tasks:**")
        for task in digest['overdue_tasks'][:5]:
            contact = f" ({task['contact_name']})" if task['contact_name'] else ""
            lines.append(f"β€’ {task['title']}{contact}")
        lines.append("")
    
    if digest['tasks_due_today']:
        lines.append("**Today's Tasks:**")
        for task in digest['tasks_due_today']:
            contact = f" ({task['contact_name']})" if task['contact_name'] else ""
            lines.append(f"β€’ {task['title']}{contact}")
        lines.append("")
    
    # Deals closing soon
    if digest['deals_closing_soon']:
        lines.append("**Deals Closing Soon:**")
        for deal in digest['deals_closing_soon']:
            val = f"${deal['value']:,.0f}" if deal['value'] else "TBD"
            contact = f" - {deal['contact_name']}" if deal['contact_name'] else ""
            lines.append(f"β€’ {deal['title']} ({val}){contact} β€” {deal['expected_close']}")
        lines.append("")
    
    # Follow-ups needed
    if digest['needs_followup']:
        lines.append("**Needs Follow-up (14+ days):**")
        for contact in digest['needs_followup'][:5]:
            company = f" @ {contact['company']}" if contact['company'] else ""
            lines.append(f"β€’ {contact['name']}{company}")
        lines.append("")
    
    # Won this month
    won = digest['won_this_month']
    if won['count']:
        lines.append(f"**Won This Month:** {won['count']} deal(s) β€” ${won['value']:,.0f} πŸŽ‰")
    
    if len(lines) <= 2:
        lines.append("No activity to report. Database may be empty.")
    
    return '\n'.join(lines)

def main():
    parser = argparse.ArgumentParser(description='Generate CRM daily digest')
    parser.add_argument('--json', action='store_true', help='Output as JSON')
    parser.add_argument('--lookback', '-l', type=int, default=1, help='Days to look back')
    parser.add_argument('--lookahead', '-a', type=int, default=7, help='Days to look ahead')
    
    args = parser.parse_args()
    
    digest = generate_digest(args.lookback, args.lookahead)
    
    if args.json:
        print(json.dumps(digest, indent=2))
    else:
        print(format_digest_text(digest))

if __name__ == '__main__':
    main()
crm-export.py
6.8 KB
#!/usr/bin/env python3
"""
CRM Export - Export data to CSV/JSON formats

Exports:
- contacts: All contacts
- deals: All deals with contact info
- interactions: All interactions
- tasks: All tasks
- all: Complete database dump
"""

import argparse
import csv
import json
import os
import sqlite3
from datetime import datetime
from pathlib import Path

DB_PATH = os.environ.get('CRM_DB', os.path.expanduser('~/.local/share/agent-crm/crm.db'))
EXPORT_DIR = os.environ.get('CRM_EXPORT_DIR', os.path.expanduser('~/.local/share/agent-crm/exports'))

def get_db() -> sqlite3.Connection:
    """Get database connection."""
    if not Path(DB_PATH).exists():
        return None
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn

def ensure_export_dir():
    """Create export directory if needed."""
    Path(EXPORT_DIR).mkdir(parents=True, exist_ok=True)

def export_table(conn, table: str, query: str = None) -> list[dict]:
    """Export a table to list of dicts."""
    if query is None:
        query = f"SELECT * FROM {table}"
    rows = conn.execute(query).fetchall()
    return [dict(r) for r in rows]

def to_csv(data: list[dict], filepath: str):
    """Write data to CSV."""
    if not data:
        return
    with open(filepath, 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=data[0].keys())
        writer.writeheader()
        writer.writerows(data)

def to_json(data: list[dict], filepath: str):
    """Write data to JSON."""
    with open(filepath, 'w') as f:
        json.dump(data, f, indent=2, default=str)

def export_contacts(conn, format: str, output_dir: str) -> str:
    """Export contacts."""
    data = export_table(conn, 'contacts')
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    
    if format == 'csv':
        filepath = os.path.join(output_dir, f'contacts_{timestamp}.csv')
        to_csv(data, filepath)
    else:
        filepath = os.path.join(output_dir, f'contacts_{timestamp}.json')
        to_json(data, filepath)
    
    return filepath

def export_deals(conn, format: str, output_dir: str) -> str:
    """Export deals with contact info."""
    query = """
        SELECT d.*, c.name as contact_name, c.email as contact_email, c.company as contact_company
        FROM deals d
        LEFT JOIN contacts c ON d.contact_id = c.id
        ORDER BY d.created_at DESC
    """
    data = export_table(conn, 'deals', query)
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    
    if format == 'csv':
        filepath = os.path.join(output_dir, f'deals_{timestamp}.csv')
        to_csv(data, filepath)
    else:
        filepath = os.path.join(output_dir, f'deals_{timestamp}.json')
        to_json(data, filepath)
    
    return filepath

def export_interactions(conn, format: str, output_dir: str) -> str:
    """Export interactions with contact info."""
    query = """
        SELECT i.*, c.name as contact_name, c.company as contact_company
        FROM interactions i
        LEFT JOIN contacts c ON i.contact_id = c.id
        ORDER BY i.occurred_at DESC
    """
    data = export_table(conn, 'interactions', query)
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    
    if format == 'csv':
        filepath = os.path.join(output_dir, f'interactions_{timestamp}.csv')
        to_csv(data, filepath)
    else:
        filepath = os.path.join(output_dir, f'interactions_{timestamp}.json')
        to_json(data, filepath)
    
    return filepath

def export_tasks(conn, format: str, output_dir: str) -> str:
    """Export tasks with contact info."""
    query = """
        SELECT t.*, c.name as contact_name, d.title as deal_title
        FROM tasks t
        LEFT JOIN contacts c ON t.contact_id = c.id
        LEFT JOIN deals d ON t.deal_id = d.id
        ORDER BY t.due_at ASC
    """
    data = export_table(conn, 'tasks', query)
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    
    if format == 'csv':
        filepath = os.path.join(output_dir, f'tasks_{timestamp}.csv')
        to_csv(data, filepath)
    else:
        filepath = os.path.join(output_dir, f'tasks_{timestamp}.json')
        to_json(data, filepath)
    
    return filepath

def export_all(conn, format: str, output_dir: str) -> dict:
    """Export everything."""
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    
    if format == 'json':
        # Single JSON file with all data
        data = {
            'exported_at': datetime.now().isoformat(),
            'contacts': export_table(conn, 'contacts'),
            'deals': export_table(conn, 'deals', """
                SELECT d.*, c.name as contact_name 
                FROM deals d LEFT JOIN contacts c ON d.contact_id = c.id
            """),
            'interactions': export_table(conn, 'interactions', """
                SELECT i.*, c.name as contact_name 
                FROM interactions i LEFT JOIN contacts c ON i.contact_id = c.id
            """),
            'tasks': export_table(conn, 'tasks', """
                SELECT t.*, c.name as contact_name 
                FROM tasks t LEFT JOIN contacts c ON t.contact_id = c.id
            """),
            'audit_log': export_table(conn, 'audit_log')
        }
        filepath = os.path.join(output_dir, f'crm_export_{timestamp}.json')
        to_json(data, filepath)
        return {'format': 'json', 'path': filepath}
    else:
        # Multiple CSV files
        files = []
        files.append(export_contacts(conn, 'csv', output_dir))
        files.append(export_deals(conn, 'csv', output_dir))
        files.append(export_interactions(conn, 'csv', output_dir))
        files.append(export_tasks(conn, 'csv', output_dir))
        return {'format': 'csv', 'files': files}

def main():
    parser = argparse.ArgumentParser(description='Export CRM data')
    parser.add_argument('what', choices=['contacts', 'deals', 'interactions', 'tasks', 'all'],
                       help='What to export')
    parser.add_argument('--format', '-f', choices=['csv', 'json'], default='json',
                       help='Export format')
    parser.add_argument('--output', '-o', help='Output directory')
    
    args = parser.parse_args()
    
    conn = get_db()
    if not conn:
        print(json.dumps({'error': 'Database not found', 'path': DB_PATH}))
        return
    
    output_dir = args.output or EXPORT_DIR
    ensure_export_dir()
    Path(output_dir).mkdir(parents=True, exist_ok=True)
    
    exporters = {
        'contacts': export_contacts,
        'deals': export_deals,
        'interactions': export_interactions,
        'tasks': export_tasks,
        'all': export_all,
    }
    
    result = exporters[args.what](conn, args.format, output_dir)
    conn.close()
    
    if isinstance(result, dict):
        print(json.dumps({'status': 'success', **result}))
    else:
        print(json.dumps({'status': 'success', 'path': result}))

if __name__ == '__main__':
    main()
crm-ingest.py
16.4 KB
#!/usr/bin/env python3
"""
CRM Ingest - Parse unstructured text into CRM actions

Takes emails, meeting notes, voice transcripts and extracts:
- New contacts
- Interactions to log
- Deal updates
- Tasks to create

Outputs a JSON plan for agent review before execution.
"""

import argparse
import json
import re
import sys
from datetime import datetime
from pathlib import Path

def extract_emails(text: str) -> list[str]:
    """Extract email addresses from text."""
    pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
    return list(set(re.findall(pattern, text)))

def extract_phones(text: str) -> list[str]:
    """Extract phone numbers from text."""
    patterns = [
        r'\+?1?[-.\s]?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}',
        r'\+[0-9]{1,3}[-.\s]?[0-9]{6,14}'
    ]
    phones = []
    for pattern in patterns:
        phones.extend(re.findall(pattern, text))
    return list(set(phones))

def extract_money(text: str) -> list[dict]:
    """Extract monetary amounts."""
    patterns = [
        (r'\$([0-9,]+(?:\.[0-9]{2})?)\s*(?:k|K)', lambda m: float(m.group(1).replace(',', '')) * 1000),
        (r'\$([0-9,]+(?:\.[0-9]{2})?)\s*(?:m|M)', lambda m: float(m.group(1).replace(',', '')) * 1000000),
        (r'\$([0-9,]+(?:\.[0-9]{2})?)', lambda m: float(m.group(1).replace(',', ''))),
        (r'([0-9,]+(?:\.[0-9]{2})?)\s*(?:dollars|USD)', lambda m: float(m.group(1).replace(',', ''))),
    ]
    amounts = []
    for pattern, converter in patterns:
        for match in re.finditer(pattern, text, re.IGNORECASE):
            amounts.append({
                'raw': match.group(0),
                'value': converter(match),
                'currency': 'USD'
            })
    return amounts

def extract_dates(text: str) -> list[str]:
    """Extract date references."""
    patterns = [
        r'(?:next|this)\s+(?:monday|tuesday|wednesday|thursday|friday|saturday|sunday)',
        r'(?:next|this)\s+week',
        r'(?:next|this)\s+month',
        r'(?:in\s+)?[0-9]+\s+(?:day|week|month)s?(?:\s+from\s+now)?',
        r'(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)[a-z]*\s+[0-9]{1,2}(?:st|nd|rd|th)?(?:\s*,?\s*[0-9]{4})?',
        r'[0-9]{1,2}/[0-9]{1,2}(?:/[0-9]{2,4})?',
        r'tomorrow',
        r'today',
        r'end of (?:week|month|quarter|year)',
    ]
    dates = []
    for pattern in patterns:
        for match in re.finditer(pattern, text, re.IGNORECASE):
            dates.append(match.group(0).strip())
    return list(set(dates))

def extract_names(text: str) -> list[dict]:
    """Extract potential person names with context."""
    names = []
    seen = set()
    
    # Common false positives to filter out
    stop_words = {
        'I', 'We', 'They', 'He', 'She', 'The', 'This', 'That', 'What', 'When', 
        'Where', 'How', 'If', 'But', 'And', 'For', 'Just', 'Also', 'Very', 
        'Really', 'Thanks', 'Thank', 'Hello', 'Hi', 'Hey', 'Best', 'Regards',
        'Sincerely', 'Cheers', 'Dear', 'To', 'From', 'Subject', 'Re', 'Fwd'
    }
    
    def add_name(name: str, role: str = None, company: str = None):
        name = name.strip()
        if not name or name in stop_words or len(name) <= 2:
            return
        # Filter out single words that are likely not names
        if ' ' not in name and len(name) < 4:
            return
        
        # If we've seen this name, update with new info (role/company)
        if name in seen:
            for entry in names:
                if entry['name'] == name:
                    if role and 'role' not in entry:
                        entry['role'] = role
                    if company and 'company' not in entry:
                        entry['company'] = company
            return
        
        entry = {'name': name}
        if role:
            entry['role'] = role
        if company:
            entry['company'] = company
        names.append(entry)
        seen.add(name)
    
    # Pattern 1: Name followed by "Title, Company" on next line (most specific first)
    # "Alex Rivera\nFounder, DataStack"
    title_company_pattern = r'^([A-Z][a-z]+\s+[A-Z][a-z]+)\s*\n\s*(CEO|CTO|COO|CFO|VP|Director|Manager|Founder|Co-Founder|President|Head of [A-Za-z]+|[A-Z][a-z]+ Engineer|[A-Z][a-z]+ Manager)[,\s]+([A-Z][A-Za-z]+(?:\s+[A-Z][A-Za-z]+)?)'
    for match in re.finditer(title_company_pattern, text, re.MULTILINE):
        name = match.group(1)
        role = match.group(2).strip()
        company = match.group(3).strip()
        add_name(name, role=role, company=company)
    
    # Pattern 1b: Name followed by just title on next line (no company)
    title_pattern = r'^([A-Z][a-z]+\s+[A-Z][a-z]+)\s*\n\s*(CEO|CTO|COO|CFO|VP|Director|Manager|Founder|Co-Founder|President|Head of [A-Za-z]+)\s*$'
    for match in re.finditer(title_pattern, text, re.MULTILINE):
        name = match.group(1)
        role = match.group(2).strip()
        add_name(name, role=role)
    
    # Pattern 2: Email signature - name on line after closing
    # "Best,\nAlex Rivera" or "Thanks,\nJohn Smith"
    sig_pattern = r'(?:Best|Thanks|Regards|Cheers|Sincerely|Warmly|Yours)[,.]?\s*\n+([A-Z][a-z]+\s+[A-Z][a-z]+)'
    for match in re.finditer(sig_pattern, text, re.MULTILINE):
        add_name(match.group(1))
    
    # Pattern 3: "Name, Title at Company" or "Name, Title"
    inline_title = r'([A-Z][a-z]+\s+[A-Z][a-z]+)\s*,\s*(CEO|CTO|COO|CFO|VP|Director|Manager|Founder|Co-Founder|President|Head of [A-Za-z]+|[A-Z][a-z]+ Engineer|[A-Z][a-z]+ Manager)(?:\s+(?:at|@)\s+([A-Z][A-Za-z]+))?'
    for match in re.finditer(inline_title, text):
        name = match.group(1)
        role = match.group(2)
        company = match.group(3) if len(match.groups()) > 2 else None
        add_name(name, role=role, company=company)
    
    # Pattern 4: "talked to/met with/spoke with Name"
    action_pattern = r'(?:talked to|met with|spoke with|call with|meeting with|heard from|email from|message from)\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)?)'
    for match in re.finditer(action_pattern, text, re.IGNORECASE):
        add_name(match.group(1))
    
    # Pattern 5: "Name from/at Company" - full two-word name
    company_pattern = r'([A-Z][a-z]+\s+[A-Z][a-z]+)\s+(?:from|at|with|@)\s+([A-Z][A-Za-z]+(?:\s+[A-Z][A-Za-z]+)?)'
    for match in re.finditer(company_pattern, text):
        name = match.group(1)
        company = match.group(2)
        add_name(name, company=company)
    
    # Pattern 5b: "FirstName from/at Company" - single name with company context
    company_pattern_single = r'\b([A-Z][a-z]+)\s+(?:from|at|with|@)\s+([A-Z][A-Za-z]+(?:\s+[A-Z][A-Za-z]+)?)'
    for match in re.finditer(company_pattern_single, text):
        name = match.group(1)
        company = match.group(2)
        if name not in stop_words and len(name) >= 4:
            add_name(name, company=company)
    
    # Pattern 6: Email "From:" header with name
    from_pattern = r'From:\s*(?:"?([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)"?\s*<|([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)\s+[<\[])'
    for match in re.finditer(from_pattern, text):
        name = match.group(1) or match.group(2)
        if name:
            add_name(name)
    
    # Pattern 7: Standalone two-word capitalized name on its own line (likely signature)
    standalone = r'^([A-Z][a-z]+\s+[A-Z][a-z]+)\s*$'
    lines = text.split('\n')
    for i, line in enumerate(lines):
        match = re.match(standalone, line.strip())
        if match:
            name = match.group(1)
            # Check if next line looks like a title or company
            if i + 1 < len(lines):
                next_line = lines[i + 1].strip()
                if re.match(r'^(CEO|CTO|COO|CFO|VP|Director|Founder|Manager|Engineer|Head|President)', next_line):
                    add_name(name, role=next_line.split(',')[0].strip())
                elif re.match(r'^[A-Z]', next_line) and len(next_line) < 50:
                    add_name(name, company=next_line.split(',')[0].strip())
                else:
                    add_name(name)
            else:
                add_name(name)
    
    return names

def extract_companies(text: str) -> list[str]:
    """Extract company names."""
    # Look for patterns like "at Acme Corp" or "from TechCo"
    patterns = [
        r'(?:at|from|with|for)\s+([A-Z][a-zA-Z]+(?:\s+(?:Corp|Inc|LLC|Ltd|Co|Labs?|AI|Tech|Software|Systems|Solutions))?)',
        r'([A-Z][a-zA-Z]+(?:\.(?:com|io|ai|co)))',
    ]
    
    companies = []
    seen = set()
    # Common false positives
    stop_words = {'I', 'We', 'They', 'He', 'She', 'The', 'This', 'That', 'What', 'When', 'Where', 'How', 'If', 'But', 'And', 'For', 'Just', 'Also', 'Very', 'Really', 'Thanks', 'Thank', 'Hello', 'Hi', 'Hey'}
    
    for pattern in patterns:
        for match in re.finditer(pattern, text):
            company = match.group(1).strip()
            if company not in seen and company not in stop_words and len(company) > 2:
                companies.append(company)
                seen.add(company)
    
    return companies

def detect_interaction_type(text: str) -> str:
    """Detect the type of interaction from text."""
    text_lower = text.lower()
    
    if any(x in text_lower for x in ['subject:', 'from:', 'to:', 're:', 'fwd:']):
        return 'email'
    if any(x in text_lower for x in ['called', 'call with', 'phone call', 'spoke with', 'talked to']):
        return 'call'
    if any(x in text_lower for x in ['meeting', 'met with', 'conference', 'zoom', 'teams']):
        return 'meeting'
    if any(x in text_lower for x in ['linkedin', 'connected on']):
        return 'linkedin'
    if any(x in text_lower for x in ['texted', 'sms', 'imessage', 'whatsapp']):
        return 'text'
    
    return 'note'

def detect_deal_signals(text: str) -> dict:
    """Detect signals about deals."""
    text_lower = text.lower()
    
    signals = {
        'stage_hints': [],
        'positive': [],
        'negative': [],
        'actions': []
    }
    
    # Stage hints
    if any(x in text_lower for x in ['interested', 'wants to learn more', 'requested demo']):
        signals['stage_hints'].append('qualified')
    if any(x in text_lower for x in ['sent proposal', 'sending quote', 'pricing']):
        signals['stage_hints'].append('proposal')
    if any(x in text_lower for x in ['negotiating', 'reviewing contract', 'legal review', 'redlines']):
        signals['stage_hints'].append('negotiation')
    if any(x in text_lower for x in ['signed', 'closed', 'won', 'agreed', 'confirmed']):
        signals['stage_hints'].append('won')
    if any(x in text_lower for x in ['passed', 'declined', 'lost', 'went with competitor', 'no budget']):
        signals['stage_hints'].append('lost')
    
    # Positive signals
    if any(x in text_lower for x in ['excited', 'very interested', 'love it', 'great fit', 'impressed']):
        signals['positive'].append('high_interest')
    if any(x in text_lower for x in ['decision maker', 'can approve', 'has budget']):
        signals['positive'].append('authority')
    
    # Negative signals  
    if any(x in text_lower for x in ['not a priority', 'maybe later', 'next quarter', 'no budget']):
        signals['negative'].append('timing_issue')
    if any(x in text_lower for x in ['need to check', 'run it by', 'get approval']):
        signals['negative'].append('no_authority')
    
    # Follow-up actions
    if any(x in text_lower for x in ['will send', 'sending', 'follow up', 'get back to']):
        signals['actions'].append('follow_up_needed')
    if any(x in text_lower for x in ['schedule', 'set up a', 'book a']):
        signals['actions'].append('meeting_to_schedule')
    
    return signals

def extract_tasks(text: str) -> list[dict]:
    """Extract potential tasks/action items."""
    tasks = []
    
    # Common action patterns
    patterns = [
        r'(?:need to|should|will|must|have to)\s+([^.!?\n]+)',
        r'(?:follow up|send|schedule|call|email|review|check)\s+([^.!?\n]+)',
        r'(?:action item|todo|task):\s*([^.!?\n]+)',
        r'(?:by|before|due)\s+((?:monday|tuesday|wednesday|thursday|friday|next week|tomorrow|[a-z]+ [0-9]+)[^.!?\n]*)',
    ]
    
    for pattern in patterns:
        for match in re.finditer(pattern, text, re.IGNORECASE):
            task_text = match.group(1).strip()
            if len(task_text) > 10 and len(task_text) < 200:
                tasks.append({
                    'title': task_text[:100],
                    'source': match.group(0)
                })
    
    return tasks[:5]  # Limit to 5 tasks

def parse_email(text: str) -> dict:
    """Parse email-specific structure."""
    result = {
        'from': None,
        'to': None,
        'subject': None,
        'date': None,
        'body': text
    }
    
    lines = text.split('\n')
    body_start = 0
    
    for i, line in enumerate(lines):
        if line.lower().startswith('from:'):
            result['from'] = line[5:].strip()
            body_start = i + 1
        elif line.lower().startswith('to:'):
            result['to'] = line[3:].strip()
            body_start = i + 1
        elif line.lower().startswith('subject:'):
            result['subject'] = line[8:].strip()
            body_start = i + 1
        elif line.lower().startswith('date:'):
            result['date'] = line[5:].strip()
            body_start = i + 1
        elif line.strip() == '' and body_start > 0:
            result['body'] = '\n'.join(lines[i+1:])
            break
    
    return result

def ingest(text: str, source_type: str = 'auto') -> dict:
    """Main ingestion function - extract all structured data from text."""
    
    if source_type == 'auto':
        source_type = detect_interaction_type(text)
    
    # Parse email structure if applicable
    email_data = None
    if source_type == 'email':
        email_data = parse_email(text)
    
    # Extract all entities
    plan = {
        'source_type': source_type,
        'extracted_at': datetime.now().isoformat(),
        'raw_length': len(text),
        
        'contacts': {
            'names': extract_names(text),
            'emails': extract_emails(text),
            'phones': extract_phones(text),
            'companies': extract_companies(text)
        },
        
        'interaction': {
            'type': source_type,
            'direction': 'inbound' if email_data and email_data.get('to') else 'outbound',
            'summary': None,  # Agent should generate this
            'occurred_at': email_data.get('date') if email_data else 'today'
        },
        
        'deal_signals': detect_deal_signals(text),
        'money': extract_money(text),
        'dates': extract_dates(text),
        'potential_tasks': extract_tasks(text),
        
        'email_metadata': email_data,
        
        'suggested_actions': []
    }
    
    # Generate suggested actions
    if plan['contacts']['names']:
        for name_info in plan['contacts']['names']:
            plan['suggested_actions'].append({
                'action': 'create_or_update_contact',
                'data': name_info
            })
    
    if plan['money']:
        plan['suggested_actions'].append({
            'action': 'create_or_update_deal',
            'data': {
                'value': plan['money'][0]['value'],
                'signals': plan['deal_signals']
            }
        })
    
    if plan['deal_signals']['stage_hints']:
        plan['suggested_actions'].append({
            'action': 'update_deal_stage',
            'data': {
                'suggested_stage': plan['deal_signals']['stage_hints'][0]
            }
        })
    
    plan['suggested_actions'].append({
        'action': 'log_interaction',
        'data': {
            'type': source_type,
            'needs_summary': True
        }
    })
    
    for task in plan['potential_tasks'][:3]:
        plan['suggested_actions'].append({
            'action': 'create_task',
            'data': task
        })
    
    return plan

def main():
    parser = argparse.ArgumentParser(description='Parse unstructured text into CRM actions')
    parser.add_argument('--type', '-t', choices=['auto', 'email', 'call', 'meeting', 'note'],
                       default='auto', help='Source type')
    parser.add_argument('--file', '-f', help='Read from file instead of stdin')
    parser.add_argument('--text', help='Text to parse (alternative to stdin/file)')
    
    args = parser.parse_args()
    
    # Get input text
    if args.text:
        text = args.text
    elif args.file:
        text = Path(args.file).read_text()
    else:
        text = sys.stdin.read()
    
    if not text.strip():
        print(json.dumps({'error': 'No input text provided'}))
        sys.exit(1)
    
    result = ingest(text, args.type)
    print(json.dumps(result, indent=2))

if __name__ == '__main__':
    main()
crm-notify.py
9.3 KB
#!/usr/bin/env python3
"""
CRM Notify - Check for items needing attention

Returns actionable alerts:
- Overdue tasks
- Tasks due today
- Deals closing soon (within N days)
- Stale contacts (no interaction in N days)
- Deals stuck in stage too long

Designed to be run from heartbeat/cron and output alerts for the agent to act on.
"""

import argparse
import json
import os
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path

DB_PATH = os.environ.get('CRM_DB', os.path.expanduser('~/.local/share/agent-crm/crm.db'))

def get_db() -> sqlite3.Connection:
    """Get database connection."""
    if not Path(DB_PATH).exists():
        return None
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn

def check_alerts(
    stale_days: int = 14,
    closing_days: int = 7,
    stuck_days: int = 21
) -> dict:
    """Check for all alert conditions."""
    conn = get_db()
    if not conn:
        return {'error': 'Database not found', 'alerts': []}
    
    now = datetime.now()
    today_start = now.replace(hour=0, minute=0, second=0).isoformat()
    today_end = now.replace(hour=23, minute=59, second=59).isoformat()
    
    alerts = []
    
    # Overdue tasks
    rows = conn.execute("""
        SELECT t.*, c.name as contact_name FROM tasks t
        LEFT JOIN contacts c ON t.contact_id = c.id
        WHERE t.completed_at IS NULL AND t.due_at < ?
        ORDER BY t.due_at ASC
    """, (today_start,)).fetchall()
    
    for row in rows:
        due = datetime.fromisoformat(row['due_at']) if row['due_at'] else None
        days_overdue = (now - due).days if due else 0
        alerts.append({
            'type': 'overdue_task',
            'priority': 'high' if days_overdue > 3 else 'medium',
            'task_id': row['id'],
            'title': row['title'],
            'contact': row['contact_name'],
            'due_at': row['due_at'],
            'days_overdue': days_overdue,
            'message': f"⚠️ Task overdue ({days_overdue}d): {row['title']}" + 
                      (f" ({row['contact_name']})" if row['contact_name'] else "")
        })
    
    # Tasks due today
    rows = conn.execute("""
        SELECT t.*, c.name as contact_name FROM tasks t
        LEFT JOIN contacts c ON t.contact_id = c.id
        WHERE t.completed_at IS NULL 
        AND t.due_at >= ? AND t.due_at <= ?
        ORDER BY t.priority DESC, t.due_at ASC
    """, (today_start, today_end)).fetchall()
    
    for row in rows:
        alerts.append({
            'type': 'task_due_today',
            'priority': row['priority'] or 'normal',
            'task_id': row['id'],
            'title': row['title'],
            'contact': row['contact_name'],
            'due_at': row['due_at'],
            'message': f"πŸ“‹ Due today: {row['title']}" +
                      (f" ({row['contact_name']})" if row['contact_name'] else "")
        })
    
    # Deals closing soon
    closing_threshold = (now + timedelta(days=closing_days)).isoformat()
    rows = conn.execute("""
        SELECT d.*, c.name as contact_name FROM deals d
        LEFT JOIN contacts c ON d.contact_id = c.id
        WHERE d.stage NOT IN ('won', 'lost')
        AND d.expected_close IS NOT NULL
        AND d.expected_close <= ?
        ORDER BY d.expected_close ASC
    """, (closing_threshold,)).fetchall()
    
    for row in rows:
        close_date = datetime.fromisoformat(row['expected_close']) if row['expected_close'] else None
        days_until = (close_date - now).days if close_date else 0
        value_str = f"${row['value']:,.0f}" if row['value'] else "TBD"
        alerts.append({
            'type': 'deal_closing_soon',
            'priority': 'high' if days_until <= 3 else 'medium',
            'deal_id': row['id'],
            'title': row['title'],
            'value': row['value'],
            'contact': row['contact_name'],
            'expected_close': row['expected_close'],
            'days_until_close': days_until,
            'message': f"πŸ’° Deal closing in {days_until}d: {row['title']} ({value_str})"
        })
    
    # Stale contacts (no interaction in N days, but have a deal)
    stale_threshold = (now - timedelta(days=stale_days)).isoformat()
    rows = conn.execute("""
        SELECT c.*, d.title as deal_title, d.value as deal_value, d.stage as deal_stage,
               MAX(i.occurred_at) as last_interaction
        FROM contacts c
        JOIN deals d ON c.id = d.contact_id AND d.stage NOT IN ('won', 'lost')
        LEFT JOIN interactions i ON c.id = i.contact_id
        GROUP BY c.id
        HAVING last_interaction < ? OR last_interaction IS NULL
        ORDER BY d.value DESC NULLS LAST
    """, (stale_threshold,)).fetchall()
    
    for row in rows:
        last = datetime.fromisoformat(row['last_interaction']) if row['last_interaction'] else None
        days_stale = (now - last).days if last else 999
        alerts.append({
            'type': 'stale_contact',
            'priority': 'medium',
            'contact_id': row['id'],
            'name': row['name'],
            'company': row['company'],
            'deal': row['deal_title'],
            'deal_value': row['deal_value'],
            'last_interaction': row['last_interaction'],
            'days_since_contact': days_stale,
            'message': f"πŸ‘‹ No contact in {days_stale}d: {row['name']}" +
                      (f" ({row['deal_title']})" if row['deal_title'] else "")
        })
    
    # Deals stuck in stage
    stuck_threshold = (now - timedelta(days=stuck_days)).isoformat()
    rows = conn.execute("""
        SELECT d.*, c.name as contact_name FROM deals d
        LEFT JOIN contacts c ON d.contact_id = c.id
        WHERE d.stage NOT IN ('won', 'lost', 'lead')
        AND d.updated_at < ?
        ORDER BY d.value DESC NULLS LAST
    """, (stuck_threshold,)).fetchall()
    
    for row in rows:
        updated = datetime.fromisoformat(row['updated_at']) if row['updated_at'] else None
        days_stuck = (now - updated).days if updated else 0
        value_str = f"${row['value']:,.0f}" if row['value'] else "TBD"
        alerts.append({
            'type': 'deal_stuck',
            'priority': 'low',
            'deal_id': row['id'],
            'title': row['title'],
            'value': row['value'],
            'stage': row['stage'],
            'days_in_stage': days_stuck,
            'message': f"🐌 Deal stuck {days_stuck}d in {row['stage']}: {row['title']} ({value_str})"
        })
    
    conn.close()
    
    # Sort by priority
    priority_order = {'high': 0, 'medium': 1, 'normal': 2, 'low': 3}
    alerts.sort(key=lambda x: priority_order.get(x.get('priority', 'normal'), 2))
    
    return {
        'checked_at': now.isoformat(),
        'total_alerts': len(alerts),
        'by_type': {
            'overdue_tasks': len([a for a in alerts if a['type'] == 'overdue_task']),
            'tasks_due_today': len([a for a in alerts if a['type'] == 'task_due_today']),
            'deals_closing_soon': len([a for a in alerts if a['type'] == 'deal_closing_soon']),
            'stale_contacts': len([a for a in alerts if a['type'] == 'stale_contact']),
            'deals_stuck': len([a for a in alerts if a['type'] == 'deal_stuck'])
        },
        'alerts': alerts
    }

def format_alerts_text(result: dict) -> str:
    """Format alerts as human-readable text."""
    if 'error' in result:
        return f"❌ {result['error']}"
    
    if not result['alerts']:
        return "βœ… No CRM alerts. All clear!"
    
    lines = []
    lines.append(f"πŸ”” **CRM Alerts** ({result['total_alerts']} items)")
    lines.append("")
    
    # Group by type
    current_type = None
    type_labels = {
        'overdue_task': '⚠️ Overdue Tasks',
        'task_due_today': 'πŸ“‹ Due Today',
        'deal_closing_soon': 'πŸ’° Deals Closing Soon',
        'stale_contact': 'πŸ‘‹ Needs Follow-up',
        'deal_stuck': '🐌 Stuck Deals'
    }
    
    for alert in result['alerts']:
        if alert['type'] != current_type:
            if current_type is not None:
                lines.append("")
            current_type = alert['type']
            lines.append(f"**{type_labels.get(current_type, current_type)}:**")
        
        lines.append(f"β€’ {alert['message']}")
    
    return '\n'.join(lines)

def main():
    parser = argparse.ArgumentParser(description='Check CRM for items needing attention')
    parser.add_argument('--json', action='store_true', help='Output as JSON')
    parser.add_argument('--stale-days', type=int, default=14, help='Days before contact is stale')
    parser.add_argument('--closing-days', type=int, default=7, help='Days to look ahead for closing deals')
    parser.add_argument('--stuck-days', type=int, default=21, help='Days before deal is considered stuck')
    parser.add_argument('--type', '-t', choices=['overdue_task', 'task_due_today', 'deal_closing_soon', 'stale_contact', 'deal_stuck'],
                       help='Filter to specific alert type')
    
    args = parser.parse_args()
    
    result = check_alerts(args.stale_days, args.closing_days, args.stuck_days)
    
    # Filter by type if specified
    if args.type and 'alerts' in result:
        result['alerts'] = [a for a in result['alerts'] if a['type'] == args.type]
        result['total_alerts'] = len(result['alerts'])
    
    if args.json:
        print(json.dumps(result, indent=2))
    else:
        print(format_alerts_text(result))

if __name__ == '__main__':
    main()
crm-report.py
8.8 KB
#!/usr/bin/env python3
"""
CRM Reports - Pipeline and activity analytics

Generates reports:
- Pipeline by stage (with trends)
- Activity summary (interactions, tasks)
- Win/loss analysis
- Forecast based on expected close dates
"""

import argparse
import json
import os
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path

DB_PATH = os.environ.get('CRM_DB', os.path.expanduser('~/.local/share/agent-crm/crm.db'))

def get_db() -> sqlite3.Connection:
    """Get database connection."""
    if not Path(DB_PATH).exists():
        return None
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn

def pipeline_report() -> dict:
    """Generate pipeline report."""
    conn = get_db()
    if not conn:
        return {'error': 'Database not found'}
    
    now = datetime.now()
    
    # Current pipeline by stage
    stages = conn.execute("""
        SELECT 
            stage,
            COUNT(*) as count,
            SUM(value) as total_value,
            AVG(value) as avg_value,
            SUM(value * COALESCE(probability, 50) / 100.0) as weighted_value
        FROM deals
        WHERE stage NOT IN ('won', 'lost')
        GROUP BY stage
        ORDER BY 
            CASE stage 
                WHEN 'lead' THEN 1
                WHEN 'qualified' THEN 2
                WHEN 'proposal' THEN 3
                WHEN 'negotiation' THEN 4
            END
    """).fetchall()
    
    pipeline = {
        'stages': [dict(s) for s in stages],
        'totals': {
            'deals': sum(s['count'] for s in stages),
            'value': sum(s['total_value'] or 0 for s in stages),
            'weighted': sum(s['weighted_value'] or 0 for s in stages)
        }
    }
    
    # Deals by expected close month
    forecast = conn.execute("""
        SELECT 
            strftime('%Y-%m', expected_close) as month,
            COUNT(*) as count,
            SUM(value) as total_value,
            SUM(value * COALESCE(probability, 50) / 100.0) as weighted_value
        FROM deals
        WHERE stage NOT IN ('won', 'lost') 
        AND expected_close IS NOT NULL
        AND expected_close >= date('now')
        GROUP BY month
        ORDER BY month
        LIMIT 6
    """).fetchall()
    
    pipeline['forecast'] = [dict(f) for f in forecast]
    
    # Top deals
    top_deals = conn.execute("""
        SELECT d.*, c.name as contact_name
        FROM deals d
        LEFT JOIN contacts c ON d.contact_id = c.id
        WHERE d.stage NOT IN ('won', 'lost')
        ORDER BY d.value DESC NULLS LAST
        LIMIT 10
    """).fetchall()
    
    pipeline['top_deals'] = [dict(d) for d in top_deals]
    
    conn.close()
    return pipeline

def activity_report(days: int = 30) -> dict:
    """Generate activity report for past N days."""
    conn = get_db()
    if not conn:
        return {'error': 'Database not found'}
    
    since = (datetime.now() - timedelta(days=days)).isoformat()
    
    # Interactions by type
    interactions = conn.execute("""
        SELECT type, direction, COUNT(*) as count
        FROM interactions
        WHERE logged_at >= ?
        GROUP BY type, direction
        ORDER BY count DESC
    """, (since,)).fetchall()
    
    # Tasks created vs completed
    tasks_created = conn.execute("""
        SELECT COUNT(*) as count FROM tasks WHERE created_at >= ?
    """, (since,)).fetchone()['count']
    
    tasks_completed = conn.execute("""
        SELECT COUNT(*) as count FROM tasks WHERE completed_at >= ?
    """, (since,)).fetchone()['count']
    
    # Contacts added
    contacts_added = conn.execute("""
        SELECT COUNT(*) as count FROM contacts WHERE created_at >= ?
    """, (since,)).fetchone()['count']
    
    # Deals created
    deals_created = conn.execute("""
        SELECT COUNT(*) as count, SUM(value) as total_value
        FROM deals WHERE created_at >= ?
    """, (since,)).fetchone()
    
    # Deal stage movements
    stage_changes = conn.execute("""
        SELECT 
            json_extract(old_values, '$.stage') as from_stage,
            json_extract(new_values, '$.stage') as to_stage,
            COUNT(*) as count
        FROM audit_log
        WHERE table_name = 'deals' 
        AND action = 'UPDATE'
        AND created_at >= ?
        AND json_extract(new_values, '$.stage') IS NOT NULL
        GROUP BY from_stage, to_stage
    """, (since,)).fetchall()
    
    conn.close()
    
    return {
        'period_days': days,
        'since': since,
        'interactions': [dict(i) for i in interactions],
        'total_interactions': sum(i['count'] for i in interactions),
        'tasks': {
            'created': tasks_created,
            'completed': tasks_completed,
            'completion_rate': tasks_completed / tasks_created if tasks_created else 0
        },
        'contacts_added': contacts_added,
        'deals': {
            'created': deals_created['count'],
            'total_value': deals_created['total_value'] or 0
        },
        'stage_movements': [dict(s) for s in stage_changes]
    }

def win_loss_report(days: int = 90) -> dict:
    """Analyze won vs lost deals."""
    conn = get_db()
    if not conn:
        return {'error': 'Database not found'}
    
    since = (datetime.now() - timedelta(days=days)).isoformat()
    
    # Won deals
    won = conn.execute("""
        SELECT COUNT(*) as count, SUM(value) as total_value, AVG(value) as avg_value
        FROM deals
        WHERE stage = 'won' AND closed_at >= ?
    """, (since,)).fetchone()
    
    # Lost deals
    lost = conn.execute("""
        SELECT COUNT(*) as count, SUM(value) as total_value, AVG(value) as avg_value
        FROM deals
        WHERE stage = 'lost' AND closed_at >= ?
    """, (since,)).fetchone()
    
    # Win rate
    total_closed = (won['count'] or 0) + (lost['count'] or 0)
    win_rate = (won['count'] or 0) / total_closed if total_closed else 0
    
    # Average deal cycle (created to closed)
    cycle = conn.execute("""
        SELECT AVG(julianday(closed_at) - julianday(created_at)) as avg_days
        FROM deals
        WHERE stage = 'won' AND closed_at >= ?
    """, (since,)).fetchone()
    
    conn.close()
    
    return {
        'period_days': days,
        'won': {
            'count': won['count'] or 0,
            'total_value': won['total_value'] or 0,
            'avg_value': won['avg_value'] or 0
        },
        'lost': {
            'count': lost['count'] or 0,
            'total_value': lost['total_value'] or 0,
            'avg_value': lost['avg_value'] or 0
        },
        'win_rate': win_rate,
        'avg_cycle_days': cycle['avg_days'] or 0
    }

def format_pipeline_text(report: dict) -> str:
    """Format pipeline report as text."""
    if 'error' in report:
        return f"❌ {report['error']}"
    
    lines = []
    lines.append("πŸ“Š **Pipeline Report**")
    lines.append("")
    
    # By stage
    lines.append("**By Stage:**")
    for stage in report['stages']:
        val = f"${stage['total_value']:,.0f}" if stage['total_value'] else "$0"
        weighted = f"${stage['weighted_value']:,.0f}" if stage['weighted_value'] else "$0"
        lines.append(f"β€’ {stage['stage'].title()}: {stage['count']} deals β€” {val} (weighted: {weighted})")
    
    totals = report['totals']
    lines.append(f"β€’ **Total:** {totals['deals']} deals β€” ${totals['value']:,.0f} (weighted: ${totals['weighted']:,.0f})")
    lines.append("")
    
    # Forecast
    if report['forecast']:
        lines.append("**Forecast by Month:**")
        for month in report['forecast']:
            lines.append(f"β€’ {month['month']}: {month['count']} deals β€” ${month['total_value']:,.0f}")
        lines.append("")
    
    # Top deals
    if report['top_deals']:
        lines.append("**Top Deals:**")
        for deal in report['top_deals'][:5]:
            val = f"${deal['value']:,.0f}" if deal['value'] else "TBD"
            contact = f" ({deal['contact_name']})" if deal['contact_name'] else ""
            lines.append(f"β€’ {deal['title']}{contact} β€” {val} [{deal['stage']}]")
    
    return '\n'.join(lines)

def main():
    parser = argparse.ArgumentParser(description='CRM analytics and reports')
    parser.add_argument('report', choices=['pipeline', 'activity', 'winloss'],
                       help='Report type')
    parser.add_argument('--json', action='store_true', help='Output as JSON')
    parser.add_argument('--days', '-d', type=int, default=30, help='Days to analyze')
    
    args = parser.parse_args()
    
    if args.report == 'pipeline':
        result = pipeline_report()
        if not args.json:
            print(format_pipeline_text(result))
            return
    elif args.report == 'activity':
        result = activity_report(args.days)
    elif args.report == 'winloss':
        result = win_loss_report(args.days)
    
    if args.json or args.report != 'pipeline':
        print(json.dumps(result, indent=2))

if __name__ == '__main__':
    main()
crm-webhook.py
7.7 KB
#!/usr/bin/env python3
"""
CRM Webhook Server - Ingest leads from external forms

Accepts POST requests with contact/lead data and creates CRM entries.
Supports common form formats: Typeform, Tally, raw JSON.

Run: crm-webhook --port 8901
Then configure form webhooks to POST to http://localhost:8901/lead
"""

import argparse
import json
import os
import sqlite3
import subprocess
import sys
from datetime import datetime
from http.server import HTTPServer, BaseHTTPRequestHandler
from pathlib import Path
from urllib.parse import parse_qs, urlparse

DB_PATH = os.environ.get('CRM_DB', os.path.expanduser('~/.local/share/agent-crm/crm.db'))
CRM_SCRIPT = Path(__file__).parent / 'crm'
LOG_FILE = os.path.expanduser('~/.local/share/agent-crm/webhook.log')

def log(message: str):
    """Append to log file."""
    Path(LOG_FILE).parent.mkdir(parents=True, exist_ok=True)
    with open(LOG_FILE, 'a') as f:
        f.write(f"{datetime.now().isoformat()} {message}\n")

def parse_typeform(data: dict) -> dict:
    """Parse Typeform webhook payload."""
    answers = data.get('form_response', {}).get('answers', [])
    
    result = {'source': 'typeform'}
    field_map = {
        'email': ['email'],
        'name': ['short_text', 'long_text'],
        'phone': ['phone_number'],
    }
    
    for answer in answers:
        field_type = answer.get('type')
        field_title = answer.get('field', {}).get('title', '').lower()
        
        # Try to map by field title
        if 'email' in field_title:
            result['email'] = answer.get('email')
        elif 'name' in field_title:
            result['name'] = answer.get('text')
        elif 'phone' in field_title:
            result['phone'] = answer.get('phone_number')
        elif 'company' in field_title:
            result['company'] = answer.get('text')
        elif 'message' in field_title or 'note' in field_title:
            result['notes'] = answer.get('text')
    
    # Fallback: first short_text is name, first email is email
    if 'name' not in result:
        for answer in answers:
            if answer.get('type') == 'short_text' and answer.get('text'):
                result['name'] = answer.get('text')
                break
    
    return result

def parse_tally(data: dict) -> dict:
    """Parse Tally webhook payload."""
    fields = data.get('data', {}).get('fields', [])
    
    result = {'source': 'tally'}
    
    for field in fields:
        label = field.get('label', '').lower()
        value = field.get('value')
        
        if not value:
            continue
            
        if 'email' in label:
            result['email'] = value
        elif 'name' in label:
            result['name'] = value
        elif 'phone' in label:
            result['phone'] = value
        elif 'company' in label:
            result['company'] = value
        elif 'message' in label or 'note' in label:
            result['notes'] = value
    
    return result

def parse_generic(data: dict) -> dict:
    """Parse generic JSON payload."""
    result = {'source': 'webhook'}
    
    # Common field names
    field_map = {
        'name': ['name', 'full_name', 'fullName', 'contact_name', 'contactName'],
        'email': ['email', 'email_address', 'emailAddress', 'mail'],
        'phone': ['phone', 'phone_number', 'phoneNumber', 'tel', 'telephone'],
        'company': ['company', 'company_name', 'companyName', 'organization', 'org'],
        'notes': ['notes', 'message', 'comment', 'description', 'body'],
        'role': ['role', 'title', 'job_title', 'jobTitle', 'position'],
    }
    
    for target, sources in field_map.items():
        for source in sources:
            if source in data and data[source]:
                result[target] = data[source]
                break
    
    return result

def create_contact(data: dict) -> dict:
    """Create contact via CRM CLI."""
    if not data.get('name'):
        return {'error': 'Name is required'}
    
    cmd = [str(CRM_SCRIPT), 'add-contact', data['name']]
    
    if data.get('email'):
        cmd.extend(['--email', data['email']])
    if data.get('phone'):
        cmd.extend(['--phone', data['phone']])
    if data.get('company'):
        cmd.extend(['--company', data['company']])
    if data.get('role'):
        cmd.extend(['--role', data['role']])
    if data.get('source'):
        cmd.extend(['--source', data['source']])
    if data.get('notes'):
        cmd.extend(['--notes', data['notes']])
    
    cmd.extend(['--reason', 'Webhook ingest'])
    
    try:
        result = subprocess.run(cmd, capture_output=True, text=True)
        if result.returncode == 0:
            return json.loads(result.stdout)
        else:
            return {'error': result.stderr or 'CLI error'}
    except Exception as e:
        return {'error': str(e)}

class WebhookHandler(BaseHTTPRequestHandler):
    """HTTP handler for webhook requests."""
    
    def log_message(self, format, *args):
        log(f"HTTP {args[0]}")
    
    def _send_json(self, status: int, data: dict):
        self.send_response(status)
        self.send_header('Content-Type', 'application/json')
        self.end_headers()
        self.wfile.write(json.dumps(data).encode())
    
    def do_GET(self):
        """Health check endpoint."""
        if self.path == '/health':
            self._send_json(200, {'status': 'ok', 'service': 'crm-webhook'})
        else:
            self._send_json(404, {'error': 'Not found'})
    
    def do_POST(self):
        """Handle incoming webhooks."""
        content_length = int(self.headers.get('Content-Length', 0))
        body = self.rfile.read(content_length)
        
        try:
            data = json.loads(body)
        except json.JSONDecodeError:
            # Try URL-encoded
            try:
                data = {k: v[0] for k, v in parse_qs(body.decode()).items()}
            except:
                self._send_json(400, {'error': 'Invalid JSON'})
                return
        
        log(f"Received webhook: {json.dumps(data)[:500]}")
        
        # Parse based on path or payload structure
        if self.path == '/lead' or self.path == '/contact':
            # Detect format
            if 'form_response' in data:
                parsed = parse_typeform(data)
            elif 'data' in data and 'fields' in data.get('data', {}):
                parsed = parse_tally(data)
            else:
                parsed = parse_generic(data)
            
            log(f"Parsed: {json.dumps(parsed)}")
            
            result = create_contact(parsed)
            
            if 'error' in result:
                log(f"Error: {result['error']}")
                self._send_json(400, result)
            else:
                log(f"Created: {result}")
                self._send_json(201, result)
        
        else:
            self._send_json(404, {'error': f'Unknown endpoint: {self.path}'})

def main():
    parser = argparse.ArgumentParser(description='CRM webhook server for form ingestion')
    parser.add_argument('--port', '-p', type=int, default=8901, help='Port to listen on')
    parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
    
    args = parser.parse_args()
    
    server = HTTPServer((args.host, args.port), WebhookHandler)
    
    print(f"CRM Webhook server listening on {args.host}:{args.port}")
    print(f"Endpoints:")
    print(f"  POST /lead    - Create contact from form submission")
    print(f"  POST /contact - Create contact from form submission")
    print(f"  GET  /health  - Health check")
    print(f"")
    print(f"Log: {LOG_FILE}")
    print(f"Database: {DB_PATH}")
    
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        print("\nShutting down...")
        server.shutdown()

if __name__ == '__main__':
    main()
crm.py
24.3 KB
#!/usr/bin/env python3
"""
Agent CRM CLI - Core CRUD operations
"""

import argparse
import json
import os
import sqlite3
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
import re

# Database location
DB_PATH = os.environ.get('CRM_DB', os.path.expanduser('~/.local/share/agent-crm/crm.db'))
SCHEMA_PATH = Path(__file__).parent.parent / 'schema.sql'

def get_db() -> sqlite3.Connection:
    """Get database connection, initializing if needed."""
    db_path = Path(DB_PATH)
    db_path.parent.mkdir(parents=True, exist_ok=True)
    
    needs_init = not db_path.exists()
    conn = sqlite3.connect(db_path)
    conn.row_factory = sqlite3.Row
    conn.execute("PRAGMA foreign_keys = ON")
    
    if needs_init and SCHEMA_PATH.exists():
        conn.executescript(SCHEMA_PATH.read_text())
        conn.commit()
    
    return conn

def parse_date(s: str) -> Optional[str]:
    """Parse flexible date strings into ISO format."""
    if not s:
        return None
    
    s = s.lower().strip()
    today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
    
    # Relative dates
    if s == 'today':
        return today.isoformat()
    if s == 'tomorrow':
        return (today + timedelta(days=1)).isoformat()
    if s == 'yesterday':
        return (today - timedelta(days=1)).isoformat()
    
    # "next week", "next month"
    if s.startswith('next '):
        unit = s[5:]
        if unit == 'week':
            return (today + timedelta(weeks=1)).isoformat()
        if unit == 'month':
            return (today + timedelta(days=30)).isoformat()
    
    # "in N days/weeks"
    match = re.match(r'in (\d+) (day|week|month)s?', s)
    if match:
        n, unit = int(match.group(1)), match.group(2)
        if unit == 'day':
            return (today + timedelta(days=n)).isoformat()
        if unit == 'week':
            return (today + timedelta(weeks=n)).isoformat()
        if unit == 'month':
            return (today + timedelta(days=n*30)).isoformat()
    
    # Day names
    days = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
    if s in days or s.startswith('next ') and s[5:] in days:
        target_day = days.index(s.replace('next ', ''))
        current_day = today.weekday()
        delta = target_day - current_day
        if delta <= 0:
            delta += 7
        return (today + timedelta(days=delta)).isoformat()
    
    # Try parsing as date
    for fmt in ['%Y-%m-%d', '%m/%d/%Y', '%m/%d', '%B %d', '%b %d']:
        try:
            dt = datetime.strptime(s, fmt)
            if dt.year == 1900:  # No year specified
                dt = dt.replace(year=today.year)
                if dt < today:
                    dt = dt.replace(year=today.year + 1)
            return dt.isoformat()
        except ValueError:
            continue
    
    return s  # Return as-is if unparseable

def audit_log(conn: sqlite3.Connection, table: str, record_id: str, action: str,
              old: dict = None, new: dict = None, reason: str = None, conv_ref: str = None):
    """Log an action to the audit table."""
    conn.execute("""
        INSERT INTO audit_log (table_name, record_id, action, old_values, new_values, reason, conversation_ref)
        VALUES (?, ?, ?, ?, ?, ?, ?)
    """, (table, record_id, action, 
          json.dumps(old) if old else None,
          json.dumps(new) if new else None,
          reason, conv_ref))

# ============ CONTACTS ============

def add_contact(args):
    """Add a new contact."""
    conn = get_db()
    
    tags = json.dumps(args.tags.split(',')) if args.tags else None
    
    cursor = conn.execute("""
        INSERT INTO contacts (name, email, phone, company, role, source, tags, notes)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    """, (args.name, args.email, args.phone, args.company, args.role, args.source, tags, args.notes))
    
    record_id = cursor.lastrowid
    # Get the actual ID
    row = conn.execute("SELECT id FROM contacts WHERE rowid = ?", (record_id,)).fetchone()
    
    audit_log(conn, 'contacts', row['id'], 'INSERT', new={
        'name': args.name, 'email': args.email, 'company': args.company
    }, reason=args.reason)
    
    conn.commit()
    
    print(json.dumps({
        'status': 'created',
        'id': row['id'],
        'name': args.name,
        'company': args.company,
        'email': args.email
    }, indent=2))

def find_contact(args):
    """Find contacts by name, email, or company."""
    conn = get_db()
    
    query = args.query.lower()
    rows = conn.execute("""
        SELECT * FROM contacts
        WHERE lower(name) LIKE ? OR lower(email) LIKE ? OR lower(company) LIKE ?
        ORDER BY updated_at DESC
        LIMIT ?
    """, (f'%{query}%', f'%{query}%', f'%{query}%', args.limit)).fetchall()
    
    results = [dict(r) for r in rows]
    print(json.dumps(results, indent=2))

def list_contacts(args):
    """List all contacts."""
    conn = get_db()
    
    order = 'updated_at DESC' if args.recent else 'name ASC'
    rows = conn.execute(f"SELECT * FROM contacts ORDER BY {order} LIMIT ?", (args.limit,)).fetchall()
    
    results = [dict(r) for r in rows]
    print(json.dumps(results, indent=2))

def update_contact(args):
    """Update a contact."""
    conn = get_db()
    
    # Find the contact
    row = conn.execute("""
        SELECT * FROM contacts WHERE id = ? OR lower(name) LIKE ?
    """, (args.id, f'%{args.id.lower()}%')).fetchone()
    
    if not row:
        print(json.dumps({'error': f'Contact not found: {args.id}'}))
        sys.exit(1)
    
    old = dict(row)
    updates = {}
    
    for field in ['name', 'email', 'phone', 'company', 'role', 'source', 'notes']:
        val = getattr(args, field, None)
        if val is not None:
            updates[field] = val
    
    if args.tags:
        updates['tags'] = json.dumps(args.tags.split(','))
    
    if not updates:
        print(json.dumps({'error': 'No updates provided'}))
        sys.exit(1)
    
    set_clause = ', '.join(f"{k} = ?" for k in updates.keys())
    updates['updated_at'] = datetime.now().isoformat()
    
    conn.execute(f"""
        UPDATE contacts SET {set_clause}, updated_at = ? WHERE id = ?
    """, (*updates.values(), row['id']))
    
    audit_log(conn, 'contacts', row['id'], 'UPDATE', old=old, new=updates, reason=args.reason)
    conn.commit()
    
    print(json.dumps({'status': 'updated', 'id': row['id'], 'changes': updates}, indent=2))

def delete_contact(args):
    """Delete a contact."""
    conn = get_db()
    
    row = conn.execute("SELECT * FROM contacts WHERE id = ?", (args.id,)).fetchone()
    if not row:
        print(json.dumps({'error': f'Contact not found: {args.id}'}))
        sys.exit(1)
    
    old = dict(row)
    conn.execute("DELETE FROM contacts WHERE id = ?", (args.id,))
    audit_log(conn, 'contacts', args.id, 'DELETE', old=old, reason=args.reason)
    conn.commit()
    
    print(json.dumps({'status': 'deleted', 'id': args.id, 'name': old['name']}, indent=2))

# ============ DEALS ============

def add_deal(args):
    """Add a new deal."""
    conn = get_db()
    
    # Find contact if specified
    contact_id = None
    if args.contact:
        row = conn.execute("""
            SELECT id FROM contacts WHERE id = ? OR lower(name) LIKE ?
        """, (args.contact, f'%{args.contact.lower()}%')).fetchone()
        if row:
            contact_id = row['id']
    
    expected_close = parse_date(args.expected_close) if args.expected_close else None
    
    cursor = conn.execute("""
        INSERT INTO deals (contact_id, title, value, currency, stage, probability, expected_close, notes)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    """, (contact_id, args.title, args.value, args.currency or 'USD', 
          args.stage or 'lead', args.probability, expected_close, args.notes))
    
    row = conn.execute("SELECT id FROM deals WHERE rowid = ?", (cursor.lastrowid,)).fetchone()
    
    audit_log(conn, 'deals', row['id'], 'INSERT', new={
        'title': args.title, 'value': args.value, 'stage': args.stage or 'lead'
    }, reason=args.reason)
    
    conn.commit()
    
    print(json.dumps({
        'status': 'created',
        'id': row['id'],
        'title': args.title,
        'value': args.value,
        'stage': args.stage or 'lead'
    }, indent=2))

def list_deals(args):
    """List deals, optionally filtered by stage."""
    conn = get_db()
    
    if args.stage:
        rows = conn.execute("""
            SELECT d.*, c.name as contact_name 
            FROM deals d LEFT JOIN contacts c ON d.contact_id = c.id
            WHERE d.stage = ?
            ORDER BY d.value DESC
            LIMIT ?
        """, (args.stage, args.limit)).fetchall()
    else:
        rows = conn.execute("""
            SELECT d.*, c.name as contact_name 
            FROM deals d LEFT JOIN contacts c ON d.contact_id = c.id
            ORDER BY d.updated_at DESC
            LIMIT ?
        """, (args.limit,)).fetchall()
    
    results = [dict(r) for r in rows]
    print(json.dumps(results, indent=2))

def update_deal(args):
    """Update a deal."""
    conn = get_db()
    
    row = conn.execute("""
        SELECT * FROM deals WHERE id = ? OR lower(title) LIKE ?
    """, (args.id, f'%{args.id.lower()}%')).fetchone()
    
    if not row:
        print(json.dumps({'error': f'Deal not found: {args.id}'}))
        sys.exit(1)
    
    old = dict(row)
    updates = {}
    
    for field in ['title', 'value', 'currency', 'stage', 'probability', 'notes']:
        val = getattr(args, field, None)
        if val is not None:
            updates[field] = val
    
    if args.expected_close:
        updates['expected_close'] = parse_date(args.expected_close)
    
    if args.stage in ('won', 'lost'):
        updates['closed_at'] = datetime.now().isoformat()
    
    if not updates:
        print(json.dumps({'error': 'No updates provided'}))
        sys.exit(1)
    
    set_clause = ', '.join(f"{k} = ?" for k in updates.keys())
    
    conn.execute(f"""
        UPDATE deals SET {set_clause}, updated_at = ? WHERE id = ?
    """, (*updates.values(), datetime.now().isoformat(), row['id']))
    
    audit_log(conn, 'deals', row['id'], 'UPDATE', old=old, new=updates, reason=args.reason)
    conn.commit()
    
    print(json.dumps({'status': 'updated', 'id': row['id'], 'changes': updates}, indent=2))

def pipeline(args):
    """Show pipeline summary."""
    conn = get_db()
    
    rows = conn.execute("""
        SELECT stage, COUNT(*) as count, SUM(value) as total_value
        FROM deals
        WHERE stage NOT IN ('won', 'lost')
        GROUP BY stage
        ORDER BY 
            CASE stage 
                WHEN 'lead' THEN 1
                WHEN 'qualified' THEN 2
                WHEN 'proposal' THEN 3
                WHEN 'negotiation' THEN 4
            END
    """).fetchall()
    
    result = {
        'stages': [dict(r) for r in rows],
        'total_deals': sum(r['count'] for r in rows),
        'total_value': sum(r['total_value'] or 0 for r in rows)
    }
    
    # Weighted value (probability-adjusted)
    weighted = conn.execute("""
        SELECT SUM(value * COALESCE(probability, 50) / 100.0) as weighted
        FROM deals WHERE stage NOT IN ('won', 'lost')
    """).fetchone()
    result['weighted_value'] = weighted['weighted'] or 0
    
    print(json.dumps(result, indent=2))

# ============ INTERACTIONS ============

def log_interaction(args):
    """Log an interaction."""
    conn = get_db()
    
    # Find contact
    contact_id = None
    if args.contact:
        row = conn.execute("""
            SELECT id FROM contacts WHERE id = ? OR lower(name) LIKE ?
        """, (args.contact, f'%{args.contact.lower()}%')).fetchone()
        if row:
            contact_id = row['id']
    
    occurred = parse_date(args.date) if args.date else datetime.now().isoformat()
    
    cursor = conn.execute("""
        INSERT INTO interactions (contact_id, deal_id, type, direction, summary, raw_content, occurred_at)
        VALUES (?, ?, ?, ?, ?, ?, ?)
    """, (contact_id, args.deal, args.type, args.direction, args.summary, args.raw, occurred))
    
    row = conn.execute("SELECT id FROM interactions WHERE rowid = ?", (cursor.lastrowid,)).fetchone()
    
    audit_log(conn, 'interactions', row['id'], 'INSERT', new={
        'type': args.type, 'summary': args.summary[:100]
    }, reason=args.reason)
    
    conn.commit()
    
    print(json.dumps({
        'status': 'logged',
        'id': row['id'],
        'type': args.type,
        'contact_id': contact_id
    }, indent=2))

def list_interactions(args):
    """List interactions for a contact or recent."""
    conn = get_db()
    
    if args.contact:
        rows = conn.execute("""
            SELECT i.*, c.name as contact_name
            FROM interactions i
            LEFT JOIN contacts c ON i.contact_id = c.id
            WHERE i.contact_id = ? OR lower(c.name) LIKE ?
            ORDER BY i.occurred_at DESC
            LIMIT ?
        """, (args.contact, f'%{args.contact.lower()}%', args.limit)).fetchall()
    else:
        rows = conn.execute("""
            SELECT i.*, c.name as contact_name
            FROM interactions i
            LEFT JOIN contacts c ON i.contact_id = c.id
            ORDER BY i.occurred_at DESC
            LIMIT ?
        """, (args.limit,)).fetchall()
    
    results = [dict(r) for r in rows]
    print(json.dumps(results, indent=2))

# ============ TASKS ============

def add_task(args):
    """Add a task."""
    conn = get_db()
    
    contact_id = None
    if args.contact:
        row = conn.execute("""
            SELECT id FROM contacts WHERE id = ? OR lower(name) LIKE ?
        """, (args.contact, f'%{args.contact.lower()}%')).fetchone()
        if row:
            contact_id = row['id']
    
    due = parse_date(args.due) if args.due else None
    
    cursor = conn.execute("""
        INSERT INTO tasks (contact_id, deal_id, title, due_at, priority)
        VALUES (?, ?, ?, ?, ?)
    """, (contact_id, args.deal, args.title, due, args.priority or 'normal'))
    
    row = conn.execute("SELECT id FROM tasks WHERE rowid = ?", (cursor.lastrowid,)).fetchone()
    
    audit_log(conn, 'tasks', row['id'], 'INSERT', new={'title': args.title, 'due': due}, reason=args.reason)
    conn.commit()
    
    print(json.dumps({
        'status': 'created',
        'id': row['id'],
        'title': args.title,
        'due_at': due
    }, indent=2))

def list_tasks(args):
    """List tasks."""
    conn = get_db()
    
    if args.pending:
        rows = conn.execute("""
            SELECT t.*, c.name as contact_name
            FROM tasks t
            LEFT JOIN contacts c ON t.contact_id = c.id
            WHERE t.completed_at IS NULL
            ORDER BY t.due_at ASC NULLS LAST
            LIMIT ?
        """, (args.limit,)).fetchall()
    elif args.overdue:
        rows = conn.execute("""
            SELECT t.*, c.name as contact_name
            FROM tasks t
            LEFT JOIN contacts c ON t.contact_id = c.id
            WHERE t.completed_at IS NULL AND t.due_at < datetime('now')
            ORDER BY t.due_at ASC
            LIMIT ?
        """, (args.limit,)).fetchall()
    else:
        rows = conn.execute("""
            SELECT t.*, c.name as contact_name
            FROM tasks t
            LEFT JOIN contacts c ON t.contact_id = c.id
            ORDER BY t.created_at DESC
            LIMIT ?
        """, (args.limit,)).fetchall()
    
    results = [dict(r) for r in rows]
    print(json.dumps(results, indent=2))

def complete_task(args):
    """Complete a task."""
    conn = get_db()
    
    row = conn.execute("""
        SELECT * FROM tasks WHERE id = ? OR lower(title) LIKE ?
    """, (args.id, f'%{args.id.lower()}%')).fetchone()
    
    if not row:
        print(json.dumps({'error': f'Task not found: {args.id}'}))
        sys.exit(1)
    
    old = dict(row)
    now = datetime.now().isoformat()
    
    conn.execute("UPDATE tasks SET completed_at = ? WHERE id = ?", (now, row['id']))
    audit_log(conn, 'tasks', row['id'], 'UPDATE', old=old, new={'completed_at': now}, reason=args.reason)
    conn.commit()
    
    print(json.dumps({'status': 'completed', 'id': row['id'], 'title': row['title']}, indent=2))

# ============ QUERY ============

def query(args):
    """Run a raw SQL query (SELECT only)."""
    conn = get_db()
    
    sql = args.sql.strip()
    if not sql.lower().startswith('select'):
        print(json.dumps({'error': 'Only SELECT queries allowed'}))
        sys.exit(1)
    
    try:
        rows = conn.execute(sql).fetchall()
        results = [dict(r) for r in rows]
        print(json.dumps(results, indent=2))
    except sqlite3.Error as e:
        print(json.dumps({'error': str(e)}))
        sys.exit(1)

def stats(args):
    """Show CRM statistics."""
    conn = get_db()
    
    result = {}
    
    result['contacts'] = conn.execute("SELECT COUNT(*) as count FROM contacts").fetchone()['count']
    result['deals'] = conn.execute("SELECT COUNT(*) as count FROM deals").fetchone()['count']
    result['open_deals'] = conn.execute(
        "SELECT COUNT(*) as count FROM deals WHERE stage NOT IN ('won', 'lost')"
    ).fetchone()['count']
    result['interactions'] = conn.execute("SELECT COUNT(*) as count FROM interactions").fetchone()['count']
    result['pending_tasks'] = conn.execute(
        "SELECT COUNT(*) as count FROM tasks WHERE completed_at IS NULL"
    ).fetchone()['count']
    result['overdue_tasks'] = conn.execute(
        "SELECT COUNT(*) as count FROM tasks WHERE completed_at IS NULL AND due_at < datetime('now')"
    ).fetchone()['count']
    
    # Pipeline value
    pipeline_row = conn.execute("""
        SELECT SUM(value) as total FROM deals WHERE stage NOT IN ('won', 'lost')
    """).fetchone()
    result['pipeline_value'] = pipeline_row['total'] or 0
    
    # Won this month
    won_row = conn.execute("""
        SELECT SUM(value) as total FROM deals 
        WHERE stage = 'won' AND closed_at >= date('now', 'start of month')
    """).fetchone()
    result['won_this_month'] = won_row['total'] or 0
    
    print(json.dumps(result, indent=2))

def init_db(args):
    """Initialize the database and show what was created."""
    from pathlib import Path
    
    db_path = Path(DB_PATH)
    already_exists = db_path.exists()
    
    # Get or create database
    conn = get_db()
    
    # Get table info
    tables = conn.execute("""
        SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'
        ORDER BY name
    """).fetchall()
    table_names = [t['name'] for t in tables]
    
    # Get counts
    counts = {}
    for table in table_names:
        count = conn.execute(f"SELECT COUNT(*) as c FROM {table}").fetchone()['c']
        counts[table] = count
    
    # Get database size
    db_size = db_path.stat().st_size if db_path.exists() else 0
    
    result = {
        'status': 'existing' if already_exists else 'created',
        'database': str(db_path),
        'size_bytes': db_size,
        'tables': table_names,
        'record_counts': counts,
        'paths': {
            'database': str(db_path),
            'backups': str(db_path.parent / 'backups'),
            'charts': str(db_path.parent / 'charts'),
            'exports': str(db_path.parent / 'exports'),
        }
    }
    
    if not already_exists:
        result['message'] = 'Database created and ready to use'
    else:
        total_records = sum(counts.values())
        result['message'] = f'Database exists with {total_records} total records'
    
    print(json.dumps(result, indent=2))

# ============ MAIN ============

def main():
    parser = argparse.ArgumentParser(description='Agent CRM CLI')
    subparsers = parser.add_subparsers(dest='command', required=True)
    
    # Common args
    def add_common(p):
        p.add_argument('--reason', help='Reason for this action (audit)')
    
    # Contact commands
    p = subparsers.add_parser('add-contact', help='Add a contact')
    p.add_argument('name', help='Contact name')
    p.add_argument('--email', '-e')
    p.add_argument('--phone', '-p')
    p.add_argument('--company', '-c')
    p.add_argument('--role', '-r')
    p.add_argument('--source', '-s')
    p.add_argument('--tags', '-t', help='Comma-separated tags')
    p.add_argument('--notes', '-n')
    add_common(p)
    p.set_defaults(func=add_contact)
    
    p = subparsers.add_parser('find-contact', help='Find contacts')
    p.add_argument('query', help='Search query')
    p.add_argument('--limit', '-l', type=int, default=10)
    p.set_defaults(func=find_contact)
    
    p = subparsers.add_parser('list-contacts', help='List contacts')
    p.add_argument('--limit', '-l', type=int, default=20)
    p.add_argument('--recent', '-r', action='store_true')
    p.set_defaults(func=list_contacts)
    
    p = subparsers.add_parser('update-contact', help='Update a contact')
    p.add_argument('id', help='Contact ID or name')
    p.add_argument('--name')
    p.add_argument('--email', '-e')
    p.add_argument('--phone', '-p')
    p.add_argument('--company', '-c')
    p.add_argument('--role', '-r')
    p.add_argument('--source', '-s')
    p.add_argument('--tags', '-t')
    p.add_argument('--notes', '-n')
    add_common(p)
    p.set_defaults(func=update_contact)
    
    p = subparsers.add_parser('delete-contact', help='Delete a contact')
    p.add_argument('id', help='Contact ID')
    add_common(p)
    p.set_defaults(func=delete_contact)
    
    # Deal commands
    p = subparsers.add_parser('add-deal', help='Add a deal')
    p.add_argument('title', help='Deal title')
    p.add_argument('--value', '-v', type=float)
    p.add_argument('--contact', '-c', help='Contact name or ID')
    p.add_argument('--stage', '-s', default='lead')
    p.add_argument('--probability', '-p', type=int)
    p.add_argument('--currency', default='USD')
    p.add_argument('--expected-close', '-e')
    p.add_argument('--notes', '-n')
    add_common(p)
    p.set_defaults(func=add_deal)
    
    p = subparsers.add_parser('list-deals', help='List deals')
    p.add_argument('--stage', '-s')
    p.add_argument('--limit', '-l', type=int, default=20)
    p.set_defaults(func=list_deals)
    
    p = subparsers.add_parser('update-deal', help='Update a deal')
    p.add_argument('id', help='Deal ID or title')
    p.add_argument('--title')
    p.add_argument('--value', '-v', type=float)
    p.add_argument('--stage', '-s')
    p.add_argument('--probability', '-p', type=int)
    p.add_argument('--currency')
    p.add_argument('--expected-close', '-e')
    p.add_argument('--notes', '-n')
    add_common(p)
    p.set_defaults(func=update_deal)
    
    p = subparsers.add_parser('pipeline', help='Show pipeline summary')
    p.set_defaults(func=pipeline)
    
    # Interaction commands
    p = subparsers.add_parser('log', help='Log an interaction')
    p.add_argument('type', choices=['email', 'call', 'meeting', 'note', 'linkedin', 'text'])
    p.add_argument('summary', help='What happened')
    p.add_argument('--contact', '-c')
    p.add_argument('--deal', '-d')
    p.add_argument('--direction', choices=['inbound', 'outbound'])
    p.add_argument('--date')
    p.add_argument('--raw', help='Raw content')
    add_common(p)
    p.set_defaults(func=log_interaction)
    
    p = subparsers.add_parser('list-interactions', help='List interactions')
    p.add_argument('--contact', '-c')
    p.add_argument('--limit', '-l', type=int, default=20)
    p.set_defaults(func=list_interactions)
    
    # Task commands
    p = subparsers.add_parser('add-task', help='Add a task')
    p.add_argument('title', help='Task title')
    p.add_argument('--contact', '-c')
    p.add_argument('--deal', '-d')
    p.add_argument('--due')
    p.add_argument('--priority', choices=['low', 'normal', 'high', 'urgent'])
    add_common(p)
    p.set_defaults(func=add_task)
    
    p = subparsers.add_parser('list-tasks', help='List tasks')
    p.add_argument('--pending', action='store_true')
    p.add_argument('--overdue', action='store_true')
    p.add_argument('--limit', '-l', type=int, default=20)
    p.set_defaults(func=list_tasks)
    
    p = subparsers.add_parser('complete-task', help='Complete a task')
    p.add_argument('id', help='Task ID or title')
    add_common(p)
    p.set_defaults(func=complete_task)
    
    # Query commands
    p = subparsers.add_parser('query', help='Run SQL query')
    p.add_argument('sql', help='SQL query (SELECT only)')
    p.set_defaults(func=query)
    
    p = subparsers.add_parser('stats', help='Show CRM statistics')
    p.set_defaults(func=stats)
    
    p = subparsers.add_parser('init', help='Initialize database')
    p.set_defaults(func=init_db)
    
    args = parser.parse_args()
    args.func(args)

if __name__ == '__main__':
    main()
skill.json
961 B
{
  "name": "agent-crm",
  "version": "1.0.1",
  "description": "A complete CRM with no UIβ€”just natural language. Track contacts, deals, interactions, and tasks through conversation.",
  "author": "Tyrell",
  "license": "MIT",
  "repository": "",
  "keywords": [
    "crm",
    "sales",
    "contacts",
    "deals",
    "pipeline",
    "tasks"
  ],
  "requirements": {
    "python": ">=3.10",
    "notes": "matplotlib auto-installs on first chart generation"
  },
  "scripts": [
    "scripts/crm.py",
    "scripts/crm-ingest.py",
    "scripts/crm-digest.py",
    "scripts/crm-notify.py",
    "scripts/crm-webhook.py",
    "scripts/crm-report.py",
    "scripts/crm-chart.py",
    "scripts/crm-export.py",
    "scripts/crm-backup.py"
  ],
  "data": {
    "database": "~/.local/share/agent-crm/crm.db",
    "backups": "~/.local/share/agent-crm/backups/",
    "charts": "~/.local/share/agent-crm/charts/",
    "exports": "~/.local/share/agent-crm/exports/"
  }
}
scenarios.md
7.8 KB
# Agent CRM β€” Scenario Tests

End-to-end user stories as validation. Each scenario is a complete user journey.
Run these against a fresh database to validate the system works as expected.

---

## Scenario 1: First Contact β†’ Deal β†’ Win

**Story:** Meet someone at an event, track them through to a closed deal.

**Steps:**

1. **Add contact from event**
   ```
   Input: "Met Sarah Chen at the AI meetup. She's CTO at Replicate, interested in our API. Email is sarah@replicate.com"
   
   Expected actions:
   - Create contact: Sarah Chen, CTO, Replicate, sarah@replicate.com, source: AI meetup
   - Log interaction: meeting, "Met at AI meetup, interested in API"
   
   Validate:
   - crm find-contact "sarah" returns 1 result with correct fields
   - crm list-interactions --contact "sarah" shows 1 meeting
   ```

2. **Create deal**
   ```
   Input: "Create a $50K deal for Replicate API integration"
   
   Expected actions:
   - Create deal: "Replicate API Integration", $50,000, stage=lead, contact=Sarah Chen
   
   Validate:
   - crm list-deals shows 1 deal at $50K
   - crm pipeline shows $50K in lead stage
   ```

3. **Log follow-up call**
   ```
   Input: "Had a call with Sarah. Very positive - she has budget approved. Moving to proposal stage."
   
   Expected actions:
   - Log interaction: call, outbound, summary of conversation
   - Update deal stage: lead β†’ qualified (or proposal based on interpretation)
   - Offer to create follow-up task
   
   Validate:
   - crm list-interactions --contact "sarah" shows 2 interactions
   - Deal stage updated
   ```

4. **Close the deal**
   ```
   Input: "Sarah signed the contract. Mark the deal as won."
   
   Expected behavior:
   - Agent asks for confirmation (>$10K threshold)
   
   Input: "yes"
   
   Expected actions:
   - Update deal stage: β†’ won
   - Set closed_at timestamp
   
   Validate:
   - crm list-deals --stage won shows 1 deal
   - crm-report winloss shows $50K won
   ```

**Success criteria:** Full journey from contact to closed-won with audit trail.

---

## Scenario 2: Email Ingest

**Story:** Forward a sales email, have it parsed and logged.

**Steps:**

1. **Forward email content**
   ```
   Input: """
   From: alex@datastack.io
   Subject: Re: AI Consulting Inquiry
   Date: Feb 8, 2026
   
   Hi,
   
   Thanks for the call yesterday. We're definitely interested in moving forward 
   with the AI consulting engagement we discussed. Budget is around $30K for 
   the initial phase.
   
   Can you send over a proposal by end of week?
   
   Best,
   Alex Rivera
   Founder, DataStack
   """
   
   Expected actions:
   - Run through crm-ingest
   - Create contact: Alex Rivera, Founder, DataStack, alex@datastack.io
   - Log interaction: email, inbound
   - Detect deal signals: $30K, "proposal" stage hint
   - Create task: "Send proposal to Alex" due Friday
   
   Validate:
   - Contact exists with correct company/role
   - Interaction logged as email
   - Deal created or suggested at $30K
   ```

**Success criteria:** Unstructured email β†’ structured CRM data with minimal manual input.

---

## Scenario 3: Pipeline Review

**Story:** Ask for pipeline status, get actionable summary.

**Setup:** Create 3-4 deals at different stages.

**Steps:**

1. **Query pipeline**
   ```
   Input: "What's my pipeline look like?"
   
   Expected output:
   - Deals grouped by stage with counts and values
   - Total pipeline value
   - Weighted pipeline value
   - Highlight any deals closing soon or needing attention
   ```

2. **Drill into specific stage**
   ```
   Input: "Show me deals in proposal stage"
   
   Expected output:
   - List of deals with contact, value, expected close
   ```

3. **Ask for forecast**
   ```
   Input: "What's closing this month?"
   
   Expected output:
   - Deals with expected_close in current month
   - Total value at risk
   ```

**Success criteria:** Natural language queries return useful, formatted pipeline data.

---

## Scenario 4: Task Management

**Story:** Create, track, and complete follow-up tasks.

**Steps:**

1. **Create task with natural date**
   ```
   Input: "Remind me to follow up with Sarah next Tuesday"
   
   Expected actions:
   - Create task: "Follow up with Sarah", due next Tuesday, linked to Sarah contact
   
   Validate:
   - crm list-tasks --pending shows task with correct due date
   ```

2. **Check overdue**
   ```
   Setup: Create task due yesterday
   Input: "What's overdue?"
   
   Expected output:
   - List of overdue tasks with days overdue
   ```

3. **Complete task**
   ```
   Input: "Done with the Sarah follow-up"
   
   Expected actions:
   - Mark task completed
   
   Validate:
   - Task has completed_at set
   - crm list-tasks --pending no longer shows it
   ```

**Success criteria:** Task lifecycle with natural language and date parsing.

---

## Scenario 5: Stale Contact Alert

**Story:** System proactively alerts about contacts going cold.

**Setup:** 
- Create contact with deal 15 days ago
- No interactions logged since

**Steps:**

1. **Run notify check**
   ```
   Command: crm-notify --stale-days 14
   
   Expected output:
   - Alert for stale contact with open deal
   - Suggests follow-up
   ```

2. **Agent acts on alert**
   ```
   Input: (agent receives alert in heartbeat)
   
   Expected behavior:
   - Agent messages user about stale contact
   - Offers to create follow-up task or draft email
   ```

**Success criteria:** Proactive alerting catches contacts before they go cold.

---

## Scenario 6: Confirmation Flow

**Story:** High-stakes actions require explicit confirmation.

**Steps:**

1. **Try to close large deal**
   ```
   Input: "Mark the $75K Acme deal as won"
   
   Expected behavior:
   - Agent does NOT immediately execute
   - Agent asks: "Confirm: Mark 'Acme Deal' ($75,000) as WON?"
   ```

2. **Confirm**
   ```
   Input: "yes"
   
   Expected actions:
   - Deal updated to won
   - Audit log shows confirmation
   ```

3. **Try to delete**
   ```
   Input: "Delete the old contact John Smith"
   
   Expected behavior:
   - Agent asks for confirmation before delete
   ```

**Success criteria:** No high-stakes action executes without explicit user confirmation.

---

## Scenario 7: Webhook Ingestion

**Story:** External form submission creates CRM contact.

**Steps:**

1. **Start webhook server**
   ```
   Command: crm-webhook --port 8901 &
   ```

2. **Submit form data**
   ```
   Command: curl -X POST http://localhost:8901/lead \
     -H "Content-Type: application/json" \
     -d '{"name": "Jordan Lee", "email": "jordan@startup.io", "company": "StartupCo", "message": "Interested in your services"}'
   
   Expected response:
   - 201 Created
   - Contact ID returned
   ```

3. **Verify in CRM**
   ```
   Command: crm find-contact "jordan"
   
   Expected:
   - Contact exists with all fields populated
   - Source: webhook
   ```

**Success criteria:** Zero-touch lead capture from external forms.

---

## Running Scenarios

### Fresh Database Test

```bash
# Reset database
rm ~/.local/share/agent-crm/crm.db
crm init

# Run through scenarios manually or via agent
```

### Automated Validation

```bash
# After each scenario, validate with queries:
crm stats                          # Overall counts
crm query "SELECT * FROM audit_log ORDER BY created_at DESC LIMIT 10"  # Audit trail
```

---

## Satisfaction Criteria

Following the StrongDM "dark factories" approach:

| Scenario | Pass Condition |
|----------|---------------|
| 1. First Contact β†’ Win | Full lifecycle, all entities linked, audit complete |
| 2. Email Ingest | Correct entity extraction, >80% accuracy on fields |
| 3. Pipeline Review | Accurate totals, useful formatting |
| 4. Task Management | Date parsing works, completion tracked |
| 5. Stale Alert | Alert fires within 1 day of threshold |
| 6. Confirmation | Never auto-executes high-stakes without confirm |
| 7. Webhook | Contact created with correct source |

**Overall pass:** 6/7 scenarios pass with no critical failures.

Compatible Agents

Claude CodeOpenClaw

Details

Category
productivity
Version
1.0.1
Stars
0
Added
February 9, 2026
Updated
February 9, 2026

Actions

Download .zip

Upload this .zip to Claude Desktop via Settings β†’ Capabilities β†’ Skills

Vote: