一、为什么需要并发请求限制
在API接口测试中,无限制的并发请求会带来以下风险:
| 风险类型 | 具体表现 | 后果 |
|---|
| 服务端过载 | 压垮被测系统,导致服务宕机 | 测试环境瘫痪,影响其他团队 |
| 测试结果失真 | 服务端拒绝服务或限流,无法获取真实性能数据 | 误判系统容量 |
| 资源浪费 | 网络带宽、连接池耗尽 | 测试机自身崩溃 |
| 数据污染 | 并发写操作导致数据竞争 | 脏数据、测试失败 |
| 安全风险 | 触发WAF/防火墙拦截 | IP被封禁 核心原则:并发测试不是"请求越多越好",而是"在可控范围内模拟真实负载"。 |
二、并发限制的核心概念
2.1 关键指标定义
┌─────────────────────────────────────────────────────────┐
│ 并发控制金字塔 │
├─────────────────────────────────────────────────────────┤
│ 虚拟用户数 (VUs) │
│ ↓ 每个用户可发起 │
│ 并发连接数 (Connections) │
│ ↓ 每个连接可发送 │
│ 请求速率 (RPS/QPS) │
│ ↓ 受限于 │
│ 吞吐量 (Throughput) │
└─────────────────────────────────────────────────────────┘
| 指标 | 说明 | 典型限制场景 |
|---|
| VUs (Virtual Users) | 模拟的并发用户数 | 登录会话数限制 |
| RPS (Requests Per Second) | 每秒请求数 | API网关限流 |
| Connections | 并发TCP连接数 | 服务器文件描述符限制 |
| Throughput | 实际数据传输速率 | 带宽瓶颈 |
2.2 限流算法对比
| 算法 | 原理 | 优点 | 缺点 | 适用场景 |
|---|
| 固定窗口 | 按时间窗口计数 | 简单直观 | 窗口边界突发流量 | 简单限流 |
| 滑动窗口 | 平滑统计时间窗口 | 避免突发 | 实现复杂度高 | 精确限流 |
| 令牌桶 | 以固定速率生成令牌 | 允许突发流量 | 需要预热 | API网关 |
| 漏桶 | 请求进入队列匀速处理 | 平滑输出 | 无法应对突发 | 流量整形 |
三、测试工具中的并发控制实践
3.1 JMeter 并发限制配置
<!-- 线程组:控制虚拟用户数 -->
<ThreadGroup testname="API并发测试" guiclass="ThreadGroupGui">
<!-- 关键参数 -->
<stringProp name="ThreadGroup.num_threads">100</stringProp> <!-- 并发线程数 -->
<stringProp name="ThreadGroup.ramp_time">60</stringProp> <!-- 启动时间(秒) -->
<stringProp name="ThreadGroup.duration">300</stringProp> <!-- 持续时间 -->
<!-- 吞吐量控制器:限制RPS -->
<ConstantThroughputTimer guiclass="TestBeanGUI">
<doubleProp>500.0</doubleProp> <!-- 目标吞吐量:500请求/分钟 -->
</ConstantThroughputTimer>
</ThreadGroup>
<!-- HTTP请求默认值:连接池限制 -->
<ConfigTestElement guiclass="HttpDefaultsGui">
<stringProp name="HTTPSampler.connect_timeout">5000</stringProp> <!-- 连接超时5s -->
<stringProp name="HTTPSampler.response_timeout">10000</stringProp> <!-- 响应超时10s -->
<boolProp name="HTTPSampler.concurrentDwn">true</boolProp>
<stringProp name="HTTPSampler.concurrentPool">10</stringProp> <!-- 单线程连接池 -->
</ConfigTestElement>
JMeter 最佳实践:
- 使用 Stepping Thread Group 逐步加压,避免瞬间洪峰
- 配置 HTTP Cache Manager 避免重复请求静态资源
- 启用 Backend Listener 实时监控,超阈值自动熔断
3.2 k6 脚本化并发控制
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate } from 'k6/metrics';
// 错误率监控
const errorRate = new Rate('errors');
// 场景化并发配置
export const options = {
scenarios: {
// 场景1:正常负载(基准测试)
baseline: {
executor: 'constant-vus',
vus: 50,
duration: '5m',
tags: { scenario: 'baseline' }
},
// 场景2:逐步加压(容量测试)
ramp_up: {
executor: 'ramping-vus',
startVUs: 0,
stages: [
{ duration: '2m', target: 100 }, // 2分钟 ramp up 到100 VUs
{ duration: '5m', target: 100 }, // 保持5分钟
{ duration: '2m', target: 200 }, // 继续加压到200 VUs
{ duration: '5m', target: 200 }, // 保持观察
{ duration: '2m', target: 0 }, // 逐步减压
],
gracefulRampDown: '30s',
},
// 场景3:固定RPS(速率限制测试)
fixed_rps: {
executor: 'constant-arrival-rate',
rate: 1000, // 目标:每秒1000个请求
timeUnit: '1s',
duration: '10m',
preAllocatedVUs: 100,
maxVUs: 200,
}
},
// 全局阈值限制
thresholds: {
http_req_duration: ['p(95)<500'], // 95%请求<500ms
http_req_failed: ['rate<0.01'], // 错误率<1%
errors: ['rate<0.05'], // 自定义错误率<5%
},
};
export default function () {
// 请求头配置:模拟真实用户
const params = {
headers: {
'Content-Type': 'application/json',
'X-Test-ID': `test-${__VU}-${__ITER}`, // 唯一标识,便于追踪
},
timeout: '10s', // 单请求超时
};
const res = http.get('https://api.example.com/endpoint', params);
// 断言检查
const success = check(res, {
'status is 200': (r) => r.status === 200,
'response time < 500ms': (r) => r.timings.duration < 500,
'body is valid JSON': (r) => {
try {
JSON.parse(r.body);
return true;
} catch (e) {
return false;
}
}
});
errorRate.add(!success);
// 思考时间:模拟真实用户行为间隔
sleep(Math.random() * 2 + 1); // 1-3秒随机间隔
}
3.3 Python Locust 并发控制
from locust import HttpUser, task, between, events
from locust.runners import MasterRunner
import json
import time
class APIUser(HttpUser):
# 思考时间:每个任务间隔1-5秒
wait_time = between(1, 5)
# 固定RPS限制(通过自定义调度器)
fixed_count = 100 # 总请求数限制
def on_start(self):
"""每个虚拟用户启动时执行:登录获取token"""
self.client.headers.update({
"Content-Type": "application/json",
"X-Client-Version": "test-1.0"
})
# 登录获取token
response = self.client.post("/api/login", json={
"username": f"test_user_{self.user_id}",
"password": "test_pass"
})
self.token = response.json().get("token")
self.client.headers["Authorization"] = f"Bearer {self.token}"
@task(3) # 权重3:高频接口
def query_data(self):
"""查询接口:只读,可高并发"""
with self.client.get(
"/api/data",
catch_response=True,
timeout=10 # 超时控制
) as response:
if response.status_code == 429: # 触发限流
response.failure("Rate limited by server")
time.sleep(1) # 退避等待
elif response.status_code != 200:
response.failure(f"Unexpected status: {response.status_code}")
@task(1) # 权重1:低频接口
def create_order(self):
"""写接口:降低并发,避免数据冲突"""
# 使用标签控制并发分组
with self.client.post(
"/api/orders",
json={"item_id": 123, "quantity": 1},
catch_response=True,
tags={"type": "write", "critical": "true"}
) as response:
if response.status_code == 201:
# 记录订单ID用于后续清理
order_id = response.json().get("order_id")
self.environment.runner.stats.log_request(
"custom", "order_created", response.elapsed.total_seconds(), 0
)
@task(0) # 不执行,仅作示例
def batch_operation(self):
"""批量操作:严格限制并发数"""
# 使用信号量控制(需自定义实现)
if acquire_semaphore("batch_limit", max_concurrent=5):
try:
self.client.post("/api/batch", json={"ids": [1,2,3,4,5]})
finally:
release_semaphore("batch_limit")
# 自定义事件:全局速率限制
@events.request.add_listener
def on_request(request_type, name, response_time, response_length,
response, context, exception, **kwargs):
"""全局请求拦截:实现自适应限流"""
if response and response.status_code == 503: # 服务不可用
# 动态降低并发:通知runner减少VUs
if isinstance(runner.environment.runner, MasterRunner):
current_vus = runner.environment.runner.user_count
if current_vus > 10:
runner.environment.runner.quit() # 紧急降级
# 启动命令:限制并发
# locust -f locustfile.py --users 100 --spawn-rate 10 --run-time 10m
四、服务端配合:被测系统的限流策略
4.1 Nginx 层限流(保护被测系统)
# 限流区域定义
limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s;
limit_conn_zone $binary_remote_addr zone=addr:10m;
server {
listen 80;
server_name api-test.example.com;
location /api/ {
# 漏桶限流:突发20个,延迟处理
limit_req zone=api_limit burst=20 nodelay;
# 连接数限制:单IP最多10个并发连接
limit_conn addr 10;
# 超过限流返回503,便于测试工具识别
limit_req_status 503;
limit_conn_status 503;
proxy_pass http://backend;
# 添加响应头:告知客户端限流状态
add_header X-RateLimit-Limit 10 always;
add_header X-RateLimit-Remaining $limit_req_remaining always;
}
# 健康检查接口:不限流
location /health {
access_log off;
return 200 "OK";
}
}
4.2 应用层限流(Spring Boot 示例)
import io.github.resilience4j.ratelimiter.annotation.RateLimiter;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api")
public class TestController {
private static final String TEST_SERVICE = "testService";
@GetMapping("/data")
@RateLimiter(name = TEST_SERVICE, fallbackMethod = "rateLimitFallback")
@CircuitBreaker(name = TEST_SERVICE, fallbackMethod = "circuitBreakerFallback")
public ResponseEntity<String> getData() {
// 正常业务逻辑
return ResponseEntity.ok("Data retrieved successfully");
}
// 限流降级方法
public ResponseEntity<String> rateLimitFallback(Throwable t) {
return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS)
.body("Rate limit exceeded. Please try later.");
}
// 熔断降级方法
public ResponseEntity<String> circuitBreakerFallback(Throwable t) {
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
.body("Service temporarily unavailable.");
}
}
// 配置(application.yml)
resilience4j:
ratelimiter:
instances:
testService:
limitForPeriod: 100 # 周期内允许请求数
limitRefreshPeriod: 1s # 周期时长
timeoutDuration: 0 # 获取许可等待时间
registerHealthIndicator: true
五、并发测试的监控与调优
5.1 实时监控看板
| 监控层级 | 关键指标 | 告警阈值 |
|---|
| 客户端 | RPS、响应时间、错误率、VUs | 错误率 > 5% |
| 网络层 | 带宽利用率、连接数、TCP重传率 | 带宽 > 80% |
| 应用层 | GC频率、线程池使用率、队列长度 | 线程池 > 90% |
| 系统层 | CPU、内存、磁盘I/O、文件描述符 | CPU > 85% |
5.2 自适应并发调整算法
class AdaptiveConcurrencyController:
"""自适应并发控制器:根据系统反馈动态调整"""
def __init__(self, initial_vus=10, max_vus=1000):
self.current_vus = initial_vus
self.max_vus = max_vus
self.error_threshold = 0.05 # 5%错误率阈值
self.latency_threshold = 1000 # 1秒延迟阈值
def adjust(self, metrics):
"""
metrics: {
'error_rate': 0.02,
'p99_latency': 800,
'throughput': 500
}
"""
error_rate = metrics['error_rate']
latency = metrics['p99_latency']
# 状态机决策
if error_rate > self.error_threshold or latency > self.latency_threshold:
# 系统过载:降速
self.current_vus = max(self.current_vus * 0.8, 1)
action = "decrease"
elif error_rate < 0.01 and latency < self.latency_threshold * 0.5:
# 系统健康:加速
self.current_vus = min(self.current_vus * 1.2, self.max_vus)
action = "increase"
else:
action = "maintain"
return {
'target_vus': int(self.current_vus),
'action': action,
'reason': f"error_rate={error_rate}, latency={latency}"
}
六、总结:并发测试检查清单
□ 测试前
□ 明确测试目标(容量/稳定性/压力/破坏性)
□ 确认被测环境资源配额(CPU/内存/连接数)
□ 配置服务端限流保护(防止误伤生产环境)
□ 准备数据清理脚本(避免脏数据累积)
□ 测试中
□ 采用渐进式加压(避免瞬间洪峰)
□ 实时监控错误率和响应时间
□ 记录关键指标基线数据
□ 准备紧急停止方案(Kill Switch)
□ 测试后
□ 生成性能报告(RPS/延迟/错误率曲线)
□ 识别性能瓶颈(数据库/缓存/网络)
□ 提出优化建议(代码/配置/架构)
□ 归档测试脚本和配置
核心原则:"限流是为了更好的测试" — 通过合理的并发控制,既能保护被测系统,又能获得准确、可复现的性能数据。