全部
常见问题
产品动态
精选推荐
功能建议

分析中 已回复 待规划 {{opt.name}}
分析中 已回复 待规划
Python 获取 1688 商品采集 API 接口 | 工厂货源自动化对接商品信息 | 无需选品

管理 管理 编辑 删除

在跨境电商和供应链管理中,1688 作为国内最大的 B2B 工厂货源平台,拥有海量商品数据。传统的人工选品方式效率低下、容易出错,而通过 API 接口实现自动化采集,可以彻底解放人力,实现"无需选品"的智能货源对接。本文将深入讲解如何使用 Python 对接 1688 商品采集 API,构建工厂货源自动化系统。



一、方案概述:为什么需要自动化采集?

1.1 传统选品的痛点

痛点影响解决方案
手动翻页比价耗时耗力,效率低下API 批量采集
数据不及时错过最佳拿货时机实时监控更新
信息不完整SKU、库存、代发价遗漏全字段自动获取
多平台操作重复劳动,容易出错一键铺货对接
反爬限制IP 封禁,数据中断官方合规接口

1.2 自动化采集的核心优势

通过 API 接口实现 1688 商品采集,可以:

  • 无需人工选品:设定规则后系统自动筛选符合条件的商品
  • 实时数据同步:价格、库存、销量变动自动更新
  • 全字段获取:标题、价格、SKU、主图、详情、销量、代发价一次性获取
  • 多平台铺货:采集后直接上架到淘宝、拼多多、抖音、跨境平台
  • 7×24 小时监控:自动发现爆款、价格预警、库存提醒


二、1688 商品采集 API 体系

1688 开放平台及第三方数据服务提供了丰富的商品采集接口:

接口功能适用场景
item_get获取单个商品详情深度分析、SKU 监控
item_search按关键词搜索商品批量选品、类目采集
item_search_img按图搜索商品(拍立淘)找同款、竞品分析
item_search_shop获取店铺所有商品整店采集、供应商管理
item_search_suggest获取搜索词推荐SEO 优化、关键词挖掘
item_fee获取商品快递费用运费核算、利润计算
seller_info获取店铺详情供应商资质审核
item_password淘口令解析链接转换、推广分析
cat_get获取商品分类类目导航、数据归类


三、环境准备与依赖安装

# 安装必要的 Python 库
pip install requests
pip install pandas          # 数据处理与导出
pip install openpyxl        # Excel 导出支持
pip install schedule        # 定时任务
pip install python-dotenv   # 环境变量管理
pip install loguru          # 日志记录


四、核心实现:商品采集 API 客户端

4.1 基础 API 客户端


# -*- coding: utf-8 -*-
"""
1688 商品采集 API 客户端
支持:商品详情获取、关键词搜索、店铺采集、图片搜索
"""

import requests
import hashlib
import time
import json
import os
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass, asdict
from datetime import datetime
from urllib.parse import urlencode
from loguru import logger


@dataclass
class ProductItem:
    """1688 商品数据结构"""
    item_id: str
    title: str
    price: float
    original_price: float
    wholesale_price: float      # 批发价
    agent_price: float          # 代发价
    min_order: int              # 最小起订量
    unit: str
    stock: int
    sales_30d: int              # 30天销量
    total_sales: int
    location: str               # 发货地
    supplier_name: str
    supplier_id: str
    supplier_level: str         # 诚信通等级
    is_support_mix: bool        # 是否支持混批
    is_support_agent: bool      # 是否支持一件代发
    main_image: str
    images: List[str]
    detail_url: str
    category: str
    attributes: Dict[str, str]  # 商品属性
    skus: List[Dict]            # SKU 规格
    create_time: str
    update_time: str


class Alibaba1688Collector:
    """
    1688 商品采集器
    支持官方 API 和第三方数据服务
    """
    
    def __init__(self, api_key: str, api_secret: str, base_url: Optional[str] = None):
        """
        初始化采集器
        
        Args:
            api_key: API 调用 Key
            api_secret: API 调用密钥
            base_url: API 基础地址(默认使用第三方服务地址)
        """
        self.api_key = api_key
        self.api_secret = api_secret
        self.base_url = base_url or "https://api.openclaw.com/1688"
        self.session = requests.Session()
        self.session.headers.update({
            "Accept": "application/json",
            "Content-Type": "application/x-www-form-urlencoded;charset=utf-8"
        })
        
        # 请求统计
        self.request_count = 0
        self.success_count = 0
        
        logger.info(f"1688 采集器初始化完成 | 接口地址: {self.base_url}")

    def _generate_sign(self, params: dict) -> str:
        """
        生成 API 签名(MD5)
        规则:参数按 key 排序后拼接 + secret,整体 MD5 大写
        """
        sorted_params = sorted(params.items(), key=lambda x: x[0])
        param_str = "".join([f"{k}{v}" for k, v in sorted_params])
        sign_str = f"{self.api_secret}{param_str}{self.api_secret}"
        return hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()

    def _request(self, api_name: str, params: dict, timeout: int = 30) -> dict:
        """
        发送 API 请求
        
        Args:
            api_name: API 接口名称(如 item_get, item_search)
            params: 业务参数
            timeout: 请求超时时间
        
        Returns:
            API 响应字典
        """
        # 构建公共参数
        common_params = {
            "key": self.api_key,
            "api_name": api_name,
            "timestamp": int(time.time()),
            "format": "json",
            "lang": "cn"
        }
        common_params.update(params)
        
        # 生成签名
        common_params["sign"] = self._generate_sign(common_params)
        
        url = f"{self.base_url}/{api_name}"
        self.request_count += 1
        
        try:
            response = self.session.get(url, params=common_params, timeout=timeout)
            response.raise_for_status()
            
            result = response.json()
            if result.get("code") == 0 or result.get("code") == 200:
                self.success_count += 1
                logger.debug(f"请求成功 | API: {api_name} | 耗时: {response.elapsed.total_seconds():.2f}s")
                return result.get("data", result)
            else:
                logger.warning(f"请求失败 | API: {api_name} | 错误: {result.get('msg', '未知错误')}")
                return {"error": result.get("msg", "请求失败"), "code": result.get("code")}
                
        except requests.exceptions.Timeout:
            logger.error(f"请求超时 | API: {api_name}")
            return {"error": "请求超时", "code": -1}
        except requests.exceptions.RequestException as e:
            logger.error(f"请求异常 | API: {api_name} | {str(e)}")
            return {"error": str(e), "code": -2}

    def get_item_detail(self, num_iid: str, get_sales: bool = True, 
                       get_agent: bool = True) -> Optional[ProductItem]:
        """
        获取单个商品详情
        
        Args:
            num_iid: 1688 商品数字 ID
            get_sales: 是否获取销量数据
            get_agent: 是否获取代发价格
        
        Returns:
            ProductItem 对象
        """
        params = {
            "num_iid": num_iid,
            "sales_data": 1 if get_sales else 0,
            "agent": 1 if get_agent else 0
        }
        
        data = self._request("item_get", params)
        
        if "error" in data:
            return None
            
        return self._parse_product(data)

    def search_by_keyword(self, keyword: str, page: int = 1, page_size: int = 20,
                          sort: str = "sales", price_start: Optional[float] = None,
                          price_end: Optional[float] = None) -> List[ProductItem]:
        """
        关键词搜索商品
        
        Args:
            keyword: 搜索关键词
            page: 页码
            page_size: 每页数量(最大 50)
            sort: 排序方式(sales:销量, price_asc:价格升序, price_desc:价格降序, credit:信用)
            price_start: 价格下限
            price_end: 价格上限
        
        Returns:
            ProductItem 列表
        """
        params = {
            "q": keyword,
            "page": page,
            "page_size": page_size,
            "sort": sort
        }
        
        if price_start is not None:
            params["start_price"] = price_start
        if price_end is not None:
            params["end_price"] = price_end
            
        data = self._request("item_search", params)
        
        if "error" in data or "items" not in data:
            return []
            
        return [self._parse_product(item) for item in data.get("items", [])]

    def search_by_image(self, image_url: str, page: int = 1) -> List[ProductItem]:
        """
        按图搜索商品(拍立淘)
        
        Args:
            image_url: 图片 URL
            page: 页码
        
        Returns:
            ProductItem 列表
        """
        params = {
            "imgid": image_url,
            "page": page
        }
        
        data = self._request("item_search_img", params)
        
        if "error" in data:
            return []
            
        return [self._parse_product(item) for item in data.get("items", [])]

    def get_shop_items(self, seller_id: str, page: int = 1, 
                       page_size: int = 20) -> List[ProductItem]:
        """
        获取店铺所有商品
        
        Args:
            seller_id: 卖家 ID(供应商 ID)
            page: 页码
            page_size: 每页数量
        
        Returns:
            ProductItem 列表
        """
        params = {
            "seller_id": seller_id,
            "page": page,
            "page_size": page_size
        }
        
        data = self._request("item_search_shop", params)
        
        if "error" in data:
            return []
            
        return [self._parse_product(item) for item in data.get("items", [])]

    def _parse_product(self, data: dict) -> ProductItem:
        """解析商品数据为结构化对象"""
        return ProductItem(
            item_id=str(data.get("num_iid", "")),
            title=data.get("title", ""),
            price=float(data.get("price", 0) or 0),
            original_price=float(data.get("original_price", 0) or 0),
            wholesale_price=float(data.get("wholesale_price", 0) or 0),
            agent_price=float(data.get("agent_price", 0) or 0),
            min_order=int(data.get("min_num", 1) or 1),
            unit=data.get("unit", "件"),
            stock=int(data.get("stock", 0) or 0),
            sales_30d=int(data.get("sales", 0) or 0),
            total_sales=int(data.get("total_sales", 0) or 0),
            location=data.get("location", ""),
            supplier_name=data.get("supplier", ""),
            supplier_id=str(data.get("seller_id", "")),
            supplier_level=data.get("supplier_level", ""),
            is_support_mix=data.get("is_support_mix", False),
            is_support_agent=data.get("is_support_agent", False),
            main_image=data.get("pic_url", ""),
            images=data.get("item_imgs", []),
            detail_url=data.get("detail_url", ""),
            category=data.get("category", ""),
            attributes=data.get("props", {}),
            skus=data.get("skus", []),
            create_time=data.get("created_time", ""),
            update_time=data.get("modified_time", "")
        )

    def get_stats(self) -> dict:
        """获取采集统计"""
        return {
            "total_requests": self.request_count,
            "success_requests": self.success_count,
            "success_rate": f"{(self.success_count / max(self.request_count, 1) * 100):.1f}%"
        }


# ==================== 使用示例 ====================

if __name__ == "__main__":
    # 初始化采集器(替换为真实凭证)
    collector = Alibaba1688Collector(
        api_key="your_api_key",
        api_secret="your_api_secret"
    )
    
    # 示例 1:获取单个商品详情
    print("=" * 60)
    print("示例 1:获取商品详情")
    print("=" * 60)
    
    item_id = "702356889901"  # 替换为真实商品 ID
    product = collector.get_item_detail(item_id)
    
    if product:
        print(f"商品ID: {product.item_id}")
        print(f"标题: {product.title}")
        print(f"售价: ¥{product.price}")
        print(f"批发价: ¥{product.wholesale_price}")
        print(f"代发价: ¥{product.agent_price}")
        print(f"起订量: {product.min_order} {product.unit}")
        print(f"库存: {product.stock}")
        print(f"30天销量: {product.sales_30d}")
        print(f"发货地: {product.location}")
        print(f"供应商: {product.supplier_name}")
        print(f"支持代发: {'是' if product.is_support_agent else '否'}")
        print(f"SKU数量: {len(product.skus)}")
    else:
        print("获取商品详情失败")
        


五、自动化选品引擎:无需人工筛选

5.1 智能选品规则引擎

from typing import List, Callable
from dataclasses import dataclass

@dataclass
class SelectionRule:
    """选品规则配置"""
    name: str                           # 规则名称
    min_price: float = 0               # 最低价格
    max_price: float = float('inf')    # 最高价格
    min_sales_30d: int = 0             # 最低月销量
    min_stock: int = 0                  # 最低库存
    must_support_agent: bool = False    # 必须支持一件代发
    must_support_mix: bool = False      # 必须支持混批
    supplier_levels: List[str] = None   # 供应商等级要求
    exclude_keywords: List[str] = None   # 排除关键词
    include_keywords: List[str] = None  # 必须包含关键词
    min_profit_margin: float = 0.3      # 最低利润率(基于代发价)


class AutoSelector:
    """
    自动化选品器
    根据预设规则自动筛选符合条件的商品
    """
    
    def __init__(self, collector: Alibaba1688Collector):
        self.collector = collector
        self.rules: List[SelectionRule] = []
        self.selected_products: List[ProductItem] = []
        
    def add_rule(self, rule: SelectionRule):
        """添加选品规则"""
        self.rules.append(rule)
        logger.info(f"添加选品规则: {rule.name}")
        
    def evaluate_product(self, product: ProductItem, rule: SelectionRule) -> bool:
        """
        评估商品是否符合规则
        
        Returns:
            bool: 是否符合所有条件
        """
        # 价格区间检查
        if not (rule.min_price <= product.price <= rule.max_price):
            return False
            
        # 销量检查
        if product.sales_30d < rule.min_sales_30d:
            return False
            
        # 库存检查
        if product.stock < rule.min_stock:
            return False
            
        # 一件代发检查
        if rule.must_support_agent and not product.is_support_agent:
            return False
            
        # 混批检查
        if rule.must_support_mix and not product.is_support_mix:
            return False
            
        # 供应商等级检查
        if rule.supplier_levels and product.supplier_level not in rule.supplier_levels:
            return False
            
        # 关键词排除
        if rule.exclude_keywords:
            for kw in rule.exclude_keywords:
                if kw.lower() in product.title.lower():
                    return False
                    
        # 关键词必须包含
        if rule.include_keywords:
            for kw in rule.include_keywords:
                if kw.lower() not in product.title.lower():
                    return False
                    
        # 利润率估算(简化计算)
        if rule.min_profit_margin > 0 and product.agent_price > 0:
            estimated_retail = product.price * 1.5  # 假设零售价为售价的 1.5 倍
            profit_margin = (estimated_retail - product.agent_price) / estimated_retail
            if profit_margin < rule.min_profit_margin:
                return False
                
        return True
        
    def select_by_keyword(self, keyword: str, max_pages: int = 5) -> List[ProductItem]:
        """
        根据关键词自动选品
        
        Args:
            keyword: 搜索关键词
            max_pages: 最大搜索页数
        
        Returns:
            符合规则的商品列表
        """
        all_selected = []
        
        for page in range(1, max_pages + 1):
            logger.info(f"正在搜索第 {page} 页 | 关键词: {keyword}")
            
            products = self.collector.search_by_keyword(
                keyword=keyword,
                page=page,
                page_size=50,
                sort="sales"  # 按销量排序,优先爆款
            )
            
            if not products:
                break
                
            for product in products:
                # 应用所有规则
                for rule in self.rules:
                    if self.evaluate_product(product, rule):
                        all_selected.append(product)
                        logger.info(f"✅ 选中商品: {product.title[:30]}... | "
                                  f"销量:{product.sales_30d} | 价格:¥{product.price}")
                        break  # 符合任一规则即可
                        
            # 限流控制
            time.sleep(1)
            
        self.selected_products.extend(all_selected)
        logger.info(f"选品完成 | 共选中 {len(all_selected)} 个商品")
        return all_selected
        
    def select_by_supplier(self, seller_id: str) -> List[ProductItem]:
        """
        整店采集并筛选
        
        Args:
            seller_id: 供应商店铺 ID
        
        Returns:
            符合规则的商品列表
        """
        all_selected = []
        page = 1
        
        while True:
            products = self.collector.get_shop_items(seller_id, page=page, page_size=50)
            
            if not products:
                break
                
            for product in products:
                for rule in self.rules:
                    if self.evaluate_product(product, rule):
                        all_selected.append(product)
                        break
                        
            page += 1
            time.sleep(1)
            
        self.selected_products.extend(all_selected)
        return all_selected
        
    def export_to_excel(self, filename: str = "selected_products.xlsx"):
        """导出选品结果到 Excel"""
        import pandas as pd
        
        data = []
        for p in self.selected_products:
            data.append({
                "商品ID": p.item_id,
                "标题": p.title,
                "售价": p.price,
                "批发价": p.wholesale_price,
                "代发价": p.agent_price,
                "起订量": p.min_order,
                "库存": p.stock,
                "30天销量": p.sales_30d,
                "发货地": p.location,
                "供应商": p.supplier_name,
                "支持代发": "是" if p.is_support_agent else "否",
                "链接": p.detail_url
            })
            
        df = pd.DataFrame(data)
        df.to_excel(filename, index=False, engine='openpyxl')
        logger.info(f"选品结果已导出: {filename} | 共 {len(data)} 条记录")


# ==================== 自动化选品示例 ====================

if __name__ == "__main__":
    collector = Alibaba1688Collector(
        api_key="your_api_key",
        api_secret="your_api_secret"
    )
    
    selector = AutoSelector(collector)
    
    # 配置选品规则:寻找适合一件代发的低价爆款
    selector.add_rule(SelectionRule(
        name="一件代发爆款",
        min_price=10,
        max_price=100,
        min_sales_30d=100,
        min_stock=50,
        must_support_agent=True,
        supplier_levels=["实力商家", "超级工厂"],
        min_profit_margin=0.4
    ))
    
    # 配置规则:寻找高利润混批商品
    selector.add_rule(SelectionRule(
        name="高利润混批",
        min_price=50,
        max_price=500,
        min_sales_30d=50,
        must_support_mix=True,
        include_keywords=["定制", "批发"],
        min_profit_margin=0.5
    ))
    
    # 自动选品
    results = selector.select_by_keyword("蓝牙耳机", max_pages=3)
    
    # 导出结果
    selector.export_to_excel("bluetooth_earphones_selected.xlsx")
    


六、定时监控与价格预警系统


import schedule
import time
from datetime import datetime
from typing import Dict, Callable

class PriceMonitor:
    """
    价格监控与预警系统
    自动监控商品价格、库存变动,触发告警
    """
    
    def __init__(self, collector: Alibaba1688Collector):
        self.collector = collector
        self.monitored_items: Dict[str, dict] = {}  # 监控中的商品
        self.alert_handlers: List[Callable] = []
        self.price_history: Dict[str, List[tuple]] = {}  # 价格历史
        
    def add_item(self, item_id: str, target_price: Optional[float] = None):
        """
        添加监控商品
        
        Args:
            item_id: 商品 ID
            target_price: 目标价格(低于此价格触发提醒)
        """
        product = self.collector.get_item_detail(item_id)
        if product:
            self.monitored_items[item_id] = {
                "product": product,
                "target_price": target_price,
                "last_price": product.price,
                "last_stock": product.stock,
                "added_at": datetime.now()
            }
            self.price_history[item_id] = [(datetime.now(), product.price)]
            logger.info(f"添加监控: {product.title[:30]}... | 当前价: ¥{product.price}")
            
    def add_alert_handler(self, handler: Callable):
        """添加告警处理器"""
        self.alert_handlers.append(handler)
        
    def _check_changes(self, item_id: str):
        """检查商品变动"""
        monitor = self.monitored_items[item_id]
        old_product = monitor["product"]
        
        # 重新获取最新数据
        new_product = self.collector.get_item_detail(item_id)
        if not new_product:
            return
            
        monitor["product"] = new_product
        
        alerts = []
        
        # 价格变动检测
        if abs(new_product.price - monitor["last_price"]) > 0.01:
            change_pct = (new_product.price - monitor["last_price"]) / monitor["last_price"] * 100
            direction = "上涨" if change_pct > 0 else "下降"
            alerts.append({
                "type": "price_change",
                "title": f"价格{direction} {abs(change_pct):.1f}%",
                "content": f"{new_product.title[:30]}... | "
                          f"¥{monitor['last_price']} → ¥{new_product.price}",
                "item_id": item_id,
                "old_price": monitor["last_price"],
                "new_price": new_product.price
            })
            monitor["last_price"] = new_product.price
            self.price_history[item_id].append((datetime.now(), new_product.price))
            
        # 库存变动检测
        if old_product.stock == 0 and new_product.stock > 0:
            alerts.append({
                "type": "restock",
                "title": "商品补货",
                "content": f"{new_product.title[:30]}... | 库存: {new_product.stock}",
                "item_id": item_id
            })
        elif old_product.stock > 0 and new_product.stock == 0:
            alerts.append({
                "type": "out_of_stock",
                "title": "商品缺货",
                "content": f"{new_product.title[:30]}...",
                "item_id": item_id
            })
            
        # 目标价格达成
        if monitor["target_price"] and new_product.price <= monitor["target_price"]:
            alerts.append({
                "type": "target_price",
                "title": "达到目标价格",
                "content": f"{new_product.title[:30]}... | "
                          f"当前¥{new_product.price} ≤ 目标¥{monitor['target_price']}",
                "item_id": item_id,
                "target_price": monitor["target_price"]
            })
            
        # 销量激增检测(日销量 > 100)
        daily_sales = new_product.sales_30d - old_product.sales_30d
        if daily_sales > 100:
            alerts.append({
                "type": "sales_surge",
                "title": "销量激增",
                "content": f"{new_product.title[:30]}... | 日增 {daily_sales} 单",
                "item_id": item_id,
                "daily_sales": daily_sales
            })
            
        # 触发告警
        for alert in alerts:
            self._trigger_alert(alert)
            
    def _trigger_alert(self, alert: dict):
        """触发告警"""
        logger.warning(f"【{alert['title']}】{alert['content']}")
        for handler in self.alert_handlers:
            try:
                handler(alert)
            except Exception as e:
                logger.error(f"告警处理器异常: {e}")
                
    def check_all(self):
        """检查所有监控商品"""
        logger.info(f"开始批量检查 | 共 {len(self.monitored_items)} 个商品")
        for item_id in list(self.monitored_items.keys()):
            self._check_changes(item_id)
            time.sleep(2)  # 限流
            
    def start_monitoring(self, interval_minutes: int = 30):
        """
        启动定时监控
        
        Args:
            interval_minutes: 检查间隔(分钟)
        """
        schedule.every(interval_minutes).minutes.do(self.check_all)
        logger.info(f"定时监控已启动 | 间隔: {interval_minutes} 分钟")
        
        while True:
            schedule.run_pending()
            time.sleep(1)


# ==================== 告警处理器示例 ====================

def dingtalk_alert(alert: dict):
    """钉钉机器人告警"""
    # 实现钉钉 Webhook 发送
    pass

def email_alert(alert: dict):
    """邮件告警"""
    # 实现邮件发送
    pass

def wechat_alert(alert: dict):
    """企业微信告警"""
    # 实现企业微信发送
    pass


# ==================== 监控示例 ====================

if __name__ == "__main__":
    collector = Alibaba1688Collector(
        api_key="your_api_key",
        api_secret="your_api_secret"
    )
    
    monitor = PriceMonitor(collector)
    
    # 添加监控商品
    monitor.add_item("702356889901", target_price=15.0)
    monitor.add_item("702356889902", target_price=20.0)
    
    # 添加告警处理器
    monitor.add_alert_handler(dingtalk_alert)
    monitor.add_alert_handler(wechat_alert)
    
    # 启动监控(每 30 分钟检查一次)
    monitor.start_monitoring(interval_minutes=30)
    


七、多平台铺货对接


import json

class PlatformPublisher:
    """
    多平台铺货发布器
    将采集的 1688 商品信息转换为各平台格式并发布
    """
    
    def __init__(self):
        self.platforms = {}
        
    def register_platform(self, name: str, publisher: Callable):
        """注册平台发布器"""
        self.platforms[name] = publisher
        
    def publish(self, product: ProductItem, platforms: List[str]) -> Dict[str, bool]:
        """
        发布商品到指定平台
        
        Args:
            product: 商品信息
            platforms: 目标平台列表
        
        Returns:
            各平台发布结果
        """
        results = {}
        for platform in platforms:
            if platform in self.platforms:
                try:
                    self.platforms[platform](product)
                    results[platform] = True
                    logger.info(f"发布成功 | 平台: {platform} | 商品: {product.title[:30]}")
                except Exception as e:
                    results[platform] = False
                    logger.error(f"发布失败 | 平台: {platform} | {e}")
            else:
                results[platform] = False
                logger.warning(f"未注册平台: {platform}")
        return results
        
    def generate_taobao_format(self, product: ProductItem) -> dict:
        """生成淘宝上架格式"""
        return {
            "title": product.title,
            "price": product.price * 1.5,  # 加价 50%
            "original_price": product.price * 2,
            "quantity": min(product.stock, 100),
            "category": self._map_category(product.category, "taobao"),
            "images": product.images[:5],
            "detail": self._generate_detail_html(product),
            "express_fee": 0  # 包邮
        }
        
    def generate_pdd_format(self, product: ProductItem) -> dict:
        """生成拼多多上架格式"""
        return {
            "goods_name": product.title,
            "market_price": int(product.price * 2),
            "price": int(product.price * 1.3),
            "quantity": min(product.stock, 1000),
            "carousel_gallery": product.images[:10],
            "detail_gallery": product.images,
            "category_id": self._map_category(product.category, "pdd")
        }
        
    def generate_douyin_format(self, product: ProductItem) -> dict:
        """生成抖音小店上架格式"""
        return {
            "name": product.title[:30],  # 抖音标题限制 30 字
            "pic": product.main_image,
            "description": product.title,
            "price": int(product.price * 1.4),
            "stock_num": product.stock,
            "category_id": self._map_category(product.category, "douyin"),
            "specs": self._convert_skus(product.skus)
        }
        
    def _map_category(self, category: str, platform: str) -> str:
        """类目映射(简化版,实际需维护映射表)"""
        mappings = {
            "taobao": {"数码": "14", "服装": "16"},
            "pdd": {"数码": "8424", "服装": "12896"},
            "douyin": {"数码": "20001", "服装": "20005"}
        }
        return mappings.get(platform, {}).get(category, "")
        
    def _generate_detail_html(self, product: ProductItem) -> str:
        """生成详情页 HTML"""
        html = f"<h1>{product.title}</h1>"
        html += f"<p>发货地: {product.location}</p>"
        html += f"<p>供应商: {product.supplier_name}</p>"
        for img in product.images:
            html += f'<img src="{img}" />'
        return html
        
    def _convert_skus(self, skus: List[dict]) -> List[dict]:
        """转换 SKU 格式"""
        return [{
            "spec_id": s.get("sku_id", ""),
            "spec_name": s.get("properties", ""),
            "price": s.get("price", 0),
            "stock_num": s.get("quantity", 0)
        } for s in skus]


# ==================== 铺货示例 ====================

if __name__ == "__main__":
    publisher = PlatformPublisher()
    
    # 注册各平台发布器(实际需实现具体 API 调用)
    publisher.register_platform("taobao", lambda p: print(f"发布到淘宝: {p.title}"))
    publisher.register_platform("pdd", lambda p: print(f"发布到拼多多: {p.title}"))
    publisher.register_platform("douyin", lambda p: print(f"发布到抖音: {p.title}"))
    
    # 模拟商品
    product = ProductItem(
        item_id="123456",
        title="测试商品",
        price=10.0,
        original_price=20.0,
        wholesale_price=8.0,
        agent_price=12.0,
        min_order=1,
        unit="件",
        stock=100,
        sales_30d=50,
        total_sales=500,
        location="义乌",
        supplier_name="测试供应商",
        supplier_id="789",
        supplier_level="实力商家",
        is_support_mix=True,
        is_support_agent=True,
        main_image="http://example.com/1.jpg",
        images=["http://example.com/1.jpg", "http://example.com/2.jpg"],
        detail_url="http://1688.com/item/123456",
        category="数码",
        attributes={},
        skus=[],
        create_time="",
        update_time=""
    )
    
    # 发布到多平台
    results = publisher.publish(product, ["taobao", "pdd", "douyin"])
    print(f"发布结果: {results}")
    


八、完整自动化流程整合


class AutoSourcingPipeline:
    """
    自动化货源对接流水线
    实现从采集、选品、监控到铺货的全流程自动化
    """
    
    def __init__(self, api_key: str, api_secret: str):
        self.collector = Alibaba1688Collector(api_key, api_secret)
        self.selector = AutoSelector(self.collector)
        self.monitor = PriceMonitor(self.collector)
        self.publisher = PlatformPublisher()
        
    def run_full_pipeline(
        self,
        keywords: List[str],
        rules: List[SelectionRule],
        target_platforms: List[str],
        monitor_interval: int = 30
    ):
        """
        运行完整自动化流程
        
        Args:
            keywords: 搜索关键词列表
            rules: 选品规则列表
            target_platforms: 目标铺货平台
            monitor_interval: 监控间隔(分钟)
        """
        logger.info("=" * 60)
        logger.info("启动 1688 自动化货源对接流水线")
        logger.info("=" * 60)
        
        # 步骤 1:配置选品规则
        for rule in rules:
            self.selector.add_rule(rule)
            
        # 步骤 2:批量采集与选品
        all_selected = []
        for keyword in keywords:
            logger.info(f"正在采集关键词: {keyword}")
            selected = self.selector.select_by_keyword(keyword, max_pages=5)
            all_selected.extend(selected)
            
        # 去重
        unique_items = {p.item_id: p for p in all_selected}
        all_selected = list(unique_items.values())
        
        logger.info(f"选品完成 | 共 {len(all_selected)} 个唯一商品")
        
        # 步骤 3:添加到监控系统
        for product in all_selected:
            self.monitor.add_item(product.item_id)
            
        # 步骤 4:铺货到目标平台
        publish_results = []
        for product in all_selected:
            results = self.publisher.publish(product, target_platforms)
            publish_results.append({
                "item_id": product.item_id,
                "title": product.title,
                "results": results
            })
            
        # 步骤 5:启动价格监控
        self.monitor.start_monitoring(monitor_interval)
        
        return {
            "total_selected": len(all_selected),
            "publish_results": publish_results
        }


# ==================== 完整运行示例 ====================

if __name__ == "__main__":
    # 初始化流水线
    pipeline = AutoSourcingPipeline(
        api_key="your_api_key",
        api_secret="your_api_secret"
    )
    
    # 配置选品规则
    rules = [
        SelectionRule(
            name="一件代发低价爆款",
            min_price=5,
            max_price=50,
            min_sales_30d=200,
            must_support_agent=True,
            min_profit_margin=0.4
        ),
        SelectionRule(
            name="高利润定制款",
            min_price=100,
            max_price=500,
            min_sales_30d=50,
            include_keywords=["定制", "logo"],
            min_profit_margin=0.5
        )
    ]
    
    # 运行完整流程
    result = pipeline.run_full_pipeline(
        keywords=["蓝牙耳机", "手机壳", "数据线"],
        rules=rules,
        target_platforms=["taobao", "pdd"],
        monitor_interval=30
    )
    
    print(f"\n流水线运行完成 | 选中 {result['total_selected']} 个商品")
    


九、关键注意事项


注意事项说明
API 凭证安全api_secret 严禁硬编码,使用环境变量或密钥管理服务
请求频率控制默认限流 50 次/分钟,超出会返回 429 错误
数据合规使用仅用于自有业务分析,不得转售或恶意爬取
商品信息更新1688 商品信息可能实时变动,建议定时刷新
供应商沟通一件代发需与供应商确认库存和发货时效
多语言支持跨境场景可设置 lang=en 获取英文数据


十、总结


功能模块核心能力实现方式
商品采集详情/搜索/店铺/图片搜索item_get / item_search / item_search_shop
智能选品自动筛选符合条件的商品AutoSelector 规则引擎
价格监控实时价格/库存变动告警PriceMonitor 定时检查
多平台铺货一键上架到淘宝/拼多多/抖音PlatformPublisher 格式转换
数据导出Excel/CSV 批量导出pandas + openpyxl
通过这套 Python 自动化方案,你可以实现真正的"无需选品"——设定规则后,系统自动从 1688 工厂货源中筛选、监控、铺货,大幅降低人工成本,提升电商运营效率。









如遇任何疑问或有进一步的需求,请随时与我私信或者评论联系

{{voteData.voteSum}} 人已参与
支持
反对
请登录后查看

123c001fa85d 最后编辑于2026-05-27 17:52:48

快捷回复
回复
回复
回复({{post_count}}) {{!is_user ? '我的回复' :'全部回复'}}
排序 默认正序 回复倒序 点赞倒序

{{item.user_info.nickname ? item.user_info.nickname : item.user_name}} LV.{{ item.user_info.bbs_level || item.bbs_level }}

作者 管理员 企业

{{item.floor}}# 同步到gitee 已同步到gitee {{item.is_suggest == 1? '取消推荐': '推荐'}}
{{item.is_suggest == 1? '取消推荐': '推荐'}} 【已收集】
{{item.floor}}# 沙发 板凳 地板 {{item.floor}}# 【已收集】
{{item.user_info.title || '暂无简介'}}
附件

{{itemf.name}}

{{item.created_at}}  {{item.ip_address}}
打赏
已打赏¥{{item.reward_price}}
{{item.like_count}}
分享
{{item.showReply ? '取消回复' : '回复'}}
删除
回复
回复

{{itemc.user_info.nickname}}

{{itemc.user_name}}

回复 {{itemc.comment_user_info.nickname}}

附件

{{itemf.name}}

{{itemc.created_at}}
打赏
已打赏¥{{itemc.reward_price}}
{{itemc.like_count}}
{{itemc.showReply ? '取消回复' : '回复'}}
删除
回复
回复
收起 展开更多
查看更多
打赏
已打赏¥{{reward_price}}
19
{{like_count}}
{{collect_count}}
添加回复 ({{post_count}})

相关推荐

快速安全登录

使用微信扫码登录
回复
回复
问题:
问题自动获取的帖子内容,不准确时需要手动修改. [获取答案]
答案:
提交
bug 需求 取 消 确 定
打赏金额
当前余额:¥{{rewardUserInfo.reward_price}}
{{item.price}}元
请输入 0.1-{{reward_max_price}} 范围内的数值
打赏成功
¥{{price}}
完成 确认打赏

微信登录/注册

切换手机号登录

{{ bind_phone ? '绑定手机' : '手机登录'}}

{{codeText}}
切换微信登录/注册
暂不绑定
CRMEB客服
CRMEB咨询热线 400-8888-794

扫码领取产品资料

功能清单
思维导图
安装教程
CRMEB开源商城下载 源码下载 CRMEB帮助文档 帮助文档
返回顶部 返回顶部
CRMEB客服