时间 | 分钟 |
免费账户 |
20 QPS 150000token/分钟 |
付费账户 |
30000 QPS 250,000 token/分钟 |
RateLimitError: Rate limit reached for default-codex in organization org-{id} on requests per min. Limit: 20.000000 / min. Current: 24.000000 / min. Contact support@openai.com if you continue to have issues or if you’d like to request an increase.
对于Rate limit解决思路提供2种
思路一
使用自定义装饰器来限速并发的请求
代码如下:
import random
import time
import openai
def retry_with_exponential_backoff(
func,
initial_delay: float = 1,
exponential_base: float = 2,
jitter: bool = True,
max_retries: int = 10,
errors: tuple = (openai.error.RateLimitError,),
):
def wrapper(*args, **kwargs):
num_retries = 0
delay = initial_delay
while True:
try:
return func(*args, **kwargs)
except errors as e:
num_retries += 1
if num_retries > max_retries:
raise Exception(
f"Maximum number of retries ({max_retries}) exceeded."
)
delay *= exponential_base * (1 + jitter * random.random())
time.sleep(delay)
except Exception as e:
raise e
return wrapper
@retry_with_exponential_backoff
def completions_with_backoff(**kwargs):
return openai.Completion.create(**kwargs)
completions_with_backoff(model="text-davinci-003", prompt="自定义prompt请求")
思路二
使用延迟来限制请求
import time
import openai
def delayed_completion(delay_in_seconds: float = 1, **kwargs):
# 延迟xx秒
time.sleep(delay_in_seconds)
# Call the Completion API and return the result
return openai.Completion.create(**kwargs)
# 限制并发数量
rate_limit_per_minute = 20
delay = 60.0 / rate_limit_per_minute
delayed_completion(
delay_in_seconds=delay,
model="text-davinci-003",
prompt="自定义prompt"
)
第二种方式在单个key上存在的优势还是不错的,但是在大并发的时候就明显不够用了
代码实现思路:
1、设置任务生成器
def task_id_generator_function():
"""Generate integers 0, 1, 2, and so on."""
task_id = 0
while True:
yield task_id
task_id += 1
2、定义任务字典状态
@dataclass
class StatusTracker:
num_tasks_started: int = 0
num_tasks_in_progress: int = 0
num_tasks_succeeded: int = 0
num_tasks_failed: int = 0
num_rate_limit_errors: int = 0
num_api_errors: int = 0
num_other_errors: int = 0
time_of_last_rate_limit_error: int = 0
3、统计请求中的令牌数。仅支持完成和嵌入请求
def num_tokens_consumed_from_request(
request_json: dict,
api_endpoint: str,
token_encoding_name: str,
):
encoding = tiktoken.get_encoding(token_encoding_name)
if api_endpoint == "completions":
prompt = request_json["prompt"]
max_tokens = request_json.get("max_tokens", 15)
n = request_json.get("n", 1)
completion_tokens = n * max_tokens
if isinstance(prompt, str):
prompt_tokens = len(encoding.encode(prompt))
num_tokens = prompt_tokens + completion_tokens
return num_tokens
elif isinstance(prompt, list):
prompt_tokens = sum([len(encoding.encode(p)) for p in prompt])
num_tokens = prompt_tokens + completion_tokens
return num_tokens
else:
raise TypeError('Expecting either string or list of strings for "prompt" field in completion request')
elif api_endpoint == "embeddings":
input = request_json["input"]
if isinstance(input, str): # single input
num_tokens = len(encoding.encode(input))
return num_tokens
elif isinstance(input, list): # multiple inputs
num_tokens = sum([len(encoding.encode(i)) for i in input])
return num_tokens
else:
raise TypeError('Expecting either string or list of strings for "inputs" field in embedding request')
else:
raise NotImplementedError(f'API endpoint "{api_endpoint}" not implemented in this script')
4、定义主函数
async def process_api_requests_from_file(
requests_filepath: str,
save_filepath: str,
request_url: str,
api_key: str,
max_requests_per_minute: float,
max_tokens_per_minute: float,
token_encoding_name: str,
max_attempts: int,
logging_level: int,
):
seconds_to_pause_after_rate_limit_error = 15
seconds_to_sleep_each_loop = 0.001
logging.basicConfig(level=logging_level)
logging.debug(f"Logging initialized at level {logging_level}")
api_endpoint = api_endpoint_from_url(request_url)
request_header = {"Authorization": f"Bearer {api_key}"}
queue_of_requests_to_retry = asyncio.Queue()
task_id_generator = task_id_generator_function()
status_tracker = StatusTracker()
next_request = None
available_request_capacity = max_requests_per_minute
available_token_capacity = max_tokens_per_minute
last_update_time = time.time()
file_not_finished = True
logging.debug(f"Initialization complete.")
with open(requests_filepath) as file:
requests = file.__iter__()
logging.debug(f"File opened. Entering main loop")
while True:
if next_request is None:
if queue_of_requests_to_retry.empty() is False:
next_request = queue_of_requests_to_retry.get_nowait()
logging.debug(f"Retrying request {next_request.task_id}: {next_request}")
elif file_not_finished:
try:
request_json = eval(next(requests))
next_request = APIRequest(
task_id=next(task_id_generator),
request_json=request_json,
token_consumption=num_tokens_consumed_from_request(request_json, api_endpoint, token_encoding_name),
attempts_left=max_attempts,
)
status_tracker.num_tasks_started += 1
status_tracker.num_tasks_in_progress += 1
logging.debug(f"Reading request {next_request.task_id}: {next_request}")
except StopIteration:
logging.debug("Read file exhausted")
file_not_finished = False
current_time = time.time()
seconds_since_update = current_time - last_update_time
available_request_capacity = min(
available_request_capacity + max_requests_per_minute * seconds_since_update / 60.0,
max_requests_per_minute,
)
available_token_capacity = min(
available_token_capacity + max_tokens_per_minute * seconds_since_update / 60.0,
max_tokens_per_minute,
)
last_update_time = current_time
if next_request:
next_request_tokens = next_request.token_consumption
if (
available_request_capacity >= 1
and available_token_capacity >= next_request_tokens
):
available_request_capacity -= 1
available_token_capacity -= next_request_tokens
next_request.attempts_left -= 1
asyncio.create_task(
next_request.call_API(
request_url=request_url,
request_header=request_header,
retry_queue=queue_of_requests_to_retry,
save_filepath=save_filepath,
status_tracker=status_tracker,
)
)
next_request = None
if status_tracker.num_tasks_in_progress == 0:
break
await asyncio.sleep(seconds_to_sleep_each_loop)
seconds_since_rate_limit_error = (time.time() - status_tracker.time_of_last_rate_limit_error)
if seconds_since_rate_limit_error < seconds_to_pause_after_rate_limit_error:
remaining_seconds_to_pause = (seconds_to_pause_after_rate_limit_error - seconds_since_rate_limit_error)
await asyncio.sleep(remaining_seconds_to_pause)
logging.warn(f"Pausing to cool down until {time.ctime(status_tracker.time_of_last_rate_limit_error + seconds_to_pause_after_rate_limit_error)}")
logging.info(f"""Parallel processing complete. Results saved to {save_filepath}""")
if status_tracker.num_tasks_failed > 0:
logging.warning(f"{status_tracker.num_tasks_failed} / {status_tracker.num_tasks_started} requests failed. Errors logged to {save_filepath}.")
if status_tracker.num_rate_limit_errors > 0:
logging.warning(f"{status_tracker.num_rate_limit_errors} rate limit errors received. Consider running at a lower rate.")
以上是对并发处理RateLimitError解决思路
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END
暂无评论内容