ChatGPT 私房菜:OpenAI Api 开发接口限流怎么办?一个妙招
如何优雅地处理更多提示词
前言
《ChatGPT私房菜》系列文章,是在 OpenAI 开放给开发者使用的文档基础上写出来的。是为了给准备做ChatGPT相关应用的开发者参考。
限流
一般的App应用,在并发量高企之后,会考虑人为进行限流。基于以下考虑:
OpenAI 的接口算力资源有限,所以网页端早期扩容之前,经常出现宕机的情况。OpenAI也想不到,ChatGPT如此火热,准备的服务器不够用。
开发者使用api token对接大模型,遇到限流,应该正确处理,以便用户比较容易接受。
额度
OpenAI 的接口限流,按照用户身份,分为三类,免费用户、付费用户(开通后48小时内)、付费用户(开通48小时后)。
根据接口功能,大致分为两类,一类是文本补全,一类是代码补全。
下面列出了可能的情况,最新的数据,你可以在 OpenAI 的官网找到。
用户身份 |
文本补全 向量化 |
代码生成 代码编辑 |
免费用户 |
20次请求/分钟 15万token/分钟 |
20次请求/分钟 15万token/分钟 |
付费用户 (开通48小时内) |
60次请求/分钟 25万token(davinci模型)/分钟 |
20次请求/分钟 15万token/分钟 |
付费用户 开通48小时后 |
3000次请求/分钟 25万token/分钟 |
20次请求/分钟 15万token/分钟 |
代码示例
一般遇到频率限制,会抛出类似以下的异常:
1
2
3
|
Rate limit reached for default-code-davinci-002 in organization org-exampleorgid123 on tokens per min.
Limit: 10000.000000 / min. Current: 10020.000000 / min.
Contact support@openai.com if you continue to have issues.
|
下面是一段错误的示例,会触发频次异常。
1
2
3
4
5
6
7
8
9
|
import openai
# 多次请求
for _ in range(100):
openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hello"}],
max_tokens=10,
)
|
上面代码中,没有使用并发请求,仅同步循环多次。根据响应的速度,极大概率触发频次过高的限制。
错误重试
最简单、也是最容易想到的,就是自动重试。
比如一分钟只有20次请求额度,如果达到限额,触发了异常。那么,记录下这次异常,使用重试策略。比如:
- 第1次异常,间隔 10ms 重试;
- 仍然失败,即第2次异常,间隔 15ms 再次重试;
- 第3次异常,50ms 重试;
- ……
以此类推,错误次数多,触发的延迟就越长。我们不需要自己重新造轮子,下面是两个方便且流行的库。
1,Tenacity
使用Python 的 Tenacity 库,文档地址点这里:https://tenacity.readthedocs.io/en/latest/
下面是是代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import openai
from tenacity import (
retry,
stop_after_attempt,
wait_random_exponential,
)
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
def completion_with_backoff(**kwargs):
return openai.Completion.create(**kwargs)
completion_with_backoff(model="text-davinci-002", prompt="Once upon a time,")
|
最主要的是使用了 Tenacity 提供的修饰器 @retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
。
- wait_random_exponential:使用了指数随机退避策略,等待时间在1到60秒之间。也就是说,每次失败后,重试间隔时间,会以指数增长的方式逐渐增加,但最多不超过60秒。
- stop_after_attempt(6): 最多重试 6 次,之后就不会再请求了。
2,backoff
与 Tenacity 一样,是控制重试的Python库,号称是更优雅的实现。
backoff仓库地址: https://github.com/litl/backoff
下面是代码示例:
1
2
3
4
5
6
7
8
9
10
|
import backoff
import openai
@backoff.on_exception(backoff.expo, openai.error.RateLimitError)
def completions_with_backoff(**kwargs):
return openai.Completion.create(**kwargs)
completions_with_backoff(model="text-davinci-002", prompt="Once upon a time,")
|
backoff 提供的装饰器,自动捕捉函数方法抛出的异常,根据配置的动作执行重试。
3,手动实现
如果上面两个重试策略,不满足你的开发使用。自己造一个轮子也是OK的。
OpenAI 给出了一个示例,下面是修饰器的原始实现代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
import random
import time
import openai
# define a retry decorator
def retry_with_exponential_backoff(
func,
initial_delay: float = 1,
exponential_base: float = 2,
jitter: bool = True,
max_retries: int = 10,
errors: tuple = (openai.error.RateLimitError,),
):
"""Retry a function with exponential backoff."""
def wrapper(*args, **kwargs):
# Initialize variables
num_retries = 0
delay = initial_delay
# Loop until a successful response or max_retries is hit or an exception is raised
while True:
try:
return func(*args, **kwargs)
# Retry on specified errors
except errors as e:
# Increment retries
num_retries += 1
# Check if max retries has been reached
if num_retries > max_retries:
raise Exception(
f"Maximum number of retries ({max_retries}) exceeded."
)
# Increment the delay
delay *= exponential_base * (1 + jitter * random.random())
# Sleep for the delay
time.sleep(delay)
# Raise exceptions for any errors not specified
except Exception as e:
raise e
return wrapper
|
使用的时候:
1
2
3
4
5
6
|
@retry_with_exponential_backoff
def completions_with_backoff(**kwargs):
return openai.Completion.create(**kwargs)
completions_with_backoff(model="text-davinci-002", prompt="Once upon a time,")
|
怎么扩容
上一节的错误重试,整体控制触发异常之后的情况。本节介绍请求扩容。
1,延时
对于ChatGPT的流式输出接口,更好的方式,是增加请求之间的延迟,让用户不易察觉。
比如,一分钟20次请求限制,给每个用户添加 3-6 秒的延迟。
下面是一段示例代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
import time
import openai
# 对请求添加延时
def delayed_completion(delay_in_seconds: float = 1, **kwargs):
"""Delay a completion by a specified amount of time."""
# Sleep for the delay
time.sleep(delay_in_seconds)
# Call the Completion API and return the result
return openai.Completion.create(**kwargs)
# 根据频次,计算延时
rate_limit_per_minute = 20
delay = 60.0 / rate_limit_per_minute
delayed_completion(
delay_in_seconds=delay,
model="text-davinci-002",
prompt="Once upon a time,"
)
|
2,批处理
一个请求只处理一个prompt提示词输入,确实是有点慢。那么,为了提高请求的吞吐量,可以把多个请求的 prompt,一次性塞到一个请求内,是不是并发处理能力立马就上来了。
事实上,OpenAI是允许和鼓励这么使用的。
一个请求,一个提示词,代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import openai
num_stories = 10
prompt = "Once upon a time,"
for _ in range(num_stories):
response = openai.Completion.create(
model="curie",
prompt=prompt,
max_tokens=20,
)
# print story
print(prompt + response.choices[0].text)
|
注意 prompt 这个参数,只赋值了一个字符串。其实,这个参数,可以传数组,容纳N个字符串。下面是示例代码:
1
2
3
4
5
6
7
8
9
10
11
|
import openai
num_stories = 10
prompts = ["Once upon a time,"] * num_stories
# 批处理,一个请求,放入 10 个提示词
response = openai.Completion.create(
model="curie",
prompt=prompts,
max_tokens=20,
)
|
这样做,会带来一个问题,就是需要在响应中,要明确区分出某个响应,是针对某个请求的。方法是使用索引进行对照:
1
2
3
4
5
6
7
8
|
# 根据索引,把响应和请求对应起来
stories = [""] * len(prompts)
for choice in response.choices:
stories[choice.index] = prompts[choice.index] + choice.text
# 打印输出
for story in stories:
print(story)
|
响应内容中,choice.index
就是输入的 prompts.index
是一一对照的。即时ChatGPT生成响应的次序不同,但响应中明确指出了此响应多对照的提示词数组中的索引位置。
最后
本文重点讲了两个内容,一个是遇到OpenAI限流异常时,如何优雅地处理,或如何避免触发限流;另一个是通过批处理、延时的方法,最大程度提高吞吐量。
我是@程序员小助手,专注编程知识,圈子动态的IT领域原创作者。