Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixbug: llm.timeout not working #1060

Merged
merged 4 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config2.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ llm:
api_key: "YOUR_API_KEY"
model: "gpt-4-turbo-preview" # or gpt-3.5-turbo-1106 / gpt-4-1106-preview
proxy: "YOUR_PROXY" # for LLM API requests
# timeout: 600 # Optional.
pricing_plan: "" # Optional. If invalid, it will be automatically filled in with the value of the `model`.
# Azure-exclusive pricing plan mappings:
# - gpt-3.5-turbo 4k: "gpt-3.5-turbo-1106"
Expand Down
6 changes: 3 additions & 3 deletions metagpt/actions/action_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ async def _aask_v1(
images: Optional[Union[str, list[str]]] = None,
system_msgs: Optional[list[str]] = None,
schema="markdown", # compatible to original format
timeout=3,
timeout=0,
) -> (str, BaseModel):
"""Use ActionOutput to wrap the output of aask"""
content = await self.llm.aask(prompt, system_msgs, images=images, timeout=timeout)
Expand Down Expand Up @@ -448,7 +448,7 @@ def set_llm(self, llm):
def set_context(self, context):
self.set_recursive("context", context)

async def simple_fill(self, schema, mode, images: Optional[Union[str, list[str]]] = None, timeout=3, exclude=None):
async def simple_fill(self, schema, mode, images: Optional[Union[str, list[str]]] = None, timeout=0, exclude=None):
prompt = self.compile(context=self.context, schema=schema, mode=mode, exclude=exclude)

if schema != "raw":
Expand All @@ -473,7 +473,7 @@ async def fill(
mode="auto",
strgy="simple",
images: Optional[Union[str, list[str]]] = None,
timeout=3,
timeout=0,
exclude=[],
):
"""Fill the node(s) with mode.
Expand Down
2 changes: 1 addition & 1 deletion metagpt/configs/llm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class LLMConfig(YamlModel):
stream: bool = False
logprobs: Optional[bool] = None # https://cookbook.openai.com/examples/using_logprobs
top_logprobs: Optional[int] = None
timeout: int = 60
timeout: int = 600

# For Network
proxy: Optional[str] = None
Expand Down
8 changes: 4 additions & 4 deletions metagpt/provider/anthropic_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ def _update_costs(self, usage: Usage, model: str = None, local_calc_usage: bool
def get_choice_text(self, resp: Message) -> str:
return resp.content[0].text

async def _achat_completion(self, messages: list[dict], timeout: int = 3) -> Message:
async def _achat_completion(self, messages: list[dict], timeout: int = 0) -> Message:
resp: Message = await self.aclient.messages.create(**self._const_kwargs(messages))
self._update_costs(resp.usage, self.model)
return resp

async def acompletion(self, messages: list[dict], timeout: int = 3) -> Message:
return await self._achat_completion(messages, timeout=timeout)
async def acompletion(self, messages: list[dict], timeout: int = 0) -> Message:
return await self._achat_completion(messages, timeout=self.get_timeout(timeout))

async def _achat_completion_stream(self, messages: list[dict], timeout: int = 3) -> str:
async def _achat_completion_stream(self, messages: list[dict], timeout: int = 0) -> str:
stream = await self.aclient.messages.create(**self._const_kwargs(messages, stream=True))
collected_content = []
usage = Usage(input_tokens=0, output_tokens=0)
Expand Down
26 changes: 15 additions & 11 deletions metagpt/provider/base_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)

from metagpt.configs.llm_config import LLMConfig
from metagpt.const import LLM_API_TIMEOUT
from metagpt.logs import logger
from metagpt.schema import Message
from metagpt.utils.common import log_and_reraise
Expand Down Expand Up @@ -108,7 +109,7 @@ async def aask(
system_msgs: Optional[list[str]] = None,
format_msgs: Optional[list[dict[str, str]]] = None,
images: Optional[Union[str, list[str]]] = None,
timeout=3,
timeout=0,
stream=True,
) -> str:
if system_msgs:
Expand All @@ -124,31 +125,31 @@ async def aask(
else:
message.extend(msg)
logger.debug(message)
rsp = await self.acompletion_text(message, stream=stream, timeout=timeout)
rsp = await self.acompletion_text(message, stream=stream, timeout=self.get_timeout(timeout))
return rsp

def _extract_assistant_rsp(self, context):
return "\n".join([i["content"] for i in context if i["role"] == "assistant"])

async def aask_batch(self, msgs: list, timeout=3) -> str:
async def aask_batch(self, msgs: list, timeout=0) -> str:
"""Sequential questioning"""
context = []
for msg in msgs:
umsg = self._user_msg(msg)
context.append(umsg)
rsp_text = await self.acompletion_text(context, timeout=timeout)
rsp_text = await self.acompletion_text(context, timeout=self.get_timeout(timeout))
context.append(self._assistant_msg(rsp_text))
return self._extract_assistant_rsp(context)

async def aask_code(self, messages: Union[str, Message, list[dict]], timeout=3, **kwargs) -> dict:
async def aask_code(self, messages: Union[str, Message, list[dict]], timeout=0, **kwargs) -> dict:
raise NotImplementedError

@abstractmethod
async def _achat_completion(self, messages: list[dict], timeout=3):
async def _achat_completion(self, messages: list[dict], timeout=0):
"""_achat_completion implemented by inherited class"""

@abstractmethod
async def acompletion(self, messages: list[dict], timeout=3):
async def acompletion(self, messages: list[dict], timeout=0):
"""Asynchronous version of completion
All GPTAPIs are required to provide the standard OpenAI completion interface
[
Expand All @@ -159,7 +160,7 @@ async def acompletion(self, messages: list[dict], timeout=3):
"""

@abstractmethod
async def _achat_completion_stream(self, messages: list[dict], timeout: int = 3) -> str:
async def _achat_completion_stream(self, messages: list[dict], timeout: int = 0) -> str:
"""_achat_completion_stream implemented by inherited class"""

@retry(
Expand All @@ -169,11 +170,11 @@ async def _achat_completion_stream(self, messages: list[dict], timeout: int = 3)
retry=retry_if_exception_type(ConnectionError),
retry_error_callback=log_and_reraise,
)
async def acompletion_text(self, messages: list[dict], stream: bool = False, timeout: int = 3) -> str:
async def acompletion_text(self, messages: list[dict], stream: bool = False, timeout: int = 0) -> str:
"""Asynchronous version of completion. Return str. Support stream-print"""
if stream:
return await self._achat_completion_stream(messages, timeout=timeout)
resp = await self._achat_completion(messages, timeout=timeout)
return await self._achat_completion_stream(messages, timeout=self.get_timeout(timeout))
resp = await self._achat_completion(messages, timeout=self.get_timeout(timeout))
return self.get_choice_text(resp)

def get_choice_text(self, rsp: dict) -> str:
Expand Down Expand Up @@ -236,3 +237,6 @@ def with_model(self, model: str):
"""Set model and return self. For example, `with_model("gpt-3.5-turbo")`."""
self.config.model = model
return self

def get_timeout(self, timeout: int) -> int:
return timeout or self.config.timeout or LLM_API_TIMEOUT
8 changes: 4 additions & 4 deletions metagpt/provider/dashscope_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,16 +202,16 @@ def completion(self, messages: list[dict]) -> GenerationOutput:
self._update_costs(dict(resp.usage))
return resp.output

async def _achat_completion(self, messages: list[dict], timeout: int = 3) -> GenerationOutput:
async def _achat_completion(self, messages: list[dict], timeout: int = 0) -> GenerationOutput:
resp: GenerationResponse = await self.aclient.acall(**self._const_kwargs(messages, stream=False))
self._check_response(resp)
self._update_costs(dict(resp.usage))
return resp.output

async def acompletion(self, messages: list[dict], timeout=3) -> GenerationOutput:
return await self._achat_completion(messages, timeout=timeout)
async def acompletion(self, messages: list[dict], timeout=0) -> GenerationOutput:
return await self._achat_completion(messages, timeout=self.get_timeout(timeout))

async def _achat_completion_stream(self, messages: list[dict], timeout: int = 3) -> str:
async def _achat_completion_stream(self, messages: list[dict], timeout: int = 0) -> str:
resp = await self.aclient.acall(**self._const_kwargs(messages, stream=True))
collected_content = []
usage = {}
Expand Down
2 changes: 1 addition & 1 deletion metagpt/provider/general_api_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ async def arequest_raw(
total=request_timeout[1],
)
else:
timeout = aiohttp.ClientTimeout(total=request_timeout if request_timeout else TIMEOUT_SECS)
timeout = aiohttp.ClientTimeout(total=request_timeout or TIMEOUT_SECS)

if files:
# TODO: Use `aiohttp.MultipartWriter` to create the multipart form data here.
Expand Down
8 changes: 4 additions & 4 deletions metagpt/provider/google_gemini_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,16 @@ def completion(self, messages: list[dict]) -> "GenerateContentResponse":
self._update_costs(usage)
return resp

async def _achat_completion(self, messages: list[dict], timeout: int = 3) -> "AsyncGenerateContentResponse":
async def _achat_completion(self, messages: list[dict], timeout: int = 0) -> "AsyncGenerateContentResponse":
resp: AsyncGenerateContentResponse = await self.llm.generate_content_async(**self._const_kwargs(messages))
usage = await self.aget_usage(messages, resp.text)
self._update_costs(usage)
return resp

async def acompletion(self, messages: list[dict], timeout=3) -> dict:
return await self._achat_completion(messages, timeout=timeout)
async def acompletion(self, messages: list[dict], timeout=0) -> dict:
return await self._achat_completion(messages, timeout=self.get_timeout(timeout))

async def _achat_completion_stream(self, messages: list[dict], timeout: int = 3) -> str:
async def _achat_completion_stream(self, messages: list[dict], timeout: int = 0) -> str:
resp: AsyncGenerateContentResponse = await self.llm.generate_content_async(
**self._const_kwargs(messages, stream=True)
)
Expand Down
14 changes: 7 additions & 7 deletions metagpt/provider/human_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class HumanProvider(BaseLLM):
def __init__(self, config: LLMConfig):
pass

def ask(self, msg: str, timeout=3) -> str:
def ask(self, msg: str, timeout=0) -> str:
logger.info("It's your turn, please type in your response. You may also refer to the context below")
rsp = input(msg)
if rsp in ["exit", "quit"]:
Expand All @@ -31,20 +31,20 @@ async def aask(
system_msgs: Optional[list[str]] = None,
format_msgs: Optional[list[dict[str, str]]] = None,
generator: bool = False,
timeout=3,
timeout=0,
) -> str:
return self.ask(msg, timeout=timeout)
return self.ask(msg, timeout=self.get_timeout(timeout))

async def _achat_completion(self, messages: list[dict], timeout=3):
async def _achat_completion(self, messages: list[dict], timeout=0):
pass

async def acompletion(self, messages: list[dict], timeout=3):
async def acompletion(self, messages: list[dict], timeout=0):
"""dummy implementation of abstract method in base"""
return []

async def _achat_completion_stream(self, messages: list[dict], timeout: int = 3) -> str:
async def _achat_completion_stream(self, messages: list[dict], timeout: int = 0) -> str:
pass

async def acompletion_text(self, messages: list[dict], stream=False, timeout=3) -> str:
async def acompletion_text(self, messages: list[dict], stream=False, timeout=0) -> str:
"""dummy implementation of abstract method in base"""
return ""
13 changes: 6 additions & 7 deletions metagpt/provider/ollama_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import json

from metagpt.configs.llm_config import LLMConfig, LLMType
from metagpt.const import LLM_API_TIMEOUT
from metagpt.logs import log_llm_stream
from metagpt.provider.base_llm import BaseLLM
from metagpt.provider.general_api_requestor import GeneralAPIRequestor
Expand Down Expand Up @@ -50,28 +49,28 @@ def _decode_and_load(self, chunk: bytes, encoding: str = "utf-8") -> dict:
chunk = chunk.decode(encoding)
return json.loads(chunk)

async def _achat_completion(self, messages: list[dict], timeout: int = 3) -> dict:
async def _achat_completion(self, messages: list[dict], timeout: int = 0) -> dict:
resp, _, _ = await self.client.arequest(
method=self.http_method,
url=self.suffix_url,
params=self._const_kwargs(messages),
request_timeout=LLM_API_TIMEOUT,
request_timeout=self.get_timeout(timeout),
)
resp = self._decode_and_load(resp)
usage = self.get_usage(resp)
self._update_costs(usage)
return resp

async def acompletion(self, messages: list[dict], timeout=3) -> dict:
return await self._achat_completion(messages, timeout=timeout)
async def acompletion(self, messages: list[dict], timeout=0) -> dict:
return await self._achat_completion(messages, timeout=self.get_timeout(timeout))

async def _achat_completion_stream(self, messages: list[dict], timeout: int = 3) -> str:
async def _achat_completion_stream(self, messages: list[dict], timeout: int = 0) -> str:
stream_resp, _, _ = await self.client.arequest(
method=self.http_method,
url=self.suffix_url,
stream=True,
params=self._const_kwargs(messages, stream=True),
request_timeout=LLM_API_TIMEOUT,
request_timeout=self.get_timeout(timeout),
)

collected_content = []
Expand Down
26 changes: 13 additions & 13 deletions metagpt/provider/openai_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ def _get_proxy_params(self) -> dict:

return params

async def _achat_completion_stream(self, messages: list[dict], timeout=3) -> str:
async def _achat_completion_stream(self, messages: list[dict], timeout=0) -> str:
response: AsyncStream[ChatCompletionChunk] = await self.aclient.chat.completions.create(
**self._cons_kwargs(messages, timeout=timeout), stream=True
**self._cons_kwargs(messages, timeout=self.get_timeout(timeout)), stream=True
)
usage = None
collected_messages = []
Expand Down Expand Up @@ -109,28 +109,28 @@ async def _achat_completion_stream(self, messages: list[dict], timeout=3) -> str
self._update_costs(usage)
return full_reply_content

def _cons_kwargs(self, messages: list[dict], timeout=3, **extra_kwargs) -> dict:
def _cons_kwargs(self, messages: list[dict], timeout=0, **extra_kwargs) -> dict:
kwargs = {
"messages": messages,
"max_tokens": self._get_max_tokens(messages),
# "n": 1, # Some services do not provide this parameter, such as mistral
# "stop": None, # default it's None and gpt4-v can't have this one
"temperature": self.config.temperature,
"model": self.model,
"timeout": max(self.config.timeout, timeout),
"timeout": self.get_timeout(timeout),
}
if extra_kwargs:
kwargs.update(extra_kwargs)
return kwargs

async def _achat_completion(self, messages: list[dict], timeout=3) -> ChatCompletion:
kwargs = self._cons_kwargs(messages, timeout=timeout)
async def _achat_completion(self, messages: list[dict], timeout=0) -> ChatCompletion:
kwargs = self._cons_kwargs(messages, timeout=self.get_timeout(timeout))
rsp: ChatCompletion = await self.aclient.chat.completions.create(**kwargs)
self._update_costs(rsp.usage)
return rsp

async def acompletion(self, messages: list[dict], timeout=3) -> ChatCompletion:
return await self._achat_completion(messages, timeout=timeout)
async def acompletion(self, messages: list[dict], timeout=0) -> ChatCompletion:
return await self._achat_completion(messages, timeout=self.get_timeout(timeout))

@retry(
wait=wait_random_exponential(min=1, max=60),
Expand All @@ -139,24 +139,24 @@ async def acompletion(self, messages: list[dict], timeout=3) -> ChatCompletion:
retry=retry_if_exception_type(APIConnectionError),
retry_error_callback=log_and_reraise,
)
async def acompletion_text(self, messages: list[dict], stream=False, timeout=3) -> str:
async def acompletion_text(self, messages: list[dict], stream=False, timeout=0) -> str:
"""when streaming, print each token in place."""
if stream:
return await self._achat_completion_stream(messages, timeout=timeout)

rsp = await self._achat_completion(messages, timeout=timeout)
rsp = await self._achat_completion(messages, timeout=self.get_timeout(timeout))
return self.get_choice_text(rsp)

async def _achat_completion_function(
self, messages: list[dict], timeout: int = 3, **chat_configs
self, messages: list[dict], timeout: int = 0, **chat_configs
) -> ChatCompletion:
messages = process_message(messages)
kwargs = self._cons_kwargs(messages=messages, timeout=timeout, **chat_configs)
kwargs = self._cons_kwargs(messages=messages, timeout=self.get_timeout(timeout), **chat_configs)
rsp: ChatCompletion = await self.aclient.chat.completions.create(**kwargs)
self._update_costs(rsp.usage)
return rsp

async def aask_code(self, messages: list[dict], timeout: int = 3, **kwargs) -> dict:
async def aask_code(self, messages: list[dict], timeout: int = 0, **kwargs) -> dict:
"""Use function of tools to ask a code.
Note: Keep kwargs consistent with https://platform.openai.com/docs/api-reference/chat/create

Expand Down
8 changes: 4 additions & 4 deletions metagpt/provider/qianfan_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ def completion(self, messages: list[dict]) -> JsonBody:
self._update_costs(resp.body.get("usage", {}))
return resp.body

async def _achat_completion(self, messages: list[dict], timeout: int = 3) -> JsonBody:
async def _achat_completion(self, messages: list[dict], timeout: int = 0) -> JsonBody:
resp = await self.aclient.ado(**self._const_kwargs(messages=messages, stream=False))
self._update_costs(resp.body.get("usage", {}))
return resp.body

async def acompletion(self, messages: list[dict], timeout: int = 3) -> JsonBody:
return await self._achat_completion(messages, timeout=timeout)
async def acompletion(self, messages: list[dict], timeout: int = 0) -> JsonBody:
return await self._achat_completion(messages, timeout=self.get_timeout(timeout))

async def _achat_completion_stream(self, messages: list[dict], timeout: int = 3) -> str:
async def _achat_completion_stream(self, messages: list[dict], timeout: int = 0) -> str:
resp = await self.aclient.ado(**self._const_kwargs(messages=messages, stream=True))
collected_content = []
usage = {}
Expand Down
Loading
Loading