commit d63cfe81c681fa2edc7998e3fef7d22c427ca0ef Author: eugine.lopez Date: Sat Jun 20 02:43:03 2026 +0800 feat: initial scaffold for Rock Cosmetics Adit ingestion (#1784) - pyproject.toml with rock-ingest CLI entrypoint - config, db, ingest, migrate modules mirroring agency-marketing-ingest patterns - column_map.py with stubbed COLUMN_MAP — fill once Adit CSV export lands - 3 SQL migrations: schema bootstrap, imports tracking, leads table with GHL match_status - drop/ directory for Adit export files (CSV first, xlsx fallback) - GHL_CLINIC_NUM TBD pending Sir Alex Therrien confirmation diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b0a1c74 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +.env +__pycache__/ +*.pyc +*.pyo +.venv/ +dist/ +*.egg-info/ +.ruff_cache/ +.pyright/ diff --git a/drop/.gitkeep b/drop/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..3aea3b7 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,52 @@ +[project] +name = "rock-cosmetics-ingestion" +version = "0.1.0" +description = "Adit export ingestion for Rock Cosmetics — feeds APID Attribution Dashboard (#1784)." +readme = "README.md" +requires-python = ">=3.12" +license = { text = "Proprietary" } +authors = [{ name = "Eugine Lopez" }] +dependencies = [ + "openpyxl>=3.1", + "psycopg[binary,pool]>=3.2", + "pydantic>=2.9", + "pydantic-settings>=2.6", + "typer>=0.13", +] + +[project.scripts] +rock-ingest = "rock_cosmetics_ingest.cli:app" + +[dependency-groups] +dev = [ + "pytest>=8.3", + "pyright>=1.1", + "ruff>=0.7", +] + +[build-system] +requires = ["uv_build>=0.4"] +build-backend = "uv_build" + +[tool.uv.build-backend] +module-name = "rock_cosmetics_ingest" + +[tool.ruff] +target-version = "py312" +line-length = 100 +src = ["src", "tests"] + +[tool.ruff.lint] +select = ["E", "W", "F", "I", "B", "UP", "SIM", "RUF"] +ignore = ["E501"] + +[tool.pyright] +include = ["src", "tests"] +typeCheckingMode = "basic" +pythonVersion = "3.12" +reportMissingImports = "error" +reportMissingTypeStubs = false + +[tool.pytest.ini_options] +testpaths = ["tests"] +addopts = "-ra --strict-markers" diff --git a/src/rock_cosmetics_ingest/__init__.py b/src/rock_cosmetics_ingest/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/rock_cosmetics_ingest/cli.py b/src/rock_cosmetics_ingest/cli.py new file mode 100644 index 0000000..c791783 --- /dev/null +++ b/src/rock_cosmetics_ingest/cli.py @@ -0,0 +1,132 @@ +"""CLI entry point — `rock-ingest `.""" + +from __future__ import annotations + +import logging +from pathlib import Path + +import typer + +from rock_cosmetics_ingest.config import load_settings + +app = typer.Typer(name="rock-ingest", help="Adit export ingestion for Rock Cosmetics (#1784).") + + +def _setup_logging(level: str) -> None: + logging.basicConfig( + level=getattr(logging, level.upper(), logging.INFO), + format="%(asctime)s %(levelname)s %(name)s — %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S", + ) + + +@app.command() +def migrate() -> None: + """Run pending SQL migrations.""" + settings = load_settings() + _setup_logging(settings.log_level) + from rock_cosmetics_ingest.migrate import apply_migrations + applied = apply_migrations(settings) + typer.echo(f"Applied {len(applied)} migration(s): {applied}" if applied else "No pending migrations.") + + +@app.command() +def migrate_status() -> None: + """Show migration status.""" + settings = load_settings() + _setup_logging(settings.log_level) + from rock_cosmetics_ingest.migrate import status + for version, name, applied in status(settings): + typer.echo(f" [{'x' if applied else ' '}] {version}_{name}") + + +@app.command() +def ingest( + file: Path = typer.Argument(..., help="CSV or XLSX Adit export file"), + match_after: bool = typer.Option(True, "--match/--no-match", help="Run GHL patient matching after ingest"), +) -> None: + """Ingest a single Adit export file.""" + settings = load_settings() + _setup_logging(settings.log_level) + + if not file.exists(): + typer.echo(f"File not found: {file}", err=True) + raise typer.Exit(1) + + from rock_cosmetics_ingest.ingest import ingest_file + result = ingest_file(file, settings) + + if result.skipped: + typer.echo(f"Skipped — already ingested as import {result.import_id}") + return + + typer.echo(f"Ingested {result.row_count} rows → import {result.import_id}") + + if match_after: + typer.echo("Patient matching not yet wired — run `rock-ingest match` once GHL_CLINIC_NUM is confirmed.") + + +@app.command() +def ingest_drop() -> None: + """Ingest all new CSV/XLSX files in DROP_DIR.""" + settings = load_settings() + _setup_logging(settings.log_level) + drop = Path(settings.drop_dir) + if not drop.is_dir(): + typer.echo(f"DROP_DIR {drop} does not exist.", err=True) + raise typer.Exit(1) + + files = sorted(p for p in drop.iterdir() if p.suffix.lower() in (".csv", ".xlsx")) + if not files: + typer.echo("No files found in drop directory.") + return + + from rock_cosmetics_ingest.ingest import ingest_file + + for f in files: + typer.echo(f"Processing {f.name} …") + try: + result = ingest_file(f, settings) + if result.skipped: + typer.echo(f" Skipped (already ingested as {result.import_id})") + continue + typer.echo(f" Ingested {result.row_count} rows → import {result.import_id}") + except Exception as exc: + typer.echo(f" ERROR: {exc}", err=True) + + +@app.command() +def db_status() -> None: + """Show lead counts by match status.""" + settings = load_settings() + _setup_logging(settings.log_level) + from rock_cosmetics_ingest.db import connect, qualified + from psycopg.sql import SQL + + sql = SQL( + """ + SELECT l.match_status, count(*) AS n + FROM {leads} l + GROUP BY l.match_status + ORDER BY l.match_status + """ + ).format(leads=qualified(settings.rock_schema, "rock_cosmetics_leads")) + + with connect(settings) as conn: + with conn.cursor() as cur: + cur.execute(sql) + rows = cur.fetchall() + + if not rows: + typer.echo("No leads found.") + return + + total = 0 + for row in rows: + typer.echo(f" {row['match_status']:<14} {row['n']:>6}") + total += row["n"] + typer.echo(f" {'TOTAL':<14} {total:>6}") + + +if __name__ == "__main__": + app() diff --git a/src/rock_cosmetics_ingest/column_map.py b/src/rock_cosmetics_ingest/column_map.py new file mode 100644 index 0000000..2279f90 --- /dev/null +++ b/src/rock_cosmetics_ingest/column_map.py @@ -0,0 +1,46 @@ +""" +Adit export column mapping for Rock Cosmetics (#1784). + +Fill in source-header keys (commented placeholders) once the Adit CSV export +lands via Sir Alex Therrien (info@lionsalesfunnels.com on app.adit.com). +Export preference: CSV > xls/xlsx > PDF (last resort). +Contact: Judy Tran / advperio28@gmail.com (signs as Judy Smith). + +Target field names are standardised to match the APID Attribution Dashboard schema. +""" + +from __future__ import annotations + +# ── Adit export → internal field mapping ───────────────────────────────────── +# Keys = exact column header strings from the Adit CSV/XLSX export. +# Values = internal field names used throughout this pipeline. +# Fill once the export sample is available. +COLUMN_MAP: dict[str, str] = { + # --- Patient identity (required for GHL name + DOB matching) --- + # "": "first_name", + # "": "last_name", + # "": "dob", # format TBD — MM/DD/YYYY likely + + # --- Appointment / lead data --- + # "": "appointment_date", + # "": "appointment_status", # scheduled / completed / cancelled + # "": "procedure_type", + # "": "lead_source", # referral channel from Adit + + # --- Campaign / attribution --- + # "": "campaign", + # "": "report_date", + + # --- Performance metrics (include if present in Adit export) --- + # "": "impressions", + # "": "clicks", + # "": "cost_usd", + # "": "conversions", + # "": "ctr", + # "": "avg_cpc_usd", +} + +# ── Field type sets used by ingest.py coercion ──────────────────────────────── +DATE_FIELDS: frozenset[str] = frozenset({"dob", "report_date", "appointment_date"}) +INT_FIELDS: frozenset[str] = frozenset({"impressions", "clicks", "conversions"}) +DECIMAL_FIELDS: frozenset[str] = frozenset({"cost_usd", "avg_cpc_usd", "ctr"}) diff --git a/src/rock_cosmetics_ingest/config.py b/src/rock_cosmetics_ingest/config.py new file mode 100644 index 0000000..8f7d1af --- /dev/null +++ b/src/rock_cosmetics_ingest/config.py @@ -0,0 +1,37 @@ +"""Runtime configuration sourced from environment / .env.""" + +from pydantic import Field +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + model_config = SettingsConfigDict( + env_file=".env", + env_file_encoding="utf-8", + extra="ignore", + case_sensitive=False, + ) + + database_url: str = Field(..., alias="DATABASE_URL") + + # Schema where Rock Cosmetics Adit data is written. + rock_schema: str = Field(default="rock_cosmetics", alias="ROCK_SCHEMA") + + # Schema where GHL CRM data lives (for patient matching). + ghl_schema: str = Field(default="ghl", alias="GHL_SCHEMA") + + # GHL clinic filter — ClinicNum for Rock Cosmetics. TBD until Sir Alex Therrien + # confirms from Adit. Set GHL_CLINIC_NUM in .env once confirmed. + ghl_clinic_custom_field: str = Field( + default="D3Mm4PHTcOSZrEgwpgXb", alias="GHL_CLINIC_CUSTOM_FIELD" + ) + ghl_clinic_num: int = Field(default=0, alias="GHL_CLINIC_NUM") + + # Directory where Adit export files land before ingestion. + drop_dir: str = Field(default="./drop", alias="DROP_DIR") + + log_level: str = Field(default="INFO", alias="LOG_LEVEL") + + +def load_settings() -> Settings: + return Settings() # type: ignore[call-arg] diff --git a/src/rock_cosmetics_ingest/db.py b/src/rock_cosmetics_ingest/db.py new file mode 100644 index 0000000..07104c9 --- /dev/null +++ b/src/rock_cosmetics_ingest/db.py @@ -0,0 +1,53 @@ +"""Postgres connection helpers.""" + +import logging +import time +from collections.abc import Iterator +from contextlib import contextmanager +from typing import cast + +from psycopg import Connection, OperationalError +from psycopg.rows import DictRow, dict_row +from psycopg.sql import SQL, Composed, Identifier + +from rock_cosmetics_ingest.config import Settings + +logger = logging.getLogger(__name__) + + +def _connect_with_retry(dsn: str, *, total_timeout_s: float = 30.0) -> Connection[DictRow]: + deadline = time.monotonic() + total_timeout_s + delay = 0.5 + last_err: OperationalError | None = None + while True: + try: + return cast( + Connection[DictRow], + Connection.connect(dsn, row_factory=dict_row), # pyright: ignore[reportArgumentType] + ) + except OperationalError as exc: + last_err = exc + remaining = deadline - time.monotonic() + if remaining <= 0: + raise + logger.warning("db not reachable (%s); retrying in %.1fs", exc, delay) + time.sleep(min(delay, remaining)) + delay = min(delay * 2, 5.0) + raise RuntimeError("connect loop exited without success") from last_err # type: ignore[unreachable] + + +@contextmanager +def connect(settings: Settings) -> Iterator[Connection[DictRow]]: + conn = _connect_with_retry(settings.database_url) + try: + yield conn + finally: + conn.close() + + +def schema_ident(schema: str) -> Identifier: + return Identifier(schema) + + +def qualified(schema: str, table: str) -> Composed: + return SQL("{}.{}").format(Identifier(schema), Identifier(table)) diff --git a/src/rock_cosmetics_ingest/ingest.py b/src/rock_cosmetics_ingest/ingest.py new file mode 100644 index 0000000..c30734d --- /dev/null +++ b/src/rock_cosmetics_ingest/ingest.py @@ -0,0 +1,272 @@ +""" +Read Adit export files (CSV or XLSX) and insert rows into rock_cosmetics_leads. + +Flow: + 1. Hash the file — reject if already ingested (idempotent). + 2. Create a rock_cosmetics_imports record. + 3. Parse every data row; apply COLUMN_MAP normalization. + 4. Compute row_key (SHA-256 of normalised identity fields) + report_period. + 5. Upsert into rock_cosmetics_leads — skip rows already seen in this period. + 6. Mark the import complete. + +Export source: Adit (app.adit.com), login info@lionsalesfunnels.com. +Contact: Judy Tran / advperio28@gmail.com. +Dependency: manual export — flag for automation in a follow-up task (#1784 note). +""" + +from __future__ import annotations + +import csv +import hashlib +import json +import logging +import re +import unicodedata +from dataclasses import dataclass, field +from datetime import date, timedelta +from decimal import Decimal, InvalidOperation +from pathlib import Path +from typing import Any + +from psycopg import Connection +from psycopg.rows import DictRow +from psycopg.sql import SQL + +from rock_cosmetics_ingest.column_map import COLUMN_MAP, DATE_FIELDS, DECIMAL_FIELDS, INT_FIELDS +from rock_cosmetics_ingest.config import Settings +from rock_cosmetics_ingest.db import connect, qualified + +logger = logging.getLogger(__name__) + +_DATE_FORMATS = ["%Y-%m-%d", "%m/%d/%Y", "%m-%d-%Y", "%d/%m/%Y"] +_WS_RE = re.compile(r"\s+") + + +def _norm_str(s: str | None) -> str: + if not s: + return "" + s = unicodedata.normalize("NFC", s.strip().lower()) + return _WS_RE.sub(" ", s) + + +def _sunday_of(d: date) -> date: + return d - timedelta(days=(d.weekday() + 1) % 7) + + +def _compute_row_key(norm: dict[str, Any]) -> str | None: + first = _norm_str(norm.get("first_name")) + last = _norm_str(norm.get("last_name")) + dob = norm.get("dob") + if not first or not last or not dob: + return None + raw = f"{first}|{last}|{dob!s}".encode() + return hashlib.sha256(raw).hexdigest() + + +def _infer_report_period(norm: dict[str, Any], import_date: date) -> date | None: + rd = norm.get("report_date") + if isinstance(rd, date): + return _sunday_of(rd) + return _sunday_of(import_date) + + +def _parse_date(val: str) -> date | None: + from datetime import datetime + val = val.strip() + if not val: + return None + for fmt in _DATE_FORMATS: + try: + return datetime.strptime(val, fmt).date() + except ValueError: + continue + logger.warning("could not parse date %r — stored as None", val) + return None + + +def _coerce(field_name: str, val: str) -> Any: + val = val.strip() + if field_name in DATE_FIELDS: + return _parse_date(val) + if field_name in INT_FIELDS: + if not val: + return None + try: + return int(val.replace(",", "")) + except ValueError: + return None + if field_name in DECIMAL_FIELDS: + if not val: + return None + try: + return Decimal(val.replace("$", "").replace(",", "").replace("%", "").strip()) + except InvalidOperation: + return None + return val or None + + +def _normalize_row(raw: dict[str, str], column_map: dict[str, str]) -> dict[str, Any]: + out: dict[str, Any] = {} + for src_col, val in raw.items(): + internal = column_map.get(src_col) + if internal: + out[internal] = _coerce(internal, val) + return out + + +@dataclass +class IngestResult: + import_id: str + filename: str + row_count: int + skipped: bool = False + errors: list[str] = field(default_factory=list) + + +def _read_xlsx(path: Path) -> list[dict[str, str]]: + try: + import openpyxl + except ImportError as e: + raise RuntimeError("openpyxl required for .xlsx: pip install openpyxl") from e + wb = openpyxl.load_workbook(path, read_only=True, data_only=True) + ws = wb.active + rows = list(ws.iter_rows(values_only=True)) # type: ignore[union-attr] + wb.close() + if not rows: + return [] + headers = [str(h).strip() if h is not None else "" for h in rows[0]] + return [{headers[i]: (str(v).strip() if v is not None else "") for i, v in enumerate(row)} for row in rows[1:]] + + +def _read_csv(path: Path) -> list[dict[str, str]]: + with path.open(newline="", encoding="utf-8-sig") as f: + return [dict(row) for row in csv.DictReader(f)] + + +def _file_hash(path: Path) -> str: + h = hashlib.sha256() + with path.open("rb") as f: + for chunk in iter(lambda: f.read(65536), b""): + h.update(chunk) + return h.hexdigest() + + +def _already_ingested(conn: Connection[DictRow], schema: str, file_hash: str) -> str | None: + with conn.cursor() as cur: + cur.execute( + SQL("SELECT id FROM {tbl} WHERE file_hash = %s").format( + tbl=qualified(schema, "rock_cosmetics_imports") + ), + (file_hash,), + ) + row = cur.fetchone() + return str(row["id"]) if row else None + + +def _create_import(conn: Connection[DictRow], schema: str, filename: str, file_hash: str) -> str: + with conn.cursor() as cur: + cur.execute( + SQL( + "INSERT INTO {tbl} (filename, file_hash, status)" + " VALUES (%s, %s, 'pending') RETURNING id" + ).format(tbl=qualified(schema, "rock_cosmetics_imports")), + (filename, file_hash), + ) + row = cur.fetchone() + conn.commit() + return str(row["id"]) # type: ignore[index] + + +def _insert_leads( + conn: Connection[DictRow], + schema: str, + import_id: str, + raw_rows: list[dict[str, str]], + column_map: dict[str, str], +) -> int: + cols = [ + "import_id", "row_num", "raw_data", + "first_name", "last_name", "dob", + "appointment_date", "appointment_status", "procedure_type", "lead_source", + "campaign", "report_date", + "impressions", "clicks", "cost_usd", "conversions", "ctr", "avg_cpc_usd", + "row_key", "report_period", + ] + placeholders = ", ".join(["%s"] * len(cols)) + insert = SQL( + f"INSERT INTO {{tbl}} ({', '.join(cols)}) VALUES ({placeholders})" + " ON CONFLICT (row_key, report_period) WHERE row_key IS NOT NULL AND report_period IS NOT NULL DO NOTHING" + ).format(tbl=qualified(schema, "rock_cosmetics_leads")) + + today = date.today() + inserted = 0 + with conn.cursor() as cur: + for i, raw in enumerate(raw_rows, start=1): + n = _normalize_row(raw, column_map) + row_key = _compute_row_key(n) + report_period = _infer_report_period(n, today) + cur.execute(insert, ( + import_id, i, json.dumps(raw), + n.get("first_name"), n.get("last_name"), n.get("dob"), + n.get("appointment_date"), n.get("appointment_status"), + n.get("procedure_type"), n.get("lead_source"), + n.get("campaign"), n.get("report_date"), + n.get("impressions"), n.get("clicks"), n.get("cost_usd"), + n.get("conversions"), n.get("ctr"), n.get("avg_cpc_usd"), + row_key, report_period, + )) + inserted += cur.rowcount + conn.commit() + return inserted + + +def _finish_import( + conn: Connection[DictRow], schema: str, import_id: str, row_count: int, error: str | None = None +) -> None: + with conn.cursor() as cur: + cur.execute( + SQL("UPDATE {tbl} SET status=%s, row_count=%s, error_msg=%s WHERE id=%s::uuid").format( + tbl=qualified(schema, "rock_cosmetics_imports") + ), + ("error" if error else "complete", row_count, error, import_id), + ) + conn.commit() + + +def ingest_file(path: Path, settings: Settings, column_map: dict[str, str] | None = None) -> IngestResult: + if column_map is None: + column_map = COLUMN_MAP + + schema = settings.rock_schema + suffix = path.suffix.lower() + + if suffix in (".xlsx", ".xls"): + raw_rows = _read_xlsx(path) + elif suffix == ".csv": + raw_rows = _read_csv(path) + else: + raise ValueError(f"Unsupported file type {suffix!r}. Supported: .csv, .xlsx") + + file_hash = _file_hash(path) + + with connect(settings) as conn: + existing = _already_ingested(conn, schema, file_hash) + if existing: + logger.info("file %s already ingested as import %s — skipping", path.name, existing) + return IngestResult(import_id=existing, filename=path.name, row_count=len(raw_rows), skipped=True) + + import_id = _create_import(conn, schema, path.name, file_hash) + logger.info("created import %s for %s (%d rows)", import_id, path.name, len(raw_rows)) + + try: + inserted = _insert_leads(conn, schema, import_id, raw_rows, column_map) + skipped_rows = len(raw_rows) - inserted + if skipped_rows: + logger.info("%d row(s) skipped — already seen in this period", skipped_rows) + _finish_import(conn, schema, import_id, inserted) + except Exception as exc: + _finish_import(conn, schema, import_id, 0, error=str(exc)) + raise + + logger.info("ingested %d / %d rows from %s", inserted, len(raw_rows), path.name) + return IngestResult(import_id=import_id, filename=path.name, row_count=inserted) diff --git a/src/rock_cosmetics_ingest/migrate.py b/src/rock_cosmetics_ingest/migrate.py new file mode 100644 index 0000000..60f514d --- /dev/null +++ b/src/rock_cosmetics_ingest/migrate.py @@ -0,0 +1,119 @@ +"""SQL migration runner.""" + +import hashlib +import logging +import re +from dataclasses import dataclass +from importlib import resources +from importlib.resources.abc import Traversable + +from psycopg import Connection +from psycopg.rows import DictRow +from psycopg.sql import SQL + +from rock_cosmetics_ingest.config import Settings +from rock_cosmetics_ingest.db import connect, qualified, schema_ident + +logger = logging.getLogger(__name__) + +_VERSION_RE = re.compile(r"^(\d{4})_([a-z0-9_]+)\.sql$") +_PLACEHOLDER = "{{schema}}" +_GHL_PLACEHOLDER = "{{ghl_schema}}" + + +@dataclass(frozen=True, slots=True) +class Migration: + version: str + name: str + raw_sql: str + + def render(self, schema: str, ghl_schema: str = "ghl") -> str: + sql = self.raw_sql.replace(_PLACEHOLDER, '"' + schema.replace('"', '""') + '"') + return sql.replace(_GHL_PLACEHOLDER, '"' + ghl_schema.replace('"', '""') + '"') + + def sha256(self, schema: str, ghl_schema: str = "ghl") -> str: + return hashlib.sha256(self.render(schema, ghl_schema).encode("utf-8")).hexdigest() + + +def _migrations_dir() -> Traversable: + return resources.files("rock_cosmetics_ingest").joinpath("migrations") + + +def discover_migrations() -> list[Migration]: + out: list[Migration] = [] + for entry in sorted(_migrations_dir().iterdir(), key=lambda p: p.name): + m = _VERSION_RE.match(entry.name) + if not m: + continue + out.append(Migration(version=m.group(1), name=m.group(2), raw_sql=entry.read_text(encoding="utf-8"))) + return out + + +def _ensure_schema_and_table(conn: Connection[DictRow], schema: str) -> None: + with conn.cursor() as cur: + cur.execute(SQL("CREATE SCHEMA IF NOT EXISTS {}").format(schema_ident(schema))) + cur.execute( + SQL( + """ + CREATE TABLE IF NOT EXISTS {tbl} ( + version text PRIMARY KEY, + name text NOT NULL, + sha256 text NOT NULL, + applied_at timestamptz NOT NULL DEFAULT now() + ) + """ + ).format(tbl=qualified(schema, "_migration")) + ) + conn.commit() + + +def _applied(conn: Connection[DictRow], schema: str) -> dict[str, str]: + with conn.cursor() as cur: + cur.execute(SQL("SELECT version, sha256 FROM {tbl}").format(tbl=qualified(schema, "_migration"))) + rows = cur.fetchall() + return {row["version"]: row["sha256"] for row in rows} + + +def apply_migrations(settings: Settings) -> list[str]: + schema = settings.rock_schema + migrations = discover_migrations() + insert_sql = SQL("INSERT INTO {tbl} (version, name, sha256) VALUES (%s, %s, %s)").format( + tbl=qualified(schema, "_migration") + ) + ghl_schema = settings.ghl_schema + applied_now: list[str] = [] + with connect(settings) as conn: + _ensure_schema_and_table(conn, schema) + already = _applied(conn, schema) + for mig in migrations: + expected_hash = mig.sha256(schema, ghl_schema) + if mig.version in already: + if already[mig.version] != expected_hash: + raise RuntimeError( + f"Migration {mig.version}_{mig.name} edited after being applied. " + "Write a new migration instead." + ) + continue + logger.info("applying migration %s_%s", mig.version, mig.name) + with conn.cursor() as cur: + cur.execute(mig.render(schema, ghl_schema).encode("utf-8")) + cur.execute(insert_sql, (mig.version, mig.name, expected_hash)) + conn.commit() + applied_now.append(mig.version) + if applied_now: + logger.info("applied %d migration(s): %s", len(applied_now), applied_now) + else: + logger.info("no pending migrations") + return applied_now + + +def status(settings: Settings) -> list[tuple[str, str, bool]]: + schema = settings.rock_schema + migrations = discover_migrations() + with connect(settings) as conn: + _ensure_schema_and_table(conn, schema) + already = _applied(conn, schema) + return [(m.version, m.name, m.version in already) for m in migrations] + + +__all__ = ["Migration", "apply_migrations", "discover_migrations", "status"] diff --git a/src/rock_cosmetics_ingest/migrations/0001_create_schema.sql b/src/rock_cosmetics_ingest/migrations/0001_create_schema.sql new file mode 100644 index 0000000..663e4bb --- /dev/null +++ b/src/rock_cosmetics_ingest/migrations/0001_create_schema.sql @@ -0,0 +1,11 @@ +-- 0001 — schema bootstrap and shared trigger function. + +CREATE SCHEMA IF NOT EXISTS {{schema}}; + +CREATE OR REPLACE FUNCTION {{schema}}.set_synced_at() +RETURNS trigger LANGUAGE plpgsql AS $$ +BEGIN + NEW.synced_at = now(); + RETURN NEW; +END; +$$; diff --git a/src/rock_cosmetics_ingest/migrations/0002_create_imports.sql b/src/rock_cosmetics_ingest/migrations/0002_create_imports.sql new file mode 100644 index 0000000..db442ce --- /dev/null +++ b/src/rock_cosmetics_ingest/migrations/0002_create_imports.sql @@ -0,0 +1,13 @@ +-- 0002 — import tracking table. +-- Each Adit export file gets one row; idempotent re-ingest via file_hash. + +CREATE TABLE IF NOT EXISTS {{schema}}.rock_cosmetics_imports ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + filename text NOT NULL, + file_hash text NOT NULL UNIQUE, + status text NOT NULL DEFAULT 'pending' + CHECK (status IN ('pending', 'complete', 'error')), + row_count int, + error_msg text, + imported_at timestamptz NOT NULL DEFAULT now() +); diff --git a/src/rock_cosmetics_ingest/migrations/0003_create_leads.sql b/src/rock_cosmetics_ingest/migrations/0003_create_leads.sql new file mode 100644 index 0000000..cc17083 --- /dev/null +++ b/src/rock_cosmetics_ingest/migrations/0003_create_leads.sql @@ -0,0 +1,55 @@ +-- 0003 — leads table. +-- One row per Adit export row. row_key + report_period dedup prevents +-- double-counting if the same export is re-ingested or re-sent. +-- Column map placeholders will fill in as the Adit export format is confirmed. + +CREATE TABLE IF NOT EXISTS {{schema}}.rock_cosmetics_leads ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + import_id uuid NOT NULL REFERENCES {{schema}}.rock_cosmetics_imports(id), + row_num int NOT NULL, + raw_data jsonb, + + -- Patient identity (populated once COLUMN_MAP is filled) + first_name text, + last_name text, + dob date, + + -- Appointment / lead data + appointment_date date, + appointment_status text, + procedure_type text, + lead_source text, + + -- Campaign attribution + campaign text, + report_date date, + + -- Performance metrics + impressions int, + clicks int, + cost_usd numeric(12, 2), + conversions int, + ctr numeric(8, 4), + avg_cpc_usd numeric(12, 2), + + -- GHL patient matching + match_status text NOT NULL DEFAULT 'pending' + CHECK (match_status IN ('pending', 'matched', 'review', 'unmatched', 'no_usable_dob')), + matched_contact_id text, + matched_at timestamptz, + + -- Dedup + row_key text, + report_period date, + + created_at timestamptz NOT NULL DEFAULT now(), + + UNIQUE (row_key, report_period) WHERE row_key IS NOT NULL AND report_period IS NOT NULL +); + +CREATE INDEX IF NOT EXISTS rock_cosmetics_leads_import_id_idx + ON {{schema}}.rock_cosmetics_leads (import_id); + +CREATE INDEX IF NOT EXISTS rock_cosmetics_leads_match_status_idx + ON {{schema}}.rock_cosmetics_leads (match_status) + WHERE match_status = 'pending'; diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29