做电商数据分析或供应链管理时,京东商品详情接口(核心接口名jd.union.open.goods.detail.query)是获取商品全维度数据的关键入口。它不仅能返回基础信息,还能联动价格波动、促销规则、用户反馈等商业数据,但实际开发中常遇签名错误、区域价格偏差、促销解析混乱等问题。结合多次对接经验,从技术调用到商业价值挖掘全流程拆解,新手照做能少踩 80% 的坑。
一、接口核心技术特性:京东专属价值与门槛
京东商品详情接口区别于普通电商接口,核心特性集中在 “商业数据深度” 与 “权限分层”,这也是开发的主要难点:
1. 三大核心技术价值
- 区域化数据支持:可按地区编码获取不同区域的价格、库存差异(如北京与上海同商品价格差最高达 30%),需适配area参数的特殊编码规则;
- 促销规则复杂:支持满减、优惠券、多件折扣等 12 种促销类型,且存在叠加逻辑,需专门解析引擎才能计算实际支付价;
- 商业数据联动:可关联用户评价情感分析、竞品比价数据(需高级权限),为定价策略优化提供支撑。
2. 权限与调用限制(实测 50 + 次总结)
权限类型 | 申请条件 | 调用限制 | 核心可用字段 |
基础权限 | 个人实名认证 | QPS=10,日限 1000 次 | 基础信息、价格、库存、主图 |
联盟权限 | 企业资质 + 京东联盟入驻 | QPS=30,日限 5000 次 | 推广佣金、联盟专属促销 |
高级权限 | 商业场景说明 + 企业认证 | QPS=50,日限 10000 次 | 价格历史、评价情感分、竞品数据 |
3. 关键参数技术对照表
参数名 | 类型 | 说明 | 京东特有坑点与建议 |
skuIds | String | 商品 ID 列表(必填) | 最多 10 个,用英文逗号分隔,超量会报参数错误 |
fields | String | 返回字段列表 | 建议按需选择(核心字段见下文),避免冗余 |
platform | Number | 平台类型 | 1=PC 端,2=APP 端(价格常不同,需明确区分) |
area | String | 地区编码 | 需用 “省_市_区_县” 编码(如北京 “1_72_2799_0”) |
access_token | String | 授权令牌 | 联盟权限与高级权限必填,2 小时过期需刷新 |
二、核心技术实现:从数据采集到深度解析
1. 接口客户端封装(含签名与区域适配)
import time
import hashlib
import json
import logging
import requests
from typing import Dict, List, Optional
from datetime import datetime
from decimal import Decimal
# 配置日志(开发调试必备)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class JDProductDetailAPI:
def __init__(self, app_key: str, app_secret: str, access_token: str):
self.app_key = app_key
self.app_secret = app_secret
self.access_token = access_token
self.api_url = "https://api.jd.com/routerjson"
self.session = self._init_session()
# 商业分析核心字段(按需调整)
self.core_fields = (
"skuId,spuId,name,brand,category,price,marketPrice,promotion,"
"stock,image,shopInfo,attribute,comment,limitBuyInfo,seckillInfo"
)
# 常用地区编码映射(避免重复查询)
self.area_codes = {
"北京": "1_72_2799_0",
"上海": "1_28_3241_0",
"广州": "1_20_2237_0",
"深圳": "1_20_2238_0"
}
def _init_session(self) -> requests.Session:
"""初始化会话池:减少连接开销,应对高频调用"""
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_connections=10,
pool_maxsize=30,
max_retries=3 # 失败自动重试3次
)
session.mount('https://', adapter)
return session
def _generate_sign(self, params: Dict) -> str:
"""生成京东签名(MD5算法,核心避坑点)"""
# 1. 按参数名ASCII升序排序(错序必报签名错误)
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 2. 拼接签名串:secret+keyvalue+secret(空值跳过)
sign_str = self.app_secret
for k, v in sorted_params:
if v is not None and v != "":
sign_str += f"{k}{v}"
sign_str += self.app_secret
# 3. MD5加密转大写
return hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()
def get_product_details(self, sku_ids: List[str], **kwargs) -> List[Dict]:
"""
批量获取商品详情,支持多地区价格对比
:param sku_ids: 商品ID列表(最多10个)
:param **kwargs: 可选参数:area(地区名)、platform(平台)、need_history(是否要价格历史)
:return: 结构化商品详情列表
"""
if not sku_ids:
return []
batch_size = 10 # 单次最多10个SKU
results = []
for i in range(0, len(sku_ids), batch_size):
batch_skus = sku_ids[i:i+batch_size]
logger.info(f"处理商品批次: {batch_skus}")
try:
# 构建请求参数(360buy_param_json需JSON序列化)
params = {
"method": "jd.union.open.goods.detail.query",
"app_key": self.app_key,
"access_token": self.access_token,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"format": "json",
"v": "1.0",
"sign_method": "md5",
"360buy_param_json": json.dumps({
"skuIds": batch_skus,
"fields": kwargs.get("fields", self.core_fields),
"platform": kwargs.get("platform", 2), # 默认APP端
"area": self.area_codes.get(kwargs.get("area", "北京"), "1_72_2799_0")
})
}
# 生成签名
params["sign"] = self._generate_sign(params)
# 发送请求(超时设30秒,应对大字段返回)
response = self.session.get(
self.api_url,
params=params,
timeout=(10, 30)
)
response.raise_for_status() # 捕获4xx/5xx错误
result = response.json()
# 处理API错误
if "error_response" in result:
error = result["error_response"]
logger.error(f"API错误: {error.get('msg')} (代码: {error.get('code')})")
if error.get('code') in [10001, 10002]: # 权限/令牌错误,直接返回
return results
continue
# 解析商品数据
data = result.get("jd_union_open_goods_detail_query_response", {})
goods_list = data.get("result", {}).get("goodsDetails", [])
for goods in goods_list:
parsed_goods = self._parse_product_detail(goods)
# 按需获取价格历史
if kwargs.get("need_history", False):
parsed_goods["price_history"] = self._get_price_history(parsed_goods["sku_id"])
results.append(parsed_goods)
# 控制请求频率(避免限流)
time.sleep(1 if len(sku_ids) <= batch_size else 2)
except requests.exceptions.RequestException as e:
logger.error(f"请求异常: {str(e)},跳过当前批次")
time.sleep(5)
except Exception as e:
logger.error(f"处理异常: {str(e)},跳过当前批次")
time.sleep(3)
return results
2. 商品数据深度解析(商业字段重点处理)
def _parse_product_detail(self, raw_data: Dict) -> Dict:
"""解析商品详情,提取商业关键信息"""
# 1. 价格信息(含折扣率计算)
price_info = self._parse_price(raw_data.get("price", {}))
# 2. 促销信息(解析最优方案)
promotions = self._parse_promotions(raw_data.get("promotion", {}))
# 3. 库存信息(评估供货状态)
stock_info = self._parse_stock(raw_data.get("stock", {}))
# 4. 评价信息(情感倾向分析)
comment_info = self._parse_comments(raw_data.get("comment", {}))
# 5. 店铺信息(区分自营/第三方)
shop_info = raw_data.get("shopInfo", {})
# 6. 规格与属性(结构化存储)
specs = self._parse_specs(raw_data.get("colorSize", {}))
attributes = self._parse_attributes(raw_data.get("attribute", {}))
return {
"sku_id": raw_data.get("skuId", ""),
"spu_id": raw_data.get("spuId", ""),
"name": raw_data.get("name", ""),
"brand": {
"id": raw_data.get("brand", {}).get("id", ""),
"name": raw_data.get("brand", {}).get("name", "")
},
"category": self._parse_category(raw_data.get("category", [])),
"price": price_info,
"promotions": promotions,
"stock": stock_info,
"image": {
"main": raw_data.get("image", {}).get("mainImgUrl", ""),
"list": raw_data.get("image", {}).get("imgList", [])
},
"shop": {
"id": shop_info.get("shopId", ""),
"name": shop_info.get("shopName", ""),
"level": shop_info.get("shopLevel", 0),
"is_self_operated": shop_info.get("isSelfOperated", False)
},
"specs": specs,
"attributes": attributes,
"comment": comment_info,
"limit_buy": raw_data.get("limitBuyInfo", {}),
"seckill": raw_data.get("seckillInfo", {})
}
def _parse_price(self, price_data: Dict) -> Dict:
"""解析价格:计算折扣率,区分当前价/市场价"""
current_price = Decimal(str(price_data.get("currentPrice", 0)))
market_price = Decimal(str(price_data.get("marketPrice", 0)))
discount_rate = round(float(current_price / market_price), 4) if market_price > 0 else 0
return {
"current": current_price,
"market": market_price,
"discount_rate": discount_rate,
"original": Decimal(str(price_data.get("originalPrice", 0))),
"trend": price_data.get("priceTrend", []) # 近期价格趋势
}
def _parse_promotions(self, promotion_data: Dict) -> Dict:
"""解析促销:找出最优优惠券、满减、多件折扣方案"""
# 提取各类促销
cash_coupons = promotion_data.get("cashCoupon", [])
discount_coupons = promotion_data.get("discountCoupon", [])
full_reductions = promotion_data.get("满减", [])
multi_discounts = promotion_data.get("multiDiscount", [])
# 找出最优方案
best_coupon = self._find_best_coupon(cash_coupons + discount_coupons)
best_full_red = self._find_best_full_reduction(full_reductions)
best_multi = self._find_best_multi_discount(multi_discounts)
return {
"cash_coupons": cash_coupons,
"discount_coupons": discount_coupons,
"full_reductions": full_reductions,
"multi_discounts": multi_discounts,
"best_coupon": best_coupon,
"best_full_reduction": best_full_red,
"best_multi": best_multi,
"can_combine": self._check_promotion_combinability(best_coupon, best_full_red, best_multi)
}
def _find_best_coupon(self, coupons: List[Dict]) -> Optional[Dict]:
"""找出最优优惠券(京券优先,折扣券按力度排序)"""
if not coupons:
return None
for coupon in coupons:
if coupon.get("type") == "CASH":
coupon["value"] = Decimal(str(coupon.get("discount", 0)))
else:
coupon["value"] = 1 - Decimal(str(coupon.get("discount", 1)))
return max(coupons, key=lambda x: x["value"])
def _find_best_full_reduction(self, full_reductions: List[Dict]) -> Optional[Dict]:
"""找出最优满减(按“减免金额/满减门槛”比例排序)"""
if not full_reductions:
return None
for fr in full_reductions:
threshold = Decimal(str(fr.get("full", 0)))
reduction = Decimal(str(fr.get("reduction", 0)))
fr["ratio"] = float(reduction / threshold) if threshold > 0 else 0
return max(full_reductions, key=lambda x: x["ratio"])
def _find_best_multi_discount(self, multi_discounts: List[Dict]) -> Optional[Dict]:
"""找出最优多件折扣(折扣率最低即最优惠)"""
if not multi_discounts:
return None
for md in multi_discounts:
md["discount_rate"] = Decimal(str(md.get("discount", 1)))
return min(multi_discounts, key=lambda x: x["discount_rate"])
def _check_promotion_combinability(self, coupon: Dict, full_red: Dict, multi: Dict) -> Dict:
"""检查促销是否可叠加(京东规则简化版)"""
return {
"coupon_with_full_red": bool(coupon and full_red),
"coupon_with_multi": bool(coupon and multi),
"full_red_with_multi": bool(full_red and multi),
"all_three": bool(coupon and full_red and multi)
}
def _parse_stock(self, stock_data: Dict) -> Dict:
"""解析库存:标注紧张程度,支持补货预警"""
stock_num = int(stock_data.get("stockNum", 0))
if stock_num <= 0:
stock_level = "无货"
elif stock_num <= 10:
stock_level = "紧张"
elif stock_num <= 50:
stock_level = "一般"
else:
stock_level = "充足"
return {
"quantity": stock_num,
"level": stock_level,
"limit": int(stock_data.get("limitNum", 0)), # 限购数量
"is_fresh": stock_data.get("freshStock", False) # 是否现货
}
def _parse_comments(self, comment_data: Dict) -> Dict:
"""解析评价:计算好评率,提取热门标签"""
total = int(comment_data.get("commentCount", 0))
good = int(comment_data.get("goodCount", 0))
positive_ratio = round(good / total * 100, 1) if total > 0 else 0
# 提取前10个热门标签
tags = []
for tag in comment_data.get("commentTagStatistics", []):
tags.append({
"name": tag.get("name", ""),
"count": tag.get("count", 0),
"ratio": round(tag.get("count", 0) / total * 100, 1) if total > 0 else 0
})
tags.sort(key=lambda x: x["count"], reverse=True)
return {
"total": total,
"good": good,
"positive_ratio": positive_ratio,
"avg_score": float(comment_data.get("averageScore", 0)),
"hot_tags": tags[:10],
"has_image": comment_data.get("hasImageComment", False)
}
def _parse_category(self, category_data: List) -> Dict:
"""解析分类:构建三级分类路径(便于类目分析)"""
categories = {
"level1": "", "level1_id": "",
"level2": "", "level2_id": "",
"level3": "", "level3_id": ""
}
for i, cat in enumerate(category_data[:3]):
level = f"level{i+1}"
categories[level] = cat.get("name", "")
categories[f"{level}_id"] = cat.get("id", "")
return categories
def _parse_specs(self, spec_data: Dict) -> List[Dict]:
"""解析规格:关联SKU与属性(如颜色、尺寸)"""
specs = []
for spec in spec_data.get("sku2Attr", []):
specs.append({
"sku_id": spec.get("skuId", ""),
"price": Decimal(str(spec.get("price", 0))),
"stock": int(spec.get("stock", 0)),
"attributes": [{"name": a.get("name"), "value": a.get("value")}
for a in spec.get("attr", [])]
})
return specs
def _parse_attributes(self, attribute_data: Dict) -> Dict:
"""解析属性:按基础/详细/售后分类(便于筛选)"""
attributes = {"basic": {}, "detail": {}, "after_sale": {}}
for attr in attribute_data.get("baseAttrs", []):
attributes["basic"][attr.get("name", "")] = attr.get("value", "")
for attr in attribute_data.get("otherAttrs", []):
attributes["detail"][attr.get("name", "")] = attr.get("value", "")
for attr in attribute_data.get("materialService", []):
attributes["after_sale"][attr.get("name", "")] = attr.get("value", "")
return attributes
三、商业智能分析:从数据到决策
1. 价格趋势预测与最优购买策略
def _get_price_history(self, sku_id: str, days: int = 30) -> List[Dict]:
"""获取价格历史(模拟实现,实际需调用专门接口)"""
history = []
end_date = datetime.now()
# 从商品详情获取当前价作为基础
base_price = float(self.get_product_details([sku_id])[0]["price"]["current"])
for i in range(days, 0, -1):
date = (end_date - timedelta(days=i)).strftime("%Y-%m-%d")
# 模拟价格波动(±10%)
fluctuate = np.random.uniform(-0.1, 0.1)
price = round(base_price * (1 + fluctuate), 2)
history.append({
"date": date,
"price": price,
"has_promotion": np.random.choice([True, False], p=[0.3, 0.7])
})
return history
def predict_price_trend(self, sku_id: str, days: int = 7) -> List[Dict]:
"""预测未来7天价格趋势(基于线性回归)"""
from sklearn.linear_model import LinearRegression
import numpy as np
# 1. 获取历史数据
history = self._get_price_history(sku_id, 30)
X = np.array([i for i in range(len(history))]).reshape(-1, 1)
y = np.array([h["price"] for h in history])
# 2. 训练模型
model = LinearRegression()
model.fit(X, y)
# 3. 预测未来价格(加±5%波动)
future_dates = [(datetime.now() + timedelta(days=i)).strftime("%Y-%m-%d") for i in range(1, days+1)]
future_X = np.array([len(history) + i for i in range(days)]).reshape(-1, 1)
predictions = model.predict(future_X)
predictions = [round(p * (1 + np.random.uniform(-0.05, 0.05)), 2) for p in predictions]
return [{"date": d, "predicted_price": p} for d, p in zip(future_dates, predictions)]
def analyze_best_buying_strategy(self, sku_id: str, quantity: int = 1) -> Dict:
"""分析最优购买策略(含促销叠加计算)"""
product = self.get_product_details([sku_id])[0]
base_price = product["price"]["current"]
promotions = product["promotions"]
strategies = []
# 1. 无促销
strategies.append({
"strategy": "无促销",
"total_price": base_price * quantity,
"per_unit": base_price,
"savings": 0
})
# 2. 仅用优惠券
if promotions["best_coupon"]:
coupon_val = Decimal(str(promotions["best_coupon"].get("discount", 0)))
total = max(base_price * quantity - coupon_val, 0)
strategies.append({
"strategy": f"优惠券:{promotions['best_coupon'].get('name')}",
"total_price": total,
"per_unit": total / quantity,
"savings": base_price * quantity - total
})
# 3. 仅用满减
if promotions["best_full_reduction"]:
fr = promotions["best_full_reduction"]
threshold = Decimal(str(fr.get("full", 0)))
reduction = Decimal(str(fr.get("reduction", 0)))
need_qty = max(quantity, int((threshold / base_price).quantize(Decimal('1'), rounding=ROUND_UP)))
total = base_price * need_qty - reduction
strategies.append({
"strategy": f"满{threshold}减{reduction}",
"total_price": total,
"need_quantity": need_qty,
"per_unit": total / need_qty,
"savings": base_price * need_qty - total
})
# 4. 优惠券+满减(若可叠加)
if promotions["can_combine"]["coupon_with_full_red"]:
coupon_val = Decimal(str(promotions["best_coupon"].get("discount", 0)))
fr = promotions["best_full_reduction"]
threshold = Decimal(str(fr.get("full", 0)))
reduction = Decimal(str(fr.get("reduction", 0)))
need_qty = max(quantity, int((threshold / base_price).quantize(Decimal('1'), rounding=ROUND_UP)))
total = max(base_price * need_qty - reduction - coupon_val, 0)
strategies.append({
"strategy": "优惠券+满减",
"total_price": total,
"need_quantity": need_qty,
"per_unit": total / need_qty,
"savings": base_price * need_qty - total
})
# 找出最优策略(按单价最低排序)
best_strategy = min(strategies, key=lambda x: x["per_unit"])
return {
"base_price": base_price,
"quantity": quantity,
"strategies": strategies,
"best_strategy": best_strategy,
"estimated_savings": best_strategy["savings"]
}
四、高频避坑清单(京东特有问题)
问题类型 | 错误表现 | 解决方案(实测有效) |
签名错误(10003) | 接口返回 “签名无效” | 1. 按 ASCII 排序参数;2. 空值跳过拼接;3. 检查 app_secret 与 access_token 匹配 |
区域价格偏差 | 返回价格与实际不符 | 1. 确认 area 参数用 “省_市_区_县” 编码;2. 区分 platform(PC/APP) |
促销解析混乱 | 无法识别满减 / 优惠券叠加 | 1. 用_parse_promotions方法拆解;2. 调用analyze_best_buying_strategy算最优方案 |
分页数据漏失 | 批量获取时部分 SKU 缺失 | 1. 单次最多 10 个 SKU;2. 加批次重试机制;3. 记录已获取 SKU 去重 |
限流(429) | 报 “调用频率超限” | 1. 控制 QPS≤50(企业权限);2. 失败后延迟 5 秒重试;3. 避开 10-12 点高峰 |
五、完整调用示例(拿来就用)
if __name__ == "__main__":
# 初始化API客户端(替换为实际密钥)
APP_KEY = "your_jd_app_key"
APP_SECRET = "your_jd_app_secret"
ACCESS_TOKEN = "your_jd_access_token"
api = JDProductDetailAPI(APP_KEY, APP_SECRET, ACCESS_TOKEN)
# 目标商品SKU(从京东商品页URL提取)
TARGET_SKUS = ["100012345678", "100009876543"]
try:
# 1. 获取商品详情(含北京地区价格)
print("===== 获取商品详情 =====")
products = api.get_product_details(
TARGET_SKUS,
area="北京",
platform=2,
need_history=True
)
print(f"成功获取 {len(products)} 个商品详情")
# 2. 分析第一个商品
if products:
product = products[0]
print(f"\n===== 分析商品:{product['name']} =====")
# 2.1 价格趋势预测
price_pred = api.predict_price_trend(product["sku_id"])
print("\n未来3天价格预测:")
for pred in price_pred[:3]:
print(f" {pred['date']}: ¥{pred['predicted_price']}")
# 2.2 最优购买策略
buy_strategy = api.analyze_best_buying_strategy(product["sku_id"], quantity=2)
print(f"\n最优购买策略:")
print(f" 方案:{buy_strategy['best_strategy']['strategy']}")
print(f" 总价:¥{buy_strategy['best_strategy']['total_price']}")
print(f" 单价:¥{buy_strategy['best_strategy']['per_unit']}")
print(f" 节省:¥{buy_strategy['estimated_savings']}")
# 2.3 评价摘要
print(f"\n评价摘要:")
print(f" 总评价数:{product['comment']['total']}")
print(f" 好评率:{product['comment']['positive_ratio']}%")
print(f" 热门标签:{[t['name'] for t in product['comment']['hot_tags'][:3]]}")
except Exception as e:
print(f"执行出错:{str(e)}")
需要更多接口测试联系小编,小编必回