Claude Code · Implementation Spec

Lead Enricher
Implementation Spec

Tài liệu đầy đủ để Claude Code SSH vào Mac mini và triển khai hệ thống crawl email từ danh sách công ty CSV. Đọc hết trước khi bắt đầu.

RuntimeMac mini M4 · Python 3.11+
Primary ToolJina Reader API
Input/OutputCSV local
Scale1k – 5k domains/batch
Est. Cost~$0.07 / 5k leads

Lead Enricher — Technical Implementation Spec

Tài liệu này dành cho Claude Code SSH vào Mac mini để triển khai toàn bộ hệ thống. Đọc hết trước khi bắt đầu. Không bỏ qua bất kỳ bước nào.


0. Context & Mục tiêu

Bài toán: Tự động crawl website từ danh sách công ty (CSV), extract email/phone/social, ghi kết quả vào output CSV.

Scale: 1,000 – 5,000 domains per batch
Target data: Email (ưu tiên nhất), phone, Facebook, LinkedIn, Zalo
Runtime: Mac mini M4, macOS, Python 3.11+
Input: input.csv lưu local
Output: output.csv lưu local (enriched)


1. Environment

Machine : Mac mini M4, macOS Sequoia
Python  : 3.11+ (kiểm tra bằng python3 --version)
Path    : ~/lead-enricher/
User    : chạy dưới user hiện tại, không cần sudo

1.1 Tạo project

mkdir -p ~/lead-enricher/logs
cd ~/lead-enricher
python3 -m venv venv
source venv/bin/activate

1.2 Install dependencies

pip install \
  httpx[asyncio]==0.27.0 \
  beautifulsoup4==4.12.3 \
  dnspython==2.6.1 \
  python-dotenv==1.0.1 \
  pandas==2.2.2 \
  playwright==1.44.0 \
  psutil==5.9.8 \
  lxml==5.2.2

playwright install chromium

1.3 File .env

Tạo file ~/lead-enricher/.env:

JINA_API_KEY=jina_xxxxxxxxxxxxxx

Lấy API key tại: https://jina.ai/reader → Get API Key (free, không cần thẻ)


2. Cấu trúc project

~/lead-enricher/
├── .env
├── config.yaml
├── main.py
├── market.py
├── crawler.py
├── extractor.py
├── validator.py
├── csv_handler.py
├── logs/
├── input.csv          ← user chuẩn bị
└── output.csv         ← generated

3. File config.yaml

# Jina Reader settings
jina:
  base_url: "https://r.jina.ai"
  engine: "browser"          # force Chrome render
  proxy: "auto"              # rotate residential IPs
  timeout: 30                # seconds per request

# Crawl settings
crawl:
  concurrency: 20            # số requests song song (tăng lên 50 nếu RAM > 8GB)
  max_urls_per_domain: 5     # số URL patterns thử per domain
  head_timeout: 5            # seconds cho HEAD check
  retry_max: 2               # số lần retry khi fail
  retry_delay: [5, 15]       # seconds giữa các retry
  batch_write: 50            # ghi CSV mỗi N rows
  rate_limit_delay: 1.0      # seconds giữa requests đến cùng 1 domain

# Playwright fallback
playwright:
  enabled: true
  timeout: 15000             # ms
  js_enabled: false          # tắt JS cho safety, bật nếu cần
  block_resources:
    - "image"
    - "media"
    - "font"
    - "stylesheet"

# CSV
csv:
  input: "input.csv"
  output: "output.csv"
  encoding: "utf-8-sig"      # đọc đúng tiếng Việt trong Excel

# Logging
log:
  level: "INFO"
  file: "logs/run.log"

4. File market.py

Detect thị trường từ country column hoặc TLD → chọn URL pattern set phù hợp.

# market.py
from __future__ import annotations
import re
from urllib.parse import urlparse

# Map TLD → market key
TLD_MAP: dict[str, str] = {
    ".vn": "vietnamese", ".com.vn": "vietnamese",
    ".kr": "korean",     ".co.kr": "korean",
    ".jp": "japanese",   ".co.jp": "japanese",
    ".cn": "chinese",    ".com.cn": "chinese",
    ".de": "german",
    ".fr": "french",
    ".it": "italian",
    ".es": "spanish",
    ".nl": "dutch",
    ".com": "english",   ".net": "english",
    ".io": "english",    ".co": "english",
    ".org": "english",
}

# Map country code → market key
COUNTRY_MAP: dict[str, str] = {
    "VN": "vietnamese",
    "KR": "korean",
    "JP": "japanese",
    "CN": "chinese",
    "DE": "german",
    "FR": "french",
    "IT": "italian",
    "ES": "spanish",
    "NL": "dutch",
    "US": "english", "GB": "english",
    "AU": "english", "SG": "english",
    "CA": "english",
}

# URL patterns theo market
URL_PATTERNS: dict[str, list[str]] = {
    "vietnamese": [
        "/lien-he", "/lien-he-voi-chung-toi", "/lienhe",
        "/ve-chung-toi", "/gioi-thieu", "/gioi-thieu-cong-ty",
        "/thong-tin-lien-he", "/ban-lanh-dao",
        "/contact", "/about",
    ],
    "english": [
        "/contact", "/contact-us", "/about", "/about-us",
        "/team", "/our-team", "/leadership", "/staff",
        "/pages/contact",      # Shopify
        "/get-in-touch", "/reach-us",
    ],
    "korean": [
        "/contact", "/about", "/company",
        "/contact-us", "/inquiry", "/contactus",
        "/about-us", "/company/about",
    ],
    "japanese": [
        "/contact", "/about", "/company",
        "/inquiry", "/access",
        "/corporate/contact", "/company/profile",
        "/company/overview",
    ],
    "chinese": [
        "/contact", "/about", "/aboutus",
        "/contactus", "/company", "/gywm",
    ],
    "german": [
        "/impressum",          # bắt buộc theo luật DE → hit rate cao
        "/kontakt", "/ueber-uns", "/uber-uns",
        "/contact", "/unternehmen", "/ansprechpartner",
    ],
    "french": [
        "/mentions-legales",   # bắt buộc theo luật FR → hit rate cao
        "/contact", "/contactez-nous", "/nous-contacter",
        "/a-propos", "/qui-sommes-nous", "/equipe",
    ],
    "italian": [
        "/contatti", "/chi-siamo", "/contact", "/about",
    ],
    "spanish": [
        "/contacto", "/contactanos", "/contact",
        "/sobre-nosotros", "/quienes-somos", "/equipo",
    ],
    "dutch": [
        "/contact", "/over-ons", "/contacteer-ons", "/bedrijf",
    ],
}

# Language signals để detect từ homepage content
LANGUAGE_SIGNALS: dict[str, list[str]] = {
    "vietnamese": ["liên hệ", "về chúng tôi", "giới thiệu", "công ty", "điện thoại"],
    "korean":     ["연락처", "회사소개", "문의하기", "소개"],
    "japanese":   ["お問い合わせ", "会社概要", "アクセス", "お問合せ"],
    "chinese":    ["联系我们", "关于我们", "公司简介", "联系方式"],
    "german":     ["impressum", "kontakt", "über uns", "datenschutz"],
    "french":     ["contactez", "à propos", "mentions légales", "équipe"],
    "spanish":    ["contáctenos", "sobre nosotros", "contacto", "quiénes somos"],
}


def detect_market(domain: str, country: str = "", homepage_text: str = "") -> str:
    """
    Priority:
    1. country column từ CSV (most reliable)
    2. TLD của domain
    3. Language signals từ homepage content (fallback .com)
    """
    # Priority 1: country column
    if country:
        key = COUNTRY_MAP.get(country.strip().upper())
        if key:
            return key

    # Priority 2: TLD
    domain_lower = domain.lower().rstrip("/")
    # Check 2-part TLD trước (.com.vn, .co.kr)
    for tld, market in TLD_MAP.items():
        if domain_lower.endswith(tld):
            return market

    # Priority 3: detect từ homepage content
    if homepage_text:
        text_lower = homepage_text.lower()
        scores: dict[str, int] = {}
        for lang, signals in LANGUAGE_SIGNALS.items():
            scores[lang] = sum(1 for s in signals if s in text_lower)
        best = max(scores, key=scores.get)
        if scores[best] > 0:
            return best

    return "english"


def get_patterns(market: str) -> list[str]:
    """
    Trả về pattern list cho market.
    Non-English markets luôn append English patterns làm fallback.
    Dedup giữ thứ tự.
    """
    primary = URL_PATTERNS.get(market, URL_PATTERNS["english"])
    if market == "english":
        return primary

    # Append English fallback, dedup
    combined = primary + URL_PATTERNS["english"]
    seen: set[str] = set()
    result: list[str] = []
    for p in combined:
        if p not in seen:
            seen.add(p)
            result.append(p)
    return result

5. File validator.py

DNS check và URL safety validation.

# validator.py
from __future__ import annotations
import ipaddress
import socket
import urllib.parse
import dns.resolver

# Private IP ranges để block SSRF
_PRIVATE_NETS = [
    ipaddress.ip_network("10.0.0.0/8"),
    ipaddress.ip_network("172.16.0.0/12"),
    ipaddress.ip_network("192.168.0.0/16"),
    ipaddress.ip_network("127.0.0.0/8"),
    ipaddress.ip_network("169.254.0.0/16"),
    ipaddress.ip_network("::1/128"),
]

# Known parked domain signals
PARKED_SIGNALS = [
    "domain for sale", "buy this domain", "this domain is parked",
    "domain is for sale", "make an offer", "godaddy.com/domains",
    "namecheap.com", "dan.com", "sedo.com", "hugedomains.com",
    "afternic.com", "undeveloped.com", "domain đang được rao bán",
]


def is_safe_url(url: str) -> bool:
    """Block private IPs (SSRF protection) và non-http schemes."""
    try:
        parsed = urllib.parse.urlparse(url)
        if parsed.scheme not in ("http", "https"):
            return False
        hostname = parsed.hostname or ""
        # Thử parse như IP
        try:
            ip = ipaddress.ip_address(hostname)
            for net in _PRIVATE_NETS:
                if ip in net:
                    return False
        except ValueError:
            pass  # hostname bình thường
        return True
    except Exception:
        return False


def check_dns(domain: str, timeout: float = 3.0) -> bool:
    """Kiểm tra domain có resolve được không."""
    try:
        socket.setdefaulttimeout(timeout)
        socket.getaddrinfo(domain, 80)
        return True
    except (socket.gaierror, socket.timeout):
        return False


def validate_email_domain(email: str) -> bool:
    """Kiểm tra domain của email có MX record không."""
    try:
        domain = email.split("@")[1].strip()
        dns.resolver.resolve(domain, "MX", lifetime=5)
        return True
    except Exception:
        return False


def is_parked(text: str) -> bool:
    """Detect parked/for-sale domain từ content."""
    if not text or len(text.strip()) < 300:
        return True
    text_lower = text.lower()
    return any(sig in text_lower for sig in PARKED_SIGNALS)

6. File extractor.py

Extract email, phone, social links từ Markdown/HTML. Handle Cloudflare obfuscation.

# extractor.py
from __future__ import annotations
import re
from bs4 import BeautifulSoup

# ── Regex patterns ──────────────────────────────────────────────────────────

EMAIL_RE = re.compile(
    r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}",
    re.IGNORECASE,
)

# Phone: hỗ trợ VN (+84, 0xxx) và international
PHONE_RE = re.compile(
    r"(?:\+84|0084|0)(?:\s*[-.\s]?\s*)"
    r"(?:3[2-9]|5[689]|7[06-9]|8[0-9]|9[0-9])"
    r"(?:\s*[-.\s]?\s*\d){7}"
    r"|(?:\+\d{1,3}[\s.-]?)?\(?\d{2,4}\)?[\s.-]?\d{3,4}[\s.-]?\d{3,4}",
    re.IGNORECASE,
)

FACEBOOK_RE = re.compile(
    r"(?:https?://)?(?:www\.)?facebook\.com/(?!sharer|share|plugins|photo)"
    r"[a-zA-Z0-9._%+\-/]+",
    re.IGNORECASE,
)

LINKEDIN_RE = re.compile(
    r"(?:https?://)?(?:www\.)?linkedin\.com/(?:company|in)/[a-zA-Z0-9.\-_/]+",
    re.IGNORECASE,
)

ZALO_RE = re.compile(
    r"(?:https?://)?(?:zalo\.me|chat\.zalo\.me)/[a-zA-Z0-9.\-_/+]+",
    re.IGNORECASE,
)

# Emails để loại bỏ (false positives)
EMAIL_BLACKLIST = {
    "example@example.com", "test@test.com", "email@email.com",
    "user@domain.com", "info@example.com", "support@example.com",
    "noreply@example.com", "no-reply@example.com",
    "admin@example.com", "webmaster@example.com",
}

# File extensions → không phải email
EMAIL_FAKE_DOMAINS = {
    "png", "jpg", "jpeg", "gif", "svg", "webp",
    "css", "js", "ts", "jsx", "tsx", "vue",
    "woff", "woff2", "ttf", "eot",
}


def decode_cf_email(encoded: str) -> str:
    """Decode Cloudflare XOR email obfuscation."""
    try:
        b = bytes.fromhex(encoded)
        key = b[0]
        return "".join(chr(c ^ key) for c in b[1:])
    except Exception:
        return ""


def _clean_email(email: str) -> str | None:
    """Validate và clean email."""
    email = email.lower().strip()
    if email in EMAIL_BLACKLIST:
        return None
    # Check fake extension
    domain_ext = email.split(".")[-1]
    if domain_ext in EMAIL_FAKE_DOMAINS:
        return None
    # Phải có @ và ít nhất 1 dấu . sau @
    if "@" not in email:
        return None
    local, domain = email.split("@", 1)
    if "." not in domain or len(local) < 1:
        return None
    return email


def extract_from_html(html: str) -> dict:
    """Extract từ raw HTML — handle Cloudflare obfuscation."""
    emails: set[str] = set()
    phones: set[str] = set()
    socials: dict[str, str] = {}

    soup = BeautifulSoup(html, "lxml")

    # Cloudflare obfuscated emails — data-cfemail attribute
    for tag in soup.select("[data-cfemail]"):
        decoded = decode_cf_email(tag.get("data-cfemail", ""))
        if "@" in decoded:
            cleaned = _clean_email(decoded)
            if cleaned:
                emails.add(cleaned)

    # Cloudflare href encoded: /cdn-cgi/l/email-protection#<hex>
    for a in soup.find_all("a", href=True):
        href = str(a.get("href", ""))
        if "/cdn-cgi/l/email-protection#" in href:
            encoded = href.split("#")[-1]
            decoded = decode_cf_email(encoded)
            if "@" in decoded:
                cleaned = _clean_email(decoded)
                if cleaned:
                    emails.add(cleaned)

    # mailto: links
    for a in soup.find_all("a", href=True):
        href = str(a.get("href", ""))
        if href.startswith("mailto:"):
            email = href.replace("mailto:", "").split("?")[0].strip()
            cleaned = _clean_email(email)
            if cleaned:
                emails.add(cleaned)

    # Plain text scan
    text = soup.get_text(" ")
    for m in EMAIL_RE.finditer(text):
        cleaned = _clean_email(m.group())
        if cleaned:
            emails.add(cleaned)

    # Phone
    for m in PHONE_RE.finditer(text):
        phones.add(m.group().strip())

    # Social
    for m in FACEBOOK_RE.finditer(html):
        socials.setdefault("facebook", m.group())
    for m in LINKEDIN_RE.finditer(html):
        socials.setdefault("linkedin", m.group())
    for m in ZALO_RE.finditer(html):
        socials.setdefault("zalo", m.group())

    return {
        "emails": sorted(emails),
        "phones": sorted(phones),
        "socials": socials,
    }


def extract_from_markdown(text: str) -> dict:
    """Extract từ Jina Markdown output."""
    emails: set[str] = set()
    phones: set[str] = set()
    socials: dict[str, str] = {}

    # Emails
    for m in EMAIL_RE.finditer(text):
        cleaned = _clean_email(m.group())
        if cleaned:
            emails.add(cleaned)

    # Phones
    for m in PHONE_RE.finditer(text):
        phones.add(m.group().strip())

    # Social links
    for m in FACEBOOK_RE.finditer(text):
        socials.setdefault("facebook", m.group())
    for m in LINKEDIN_RE.finditer(text):
        socials.setdefault("linkedin", m.group())
    for m in ZALO_RE.finditer(text):
        socials.setdefault("zalo", m.group())

    return {
        "emails": sorted(emails),
        "phones": sorted(phones),
        "socials": socials,
    }


def merge_results(*results: dict) -> dict:
    """Merge kết quả từ nhiều pages."""
    emails: set[str] = set()
    phones: set[str] = set()
    socials: dict[str, str] = {}

    for r in results:
        emails.update(r.get("emails", []))
        phones.update(r.get("phones", []))
        socials.update(r.get("socials", {}))

    return {
        "emails": sorted(emails),
        "phones": sorted(phones),
        "socials": socials,
    }

7. File crawler.py

Core crawl logic — HEAD check, Jina fetch, Playwright fallback.

# crawler.py
from __future__ import annotations
import asyncio
import logging
from urllib.parse import urlparse

import httpx
import psutil
from playwright.async_api import async_playwright

from extractor import extract_from_markdown, extract_from_html, merge_results
from validator import is_safe_url, is_parked

logger = logging.getLogger(__name__)

# Status codes
STATUS_OK            = "ok"
STATUS_DEAD          = "dead"
STATUS_PARKED        = "parked"
STATUS_BLOCKED       = "blocked_403"
STATUS_NO_EMAIL      = "no_email_found"
STATUS_REDIRECTED    = "ok_redirected"
STATUS_SERVER_ERROR  = "server_error"
STATUS_TIMEOUT       = "timeout"


class Crawler:
    def __init__(self, config: dict):
        self.jina_key     = config["jina_api_key"]
        self.jina_base    = config["jina"]["base_url"]
        self.jina_timeout = config["jina"]["timeout"]
        self.head_timeout = config["crawl"]["head_timeout"]
        self.retry_max    = config["crawl"]["retry_max"]
        self.retry_delay  = config["crawl"]["retry_delay"]
        self.pw_enabled   = config["playwright"]["enabled"]
        self.pw_timeout   = config["playwright"]["timeout"]
        self.pw_js        = config["playwright"]["js_enabled"]
        self._client: httpx.AsyncClient | None = None

    async def __aenter__(self):
        self._client = httpx.AsyncClient(
            follow_redirects=True,
            timeout=self.head_timeout,
            headers={"User-Agent": "Mozilla/5.0 (compatible; LeadEnricher/1.0)"},
        )
        return self

    async def __aexit__(self, *_):
        if self._client:
            await self._client.aclose()

    # ── HEAD CHECK ───────────────────────────────────────────────────────────

    async def head_check(self, url: str) -> tuple[bool, int, str]:
        """
        Returns: (exists, status_code, final_url)
        exists=True nếu URL có thể crawl được
        """
        if not is_safe_url(url):
            return False, 0, url
        try:
            r = await self._client.head(url, timeout=self.head_timeout)
            # Một số server không support HEAD → fallback GET stream
            if r.status_code == 405:
                r = await self._client.get(
                    url, timeout=self.head_timeout,
                    headers={"Range": "bytes=0-0"},  # chỉ lấy 1 byte
                )
            final_url = str(r.url)
            code = r.status_code

            if code == 200:
                return True, code, final_url
            elif code in (301, 302, 303, 307, 308):
                return True, code, final_url
            elif code == 403:
                return True, code, final_url   # tồn tại nhưng bị block, thử Jina
            elif code == 429:
                return False, code, final_url  # rate limited
            elif code == 404:
                return False, code, final_url
            else:
                return False, code, final_url

        except (httpx.ConnectError, httpx.TimeoutException, httpx.ConnectTimeout):
            return False, 0, url
        except Exception as e:
            logger.debug(f"HEAD check error {url}: {e}")
            return False, 0, url

    # ── JINA FETCH ───────────────────────────────────────────────────────────

    async def jina_fetch(self, url: str, retry: int = 0) -> str | None:
        """Fetch URL qua Jina Reader, trả về Markdown string."""
        jina_url = f"{self.jina_base}/{url}"
        headers = {
            "Authorization": f"Bearer {self.jina_key}",
            "X-Engine": "browser",
            "X-Proxy": "auto",
            "X-Return-Format": "markdown",
            "X-Timeout": str(self.jina_timeout),
        }
        try:
            async with httpx.AsyncClient(timeout=self.jina_timeout + 5) as client:
                r = await client.get(jina_url, headers=headers)
                if r.status_code == 200:
                    return r.text
                elif r.status_code == 429 and retry < self.retry_max:
                    delay = self.retry_delay[min(retry, len(self.retry_delay) - 1)]
                    logger.info(f"Jina rate limited, waiting {delay}s...")
                    await asyncio.sleep(delay)
                    return await self.jina_fetch(url, retry + 1)
                else:
                    logger.debug(f"Jina {r.status_code} for {url}")
                    return None
        except asyncio.TimeoutError:
            if retry < self.retry_max:
                await asyncio.sleep(self.retry_delay[0])
                return await self.jina_fetch(url, retry + 1)
            return None
        except Exception as e:
            logger.debug(f"Jina fetch error {url}: {e}")
            return None

    # ── PLAYWRIGHT FALLBACK ──────────────────────────────────────────────────

    async def playwright_fetch(self, url: str) -> str | None:
        """Fallback khi Jina fail. Chạy headless Chrome thật."""
        # Safety: check CPU trước khi spin Chrome
        if psutil.cpu_percent(interval=0.5) > 85:
            logger.warning("CPU too high, skipping Playwright")
            return None
        try:
            async with async_playwright() as pw:
                browser = await pw.chromium.launch(
                    headless=True,
                    args=[
                        "--disable-extensions",
                        "--disable-plugins",
                        "--blink-settings=imagesEnabled=false",
                        "--disable-web-security",
                        "--no-first-run",
                    ],
                )
                context = await browser.new_context(
                    java_script_enabled=self.pw_js,
                    user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                               "AppleWebKit/537.36 (KHTML, like Gecko) "
                               "Chrome/125.0.0.0 Safari/537.36",
                )
                page = await context.new_page()

                # Block heavy resources
                async def block_resource(route):
                    if route.request.resource_type in ("image", "media", "font"):
                        await route.abort()
                    else:
                        await route.continue_()
                await page.route("**/*", block_resource)

                page.set_default_timeout(self.pw_timeout)
                await page.goto(url, wait_until="domcontentloaded")
                html = await page.content()
                await browser.close()
                return html
        except Exception as e:
            logger.debug(f"Playwright error {url}: {e}")
            return None

    # ── MAIN CRAWL LOGIC ─────────────────────────────────────────────────────

    async def crawl_domain(
        self,
        domain: str,
        patterns: list[str],
        market: str,
    ) -> dict:
        """
        Crawl 1 domain qua tất cả các steps.
        Returns dict với email, phone, social, status.
        """
        base_url = f"https://{domain}"
        all_results = []

        # ── STEP 2: Homepage quick scan ──────────────────────────────────────
        logger.info(f"[{domain}] Scanning homepage...")
        homepage_md = await self.jina_fetch(base_url)

        if homepage_md:
            if is_parked(homepage_md):
                return self._result(STATUS_PARKED)

            r = extract_from_markdown(homepage_md)
            if r["emails"]:
                logger.info(f"[{domain}] ✓ Email found on homepage")
                return self._result(STATUS_OK, r, source=base_url)
            all_results.append(r)

        # ── STEP 3+4: Loop URL patterns ──────────────────────────────────────
        for path in patterns:
            url = f"{base_url}{path}"

            # HEAD check
            exists, code, final_url = await self.head_check(url)

            if not exists:
                if code == 404:
                    logger.debug(f"[{domain}] 404 {path}, skipping")
                    continue
                elif code == 429:
                    logger.info(f"[{domain}] Rate limited on HEAD, waiting...")
                    await asyncio.sleep(30)
                    exists, code, final_url = await self.head_check(url)
                    if not exists:
                        continue
                elif code == 0:
                    continue
                else:
                    continue

            logger.info(f"[{domain}] Fetching {path} (HEAD={code})")

            # Jina fetch
            content_md = await self.jina_fetch(final_url)

            if content_md:
                r = extract_from_markdown(content_md)
                if r["emails"]:
                    logger.info(f"[{domain}] ✓ Email found at {path}")
                    merged = merge_results(*all_results, r)
                    return self._result(STATUS_OK, merged, source=final_url)
                all_results.append(r)
            elif self.pw_enabled and code == 403:
                # Playwright fallback chỉ khi Jina fail + bị block
                logger.info(f"[{domain}] Trying Playwright fallback for {path}")
                html = await self.playwright_fetch(final_url)
                if html:
                    r = extract_from_html(html)
                    if r["emails"]:
                        merged = merge_results(*all_results, r)
                        return self._result(STATUS_OK, merged, source=final_url)
                    all_results.append(r)

            # Rate limit giữa các requests đến cùng domain
            await asyncio.sleep(1.0)

        # Hết patterns
        merged = merge_results(*all_results)
        if merged["phones"] or merged["socials"]:
            # Có phone/social dù không có email → vẫn ghi
            return self._result(STATUS_NO_EMAIL, merged)
        return self._result(STATUS_NO_EMAIL)

    def _result(self, status: str, data: dict | None = None, source: str = "") -> dict:
        result = {
            "status": status,
            "email": "",
            "email_2": "",
            "phone": "",
            "facebook": "",
            "linkedin": "",
            "zalo": "",
            "source_url": source,
        }
        if data:
            emails = data.get("emails", [])
            if emails:
                result["email"] = emails[0]
            if len(emails) > 1:
                result["email_2"] = emails[1]
            phones = data.get("phones", [])
            if phones:
                result["phone"] = phones[0]
            socials = data.get("socials", {})
            result["facebook"] = socials.get("facebook", "")
            result["linkedin"] = socials.get("linkedin", "")
            result["zalo"] = socials.get("zalo", "")
        return result

8. File csv_handler.py

Đọc/ghi CSV với resume support.

# csv_handler.py
from __future__ import annotations
import logging
from pathlib import Path
from datetime import datetime

import pandas as pd

logger = logging.getLogger(__name__)

OUTPUT_COLS = [
    "company_name", "website", "country",
    "email", "email_2", "phone",
    "facebook", "linkedin", "zalo",
    "status", "last_checked", "source_url", "market_detected",
]


def load(path: str) -> pd.DataFrame:
    """Load input CSV. Thêm output columns nếu chưa có."""
    df = pd.read_csv(path, encoding="utf-8-sig", dtype=str)
    df = df.fillna("")

    # Normalize column names
    df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns]

    # Đảm bảo có cột website
    if "website" not in df.columns:
        raise ValueError("CSV phải có cột 'website'")

    # Thêm output columns nếu chưa có
    for col in OUTPUT_COLS:
        if col not in df.columns:
            df[col] = ""

    logger.info(f"Loaded {len(df)} rows from {path}")
    return df


def save(df: pd.DataFrame, path: str):
    """Ghi CSV với encoding utf-8-sig (đọc đúng tiếng Việt trong Excel)."""
    df.to_csv(path, index=False, encoding="utf-8-sig")
    logger.debug(f"Saved {len(df)} rows to {path}")


def get_pending(df: pd.DataFrame) -> pd.DataFrame:
    """Lấy rows chưa được xử lý (status rỗng)."""
    pending = df[df["status"] == ""].copy()
    logger.info(f"Pending: {len(pending)} / {len(df)} rows")
    return pending


def update_row(df: pd.DataFrame, idx: int, result: dict, market: str):
    """Cập nhật 1 row với kết quả crawl."""
    df.at[idx, "email"]          = result.get("email", "")
    df.at[idx, "email_2"]        = result.get("email_2", "")
    df.at[idx, "phone"]          = result.get("phone", "")
    df.at[idx, "facebook"]       = result.get("facebook", "")
    df.at[idx, "linkedin"]       = result.get("linkedin", "")
    df.at[idx, "zalo"]           = result.get("zalo", "")
    df.at[idx, "status"]         = result.get("status", "")
    df.at[idx, "source_url"]     = result.get("source_url", "")
    df.at[idx, "market_detected"]= market
    df.at[idx, "last_checked"]   = datetime.now().strftime("%Y-%m-%d %H:%M")


def print_stats(df: pd.DataFrame):
    """In thống kê kết quả."""
    total = len(df)
    done  = df[df["status"] != ""].shape[0]
    ok    = df[df["status"] == "ok"].shape[0]
    dead  = df[df["status"] == "dead"].shape[0]
    parked= df[df["status"] == "parked"].shape[0]
    noem  = df[df["status"] == "no_email_found"].shape[0]

    print(f"""
┌─────────────────────────────────────┐
│  CRAWL STATS                        │
├─────────────────────────────────────┤
│  Total rows     : {total:<5}
│  Processed      : {done:<5}
│  Email found ✓  : {ok:<5} ({ok/max(done,1)*100:.0f}%)          │
│  Dead domain    : {dead:<5}
│  Parked         : {parked:<5}
│  No email found : {noem:<5}
└─────────────────────────────────────┘
""")

9. File main.py

Orchestrator — đọc CSV, chạy pipeline, ghi kết quả.

# main.py
from __future__ import annotations
import argparse
import asyncio
import logging
import os
import sys
from pathlib import Path
from urllib.parse import urlparse

import yaml
from dotenv import load_dotenv

from market import detect_market, get_patterns
from validator import check_dns
from crawler import Crawler, STATUS_DEAD
from csv_handler import load, save, get_pending, update_row, print_stats

# ── Setup ────────────────────────────────────────────────────────────────────

load_dotenv()

def setup_logging(log_file: str, level: str = "INFO"):
    Path(log_file).parent.mkdir(exist_ok=True)
    logging.basicConfig(
        level=getattr(logging, level.upper()),
        format="%(asctime)s [%(levelname)s] %(message)s",
        datefmt="%H:%M:%S",
        handlers=[
            logging.StreamHandler(sys.stdout),
            logging.FileHandler(log_file, encoding="utf-8"),
        ],
    )

def load_config(path: str = "config.yaml") -> dict:
    with open(path) as f:
        cfg = yaml.safe_load(f)
    cfg["jina_api_key"] = os.getenv("JINA_API_KEY", "")
    if not cfg["jina_api_key"]:
        raise ValueError("JINA_API_KEY chưa được set trong .env")
    return cfg

def normalize_domain(raw: str) -> str:
    """Lấy domain thuần từ URL bất kỳ."""
    raw = raw.strip()
    if not raw.startswith("http"):
        raw = "https://" + raw
    return urlparse(raw).netloc.lstrip("www.")

# ── Core worker ──────────────────────────────────────────────────────────────

async def process_row(
    idx: int,
    row: dict,
    crawler: Crawler,
    semaphore: asyncio.Semaphore,
    dry_run: bool = False,
) -> tuple[int, dict, str]:
    """Xử lý 1 domain. Returns (idx, result, market)."""
    async with semaphore:
        domain  = normalize_domain(str(row.get("website", "")))
        country = str(row.get("country", ""))
        logger   = logging.getLogger(__name__)

        if not domain:
            return idx, {"status": "invalid_url"}, "unknown"

        logger.info(f"Processing [{idx}] {domain}")

        # ── STEP 1: DNS check ────────────────────────────────────────────────
        if not check_dns(domain):
            logger.info(f"[{domain}] DNS failed → DEAD")
            return idx, {"status": STATUS_DEAD}, "unknown"

        # ── STEP 0: Market detection ─────────────────────────────────────────
        market   = detect_market(domain, country)
        patterns = get_patterns(market)
        logger.info(f"[{domain}] Market: {market}, {len(patterns)} patterns")

        if dry_run:
            return idx, {"status": "dry_run_ok"}, market

        # ── STEP 2-5: Crawl ──────────────────────────────────────────────────
        result = await crawler.crawl_domain(domain, patterns, market)
        return idx, result, market

# ── Main ─────────────────────────────────────────────────────────────────────

async def main():
    parser = argparse.ArgumentParser(description="Lead Enricher")
    parser.add_argument("--input",   default="input.csv",  help="Input CSV path")
    parser.add_argument("--output",  default="output.csv", help="Output CSV path")
    parser.add_argument("--config",  default="config.yaml")
    parser.add_argument("--limit",   type=int, default=0,  help="Giới hạn số rows (0=tất cả)")
    parser.add_argument("--market",  default="",           help="Filter theo market cụ thể")
    parser.add_argument("--dry-run", action="store_true",  help="Không ghi output")
    args = parser.parse_args()

    cfg = load_config(args.config)
    setup_logging(cfg["log"]["file"], cfg["log"]["level"])
    logger = logging.getLogger(__name__)

    # Load CSV
    import shutil
    # Copy input → output nếu output chưa tồn tại
    if not Path(args.output).exists():
        shutil.copy2(args.input, args.output)

    df      = load(args.output)
    pending = get_pending(df)

    if args.market:
        pending = pending[pending["country"].str.upper() == args.market.upper()]

    if args.limit > 0:
        pending = pending.head(args.limit)

    logger.info(f"Starting: {len(pending)} rows to process")

    if pending.empty:
        logger.info("No pending rows. All done!")
        print_stats(df)
        return

    concurrency = cfg["crawl"]["concurrency"]
    batch_write = cfg["crawl"]["batch_write"]
    semaphore   = asyncio.Semaphore(concurrency)

    async with Crawler(cfg) as crawler:
        tasks = [
            process_row(idx, row.to_dict(), crawler, semaphore, args.dry_run)
            for idx, row in pending.iterrows()
        ]

        done_count = 0
        for coro in asyncio.as_completed(tasks):
            idx, result, market = await coro
            update_row(df, idx, result, market)
            done_count += 1

            status = result.get("status", "")
            email  = result.get("email", "")
            domain = normalize_domain(str(df.at[idx, "website"]))
            icon   = "✓" if email else "·"
            logger.info(f"[{icon}] {domain}{status} {email}")

            # Batch write
            if done_count % batch_write == 0 and not args.dry_run:
                save(df, args.output)
                logger.info(f"Progress: {done_count}/{len(pending)} saved")

    # Final save
    if not args.dry_run:
        save(df, args.output)

    print_stats(df)
    logger.info(f"Done. Output: {args.output}")


if __name__ == "__main__":
    asyncio.run(main())

10. Input CSV format

File input.csv mà user chuẩn bị — tối thiểu cần cột website:

company_name,website,country
Công ty ABC,abc.vn,VN
XYZ Corporation,https://xyz.com,US
Korea Co Ltd,korea.co.kr,KR
Firma GmbH,firma.de,DE
Unknown Market,somesite.com,

Lưu ý: - country optional nhưng recommended để market detection chính xác - website có thể là domain thuần (abc.vn) hoặc full URL (https://abc.vn) - Encoding: UTF-8 hoặc UTF-8-BOM đều OK


11. Các lệnh chạy

cd ~/lead-enricher
source venv/bin/activate

# Test Jina API key
curl -H "Authorization: Bearer $(grep JINA_API_KEY .env | cut -d= -f2)" \
     https://r.jina.ai/https://example.com | head -20

# Dry run 50 rows đầu (không ghi file)
python main.py --input input.csv --dry-run --limit 50

# Chạy thật 200 rows đầu
python main.py --input input.csv --output output.csv --limit 200

# Chạy full batch
python main.py --input input.csv --output output.csv

# Resume (tự skip rows đã có status)
python main.py --input output.csv --output output.csv

# Chỉ chạy market VN
python main.py --input input.csv --output output_vn.csv --market VN

# Background với log
nohup python main.py --input input.csv --output output.csv \
      > logs/run_$(date +%y%m%d_%H%M).log 2>&1 &

# Xem progress
tail -f logs/run_*.log

12. Checklist triển khai

□ 1. python3 --version  →  >= 3.11
□ 2. pip install xong không lỗi
□ 3. playwright install chromium  xong không lỗi
□ 4. .env có JINA_API_KEY
□ 5. Test curl Jina → trả về Markdown
□ 6. input.csv có cột 'website'
□ 7. python main.py --dry-run --limit 10  → không crash
□ 8. python main.py --limit 50  → output.csv có data
□ 9. Mở output.csv kiểm tra email quality
□ 10. Chạy full batch

13. Troubleshooting

Lỗi Nguyên nhân Fix
JINA_API_KEY chưa được set .env không đúng path Chạy cat .env kiểm tra
playwright install lỗi Thiếu system deps brew install chromium
ModuleNotFoundError Chưa activate venv source venv/bin/activate
Hit rate < 20% Jina API key hết token Kiểm tra jina.ai/dashboard
Toàn bộ dead DNS timeout quá thấp Tăng head_timeout lên 10
Output CSV lỗi encoding Mở bằng Excel: Data → From Text/CSV → UTF-8
CPU too high warning Playwright chạy nhiều Giảm concurrency xuống 10

14. Expected output

Sau khi chạy xong 5,000 domains:

┌─────────────────────────────────────┐
│  CRAWL STATS                        │
├─────────────────────────────────────┤
│  Total rows     : 5000              │
│  Processed      : 5000              │
│  Email found ✓  : ~2500 (50%)       │
│  Dead domain    : ~850              │
│  Parked         : ~250              │
│  No email found : ~1250             │
└─────────────────────────────────────┘

Thời gian ước tính: ~1.5–2 giờ với concurrency=20 trên Mac mini M4.


Spec version: 1.0 — 2026-05-28
Runtime: Mac mini M4, macOS, Python 3.11+