feat: initial scaffold for Rock Cosmetics Adit ingestion (#1784)

- pyproject.toml with rock-ingest CLI entrypoint
- config, db, ingest, migrate modules mirroring agency-marketing-ingest patterns
- column_map.py with stubbed COLUMN_MAP — fill once Adit CSV export lands
- 3 SQL migrations: schema bootstrap, imports tracking, leads table with GHL match_status
- drop/ directory for Adit export files (CSV first, xlsx fallback)
- GHL_CLINIC_NUM TBD pending Sir Alex Therrien confirmation
This commit is contained in:
Eugine Lopez 2026-06-20 02:43:03 +08:00
commit d63cfe81c6
14 changed files with 799 additions and 0 deletions

9
.gitignore vendored Normal file
View file

@ -0,0 +1,9 @@
.env
__pycache__/
*.pyc
*.pyo
.venv/
dist/
*.egg-info/
.ruff_cache/
.pyright/

0
drop/.gitkeep Normal file
View file

52
pyproject.toml Normal file
View file

@ -0,0 +1,52 @@
[project]
name = "rock-cosmetics-ingestion"
version = "0.1.0"
description = "Adit export ingestion for Rock Cosmetics — feeds APID Attribution Dashboard (#1784)."
readme = "README.md"
requires-python = ">=3.12"
license = { text = "Proprietary" }
authors = [{ name = "Eugine Lopez" }]
dependencies = [
"openpyxl>=3.1",
"psycopg[binary,pool]>=3.2",
"pydantic>=2.9",
"pydantic-settings>=2.6",
"typer>=0.13",
]
[project.scripts]
rock-ingest = "rock_cosmetics_ingest.cli:app"
[dependency-groups]
dev = [
"pytest>=8.3",
"pyright>=1.1",
"ruff>=0.7",
]
[build-system]
requires = ["uv_build>=0.4"]
build-backend = "uv_build"
[tool.uv.build-backend]
module-name = "rock_cosmetics_ingest"
[tool.ruff]
target-version = "py312"
line-length = 100
src = ["src", "tests"]
[tool.ruff.lint]
select = ["E", "W", "F", "I", "B", "UP", "SIM", "RUF"]
ignore = ["E501"]
[tool.pyright]
include = ["src", "tests"]
typeCheckingMode = "basic"
pythonVersion = "3.12"
reportMissingImports = "error"
reportMissingTypeStubs = false
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-ra --strict-markers"

View file

View file

@ -0,0 +1,132 @@
"""CLI entry point — `rock-ingest <command>`."""
from __future__ import annotations
import logging
from pathlib import Path
import typer
from rock_cosmetics_ingest.config import load_settings
app = typer.Typer(name="rock-ingest", help="Adit export ingestion for Rock Cosmetics (#1784).")
def _setup_logging(level: str) -> None:
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
format="%(asctime)s %(levelname)s %(name)s%(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
@app.command()
def migrate() -> None:
"""Run pending SQL migrations."""
settings = load_settings()
_setup_logging(settings.log_level)
from rock_cosmetics_ingest.migrate import apply_migrations
applied = apply_migrations(settings)
typer.echo(f"Applied {len(applied)} migration(s): {applied}" if applied else "No pending migrations.")
@app.command()
def migrate_status() -> None:
"""Show migration status."""
settings = load_settings()
_setup_logging(settings.log_level)
from rock_cosmetics_ingest.migrate import status
for version, name, applied in status(settings):
typer.echo(f" [{'x' if applied else ' '}] {version}_{name}")
@app.command()
def ingest(
file: Path = typer.Argument(..., help="CSV or XLSX Adit export file"),
match_after: bool = typer.Option(True, "--match/--no-match", help="Run GHL patient matching after ingest"),
) -> None:
"""Ingest a single Adit export file."""
settings = load_settings()
_setup_logging(settings.log_level)
if not file.exists():
typer.echo(f"File not found: {file}", err=True)
raise typer.Exit(1)
from rock_cosmetics_ingest.ingest import ingest_file
result = ingest_file(file, settings)
if result.skipped:
typer.echo(f"Skipped — already ingested as import {result.import_id}")
return
typer.echo(f"Ingested {result.row_count} rows → import {result.import_id}")
if match_after:
typer.echo("Patient matching not yet wired — run `rock-ingest match` once GHL_CLINIC_NUM is confirmed.")
@app.command()
def ingest_drop() -> None:
"""Ingest all new CSV/XLSX files in DROP_DIR."""
settings = load_settings()
_setup_logging(settings.log_level)
drop = Path(settings.drop_dir)
if not drop.is_dir():
typer.echo(f"DROP_DIR {drop} does not exist.", err=True)
raise typer.Exit(1)
files = sorted(p for p in drop.iterdir() if p.suffix.lower() in (".csv", ".xlsx"))
if not files:
typer.echo("No files found in drop directory.")
return
from rock_cosmetics_ingest.ingest import ingest_file
for f in files:
typer.echo(f"Processing {f.name}")
try:
result = ingest_file(f, settings)
if result.skipped:
typer.echo(f" Skipped (already ingested as {result.import_id})")
continue
typer.echo(f" Ingested {result.row_count} rows → import {result.import_id}")
except Exception as exc:
typer.echo(f" ERROR: {exc}", err=True)
@app.command()
def db_status() -> None:
"""Show lead counts by match status."""
settings = load_settings()
_setup_logging(settings.log_level)
from rock_cosmetics_ingest.db import connect, qualified
from psycopg.sql import SQL
sql = SQL(
"""
SELECT l.match_status, count(*) AS n
FROM {leads} l
GROUP BY l.match_status
ORDER BY l.match_status
"""
).format(leads=qualified(settings.rock_schema, "rock_cosmetics_leads"))
with connect(settings) as conn:
with conn.cursor() as cur:
cur.execute(sql)
rows = cur.fetchall()
if not rows:
typer.echo("No leads found.")
return
total = 0
for row in rows:
typer.echo(f" {row['match_status']:<14} {row['n']:>6}")
total += row["n"]
typer.echo(f" {'TOTAL':<14} {total:>6}")
if __name__ == "__main__":
app()

View file

@ -0,0 +1,46 @@
"""
Adit export column mapping for Rock Cosmetics (#1784).
Fill in source-header keys (commented placeholders) once the Adit CSV export
lands via Sir Alex Therrien (info@lionsalesfunnels.com on app.adit.com).
Export preference: CSV > xls/xlsx > PDF (last resort).
Contact: Judy Tran / advperio28@gmail.com (signs as Judy Smith).
Target field names are standardised to match the APID Attribution Dashboard schema.
"""
from __future__ import annotations
# ── Adit export → internal field mapping ─────────────────────────────────────
# Keys = exact column header strings from the Adit CSV/XLSX export.
# Values = internal field names used throughout this pipeline.
# Fill once the export sample is available.
COLUMN_MAP: dict[str, str] = {
# --- Patient identity (required for GHL name + DOB matching) ---
# "<Adit header>": "first_name",
# "<Adit header>": "last_name",
# "<Adit header>": "dob", # format TBD — MM/DD/YYYY likely
# --- Appointment / lead data ---
# "<Adit header>": "appointment_date",
# "<Adit header>": "appointment_status", # scheduled / completed / cancelled
# "<Adit header>": "procedure_type",
# "<Adit header>": "lead_source", # referral channel from Adit
# --- Campaign / attribution ---
# "<Adit header>": "campaign",
# "<Adit header>": "report_date",
# --- Performance metrics (include if present in Adit export) ---
# "<Adit header>": "impressions",
# "<Adit header>": "clicks",
# "<Adit header>": "cost_usd",
# "<Adit header>": "conversions",
# "<Adit header>": "ctr",
# "<Adit header>": "avg_cpc_usd",
}
# ── Field type sets used by ingest.py coercion ────────────────────────────────
DATE_FIELDS: frozenset[str] = frozenset({"dob", "report_date", "appointment_date"})
INT_FIELDS: frozenset[str] = frozenset({"impressions", "clicks", "conversions"})
DECIMAL_FIELDS: frozenset[str] = frozenset({"cost_usd", "avg_cpc_usd", "ctr"})

View file

@ -0,0 +1,37 @@
"""Runtime configuration sourced from environment / .env."""
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
case_sensitive=False,
)
database_url: str = Field(..., alias="DATABASE_URL")
# Schema where Rock Cosmetics Adit data is written.
rock_schema: str = Field(default="rock_cosmetics", alias="ROCK_SCHEMA")
# Schema where GHL CRM data lives (for patient matching).
ghl_schema: str = Field(default="ghl", alias="GHL_SCHEMA")
# GHL clinic filter — ClinicNum for Rock Cosmetics. TBD until Sir Alex Therrien
# confirms from Adit. Set GHL_CLINIC_NUM in .env once confirmed.
ghl_clinic_custom_field: str = Field(
default="D3Mm4PHTcOSZrEgwpgXb", alias="GHL_CLINIC_CUSTOM_FIELD"
)
ghl_clinic_num: int = Field(default=0, alias="GHL_CLINIC_NUM")
# Directory where Adit export files land before ingestion.
drop_dir: str = Field(default="./drop", alias="DROP_DIR")
log_level: str = Field(default="INFO", alias="LOG_LEVEL")
def load_settings() -> Settings:
return Settings() # type: ignore[call-arg]

View file

@ -0,0 +1,53 @@
"""Postgres connection helpers."""
import logging
import time
from collections.abc import Iterator
from contextlib import contextmanager
from typing import cast
from psycopg import Connection, OperationalError
from psycopg.rows import DictRow, dict_row
from psycopg.sql import SQL, Composed, Identifier
from rock_cosmetics_ingest.config import Settings
logger = logging.getLogger(__name__)
def _connect_with_retry(dsn: str, *, total_timeout_s: float = 30.0) -> Connection[DictRow]:
deadline = time.monotonic() + total_timeout_s
delay = 0.5
last_err: OperationalError | None = None
while True:
try:
return cast(
Connection[DictRow],
Connection.connect(dsn, row_factory=dict_row), # pyright: ignore[reportArgumentType]
)
except OperationalError as exc:
last_err = exc
remaining = deadline - time.monotonic()
if remaining <= 0:
raise
logger.warning("db not reachable (%s); retrying in %.1fs", exc, delay)
time.sleep(min(delay, remaining))
delay = min(delay * 2, 5.0)
raise RuntimeError("connect loop exited without success") from last_err # type: ignore[unreachable]
@contextmanager
def connect(settings: Settings) -> Iterator[Connection[DictRow]]:
conn = _connect_with_retry(settings.database_url)
try:
yield conn
finally:
conn.close()
def schema_ident(schema: str) -> Identifier:
return Identifier(schema)
def qualified(schema: str, table: str) -> Composed:
return SQL("{}.{}").format(Identifier(schema), Identifier(table))

View file

@ -0,0 +1,272 @@
"""
Read Adit export files (CSV or XLSX) and insert rows into rock_cosmetics_leads.
Flow:
1. Hash the file reject if already ingested (idempotent).
2. Create a rock_cosmetics_imports record.
3. Parse every data row; apply COLUMN_MAP normalization.
4. Compute row_key (SHA-256 of normalised identity fields) + report_period.
5. Upsert into rock_cosmetics_leads skip rows already seen in this period.
6. Mark the import complete.
Export source: Adit (app.adit.com), login info@lionsalesfunnels.com.
Contact: Judy Tran / advperio28@gmail.com.
Dependency: manual export flag for automation in a follow-up task (#1784 note).
"""
from __future__ import annotations
import csv
import hashlib
import json
import logging
import re
import unicodedata
from dataclasses import dataclass, field
from datetime import date, timedelta
from decimal import Decimal, InvalidOperation
from pathlib import Path
from typing import Any
from psycopg import Connection
from psycopg.rows import DictRow
from psycopg.sql import SQL
from rock_cosmetics_ingest.column_map import COLUMN_MAP, DATE_FIELDS, DECIMAL_FIELDS, INT_FIELDS
from rock_cosmetics_ingest.config import Settings
from rock_cosmetics_ingest.db import connect, qualified
logger = logging.getLogger(__name__)
_DATE_FORMATS = ["%Y-%m-%d", "%m/%d/%Y", "%m-%d-%Y", "%d/%m/%Y"]
_WS_RE = re.compile(r"\s+")
def _norm_str(s: str | None) -> str:
if not s:
return ""
s = unicodedata.normalize("NFC", s.strip().lower())
return _WS_RE.sub(" ", s)
def _sunday_of(d: date) -> date:
return d - timedelta(days=(d.weekday() + 1) % 7)
def _compute_row_key(norm: dict[str, Any]) -> str | None:
first = _norm_str(norm.get("first_name"))
last = _norm_str(norm.get("last_name"))
dob = norm.get("dob")
if not first or not last or not dob:
return None
raw = f"{first}|{last}|{dob!s}".encode()
return hashlib.sha256(raw).hexdigest()
def _infer_report_period(norm: dict[str, Any], import_date: date) -> date | None:
rd = norm.get("report_date")
if isinstance(rd, date):
return _sunday_of(rd)
return _sunday_of(import_date)
def _parse_date(val: str) -> date | None:
from datetime import datetime
val = val.strip()
if not val:
return None
for fmt in _DATE_FORMATS:
try:
return datetime.strptime(val, fmt).date()
except ValueError:
continue
logger.warning("could not parse date %r — stored as None", val)
return None
def _coerce(field_name: str, val: str) -> Any:
val = val.strip()
if field_name in DATE_FIELDS:
return _parse_date(val)
if field_name in INT_FIELDS:
if not val:
return None
try:
return int(val.replace(",", ""))
except ValueError:
return None
if field_name in DECIMAL_FIELDS:
if not val:
return None
try:
return Decimal(val.replace("$", "").replace(",", "").replace("%", "").strip())
except InvalidOperation:
return None
return val or None
def _normalize_row(raw: dict[str, str], column_map: dict[str, str]) -> dict[str, Any]:
out: dict[str, Any] = {}
for src_col, val in raw.items():
internal = column_map.get(src_col)
if internal:
out[internal] = _coerce(internal, val)
return out
@dataclass
class IngestResult:
import_id: str
filename: str
row_count: int
skipped: bool = False
errors: list[str] = field(default_factory=list)
def _read_xlsx(path: Path) -> list[dict[str, str]]:
try:
import openpyxl
except ImportError as e:
raise RuntimeError("openpyxl required for .xlsx: pip install openpyxl") from e
wb = openpyxl.load_workbook(path, read_only=True, data_only=True)
ws = wb.active
rows = list(ws.iter_rows(values_only=True)) # type: ignore[union-attr]
wb.close()
if not rows:
return []
headers = [str(h).strip() if h is not None else "" for h in rows[0]]
return [{headers[i]: (str(v).strip() if v is not None else "") for i, v in enumerate(row)} for row in rows[1:]]
def _read_csv(path: Path) -> list[dict[str, str]]:
with path.open(newline="", encoding="utf-8-sig") as f:
return [dict(row) for row in csv.DictReader(f)]
def _file_hash(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
def _already_ingested(conn: Connection[DictRow], schema: str, file_hash: str) -> str | None:
with conn.cursor() as cur:
cur.execute(
SQL("SELECT id FROM {tbl} WHERE file_hash = %s").format(
tbl=qualified(schema, "rock_cosmetics_imports")
),
(file_hash,),
)
row = cur.fetchone()
return str(row["id"]) if row else None
def _create_import(conn: Connection[DictRow], schema: str, filename: str, file_hash: str) -> str:
with conn.cursor() as cur:
cur.execute(
SQL(
"INSERT INTO {tbl} (filename, file_hash, status)"
" VALUES (%s, %s, 'pending') RETURNING id"
).format(tbl=qualified(schema, "rock_cosmetics_imports")),
(filename, file_hash),
)
row = cur.fetchone()
conn.commit()
return str(row["id"]) # type: ignore[index]
def _insert_leads(
conn: Connection[DictRow],
schema: str,
import_id: str,
raw_rows: list[dict[str, str]],
column_map: dict[str, str],
) -> int:
cols = [
"import_id", "row_num", "raw_data",
"first_name", "last_name", "dob",
"appointment_date", "appointment_status", "procedure_type", "lead_source",
"campaign", "report_date",
"impressions", "clicks", "cost_usd", "conversions", "ctr", "avg_cpc_usd",
"row_key", "report_period",
]
placeholders = ", ".join(["%s"] * len(cols))
insert = SQL(
f"INSERT INTO {{tbl}} ({', '.join(cols)}) VALUES ({placeholders})"
" ON CONFLICT (row_key, report_period) WHERE row_key IS NOT NULL AND report_period IS NOT NULL DO NOTHING"
).format(tbl=qualified(schema, "rock_cosmetics_leads"))
today = date.today()
inserted = 0
with conn.cursor() as cur:
for i, raw in enumerate(raw_rows, start=1):
n = _normalize_row(raw, column_map)
row_key = _compute_row_key(n)
report_period = _infer_report_period(n, today)
cur.execute(insert, (
import_id, i, json.dumps(raw),
n.get("first_name"), n.get("last_name"), n.get("dob"),
n.get("appointment_date"), n.get("appointment_status"),
n.get("procedure_type"), n.get("lead_source"),
n.get("campaign"), n.get("report_date"),
n.get("impressions"), n.get("clicks"), n.get("cost_usd"),
n.get("conversions"), n.get("ctr"), n.get("avg_cpc_usd"),
row_key, report_period,
))
inserted += cur.rowcount
conn.commit()
return inserted
def _finish_import(
conn: Connection[DictRow], schema: str, import_id: str, row_count: int, error: str | None = None
) -> None:
with conn.cursor() as cur:
cur.execute(
SQL("UPDATE {tbl} SET status=%s, row_count=%s, error_msg=%s WHERE id=%s::uuid").format(
tbl=qualified(schema, "rock_cosmetics_imports")
),
("error" if error else "complete", row_count, error, import_id),
)
conn.commit()
def ingest_file(path: Path, settings: Settings, column_map: dict[str, str] | None = None) -> IngestResult:
if column_map is None:
column_map = COLUMN_MAP
schema = settings.rock_schema
suffix = path.suffix.lower()
if suffix in (".xlsx", ".xls"):
raw_rows = _read_xlsx(path)
elif suffix == ".csv":
raw_rows = _read_csv(path)
else:
raise ValueError(f"Unsupported file type {suffix!r}. Supported: .csv, .xlsx")
file_hash = _file_hash(path)
with connect(settings) as conn:
existing = _already_ingested(conn, schema, file_hash)
if existing:
logger.info("file %s already ingested as import %s — skipping", path.name, existing)
return IngestResult(import_id=existing, filename=path.name, row_count=len(raw_rows), skipped=True)
import_id = _create_import(conn, schema, path.name, file_hash)
logger.info("created import %s for %s (%d rows)", import_id, path.name, len(raw_rows))
try:
inserted = _insert_leads(conn, schema, import_id, raw_rows, column_map)
skipped_rows = len(raw_rows) - inserted
if skipped_rows:
logger.info("%d row(s) skipped — already seen in this period", skipped_rows)
_finish_import(conn, schema, import_id, inserted)
except Exception as exc:
_finish_import(conn, schema, import_id, 0, error=str(exc))
raise
logger.info("ingested %d / %d rows from %s", inserted, len(raw_rows), path.name)
return IngestResult(import_id=import_id, filename=path.name, row_count=inserted)

View file

@ -0,0 +1,119 @@
"""SQL migration runner."""
import hashlib
import logging
import re
from dataclasses import dataclass
from importlib import resources
from importlib.resources.abc import Traversable
from psycopg import Connection
from psycopg.rows import DictRow
from psycopg.sql import SQL
from rock_cosmetics_ingest.config import Settings
from rock_cosmetics_ingest.db import connect, qualified, schema_ident
logger = logging.getLogger(__name__)
_VERSION_RE = re.compile(r"^(\d{4})_([a-z0-9_]+)\.sql$")
_PLACEHOLDER = "{{schema}}"
_GHL_PLACEHOLDER = "{{ghl_schema}}"
@dataclass(frozen=True, slots=True)
class Migration:
version: str
name: str
raw_sql: str
def render(self, schema: str, ghl_schema: str = "ghl") -> str:
sql = self.raw_sql.replace(_PLACEHOLDER, '"' + schema.replace('"', '""') + '"')
return sql.replace(_GHL_PLACEHOLDER, '"' + ghl_schema.replace('"', '""') + '"')
def sha256(self, schema: str, ghl_schema: str = "ghl") -> str:
return hashlib.sha256(self.render(schema, ghl_schema).encode("utf-8")).hexdigest()
def _migrations_dir() -> Traversable:
return resources.files("rock_cosmetics_ingest").joinpath("migrations")
def discover_migrations() -> list[Migration]:
out: list[Migration] = []
for entry in sorted(_migrations_dir().iterdir(), key=lambda p: p.name):
m = _VERSION_RE.match(entry.name)
if not m:
continue
out.append(Migration(version=m.group(1), name=m.group(2), raw_sql=entry.read_text(encoding="utf-8")))
return out
def _ensure_schema_and_table(conn: Connection[DictRow], schema: str) -> None:
with conn.cursor() as cur:
cur.execute(SQL("CREATE SCHEMA IF NOT EXISTS {}").format(schema_ident(schema)))
cur.execute(
SQL(
"""
CREATE TABLE IF NOT EXISTS {tbl} (
version text PRIMARY KEY,
name text NOT NULL,
sha256 text NOT NULL,
applied_at timestamptz NOT NULL DEFAULT now()
)
"""
).format(tbl=qualified(schema, "_migration"))
)
conn.commit()
def _applied(conn: Connection[DictRow], schema: str) -> dict[str, str]:
with conn.cursor() as cur:
cur.execute(SQL("SELECT version, sha256 FROM {tbl}").format(tbl=qualified(schema, "_migration")))
rows = cur.fetchall()
return {row["version"]: row["sha256"] for row in rows}
def apply_migrations(settings: Settings) -> list[str]:
schema = settings.rock_schema
migrations = discover_migrations()
insert_sql = SQL("INSERT INTO {tbl} (version, name, sha256) VALUES (%s, %s, %s)").format(
tbl=qualified(schema, "_migration")
)
ghl_schema = settings.ghl_schema
applied_now: list[str] = []
with connect(settings) as conn:
_ensure_schema_and_table(conn, schema)
already = _applied(conn, schema)
for mig in migrations:
expected_hash = mig.sha256(schema, ghl_schema)
if mig.version in already:
if already[mig.version] != expected_hash:
raise RuntimeError(
f"Migration {mig.version}_{mig.name} edited after being applied. "
"Write a new migration instead."
)
continue
logger.info("applying migration %s_%s", mig.version, mig.name)
with conn.cursor() as cur:
cur.execute(mig.render(schema, ghl_schema).encode("utf-8"))
cur.execute(insert_sql, (mig.version, mig.name, expected_hash))
conn.commit()
applied_now.append(mig.version)
if applied_now:
logger.info("applied %d migration(s): %s", len(applied_now), applied_now)
else:
logger.info("no pending migrations")
return applied_now
def status(settings: Settings) -> list[tuple[str, str, bool]]:
schema = settings.rock_schema
migrations = discover_migrations()
with connect(settings) as conn:
_ensure_schema_and_table(conn, schema)
already = _applied(conn, schema)
return [(m.version, m.name, m.version in already) for m in migrations]
__all__ = ["Migration", "apply_migrations", "discover_migrations", "status"]

View file

@ -0,0 +1,11 @@
-- 0001 — schema bootstrap and shared trigger function.
CREATE SCHEMA IF NOT EXISTS {{schema}};
CREATE OR REPLACE FUNCTION {{schema}}.set_synced_at()
RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
NEW.synced_at = now();
RETURN NEW;
END;
$$;

View file

@ -0,0 +1,13 @@
-- 0002 — import tracking table.
-- Each Adit export file gets one row; idempotent re-ingest via file_hash.
CREATE TABLE IF NOT EXISTS {{schema}}.rock_cosmetics_imports (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
filename text NOT NULL,
file_hash text NOT NULL UNIQUE,
status text NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending', 'complete', 'error')),
row_count int,
error_msg text,
imported_at timestamptz NOT NULL DEFAULT now()
);

View file

@ -0,0 +1,55 @@
-- 0003 — leads table.
-- One row per Adit export row. row_key + report_period dedup prevents
-- double-counting if the same export is re-ingested or re-sent.
-- Column map placeholders will fill in as the Adit export format is confirmed.
CREATE TABLE IF NOT EXISTS {{schema}}.rock_cosmetics_leads (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
import_id uuid NOT NULL REFERENCES {{schema}}.rock_cosmetics_imports(id),
row_num int NOT NULL,
raw_data jsonb,
-- Patient identity (populated once COLUMN_MAP is filled)
first_name text,
last_name text,
dob date,
-- Appointment / lead data
appointment_date date,
appointment_status text,
procedure_type text,
lead_source text,
-- Campaign attribution
campaign text,
report_date date,
-- Performance metrics
impressions int,
clicks int,
cost_usd numeric(12, 2),
conversions int,
ctr numeric(8, 4),
avg_cpc_usd numeric(12, 2),
-- GHL patient matching
match_status text NOT NULL DEFAULT 'pending'
CHECK (match_status IN ('pending', 'matched', 'review', 'unmatched', 'no_usable_dob')),
matched_contact_id text,
matched_at timestamptz,
-- Dedup
row_key text,
report_period date,
created_at timestamptz NOT NULL DEFAULT now(),
UNIQUE (row_key, report_period) WHERE row_key IS NOT NULL AND report_period IS NOT NULL
);
CREATE INDEX IF NOT EXISTS rock_cosmetics_leads_import_id_idx
ON {{schema}}.rock_cosmetics_leads (import_id);
CREATE INDEX IF NOT EXISTS rock_cosmetics_leads_match_status_idx
ON {{schema}}.rock_cosmetics_leads (match_status)
WHERE match_status = 'pending';

0
tests/__init__.py Normal file
View file