連接TiDB Serveless
Cloudflare Workers / Pages Functions 运行在 V8 Isolates(非 Node.js 环境),不支持原生 TCP 套接字(net/tls 模块),因此标准数据库驱动(如 mysql2, pg, pymysql, redis 等)无法直接使用。
必须使用 基于 HTTP/WebSocket 的连接器 或 支持 Cloudflare Workers 的专用驱动。
以下是连接 TiDB Serverless 及其他主流数据库的具体可行方案,按推荐程度排序。
一、 连接 TiDB Serverless(MySQL 协议)
TiDB Serverless 强制要求 TLS (SSL) 连接,且端口通常为 4000。
方案 1:@tidbcloud/serverless —— 官方首选,零依赖,最简单 ⭐⭐⭐⭐⭐
TiDB Cloud 官方专为 Serverless/Edge 环境设计的 HTTP 驱动。内部通过 HTTP API 代理 MySQL 协议。
- 优点:无需连接池配置、自动处理 TLS、TypeScript 支持完美、体积极小、支持事务。
- 缺点:每次查询走 HTTP,有轻微延迟(通常 < 50ms);不支持流式结果集(需一次性加载)。
npm install @tidbcloud/serverless// workers.ts / pages/api/xxx.ts
import { createClient } from '@tidbcloud/serverless';
// 1. 在 Cloudflare Dashboard -> Workers -> Settings -> Variables and Secrets 添加:
// TIDB_HOST = "gateway01.xxx.ap-northeast-1.prod.aws.tidbcloud.com" (不带端口)
// TIDB_PORT = "4000" (可选, 默认4000)
// TIDB_USER = "xxx.root"
// TIDB_PASSWORD = "xxx"
// TIDB_DATABASE = "test"
const client = createClient({
host: env.TIDB_HOST,
port: Number(env.TIDB_PORT || 4000),
username: env.TIDB_USER,
password: env.TIDB_PASSWORD,
database: env.TIDB_DATABASE,
// ssl: true // 默认开启
});
export default {
async fetch(request, env, ctx): Promise<Response> {
try {
// 简单查询
const [rows] = await client.query('SELECT VERSION() as version, NOW() as now');
// 参数化查询 (防注入)
const [users] = await client.query('SELECT id, name FROM users WHERE status = ? AND age > ?', ['active', 18]);
// 事务支持
// await client.execute('BEGIN');
// await client.execute('INSERT INTO logs (msg) VALUES (?)', ['test']);
// await client.execute('COMMIT');
return Response.json({ success: true, data: { version: rows[0].version, users } });
} catch (e: any) {
return Response.json({ success: false, error: e.message }, { status: 500 });
}
},
} satisfies ExportedHandler<Env>;注意:@tidbcloud/serverless 目前不支持 PREPARE/EXECUTE 协议层面的预处理语句复用(但 query(sql, params) 已做参数化转义防注入),不支持游标/流式读取超大结果集。
方案 2:mysql2 + cloudflare:sockets (TCP Connect API) —— 原生 TCP 体验,功能最全 ⭐⭐⭐⭐
Cloudflare 现已开放 connect() API (TCP Sockets) 进入 GA 阶段。可直接建立 TLS TCP 连接,使用标准 mysql2 驱动。
- 优点:完整 MySQL 协议支持(预处理语句复用、流式读取、二进制协议、连接池复用)。
- 缺点:需要付费 Workers 计划 才能使用 TCP Sockets(免费版每天 10 万次请求额度内不含 TCP Sockets,或需绑定信用卡开启付费)。代码较复杂,需手动处理 TLS 握手。
npm install mysql2// wrangler.toml 需配置兼容性
// compatibility_flags = ["nodejs_compat", "global_navigator"]
// compatibility_date = "2024-09-01" (或更新)
import { createConnection } from 'mysql2/promise';
import { connect } from 'cloudflare:sockets';
export default {
async fetch(request, env, ctx): Promise<Response> {
// 1. 建立原始 TCP 连接
const socket = connect(`${env.TIDB_HOST}:${env.TIDB_PORT || 4000}`, {
tls: true, // TiDB 强制 TLS
// servername: env.TIDB_HOST // SNI, 通常自动推断
});
// 2. 将 Socket 适配为 Node.js net.Socket 兼容对象 (需要 nodejs_compat)
// mysql2 需要一个类似 net.Socket 的对象
// 这里利用 Cloudflare 提供的工具或自行封装
// 简易封装示例 (实际生产建议用成熟封装库如 @mysql-js/client 或自行完善):
// 注意:直接用 mysql2 在 Workers 里跑 TCP 还比较繁琐,需处理 readable/writable stream 转 duplex
// 社区成熟方案:使用 `mysql2/promise` 配合 `cloudflare:sockets` 的封装库
// 例如:<https://github.com/cloudflare/workers-sdk/tree/main/packages/cloudflare-sockets>
// 伪代码逻辑:
// const conn = await createConnection({
// stream: socket, // 需要实现 Duplex 接口
// user: env.TIDB_USER,
// password: env.TIDB_PASSWORD,
// database: env.TIDB_DATABASE,
// ssl: { rejectUnauthorized: true } // 在 socket 层已做 TLS
// });
// const [rows] = await conn.execute('SELECT 1');
// await conn.end();
return Response.json({ msg: "TCP Socket 方案需付费计划且代码较复杂,建议方案1或方案3" });
}
}现实建议:除非你有付费 Workers 计划且极度依赖预处理语句性能/流式读取,否则方案 1 完全够用。
方案 3:通用 HTTP 代理层 —— 兼容性最强,适合所有数据库 ⭐⭐⭐
在 Railway / Render / Fly.io / 自建服务器 部署一个轻量 HTTP 网关(如 PostgREST, Hasura, Dify API, 或自写 FastAPI/Node.js + 连接池),Worker 只调 HTTP。
- 架构:Worker (HTTP) -> Gateway (HTTP, 内网/公网) -> TiDB (TCP, 连接池复用)
- 优点:Worker 代码极简 (fetch),支持任何数据库(TiDB, PG, Mongo, Redis, Milvus...),连接池在网关侧复用,性能最稳,绕过 Workers 限制。
- 缺点:多一跳网络延迟;需维护一个常驻服务(但可用免费额度 Railway/Render 跑)。
自写网关示例 - gateway/index.ts (部署在 Railway):
// Hono + mysql2 (连接池复用)
import { Hono } from 'hono';
import { createPool } from 'mysql2/promise';
const app = new Hono();
const pool = createPool({
host: process.env.TIDB_HOST,
port: 4000,
user: process.env.TIDB_USER,
password: process.env.TIDB_PASSWORD,
database: process.env.TIDB_DATABASE,
ssl: { rejectUnauthorized: true },
waitForConnections: true,
connectionLimit: 10, // 连接池
queueLimit: 0
});
app.post('/query', async (c) => {
const { sql, params } = await c.req.json();
if (!sql) return c.json({ error: 'sql required' }, 400);
try {
const [rows] = await pool.execute(sql, params || []);
return c.json({ data: rows });
} catch (e: any) {
return c.json({ error: e.message }, 500);
}
});
export default app;Worker 端调用:
export default {
async fetch(request, env) {
const res = await fetch(`${env.GATEWAY_URL}/query`, {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${env.GATEWAY_TOKEN}` },
body: JSON.stringify({ sql: 'SELECT * FROM users WHERE id = ?', params: [1] })
});
return res; // 直接透传
}
}二、 连接其他主流数据库(Workers 原生 HTTP 方案)
| 数据库 | 官方/推荐 HTTP 驱动 | 关键包名 | 备注 |
|---|---|---|---|
| PostgreSQL (Neon, Supabase, Timescale) | Neon Serverless Driver / pg (via HTTP) | @neondatabase/serverless ⭐⭐⭐⭐⭐ | Neon 官方出品,完美支持 Workers,支持事务、预处理、流式。Supabase/Timescale 兼容。 |
| PostgreSQL (通用) | postgres.js (修改版) / wa-sqlite (WASM) | @vercel/postgres (基于 @neondatabase/serverless) | Vercel 封装版,用法类似。 |
| MySQL (PlanetScale) | PlanetScale Serverless Driver | @planetscale/database ⭐⭐⭐⭐⭐ | 专为 HTTP/Edge 设计,基于 Vitess 架构。 |
| MySQL (通用/TiDB/其他) | 方案 1 (@tidbcloud/serverless) 或 方案 3 (自建网关) | - | 无通用官方 HTTP 驱动。 |
| Redis / KV | Upstash Redis (REST API) ⭐⭐⭐⭐⭐ | @upstash/redis | 最佳选择。HTTP 协议、免费额度大、支持 Redis 命令全集、边缘极快。 |
| Redis (通用) | ioredis / redis | 不支持原生 TCP | 必须走 Upstash 或自建 HTTP 代理。 |
| MongoDB | Data API (Atlas) / mongodb (via HTTP) | @mongodb-js/data-api | Atlas 免费集群开启 Data API,走 HTTPS。 |
| SQLite (边缘本地) | D1 (Cloudflare 原生) / wa-sqlite / sql.js | cloudflare:d1 (Binding) | D1 是首选 (绑定即用)。需文件持久化可用 WASM 版 SQLite (wa-sqlite 性能强)。 |
| 向量库 | Upstash Vector / TiDB (SQL) / 自建网关 | @upstash/vector | Upstash Vector 免费、HTTP 原生。TiDB 用方案 1。Zilliz/Qdrant 需自建网关。 |
| 对象存储 | Cloudflare R2 (S3 API) | aws-sdk / s3 (via R2 Bucket Binding) | 绑定 R2 Bucket 最快 (零配置、零延迟、免费)。或用 S3 SDK 走 HTTP。 |
三、 实战:知识库 RAG 在 Workers 上的最小化架构
场景:用户提问 -> Worker 生成 Embedding -> 向量检索 -> 重排 -> LLM 生成 -> 回流式回答。
技术栈选型(全免费、全 Serverless、全 HTTP)
| 环节 | 选型 | 连接方式 |
|---|---|---|
| 向量/主存储 | TiDB Serverless (5GB 免费) | @tidbcloud/serverless (HTTP) |
| 缓存/会话/限流 | Upstash Redis (10k req/天 免费) | @upstash/redis (HTTP) |
| 文件存储 | Cloudflare R2 (10GB 免费) | env.MY_BUCKET (Binding) |
| Embedding | SiliconFlow BAAI/bge-m3 / Gemini text-embedding-004 | fetch (OpenAI 兼容 API) |
| Rerank | SiliconFlow BAAI/bge-reranker-v2-m3 | fetch |
| LLM | SiliconFlow Qwen2.5-72B / Gemini 1.5 Flash / DeepSeek | fetch (流式 SSE) |
| 前端 | Cloudflare Pages (React/Vue/Next.js) | 部署同仓库 |
Worker 核心代码骨架
// src/index.ts
import { createClient } from '@tidbcloud/serverless';
import { Redis } from '@upstash/redis';
import { Hono } from 'hono'; // 推荐用 Hono 做路由/中间件,极轻量
import { stream } from 'hono/streaming';
import { cors } from 'hono/cors';
type Bindings = {
// TiDB
TIDB_HOST: string;
TIDB_USER: string;
TIDB_PASSWORD: string;
TIDB_DATABASE: string;
// Upstash
UPSTASH_REDIS_REST_URL: string;
UPSTASH_REDIS_REST_TOKEN: string;
// R2
MY_BUCKET: R2Bucket;
// API Keys
SILICONFLOW_API_KEY: string;
// Env
ENVIRONMENT: string;
};
const app = new Hono<{ Bindings: Bindings }>();
app.use('*', cors()); // 允许前端跨域
// 初始化客户端 (全局复用,但注意 Workers 无状态,每次请求新建开销极小)
const getTidb = (env: Bindings) => createClient({
host: env.TIDB_HOST, username: env.TIDB_USER, password: env.TIDB_PASSWORD, database: env.TIDB_DATABASE
});
const getRedis = (env: Bindings) => new Redis({ url: env.UPSTASH_REDIS_REST_URL, token: env.UPSTASH_REDIS_REST_TOKEN });
// --- 嵌入向量 ---
async function embed(texts: string[], env: Bindings): Promise<number[][]> {
const res = await fetch('<https://api.siliconflow.cn/v1/embeddings>', {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${env.SILICONFLOW_API_KEY}` },
body: JSON.stringify({ model: 'BAAI/bge-m3', input: texts, encoding_format: 'float' })
});
const data = await res.json();
return data.data.map((d: any) => d.embedding);
}
// --- 重排 ---
async function rerank(query: string, docs: string[], env: Bindings): Promise<number[]> {
const res = await fetch('<https://api.siliconflow.cn/v1/rerank>', {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${env.SILICONFLOW_API_KEY}` },
body: JSON.stringify({ model: 'BAAI/bge-reranker-v2-m3', query, documents: docs, top_n: 3 })
});
const data = await res.json();
return data.results.map((r: any) => r.index); // 返回排序后的原始索引
}
// --- LLM 流式生成 ---
async function* generateStream(messages: any[], env: Bindings): AsyncGenerator<string> {
const res = await fetch('<https://api.siliconflow.cn/v1/chat/completions>', {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${env.SILICONFLOW_API_KEY}` },
body: JSON.stringify({ model: 'Qwen/Qwen2.5-72B-Instruct', messages, stream: true, temperature: 0.3 })
});
const reader = res.body?.getReader();
const decoder = new TextDecoder();
if (!reader) return;
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\\n').filter(l => l.startsWith('data: '));
for (const line of lines) {
if (line === 'data: [DONE]') return;
try {
const json = JSON.parse(line.slice(6));
const content = json.choices[0]?.delta?.content;
if (content) yield content;
} catch (e) { /* ignore parse error */ }
}
}
}
// --- 主路由:RAG 问答 ---
app.post('/api/chat', async (c) => {
const { question, history = [] } = await c.req.json();
const env = c.env;
const tidb = getTidb(env);
const redis = getRedis(env);
// 1. 限流 (IP 维度)
const ip = c.req.header('CF-Connecting-IP') || 'unknown';
const ratelimit = await redis.incr(`ratelimit:${ip}`);
if (ratelimit === 1) await redis.expire(`ratelimit:${ip}`, 60); // 1分钟窗口
if (ratelimit > 30) return c.json({ error: 'Rate limited' }, 429);
// 2. 向量化问题
const [qVec] = await embed([question], env);
// 3. TiDB 向量检索 (SQL + 向量距离)
// 表结构假设: chunks(id, doc_id, content, embedding VECTOR(1024), metadata JSON)
const searchSql = `
SELECT id, content, metadata, VEC_COSINE_DISTANCE(embedding, ?) AS dist
FROM knowledge_chunks
ORDER BY dist ASC
LIMIT 10
`;
// @tidbcloud/serverless query 参数需手动序列化向量为字符串 '[0.1, 0.2...]'
const vecStr = `[${qVec.join(',')}]`;
const [candidates] = await tidb.query(searchSql, [vecStr]);
if (candidates.length === 0) {
return stream(c, async (stream) => {
await stream.write('暂无相关知识库内容。');
});
}
// 4. Rerank 重排
const contents = candidates.map((c: any) => c.content);
const rerankedIndices = await rerank(question, contents, env);
const topChunks = rerankedIndices.map(i => candidates[i]).slice(0, 3);
// 5. 构建 Prompt
const context = topChunks.map((c: any, i: number) => `[来源 ${i+1}] ${c.content}`).join('\\n\\n');
const systemPrompt = `你是专业助手。请仅根据以下参考资料回答问题,引用来源编号如[1]。若无法回答请说不知道。\\n\\n参考资料:\\n${context}`;
const messages = [
{ role: 'system', content: systemPrompt },
...history.slice(-6), // 保留最近 3 轮对话
{ role: 'user', content: question }
];
// 6. 流式返回
return stream(c, async (stream) => {
// 发送来源信息给前端 (可选,自定义协议)
await stream.write(`__SOURCES__${JSON.stringify(topChunks.map(c => ({id: c.id, metadata: c.metadata})))}\\n`);
for await (const chunk of generateStream(messages, env)) {
await stream.write(chunk);
}
});
});
// --- 健康检查 ---
app.get('/health', (c) => c.json({ ok: true, ts: Date.now() }));
export default app;wrangler.toml 配置
name = "my-rag-worker"
main = "src/index.ts"
compatibility_date = "2024-10-01"
compatibility_flags = ["nodejs_compat"] # 部分库可能需要
# 绑定 R2
[[r2_buckets]]
binding = "MY_BUCKET"
bucket_name = "my-knowledge-files"
# 环境变量 (敏感信息请在 Dashboard -> Settings -> Variables/Secrets 设置,不要写这里)
[vars]
TIDB_HOST = "gateway01.xxx.ap-northeast-1.prod.aws.tidbcloud.com"
TIDB_USER = "xxx.root"
TIDB_DATABASE = "rag_db"
UPSTASH_REDIS_REST_URL = "<https://xxxxx.upstash.io>"
# TIDB_PASSWORD, UPSTASH_REDIS_REST_TOKEN, SILICONFLOW_API_KEY 放在 Secrets 里部署命令
# 1. 登录
npx wrangler login
# 2. 创建 R2 Bucket (首次)
npx wrangler r2 bucket create my-knowledge-files
# 3. 部署 (会提示输入 Secrets)
npx wrangler deploy
# 4. 开发调试 (本地跑,连远程资源)
npx wrangler dev --remote四、 避坑指南 & 性能优化
| 问题 | 解决方案 |
|---|---|
| 冷启动延迟 | Workers 冷启动通常 < 50ms。TiDB/Upstash 首次建连需 TLS 握手 (~100-300ms)。预热:用 Cron Trigger 每 5 分钟 fetch 一下健康检查接口。 |
| TiDB 向量参数绑定 | @tidbcloud/serverless 当前 query(sql, params) 对 VECTOR 类型参数支持有限。最稳妥:手动拼接向量字符串 '[0.1,0.2...]' 放入 SQL 字符串,或用 CONCAT 构建。确保参数化防注入用于其他字段。 |
| 流式输出中断 | Cloudflare Workers CPU 时间限制:免费版 10ms,付费版 50ms/请求 (可累积)。流式输出长文本极易超时。<br>对策:1. 精简 Prompt/Context。2. 用小模型 (如 Qwen2.5-7B / Gemini Flash-8B)。3. 必须开通 Workers Paid Plan ($5/月) 获取 30s CPU 时间上限。 |
| 大文件上传/下载 | Worker 请求体限制 100MB (Pages 更大)。大文件必须走 R2 预签名 URL:Worker 生成 PUT/GET 签名 URL -> 前端直传/直下 R2 -> 完成回调 Worker 入库元数据。 |
| CORS | hono/cors 中间件搞定。注意 Access-Control-Allow-Origin 设为具体域名而非 * (若需 Cookie/Auth)。 |
| 类型安全 | 用 hono + zod 做请求校验。@tidbcloud/serverless 返回 any[],建议定义接口 interface ChunkRow { id: number; content: string; dist: number; } 并断言。 |
| 日志调试 | console.log 在 wrangler tail / Dashboard Logs 可见。生产环境接 Sentry 或 Logtail (免费额度足够)。 |
五、 总结:选哪条路?
| 你的情况 | 推荐方案 |
|---|---|
| 只想快速跑通 RAG,用 TiDB,免费党 | 方案 1:@tidbcloud/serverless + Upstash Redis + R2 Binding + SiliconFlow/Gemini API。全程 HTTP、零运维、免费额度极大。 |
| 已有付费 Workers ($5/月),追求极致性能/流式/长连接 | 方案 2:cloudflare:sockets + mysql2 (或 pg)。原生 TCP,支持连接池复用、Prepared Statements、流式读取。 |
| 多数据库混用 / 团队协作 / 想用 Prisma/Drizzle ORM | 方案 3:自建 HTTP 网关 (部署在 Railway/Render 免费层)。Worker 只管 fetch,网关管连接池、ORM、事务、复杂 SQL。最灵活、最稳。 |
| 主力是 PostgreSQL (Neon/Supabase) | 直接用 @neondatabase/serverless (或 @vercel/postgres)。体验最好,原生支持事务、流式。 |
| 主力是 Redis | 必须用 Upstash (@upstash/redis)。原生 HTTP,免费大方,边缘极快。别想自建 Redis 暴露给 Worker。 |
一句话建议:知识库个人项目/启动期,直接上 @tidbcloud/serverless + Upstash + R2 + SiliconFlow/Gemini,全链路 Serverless HTTP,代码最少,坑最少,免费额度用到爽。