全量抓取还是增量采集?二手房数据采集实战解析

简介: 本文以链家二手房数据采集为例,探讨全量抓取与增量采集的优劣与适用场景,并结合代理IP技术实现高效、稳定的爬虫方案。通过SQLite/PostgreSQL存储、内容哈希去重、定时任务调度等手段,构建可持续运行的数据更新与统计系统。适用于房产数据分析、市场监测等场景,兼顾资源效率与数据质量。

爬虫代理

项目背景

很多做数据采集的同学都会遇到一个老问题:到底是一次性把网站的数据全部抓取下来,还是定期只更新新增和变化的部分?
我之前在做二手房市场监测的时候,就碰到过这个选择。当时目标是对比不同城市、不同小区的挂牌房源,看看价格走势和交易活跃度。如果抓取策略不对,不仅会浪费资源,还可能导致数据质量不高。 所以,本文就结合「链家二手房」这个实际站点,聊聊全量抓取和增量采集的取舍,并通过一个实战小项目,展示如何结合代理 IP 技术去实现定期的数据获取和统计。

数据目标

目标字段(示例)

  • 基础识别:house_id(从 URL 或页面特征提取)、titleurl
  • 位置维度:citydistrict(区)、bizcircle(商圈/板块)、community(小区名)
  • 核心指标:total_price(万元)、unit_price(元/㎡)、area(㎡)、room_type(几室几厅)
  • 时间戳:first_seen_at(首次发现时间)、last_seen_at(最后一次看到)
  • 变更检测:content_hash(用于判断记录是否变更)

存储设计

  • 使用 SQLite(轻量)/ PostgreSQL(生产可选)持久化记录;
  • house_id 作为主键,配合 content_hash 实现 幂等写入增量更新
  • 定期产出 统计汇总,如“按区/小区的挂牌数量、均价、面积分布”。

统计示例

  • district 维度:挂牌量、平均单价、价格分位
  • community 维度:挂牌量 Top N、均价 Top N
  • 趋势维度(可扩展):每日新增挂牌量、下架量(需引入“消失检测”)

技术选型

  • 在数据获取方式上,常见有两种:
  1. 全量抓取
    • 每次任务都从头到尾抓一遍。
    • 优点:不会漏数据。
    • 缺点:压力大,耗时耗流量,重复数据多。
  2. 增量采集
    • 每次只采集“新增”或“变化”的部分,比如根据发布时间筛选。
    • 优点:节省资源,数据更新快。
    • 缺点:需要额外逻辑来判断哪些是新数据,哪些是修改过的数据。
我的经验是:
  • 前期数据基线不足时,用全量抓取先把底子打好
  • 后期维护阶段,采用增量采集,避免重复抓取大量无效信息
在网络层面,由于链家有一定的访问频率限制,所以必须结合代理池。这里我选用了 爬虫代理服务 ,支持用户名密码认证,可以减少封禁风险。

模块实现(代码可直接运行/改造)

运行环境:Python 3.10+
安装依赖:pip install requests curl_cffi lxml beautifulsoup4 fake-useragent sqlalchemy pandas apscheduler

0)统一配置(目标入口、代理、数据库)

# -*- coding: utf-8 -*-
"""
项目:贝壳二手房抓取 - 全量 vs 增量
说明:示例代码仅作教学演示,请遵守目标站点条款与 robots.txt。
"""

import os, re, time, random, hashlib, json, datetime as dt
from typing import List, Dict, Optional, Tuple

import requests
from curl_cffi import requests as cffi_requests  # 更拟真TLS栈,可在受限站点兜底
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from sqlalchemy import create_engine, text
import pandas as pd
from apscheduler.schedulers.blocking import BlockingScheduler

# -- 代理(参考:亿牛云爬虫代理)--------
# 请替换为你的真实配置(域名、端口、用户名、密码)
PROXY_HOST = os.getenv("YINIU_HOST", "proxy.16yun.cn")     # 示例域名
PROXY_PORT = os.getenv("YINIU_PORT", "3100")                  # 示例端口
PROXY_USER = os.getenv("YINIU_USER", "16YUN")
PROXY_PASS = os.getenv("YINIU_PASS", "16IP")

PROXY = f"http://{PROXY_USER}:{PROXY_PASS}@{PROXY_HOST}:{PROXY_PORT}"
PROXIES = {
   "http": PROXY, "https": PROXY}

# ------------------ 目标与数据库 ------------------
BASE_URL = "https://www.ke.com/ershoufang/"
CITY = "sh"  # 城市简码可按需切换,如:bj、gz、sz;或直接用根入口配合筛选
DB_URL = os.getenv("DB_URL", "sqlite:///houses.db")
engine = create_engine(DB_URL, echo=False, future=True)

# ------------------ 抓取模式开关 ------------------
MODE = os.getenv("MODE", "incremental")  # 可选:'full' / 'incremental'

1)建表 & 工具函数(主键、哈希、幂等)

DDL = """
CREATE TABLE IF NOT EXISTS house (
    house_id TEXT PRIMARY KEY,
    title TEXT,
    url TEXT,
    city TEXT,
    district TEXT,
    bizcircle TEXT,
    community TEXT,
    total_price REAL,
    unit_price REAL,
    area REAL,
    room_type TEXT,
    content_hash TEXT,
    first_seen_at TEXT,
    last_seen_at TEXT
);

CREATE TABLE IF NOT EXISTS cursor_state (
    key TEXT PRIMARY KEY,
    value TEXT
);
"""

with engine.begin() as conn:
    for stmt in DDL.strip().split(";"):
        s = stmt.strip()
        if s:
            conn.execute(text(s))

def sha1(obj: Dict) -> str:
    """对核心字段做内容哈希,用于变更检测。"""
    payload = json.dumps(obj, sort_keys=True, ensure_ascii=False)
    return hashlib.sha1(payload.encode("utf-8")).hexdigest()

def now_iso():
    return dt.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"

def load_state(key: str, default: str="") -> str:
    with engine.begin() as conn:
        r = conn.execute(text("SELECT value FROM cursor_state WHERE key=:k"), {
   "k": key}).fetchone()
        return r[0] if r else default

def save_state(key: str, value: str):
    with engine.begin() as conn:
        conn.execute(text("""
            INSERT INTO cursor_state(key, value) VALUES(:k, :v)
            ON CONFLICT(key) DO UPDATE SET value=excluded.value
        """), {
   "k": key, "v": value})

2)请求层

ua = UserAgent()

def get_session(use_cffi: bool=False):
    """
    默认使用 requests;当遇到严格 TLS/指纹校验时,可切到 curl_cffi(use_cffi=True)。
    """
    s = cffi_requests.Session() if use_cffi else requests.Session()
    s.headers.update({
   
        "User-Agent": ua.random,
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "zh-CN,zh;q=0.9",
        "Connection": "keep-alive",
        # 可按需设置 Cookie,提高连续性(遵守站点条款)
    })
    s.proxies.update(PROXIES)
    s.timeout = 20
    return s

def fetch_html(url: str, session=None, max_retry=5, sleep_base=1.2) -> Optional[str]:
    """带退避重试与随机抖动的请求封装。"""
    if session is None:
        session = get_session()
    for i in range(max_retry):
        try:
            resp = session.get(url, allow_redirects=True)
            if resp.status_code == 200 and "text/html" in resp.headers.get("Content-Type", ""):
                return resp.text
            # 软性限流/异常,退避
            time.sleep(sleep_base * (2 ** i) + random.random())
        except Exception:
            time.sleep(sleep_base * (2 ** i) + random.random())
    return None

3)解析层(列表页抽取 + 详情增强可选)

贝壳列表页的结构在不同城市/版本可能有差异,以下选择“尽量稳健”的选择器(需要你根据线上实际 HTML 小幅调整)。

def parse_list(html: str) -> List[Dict]:
    soup = BeautifulSoup(html, "lxml")
    items = []
    # 典型结构:ul.sellListContent > li 或 div.content > div.leftContent > ul > li
    for li in soup.select("ul.sellListContent > li, li.clear"):
        try:
            a = li.select_one("a[data-el='ershoufang']")
            if not a:
                continue
            url = a.get("href", "").strip()
            title = a.get_text(strip=True)
            # house_id 可能包含在 URL 中,例如 .../123456789.html
            m = re.search(r"/(\d+)\.html", url)
            house_id = m.group(1) if m else hashlib.md5(url.encode()).hexdigest()[:16]

            # 价格与单价
            total_price = None
            tp = li.select_one(".totalPrice span")
            if tp:
                try:
                    total_price = float(tp.get_text(strip=True))
                except: pass

            unit_price = None
            up = li.select_one(".unitPrice span")
            if up:
                # e.g. "单价50012元/平"
                digits = re.findall(r"\d+", up.get_text())
                if digits:
                    unit_price = float(digits[0])

            # 面积/户型/小区等在 .houseInfo 或 .positionInfo 中
            area, room_type, community, district, bizcircle = None, None, None, None, None
            info = li.select_one(".houseInfo")
            if info:
                text_all = info.get_text(" ", strip=True)
                # 经验规则:如 "某小区 | 2室1厅 | 60.12平米 | 南 北 | ..."
                parts = [p.strip() for p in text_all.split("|")]
                if parts:
                    community = parts[0]
                for p in parts:
                    if "平米" in p:
                        try: area = float(re.findall(r"[0-9.]+", p)[0])
                        except: pass
                    if "室" in p:
                        room_type = p

            pos = li.select_one(".positionInfo")
            if pos:
                # 形如 "浦东 - 金桥"
                pos_text = pos.get_text(" ", strip=True)
                seg = [x.strip() for x in pos_text.split("-")]
                if len(seg) >= 1: district = seg[0]
                if len(seg) >= 2: bizcircle = seg[1]

            items.append({
   
                "house_id": house_id,
                "title": title,
                "url": url,
                "district": district,
                "bizcircle": bizcircle,
                "community": community,
                "total_price": total_price,
                "unit_price": unit_price,
                "area": area,
                "room_type": room_type
            })
        except Exception:
            continue
    return items

4)写库(幂等插入/更新 + 增量哈希)

def upsert_items(city: str, rows: List[Dict]) -> Tuple[int, int]:
    """
    返回 (inserted, updated)
    - 以 house_id 为主键;
    - 通过 content_hash 判定变更;
    """
    inserted = updated = 0
    now = now_iso()
    with engine.begin() as conn:
        for r in rows:
            payload = {
   
                "title": r.get("title"),
                "url": r.get("url"),
                "city": city,
                "district": r.get("district"),
                "bizcircle": r.get("bizcircle"),
                "community": r.get("community"),
                "total_price": r.get("total_price"),
                "unit_price": r.get("unit_price"),
                "area": r.get("area"),
                "room_type": r.get("room_type"),
            }
            h = sha1(payload)
            # 查询是否存在
            cur = conn.execute(text("SELECT content_hash, first_seen_at FROM house WHERE house_id=:id"),
                               {
   "id": r["house_id"]}).fetchone()
            if cur is None:
                # 新插入
                conn.execute(text("""
                    INSERT INTO house (house_id, title, url, city, district, bizcircle, community, total_price,
                                       unit_price, area, room_type, content_hash, first_seen_at, last_seen_at)
                    VALUES (:house_id, :title, :url, :city, :district, :bizcircle, :community, :total_price, 
                            :unit_price, :area, :room_type, :content_hash, :first_seen_at, :last_seen_at)
                """), {
   
                    **{
   "house_id": r["house_id"]}, **payload,
                    "content_hash": h, "first_seen_at": now, "last_seen_at": now
                })
                inserted += 1
            else:
                old_hash, first_seen = cur
                if h != old_hash:
                    # 内容变更才更新
                    conn.execute(text("""
                        UPDATE house
                        SET title=:title, url=:url, city=:city, district=:district, bizcircle=:bizcircle,
                            community=:community, total_price=:total_price, unit_price=:unit_price,
                            area=:area, room_type=:room_type, content_hash=:content_hash, last_seen_at=:last_seen_at
                        WHERE house_id=:house_id
                    """), {
   **{
   "house_id": r["house_id"]}, **payload, "content_hash": h, "last_seen_at": now})
                    updated += 1
                else:
                    # 无变更,仅刷新最后看到时间(可选)
                    conn.execute(text("UPDATE house SET last_seen_at=:t WHERE house_id=:id"),
                                 {
   "t": now, "id": r["house_id"]})
    return inserted, updated

5)分页抓取与“模式切换”(全量 vs 增量)

城市 + 区 为示例(如上海“浦东”“闵行”等),也可切换为“关键词搜索”或“小区名精确搜索”。

def district_entry(city_code: str, district_slug: str, page: int) -> str:
    """
    入口 URL 形态举例:
    https://{city}.ke.com/ershoufang/{district}/pg{page}/
    """
    return f"https://{city_code}.ke.com/ershoufang/{district_slug}/pg{page}/"

def crawl_district(city_code: str, district_slug: str, mode: str="incremental", max_pages: int=100):
    """
    mode:
      - 'full':从第1页扫到 max_pages 或遇到空页停止
      - 'incremental':从第1页开始,直到出现全部“已见过”的连续页范围即可停止
    增量的停止条件可根据业务调优(例如最近3页全为已见记录则停)。
    """
    print(f"[{mode}] crawling district={district_slug}")
    session = get_session()
    seen_streak = 0
    total_inserted = total_updated = 0

    for pg in range(1, max_pages + 1):
        url = district_entry(city_code, district_slug, pg)
        html = fetch_html(url, session=session)
        if not html:
            print(f"  page {pg}: fetch failed, stop.")
            break

        items = parse_list(html)
        if not items:
            print(f"  page {pg}: empty, stop.")
            break

        inserted, updated = upsert_items(city_code, items)
        total_inserted += inserted
        total_updated += updated

        print(f"  page {pg}: inserted={inserted}, updated={updated}, total={len(items)}")

        if mode == "incremental":
            # 简单启发式:若连续2页均无新增(inserted==0),认为到达“旧区间”,可停止
            if inserted == 0:
                seen_streak += 1
                if seen_streak >= 2:
                    print("  incremental early-stop (no new items).")
                    break
            else:
                seen_streak = 0

        # 低频+抖动,减轻风控
        time.sleep(random.uniform(1.2, 2.5))

    print(f"==> district {district_slug} done. inserted={total_inserted}, updated={total_updated}")

6)地理位置 / 小区名搜索入口(多维组合)

你可以维护一个“任务清单”:若使用 全量模式 就覆盖更广的区/小区集合;若 增量模式 则只盯住核心板块或关键词。

DISTRICTS = [
    "pudong", "minhang", "xuhui", "jingan", "huangpu",  # 示例,需对应站点实际 slug
]

COMMUNITIES = [
    # 小区维度的精准检索入口(不同城市/站点可能使用 ?q= 或 /rs{keyword}/ 语法,需按实际调整)
    # 例如: https://sh.ke.com/ershoufang/rs万科城市花园/
    "万科城市花园", "仁恒河滨花园"
]

def community_search_url(city_code: str, keyword: str, page: int) -> str:
    # 常见形态:/ershoufang/rs{keyword}/pg{page}/
    return f"https://{city_code}.ke.com/ershoufang/rs{keyword}/pg{page}/"

def crawl_community(city_code: str, keyword: str, mode: str="incremental", max_pages: int=50):
    print(f"[{mode}] crawling community={keyword}")
    session = get_session()
    seen_streak = 0
    total_inserted = total_updated = 0

    for pg in range(1, max_pages + 1):
        url = community_search_url(city_code, keyword, pg)
        html = fetch_html(url, session=session)
        if not html:
            print(f"  page {pg}: fetch failed, stop.")
            break
        items = parse_list(html)
        if not items:
            print(f"  page {pg}: empty, stop.")
            break

        inserted, updated = upsert_items(city_code, items)
        total_inserted += inserted
        total_updated += updated

        print(f"  page {pg}: inserted={inserted}, updated={updated}, total={len(items)}")

        if mode == "incremental":
            if inserted == 0:
                seen_streak += 1
                if seen_streak >= 2:
                    print("  incremental early-stop (no new items).")
                    break
            else:
                seen_streak = 0
        time.sleep(random.uniform(1.0, 2.0))

    print(f"==> community {keyword} done. inserted={total_inserted}, updated={total_updated}")

7)统计与导出(定期归纳)

def daily_stats(date_tag: Optional[str]=None) -> Dict[str, pd.DataFrame]:
    """
    产出几个常用视图:按区/小区的挂牌量与均价。
    """
    with engine.begin() as conn:
        df = pd.read_sql(text("SELECT * FROM house"), conn)

    if df.empty:
        print("No data yet.")
        return {
   }

    # 过滤异常值(可自定义)
    df = df[(df["unit_price"].notna()) & (df["unit_price"] > 0)]

    # district 维度
    g_d = df.groupby("district").agg(
        listings=("house_id", "nunique"),
        avg_unit_price=("unit_price", "mean"),
        p50_unit_price=("unit_price", "median")
    ).reset_index().sort_values("listings", ascending=False)

    # community 维度
    g_c = df.groupby("community").agg(
        listings=("house_id", "nunique"),
        avg_unit_price=("unit_price", "mean")
    ).reset_index().sort_values("listings", ascending=False)

    # Top榜(示例)
    top_comm = g_c.head(20).copy()

    date_tag = date_tag or dt.datetime.now().strftime("%Y%m%d")
    out_dir = f"exports_{date_tag}"
    os.makedirs(out_dir, exist_ok=True)
    g_d.to_csv(os.path.join(out_dir, "district_stats.csv"), index=False)
    g_c.to_csv(os.path.join(out_dir, "community_stats.csv"), index=False)
    top_comm.to_csv(os.path.join(out_dir, "top_community.csv"), index=False)

    print(f"Exported to {out_dir}/")
    return {
   "district": g_d, "community": g_c, "top_community": top_comm}

8)调度(每天 08:00 / 16:00 运行)

def job_run():
    mode = MODE  # 环境变量切换
    # 1) 区维度
    for d in DISTRICTS:
        crawl_district(CITY, d, mode=mode, max_pages=80 if mode=="full" else 30)
    # 2) 小区关键词维度
    for kw in COMMUNITIES:
        crawl_community(CITY, kw, mode=mode, max_pages=40 if mode=="full" else 15)
    # 3) 统计导出
    daily_stats()

if __name__ == "__main__":
    # 方式A:直接执行一次
    job_run()

    # 方式B:APScheduler 定时
    # scheduler = BlockingScheduler(timezone="Asia/Shanghai")
    # scheduler.add_job(job_run, "cron", hour="8,16", minute=0)
    # scheduler.start()

结语

在真实业务里,“全量 vs 增量”从来不是二选一,而是 阶段性权衡工程化妥协。建议你将两种模式都纳入框架能力:用全量做“基线校准”,用增量做“日常维护”,再辅以内容哈希、早停策略、代理与频控,既稳且快,长期运营成本最低。

相关文章
|
8月前
|
数据采集 消息中间件 监控
单机与分布式:社交媒体热点采集的实践经验
在舆情监控与数据分析中,单机脚本适合小规模采集如微博热榜,而小红书等大规模、高时效性需求则需分布式架构。通过Redis队列、代理IP与多节点协作,可提升采集效率与稳定性,适应数据规模与变化速度。架构选择应根据实际需求,兼顾扩展性与维护成本。
244 2
|
8月前
|
人工智能 搜索推荐 算法
流行趋势到底能不能预测?用数据分析告诉你真相!
流行趋势到底能不能预测?用数据分析告诉你真相!
417 9
|
8月前
|
人工智能 监控 算法
构建时序感知的智能RAG系统:让AI自动处理动态数据并实时更新知识库
本文系统构建了一个基于时序管理的智能体架构,旨在应对动态知识库(如财务报告、技术文档)在问答任务中的演进与不确定性。通过六层设计(语义分块、原子事实提取、实体解析、时序失效处理、知识图构建、优化知识库),实现了从原始文档到结构化、时间感知知识库的转化。该架构支持RAG和多智能体系统,提升了推理逻辑性与准确性,并通过LangGraph实现自动化工作流,强化了对持续更新信息的处理能力。
1074 5
|
8月前
|
人工智能 数据可视化 算法
企业想做数智化,数据仓库架构你得先搞懂!
在数智化浪潮下,数据驱动已成为企业竞争力的核心。然而,许多企业在转型过程中忽视了数据仓库这一关键基础。本文深入解析数据仓库的重要性,厘清其与数据库的区别,详解ODS、DWD、DWS、ADS分层逻辑,并提供从0到1搭建数据仓库的五步实战方法,助力企业夯实数智化底座,实现数据治理与业务协同的真正落地。
企业想做数智化,数据仓库架构你得先搞懂!
|
8月前
|
搜索推荐 数据挖掘 API
微店商品详情API开发指南
本文介绍了如何通过微店的micro.item_get接口获取商品详情,涵盖商品信息、描述、图片及销量等数据,并提供Python调用示例,适用于电商开发、数据分析与个性化推荐场景。
|
人工智能 关系型数据库 文件存储
DIY nas 之--照片管理工具PhotoPrism
PhotoPrism是一个开源的照片管理工具,是一款由AI驱动的应用程序,主要用于浏览、组织和分享您的照片集。
1071 3
|
机器学习/深度学习 监控 搜索推荐
从零开始构建:使用Hologres打造个性化推荐系统的完整指南
【10月更文挑战第9天】随着互联网技术的发展,个性化推荐系统已经成为许多在线服务不可或缺的一部分。一个好的推荐系统可以显著提高用户体验,增加用户粘性,并最终提升业务的转化率。本指南将详细介绍如何使用阿里云的Hologres数据库来构建一个高效的个性化推荐系统。我们将涵盖从数据准备、模型训练到实时推荐的整个流程。
892 0
|
Linux Docker 容器
Centos安装docker(linux安装docker)——超详细小白可操作手把手教程,包好用!!!
本篇博客重在讲解Centos安装docker,经博主多次在不同服务器上测试,极其的稳定,尤其是阿里的服务器,一路复制命令畅通无阻。
22350 5
Centos安装docker(linux安装docker)——超详细小白可操作手把手教程,包好用!!!
|
数据采集 运维 JavaScript
淘宝反爬虫机制的主要手段有哪些?
淘宝的反爬虫机制包括用户身份识别与验证、请求特征分析、页面内容保护、浏览器指纹识别和蜜罐技术。通过User-Agent识别、Cookie验证、账号异常检测、请求频率限制、动态页面生成、验证码机制等手段,有效防止爬虫非法抓取数据。
|
数据安全/隐私保护 C++
c++实现http客户端和服务端的开源库以及Base64加密密码
c++实现http客户端和服务端的开源库以及Base64加密密码
488 0