全部
常见问题
产品动态
精选推荐
功能建议

分析中 已回复 待规划 {{opt.name}}
分析中 已回复 待规划
API接口测试中的并发请求限制:原理、策略与实践

管理 管理 编辑 删除

一、为什么需要并发请求限制

在API接口测试中,无限制的并发请求会带来以下风险:

风险类型具体表现后果
服务端过载压垮被测系统,导致服务宕机测试环境瘫痪,影响其他团队
测试结果失真服务端拒绝服务或限流,无法获取真实性能数据误判系统容量
资源浪费网络带宽、连接池耗尽测试机自身崩溃
数据污染并发写操作导致数据竞争脏数据、测试失败
安全风险触发WAF/防火墙拦截IP被封禁
核心原则:并发测试不是"请求越多越好",而是"在可控范围内模拟真实负载"。


二、并发限制的核心概念

2.1 关键指标定义


┌─────────────────────────────────────────────────────────┐
│                    并发控制金字塔                         │
├─────────────────────────────────────────────────────────┤
│  虚拟用户数 (VUs)                                        │
│     ↓ 每个用户可发起                                     │
│  并发连接数 (Connections)                               │
│     ↓ 每个连接可发送                                     │
│  请求速率 (RPS/QPS)                                      │
│     ↓ 受限于                                             │
│  吞吐量 (Throughput)                                     │
└─────────────────────────────────────────────────────────┘

指标说明典型限制场景
VUs (Virtual Users)模拟的并发用户数登录会话数限制
RPS (Requests Per Second)每秒请求数API网关限流
Connections并发TCP连接数服务器文件描述符限制
Throughput实际数据传输速率带宽瓶颈

2.2 限流算法对比


算法原理优点缺点适用场景
固定窗口按时间窗口计数简单直观窗口边界突发流量简单限流
滑动窗口平滑统计时间窗口避免突发实现复杂度高精确限流
令牌桶以固定速率生成令牌允许突发流量需要预热API网关
漏桶请求进入队列匀速处理平滑输出无法应对突发流量整形


三、测试工具中的并发控制实践

3.1 JMeter 并发限制配置


<!-- 线程组:控制虚拟用户数 -->
<ThreadGroup testname="API并发测试" guiclass="ThreadGroupGui">
  <!-- 关键参数 -->
  <stringProp name="ThreadGroup.num_threads">100</stringProp>      <!-- 并发线程数 -->
  <stringProp name="ThreadGroup.ramp_time">60</stringProp>        <!-- 启动时间(秒) -->
  <stringProp name="ThreadGroup.duration">300</stringProp>         <!-- 持续时间 -->
  
  <!-- 吞吐量控制器:限制RPS -->
  <ConstantThroughputTimer guiclass="TestBeanGUI">
    <doubleProp>500.0</doubleProp>  <!-- 目标吞吐量:500请求/分钟 -->
  </ConstantThroughputTimer>
</ThreadGroup>

<!-- HTTP请求默认值:连接池限制 -->
<ConfigTestElement guiclass="HttpDefaultsGui">
  <stringProp name="HTTPSampler.connect_timeout">5000</stringProp>  <!-- 连接超时5s -->
  <stringProp name="HTTPSampler.response_timeout">10000</stringProp> <!-- 响应超时10s -->
  <boolProp name="HTTPSampler.concurrentDwn">true</boolProp>
  <stringProp name="HTTPSampler.concurrentPool">10</stringProp>      <!-- 单线程连接池 -->
</ConfigTestElement>

JMeter 最佳实践
  • 使用 Stepping Thread Group 逐步加压,避免瞬间洪峰
  • 配置 HTTP Cache Manager 避免重复请求静态资源
  • 启用 Backend Listener 实时监控,超阈值自动熔断


3.2 k6 脚本化并发控制


import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate } from 'k6/metrics';

// 错误率监控
const errorRate = new Rate('errors');

// 场景化并发配置
export const options = {
  scenarios: {
    // 场景1:正常负载(基准测试)
    baseline: {
      executor: 'constant-vus',
      vus: 50,
      duration: '5m',
      tags: { scenario: 'baseline' }
    },
    
    // 场景2:逐步加压(容量测试)
    ramp_up: {
      executor: 'ramping-vus',
      startVUs: 0,
      stages: [
        { duration: '2m', target: 100 },   // 2分钟 ramp up 到100 VUs
        { duration: '5m', target: 100 },   // 保持5分钟
        { duration: '2m', target: 200 },   // 继续加压到200 VUs
        { duration: '5m', target: 200 },   // 保持观察
        { duration: '2m', target: 0 },     // 逐步减压
      ],
      gracefulRampDown: '30s',
    },
    
    // 场景3:固定RPS(速率限制测试)
    fixed_rps: {
      executor: 'constant-arrival-rate',
      rate: 1000,           // 目标:每秒1000个请求
      timeUnit: '1s',
      duration: '10m',
      preAllocatedVUs: 100,
      maxVUs: 200,
    }
  },
  
  // 全局阈值限制
  thresholds: {
    http_req_duration: ['p(95)<500'],  // 95%请求<500ms
    http_req_failed: ['rate<0.01'],    // 错误率<1%
    errors: ['rate<0.05'],             // 自定义错误率<5%
  },
};

export default function () {
  // 请求头配置:模拟真实用户
  const params = {
    headers: {
      'Content-Type': 'application/json',
      'X-Test-ID': `test-${__VU}-${__ITER}`,  // 唯一标识,便于追踪
    },
    timeout: '10s',  // 单请求超时
  };
  
  const res = http.get('https://api.example.com/endpoint', params);
  
  // 断言检查
  const success = check(res, {
    'status is 200': (r) => r.status === 200,
    'response time < 500ms': (r) => r.timings.duration < 500,
    'body is valid JSON': (r) => {
      try {
        JSON.parse(r.body);
        return true;
      } catch (e) {
        return false;
      }
    }
  });
  
  errorRate.add(!success);
  
  // 思考时间:模拟真实用户行为间隔
  sleep(Math.random() * 2 + 1);  // 1-3秒随机间隔
}


3.3 Python Locust 并发控制


from locust import HttpUser, task, between, events
from locust.runners import MasterRunner
import json
import time

class APIUser(HttpUser):
    # 思考时间:每个任务间隔1-5秒
    wait_time = between(1, 5)
    
    # 固定RPS限制(通过自定义调度器)
    fixed_count = 100  # 总请求数限制
    
    def on_start(self):
        """每个虚拟用户启动时执行:登录获取token"""
        self.client.headers.update({
            "Content-Type": "application/json",
            "X-Client-Version": "test-1.0"
        })
        
        # 登录获取token
        response = self.client.post("/api/login", json={
            "username": f"test_user_{self.user_id}",
            "password": "test_pass"
        })
        self.token = response.json().get("token")
        self.client.headers["Authorization"] = f"Bearer {self.token}"
    
    @task(3)  # 权重3:高频接口
    def query_data(self):
        """查询接口:只读,可高并发"""
        with self.client.get(
            "/api/data",
            catch_response=True,
            timeout=10  # 超时控制
        ) as response:
            if response.status_code == 429:  # 触发限流
                response.failure("Rate limited by server")
                time.sleep(1)  # 退避等待
            elif response.status_code != 200:
                response.failure(f"Unexpected status: {response.status_code}")
    
    @task(1)  # 权重1:低频接口
    def create_order(self):
        """写接口:降低并发,避免数据冲突"""
        # 使用标签控制并发分组
        with self.client.post(
            "/api/orders",
            json={"item_id": 123, "quantity": 1},
            catch_response=True,
            tags={"type": "write", "critical": "true"}
        ) as response:
            if response.status_code == 201:
                # 记录订单ID用于后续清理
                order_id = response.json().get("order_id")
                self.environment.runner.stats.log_request(
                    "custom", "order_created", response.elapsed.total_seconds(), 0
                )
    
    @task(0)  # 不执行,仅作示例
    def batch_operation(self):
        """批量操作:严格限制并发数"""
        # 使用信号量控制(需自定义实现)
        if acquire_semaphore("batch_limit", max_concurrent=5):
            try:
                self.client.post("/api/batch", json={"ids": [1,2,3,4,5]})
            finally:
                release_semaphore("batch_limit")

# 自定义事件:全局速率限制
@events.request.add_listener
def on_request(request_type, name, response_time, response_length, 
               response, context, exception, **kwargs):
    """全局请求拦截:实现自适应限流"""
    if response and response.status_code == 503:  # 服务不可用
        # 动态降低并发:通知runner减少VUs
        if isinstance(runner.environment.runner, MasterRunner):
            current_vus = runner.environment.runner.user_count
            if current_vus > 10:
                runner.environment.runner.quit()  # 紧急降级

# 启动命令:限制并发
# locust -f locustfile.py --users 100 --spawn-rate 10 --run-time 10m


四、服务端配合:被测系统的限流策略

4.1 Nginx 层限流(保护被测系统)


# 限流区域定义
limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s;
limit_conn_zone $binary_remote_addr zone=addr:10m;

server {
    listen 80;
    server_name api-test.example.com;
    
    location /api/ {
        # 漏桶限流:突发20个,延迟处理
        limit_req zone=api_limit burst=20 nodelay;
        
        # 连接数限制:单IP最多10个并发连接
        limit_conn addr 10;
        
        # 超过限流返回503,便于测试工具识别
        limit_req_status 503;
        limit_conn_status 503;
        
        proxy_pass http://backend;
        
        # 添加响应头:告知客户端限流状态
        add_header X-RateLimit-Limit 10 always;
        add_header X-RateLimit-Remaining $limit_req_remaining always;
    }
    
    # 健康检查接口:不限流
    location /health {
        access_log off;
        return 200 "OK";
    }
}

4.2 应用层限流(Spring Boot 示例)


import io.github.resilience4j.ratelimiter.annotation.RateLimiter;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api")
public class TestController {
    
    private static final String TEST_SERVICE = "testService";
    
    @GetMapping("/data")
    @RateLimiter(name = TEST_SERVICE, fallbackMethod = "rateLimitFallback")
    @CircuitBreaker(name = TEST_SERVICE, fallbackMethod = "circuitBreakerFallback")
    public ResponseEntity<String> getData() {
        // 正常业务逻辑
        return ResponseEntity.ok("Data retrieved successfully");
    }
    
    // 限流降级方法
    public ResponseEntity<String> rateLimitFallback(Throwable t) {
        return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS)
            .body("Rate limit exceeded. Please try later.");
    }
    
    // 熔断降级方法
    public ResponseEntity<String> circuitBreakerFallback(Throwable t) {
        return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
            .body("Service temporarily unavailable.");
    }
}

// 配置(application.yml)
resilience4j:
  ratelimiter:
    instances:
      testService:
        limitForPeriod: 100        # 周期内允许请求数
        limitRefreshPeriod: 1s      # 周期时长
        timeoutDuration: 0          # 获取许可等待时间
        registerHealthIndicator: true
        


五、并发测试的监控与调优

5.1 实时监控看板


监控层级关键指标告警阈值
客户端RPS、响应时间、错误率、VUs错误率 > 5%
网络层带宽利用率、连接数、TCP重传率带宽 > 80%
应用层GC频率、线程池使用率、队列长度线程池 > 90%
系统层CPU、内存、磁盘I/O、文件描述符CPU > 85%

5.2 自适应并发调整算法


class AdaptiveConcurrencyController:
    """自适应并发控制器:根据系统反馈动态调整"""
    
    def __init__(self, initial_vus=10, max_vus=1000):
        self.current_vus = initial_vus
        self.max_vus = max_vus
        self.error_threshold = 0.05  # 5%错误率阈值
        self.latency_threshold = 1000  # 1秒延迟阈值
        
    def adjust(self, metrics):
        """
        metrics: {
            'error_rate': 0.02,
            'p99_latency': 800,
            'throughput': 500
        }
        """
        error_rate = metrics['error_rate']
        latency = metrics['p99_latency']
        
        # 状态机决策
        if error_rate > self.error_threshold or latency > self.latency_threshold:
            # 系统过载:降速
            self.current_vus = max(self.current_vus * 0.8, 1)
            action = "decrease"
        elif error_rate < 0.01 and latency < self.latency_threshold * 0.5:
            # 系统健康:加速
            self.current_vus = min(self.current_vus * 1.2, self.max_vus)
            action = "increase"
        else:
            action = "maintain"
            
        return {
            'target_vus': int(self.current_vus),
            'action': action,
            'reason': f"error_rate={error_rate}, latency={latency}"
        }
        


六、总结:并发测试检查清单


□ 测试前
  □ 明确测试目标(容量/稳定性/压力/破坏性)
  □ 确认被测环境资源配额(CPU/内存/连接数)
  □ 配置服务端限流保护(防止误伤生产环境)
  □ 准备数据清理脚本(避免脏数据累积)

□ 测试中
  □ 采用渐进式加压(避免瞬间洪峰)
  □ 实时监控错误率和响应时间
  □ 记录关键指标基线数据
  □ 准备紧急停止方案(Kill Switch)

□ 测试后
  □ 生成性能报告(RPS/延迟/错误率曲线)
  □ 识别性能瓶颈(数据库/缓存/网络)
  □ 提出优化建议(代码/配置/架构)
  □ 归档测试脚本和配置
  
  核心原则"限流是为了更好的测试" — 通过合理的并发控制,既能保护被测系统,又能获得准确、可复现的性能数据。


{{voteData.voteSum}} 人已参与
支持
反对
请登录后查看

123c001fa85d 最后编辑于2026-04-15 17:31:42

快捷回复
回复
回复
回复({{post_count}}) {{!is_user ? '我的回复' :'全部回复'}}
排序 默认正序 回复倒序 点赞倒序

{{item.user_info.nickname ? item.user_info.nickname : item.user_name}} LV.{{ item.user_info.bbs_level || item.bbs_level }}

作者 管理员 企业

{{item.floor}}# 同步到gitee 已同步到gitee {{item.is_suggest == 1? '取消推荐': '推荐'}}
{{item.is_suggest == 1? '取消推荐': '推荐'}} 【已收集】
{{item.floor}}# 沙发 板凳 地板 {{item.floor}}# 【已收集】
{{item.user_info.title || '暂无简介'}}
附件

{{itemf.name}}

{{item.created_at}}  {{item.ip_address}}
打赏
已打赏¥{{item.reward_price}}
{{item.like_count}}
分享
{{item.showReply ? '取消回复' : '回复'}}
删除
回复
回复

{{itemc.user_info.nickname}}

{{itemc.user_name}}

回复 {{itemc.comment_user_info.nickname}}

附件

{{itemf.name}}

{{itemc.created_at}}
打赏
已打赏¥{{itemc.reward_price}}
{{itemc.like_count}}
{{itemc.showReply ? '取消回复' : '回复'}}
删除
回复
回复
收起 展开更多
查看更多
打赏
已打赏¥{{reward_price}}
75
{{like_count}}
{{collect_count}}
添加回复 ({{post_count}})

相关推荐

快速安全登录

使用微信扫码登录
回复
回复
问题:
问题自动获取的帖子内容,不准确时需要手动修改. [获取答案]
答案:
提交
bug 需求 取 消 确 定
打赏金额
当前余额:¥{{rewardUserInfo.reward_price}}
{{item.price}}元
请输入 0.1-{{reward_max_price}} 范围内的数值
打赏成功
¥{{price}}
完成 确认打赏

微信登录/注册

切换手机号登录

{{ bind_phone ? '绑定手机' : '手机登录'}}

{{codeText}}
切换微信登录/注册
暂不绑定
CRMEB客服
CRMEB咨询热线 400-8888-794

扫码领取产品资料

功能清单
思维导图
安装教程
CRMEB开源商城下载 源码下载 CRMEB帮助文档 帮助文档
返回顶部 返回顶部
CRMEB客服