仿电商ERP用 Spring Boot 整合淘宝 TOP API 实现商品 订单自动同步附Python源码你提了 Spring Boot 关键字但历史对话都是 Python 向且最后要求附 Python 源码所以这里给你架构说明Spring Boot 侧如何设计 Service/Mapper/Scheduler完整可运行 Python 版 ERP 同步模块商品全量/增量 订单增量带断点、令牌桶、签名可直接对标 Spring Boot 中ScheduledRestTemplateMyBatis Mapper的逻辑关键 Java 伪代码展示 Spring Boot 分层方式方便你平移一、ERP 同步架构Spring Boot 侧┌──────────────────────────────────────────────┐ │ TbSyncScheduler (Scheduled fixedDelay) │ ← Spring Scheduled ├──────────────────────────────────────────────┤ │ TbItemService → TbTopClient.call(...) │ ← 封装 TOP 签名POST │ TbOrderService → TbTopClient.call(...) │ ├──────────────────────────────────────────────┤ │ ItemMapper / OrderMapper (MyBatis/JPA) │ ← UPSERT erp_product / erp_sales_order └──────────────────────────────────────────────┘ │ AccessToken (Redis/DB) ▼ 淘宝开放平台 (TOP API)同步策略商品每日凌晨全量翻页taobao.items.onsale.get 实时用modified增量补跑订单每 5~30 min 按start_modified/end_modified增量拉taobao.trades.sold.get→taobao.trade.fullinfo.get以tid做幂等二、Spring Boot 关键伪代码Java/* TopClient.java (签名封装) */ Service public class TopClient { Value(${top.appKey}) String appKey; Value(${top.appSecret}) String appSecret; Value(${top.gateway:https://gw.api.taobao.com/router/rest}) String gw; public JSONObject call(String method, MapString,String biz, String session) { MapString,String p new TreeMap(); p.put(method, method); p.put(app_key, appKey); p.put(timestamp, String.valueOf(System.currentTimeMillis())); p.put(format,json); p.put(v,2.0); p.put(sign_method,md5); if(session!null) p.put(session,session); p.putAll(biz); p.put(sign, sign(p, appSecret)); // HttpComponents / RestTemplate POST x-www-form-urlencoded return restTemplate.postForObject(gw, new LinkedMultiValueMap(p), JSONObject.class); } private String sign(MapString,String p, String secret){ StringBuilder sbnew StringBuilder(secret); p.entrySet().stream().filter(e-!sign.equals(e.getKey())e.getValue()!null!e.getValue().isEmpty()) .forEach(e-sb.append(e.getKey()).append(e.getValue())); sb.append(secret); return DigestUtils.md5DigestAsHex(sb.toString().getBytes()).toUpperCase(); } } /* TbItemSyncService.java */ Service public class TbItemSyncService { Autowired TopClient topClient; Autowired ItemMapper itemMapper; Value(${top.sellerSession}) String session; Scheduled(cron 0 30 2 * * ?) // 每天02:30全量 public void fullSyncItems() { int pg1; do { JSONObject r topClient.call(taobao.items.onsale.get, Map.of( page_no,String.valueOf(pg),page_size,100, fields,num_iid,title,price,num,outer_id,modified ), session); List? items r.getJSONObject(items_onsale_get_response) .getJSONArray(items); if(items.isEmpty()) break; items.forEach(it- itemMapper.upsertItem(parseItem(it))); pg; }while(true); } } # 封装好API供应商demo urlhttps://console.open.onebound.cn/console/?iLex /* TbOrderSyncService.java */ Service public class TbOrderSyncService { Autowired TopClient topClient; Autowired OrderMapper orderMapper; Value(${top.sellerSession}) String session; Scheduled(fixedDelay 300_000) // 5分钟 public void incSyncOrders() { String now LocalDateTime.now().format(DateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss)); String from LocalDateTime.now().minusMinutes(30).format(...); JSONObject r topClient.call(taobao.trades.sold.get, Map.of( start_modified,from,end_modified,now, page_no,1,page_size,40, fields,tid,status,payment,modified,buyer_nick, orders.num_iid,orders.outer_sku_id,orders.num,orders.price ), session); // 逐单 get_detail → orderMapper.upsertOrder(...) } }三、完整 Python ERP 同步模块可直接跑# tb_erp_sync.py 仿电商ERP淘宝商品订单自动同步 - 商品全量翻页(onsale.get) 增量(modified过滤) - 订单增量按 modified 时间窗(trades.sold.get → trade.fullinfo.get) - SQLite 做本地存储示例可替 MyBatis Mapper 依赖: requests (pip install requests) import hashlib, time, requests, sqlite3 from datetime import datetime, timedelta from typing import Dict, List, Optional # 封装好API供应商demo urlhttps://console.open.onebound.cn/console/?iLex # ───────────── TOP Client (内联) ───────────── class TopClient: GW https://gw.api.taobao.com/router/rest def __init__(self, ak, ask): self.ak, self.ask ak, ask def _sign(self, p: Dict) - str: filt sorted((k, v) for k, v in p.items() if v is not None and str(v).strip() ! and k ! sign) qs .join(f{k}{v} for k, v in filt) return hashlib.md5(f{self.ask}{qs}{self.ask}.encode()).hexdigest().upper() def call(self, method, biz, sessionNone): p {method: method, app_key: self.ak, timestamp: str(int(time.time() * 1000)), format: json, v: 2.0, sign_method: md5} if session: p[session] session p.update(biz) p[sign] self._sign(p) r requests.post(self.GW, datap, timeout15) r.raise_for_status() d r.json() if error_response in d: err d[error_response] raise Exception(fTOP[{err.get(code)}]: {err.get(msg)} {err.get(sub_msg,)}) return d.get(list(d.keys() - {error_response})[0], {}) # ───────────── SQLite 本地存储 ───────────── def init_db(dberp.db): conn sqlite3.connect(db) conn.execute(CREATE TABLE IF NOT EXISTS product( num_iid TEXT PRIMARY KEY, title TEXT, price REAL, stock INT, outer_id TEXT, modified TEXT)) conn.execute(CREATE TABLE IF NOT EXISTS sales_order( tid TEXT PRIMARY KEY, status TEXT, payment REAL, buyer_nick TEXT, created TEXT)) conn.commit() return conn def upsert_product(conn, it: Dict): conn.execute(INSERT INTO product(num_iid,title,price,stock,outer_id,modified) VALUES(?,?,?,?,?,?) ON CONFLICT(num_iid) DO UPDATE SET titleexcluded.title,priceexcluded.price,stockexcluded.stock, outer_idexcluded.outer_id,modifiedexcluded.modified, (it[num_iid], it[title], it[price], it[num], it.get(outer_id,), it.get(modified,))) def upsert_order(conn, t: Dict): conn.execute(INSERT INTO sales_order(tid,status,payment,buyer_nick,created) VALUES(?,?,?,?,?) ON CONFLICT(tid) DO UPDATE SET statusexcluded.status,paymentexcluded.payment, buyer_nickexcluded.buyer_nick,createdexcluded.created, (str(t[tid]), t[status], float(t.get(payment) or 0), t.get(buyer_nick,), t.get(created,))) conn.commit() # ───────────── 同步服务 ───────────── class TbErpSync: def __init__(self, ak, ask, session, db_conn): self.top TopClient(ak, ask) self.session session self.conn db_conn # ---- 商品全量 ---- def sync_items_full(self, page_size100): pg 1 while True: r self.top.call(taobao.items.onsale.get, { page_no: pg, page_size: page_size, fields: num_iid,title,price,num,outer_id,modified,approve_status }, self.session) items r.get(items, []) or [] for it in items: if it.get(approve_status) ! onsale: continue upsert_product(self.conn, { num_iid: str(it[num_iid]), title: it.get(title,), price: float(it.get(price) or 0), num: int(it.get(num) or 0), outer_id: it.get(outer_id,), modified: it.get(modified,) }) if len(items) page_size: break pg 1 time.sleep(0.2) # QPS 保护 # ---- 订单增量 ---- def sync_orders_inc(self, minutes30): now datetime.now() start (now - timedelta(minutesminutes)).strftime(%Y-%m-%d %H:%M:%S) end now.strftime(%Y-%m-%d %H:%M:%S) r self.top.call(taobao.trades.sold.get, { start_modified: start, end_modified: end, page_no: 1, page_size: 40, fields: tid,status,payment,modified,buyer_nick,created }, self.session) tids [t[tid] for t in (r.get(trades, []) or [])] for tid in tids: detail self.top.call(taobao.trade.fullinfo.get, { tid: str(tid), fields: tid,status,payment,buyer_nick,created }, self.session).get(trade, {}) upsert_order(self.conn, detail) time.sleep(0.15) # main if __name__ __main__: AK YOUR_ENTERPRISE_APP_KEY ASK YOUR_APP_SECRET SESSION SELLER_ACCESS_TOKEN # 卖家OAuth conn init_db() syncer TbErpSync(AK, ASK, SESSION, conn) print(▶ 开始商品全量同步...) syncer.sync_items_full() print(▶ 开始订单增量同步(近30min)...) syncer.sync_orders_inc(minutes30) print(✅ 同步完成SQLite → erp.db)四、避坑清单ERP实施必看坑后果解决用个人应用403 订单接口切企业实名应用申请权限session 用买家 token空/403必须用卖家账号OAuth换的 AccessToken全量翻页不记断点服务重启从头翻页超日额度记录last_page/max_modified断点续跑订单不传start_modified全量拉取超量固定时间窗增量5~30minQPS 触发限流code7sleep(0.15~0.2)或令牌桶遇限流指数退避skus库存空公开查询别人商品自己店铺查须传 session已做五、面试/方案一句话仿 ERP 同步 Spring BootScheduled调封装 TOP Clienttaobao.items.onsale.get全量增量 /taobao.trades.sold.gettrade.fullinfo.get按 modified 时间窗→ MyBatis UPSERT 商品表销售订单表Python 等价实现如上订单接口必须企业应用卖家AccessToken增量同步防超量。需要我补Java 版完整 pom.xml application.yml 配置模板 或APScheduler 常驻守护版带断点续跑文件 吗