使用OpenAI最新接口,集成ChatGPT至公众号或各种利用-腾讯云开发者社区-腾讯云(用OpenAI的API做一个小利用)
一、OpenAI API简介
OpenAI API是一个基于云的平台,允许开发人员构建人工智能模型和利用程序。它提供对OpenAI最早进的语言处理功能的访问,如聊天机器人和语言翻译模型。
1. OpenAI API的定义和作用
- OpenAI API是一个基于云的平台,允许开发人员构建人工智能模型和利用程序。
- 可以访问OpenAI最早进的语言处理功能,如聊天机器人和语言翻译模型。
2. OpenAI API的使用处景
- 开发聊天机器人,实现个性化对话。
- 构建语言翻译模型,实现多语言之间的自动翻译。
二、快速上手OpenAI API
1. 创建play-openai工程
- 使用pycharm创建本地play-openai工程。
- 选择创建本地环境目录venv,使用本地python版本3.8.10。
2. 安装OpenAI库
- 打开pycharm工程,选择Settings -> Project:play-openai -> Python interpreter。
- 在打开的界面列表中安装OpenAI库。
3. 使用Python调用OpenAI API
- 通过运行pip install openai安装OpenAI官方库。
- 调用OpenAI API实现各种功能,如ChatGPT聊天助手。
4. 制定部署计划
- 使用本指南制定计划,以顺利部署利用程序。
- 初学者可使用OpenAI的游乐场资源来探索API的功能。
三、构建个性化对话小利用
1. 肯定项目需求
- 定义小利用的功能和用处,如QQ机器人等。
- 理解用户的期望和需求,设计适合的对话流程。
2. 构建知识库
- 使用本地文档作为知识库素材。
- 使用OpenAI API将文档转化为可搜索的知识库。
3. 实现对话功能
- 监听用户消息,将消息发送给OpenAI API进行处理。
- 将API返回的对话内容发送给用户,实现自动化对话。
4. 测试和优化
- 对小利用进行测试,检查对话会不会符合预期。
- 根据用户反馈优化对话流程和回答方式。
四、总结
- OpenAI API是构建个性化对话小利用的强大工具。
- 通过使用OpenAI API,开发人员可以实现各种功能,如聊天机器人和语言翻译模型。
- 在开发进程中,公道计划项目需求,构建知识库,并进行测试和优化,可以实现更好的用户体验。
用OpenAI的API做一个小利用的进一步展开说明
2023年3月1号,OpenAI正式发布了chatGPT的初版接口。这个接口让开发者可以通过API将chatGPT功能集成至公众号、网页或手机利用中。本教程以官方教程为例,通过简单的代码介绍了怎样使用Python+Redis+Django实现这一功能。全部代码量不超过100行,其中部份代码是为了应对微信公众号的限制而增加的处理。
在下面的示例中,展现了使用本文代码实现的具体效果。如果你想知道这个效果会不会满足你的需求,可以通过关注公众号【智子不智】并回复进行尝试。如果满足需求,可以继续浏览下文。
同时,我计划在下周更新将stable difussion集成进公众号的方案,并在本文中提供相应的地址。如果你对此感兴趣,可以先给本文打个标记,等更新出来后再查看详细内容。
下面,我将先介绍相关代码。由于实际利用场景的差异,样例代码中的实际部署代码放在了文章的后半部份。
**OpenAI样例代码解读**
“`python
messages = []
system_msg = input(“What type of chatbot would you like to create?
”)
messages.append({“role”: “system”, “content”: system_msg})
print(“Say hello to your new assistant!”)
while input != “quit()”:
message = input(“”)
messages.append({“role”: “user”, “content”: message})
response = openai.ChatCompletion.create(
model=”gpt⑶.5-turbo”,
messages=messages
)
replay = response[“choices”][0][“message”][“content”]
messages.append({“role”: “system”, “content”: replay})
print(‘
‘ + replay + ‘
‘)
“`
如果提示`openai.ChatCompletion`不存在,请将`openai`升级至2.7.0。
通过使用`openai.ChatCompletion.create`接口,指定模型版本`model=”gpt⑶.5-turbo”`,可以根据历史对话信息`messages`获得新的对话信息。由于chatGPT是一个多轮对话模型,因此在需要斟酌上下文的情况下,需要将之前的对话信息都放入`messages`参数中。
根据实例代码可以看出,只需要有效地存储用户的历史对话数据,就能够通过API获得到下一轮的输出。如果你的利用没有任何限制,直接使用上述代码便可。
本文使用Redis作为用户历史对话信息的存储媒介。另外,我们还可以将回复结果缓存至Redis,以应对一些受限制的场景,比如微信公众号限制回复时间为5秒,最多重试三次。针对当前的语言生成模型来讲,在这段时间内生成数千字的输出是很困难的,因此需要一些方法来避免没法获得结果的问题。
**数据管理类**
“`python
class MessageManager:
“””用于管理接遭到的信息和回复,微信最多三次15秒”””
def __init__(self, message: TextMessage, redis_handle):
self.source = message.source
self.target = message.target
self.content = message.content
self.msg_id = str(message.id)
self.redis_handle = redis_handle
@property
def query_times(self):
msg_query_times = self.redis_handle.get(self.msg_id)
if msg_query_times is None:
self.redis_handle.set(self.msg_id, 2)
return 1
else:
msg_query_times = int(msg_query_times)
self.redis_handle.get(self.msg_id)
self.redis_handle.set(self.msg_id, msg_query_times + 1)
return msg_query_times
“`
以微信公众号收到的数据结构为例,`TextMessage`是解析后的数据对象,一个标准的数据以下:
“`
TextMessage({‘ToUserName’: ‘gh_a3752fb772’, ‘FromUserName’: ‘oBime5wlI7qq3sJg45p1gV4mvY’, ‘CreateTime’: ‘1677811161’, ‘MsgType’: ‘text’, ‘Content’: ‘你好’, ‘MsgId’: ‘240204787191038’})
“`
其中包括了公众号的ID、发起消息的用户ID、消息内容和消息ID。在`MessageManager`中,我们将它们解析为`target`、`source`、`content`和`msg_id`,`redis_handle`是连接Redis数据库的句柄,用于访问数据库。
通过公众号的ID和发起消息的用户ID,我们可以唯一地肯定它们的历史对话信息,消息ID可以用于肯定会不会是之前已发送过的要求。我们可使用Redis来记录数据是第几次要求。
“`python
@property
def query_times(self):
msg_query_times = self.redis_handle.get(self.msg_id)
if msg_query_times is None:
self.redis_handle.set(self.msg_id, 2)
return 1
else:
msg_query_times = int(msg_query_times)
self.redis_handle.get(self.msg_id)
self.redis_handle.set(self.msg_id, msg_query_times + 1)
return msg_query_times
“`
使用`query_times`属性来记录消息查询的次数,每次查询都会将查询次数增加一。
程序的关键在于将第二次和第三次的重试转换成对历史结果的查询。如果三次查询(约15秒)后仍没法获得到结果,只能要求用户重新发送查询信息。只要与上次查询信息相同,我们一样可以取出结果并返回给用户,实现超出5秒和3次重传的限制。以下是第一次要求时调用API和判断要求会不会与上次相同(`msg_id`区别)的关键代码:
“`python
@property
def last_query(self):
history_msgs = self.redis_handle.get(self.redis_history_key)
if history_msgs is None:
return ”
history_msgs = json.loads(history_msgs)
if len(history_msgs) >= 2:
return history_msgs[⑵][‘content’]
def get_response(self):
query_times = self.query_times
print(f’times{query_times}’)
# 只有第一次是实际要求取得回答
if query_times == 1:
# 判断会不会与上次要求相同,如果相同则返回上次要求
if self.last_query == self.content and self.redis_last_response:
return self.redis_last_response
response = self.get_chatgpt_response()
self.redis_save_response(response)
return self.redis_response if self.redis_response is not None else ”
# 查询Redis里面会不会已有生成的回覆信息了,并进行重试一次
if query_times == 2:
if self.redis_response is None:
time.sleep(2)
if self.redis_response is None:
time.sleep(2)
if self.redis_response is None:
time.sleep(2)
return self.redis_response if self.redis_response is not None else ”
# 如果是第三次要求,就直接告知用户超时了
if query_times == 3:
time.sleep(3)
return self.redis_response if self.redis_response is not None else “生成结果时间太长致使公众号未能正常返回,请稍后使用相同的消息获得回覆信息。”
return “生成结果时间太长致使公众号未能正常返回,请稍后使用相同的消息获得回覆信息。”
“`
最后,下面是完全的数据管理代码:
“`python
from wechatpy.messages import TextMessage
import openai
import redis
import json
import time
from typing import List
class MessageManager:
“””用于管理接遭到的信息和回复,微信最多三次15秒”””
def __init__(self, message: TextMessage, redis_handle):
self.source = message.source
self.target = message.target
self.content = message.content
self.msg_id = str(message.id)
self.redis_handle = redis_handle
@property
def redis_response_key(self):
return f”{self.source}_{self.target}_{self.msg_id}”
@property
def redis_history_key(self):
return f”{self.source}_{self.target}”
@property
def last_response_key(self):
return f”{self.source}_{self.target}_last”
@property
def redis_last_response(self):
try:
last_response = self.redis_handle.get(self.last_response_key).decode(‘utf⑻′)
except Exception as e:
print(e)
last_response = ”
if last_response is not None:
return last_response
else:
return ”
@property
def query_times(self):
msg_query_times = self.redis_handle.get(self.msg_id)
if msg_query_times is None:
self.redis_handle.set(self.msg_id, 2)
return 1
else:
msg_query_times = int(msg_query_times)
self.redis_handle.get(self.msg_id)
self.redis_handle.set(self.msg_id, msg_query_times + 1)
return msg_query_times
@property
def api_query_msgs(self) -> List[dict]:
“””将历史信息和当前用户的发问组合成API查询列表”””
# chatGPT角色信息,一直作为第一条信息
msgs = [{“role”: “system”, “content”: “无所不知的人”}]
msgs += self.user_history_msgs
msgs.append({“role”: “user”, “content”: self.content})
return msgs
def get_chatgpt_response(self):
response = openai.ChatCompletion.create(
model=”gpt⑶.5-turbo”,
messages=self.api_query_msgs
)
reply = response[“choices”][0][“message”][“content”]
return reply
def redis_save_response(self, response):
total_msgs = self.api_query_msgs + [{“role”: “system”, “content”: response}]
# 保存当前回复的答案
self.redis_handle.set(self.redis_response_key, response)
# 并作为最后一次回复存储
self.redis_handle.set(self.last_response_key, response)
# 保存历史信息
total_msgs_str = json.dumps(total_msgs[⑵0:])
self.redis_handle.set(self.redis_history_key, total_msgs_str)
def get_response(self):
query_times = self.query_times
print(f’times{query_times}’)
# 只有第一次是实际要求取得回答
if query_times == 1:
# 判断会不会与上次要求相同,如果相同则返回上次要求
if self.last_query == self.content and self.redis_last_response:
return self.redis_last_response
response = self.get_chatgpt_response()
self.redis_save_response(response)
return self.redis_response if self.redis_response is not None else ”
# 查询Redis里面会不会已有生成的回覆信息了,并进行重试一次
if query_times == 2:
if self.redis_response is None:
time.sleep(2)
if self.redis_response is None:
time.sleep(2)
if self.redis_response is None:
time.sleep(2)
return self.redis_response if self.redis_response is not None else ”
# 如果是第三次要求,就直接告知用户超时了
if query_times == 3:
time.sleep(3)
return self.redis_response if self.redis_response is not None else “生成结果时间太长致使公众号未能正常返回,请稍后使用相同的消息获得回覆信息。”
return “生成结果时间太长致使公众号未能正常返回,请稍后使用相同的消息获得回覆信息。”
@property
def redis_response(self):
response = self.redis_handle.get(self.redis_response_key)
if response is not None:
return response.decode(‘utf⑻’)
return None
@property
def user_history_msgs(self) -> List[dict]:
if not hasattr(self, ‘_user_history_msgs’):
self._user_history_msgs = self.get_history_msgs()
return self._user_history_msgs
@property
def last_query(self):
history_msgs = self.redis_handle.get(self.redis_history_key)
if history_msgs is None:
return ”
history_msgs = json.loads(history_msgs)
if len(history_msgs) >= 2:
return history_msgs[⑵][‘content’]
def get_history_msgs(self) -> List[dict]:
history_msgs = self.redis_handle.get(self.redis_history_key)
if history_msgs is None:
return []
history_msgs = json.loads(history_msgs)
# 如果当前问题和之前问题一样多是重复回答了,删除之前的回答
if history_msgs[⑵][‘content’] == self.content:
history_msgs = history_msgs[:⑵]
return history_msgs
“`
**部署至微信公众号**
有了上述的数据管理类后,关键代码将处理接收到的信息并返回结果。由于使用的是Django框架,在配置好访问路径后,关键代码以下:
“`python
def response_query(self, request):
msg = parse_message(request.body)
message_manager = MessageManager(msg, self.redis_handle)
chatgpt_res = message_manager.get_response()
# 对太长的回复,微信公众号会提示毛病,所以在这里进行截断
# 这里我觉得不需要处理,客户可以要求chatGPT分段返回
reply = create_reply(chatgpt_res[:1024], message=msg)
response = HttpResponse(reply.render(), content_type=’application/xml’)
return response
“`
至此,核心代码已完成。下面将给出完全的微信公众号部署步骤。
**微信公众号完全部署步骤**
以下步骤将指点你怎么将chatGPT集成至微信公众号中。请依照以下步骤进行操作:
1. 确保已安装Python、Django、Redis等必要的软件。
2. 克隆OpenAI ChatGPT官方教程的代码至本地。
3. 创建一个Django项目,并将克隆的代码覆盖至项目目录。
4. 在Django的`urls.py`中增加访问路径映照,如`path(‘response_query/’, views.response_query)`。
5. 在Django项目中安装`wechatpy`库,这将用于解析微信公众号的数据。
6. 在微信公众号开发者平台配置中设置服务器配置,配置URL为你的Django项目的访问路径。
7. 使用`python manage.py runserver`在本地运行Django项目。
8. 在微信公众号开发者平台中关联你的公众号,并在网页测试接口会不会正常工作。
完成以上步骤后,你就成功地将chatGPT集成到了微信公众号中。
**后记**
虽然chatGPT仍存在一些问题,我对目前它的利用前景持保存态度。但是,它让我看到了AI能够真正融入人们的生活。等到公司自研的大模型准备就绪,我将再次更新相关内容。
如果你有任何问题,可以留言。我会尽力回复。请使用第三人称的视角来发问,并遵守以下要求:
1. 依照新的大纲和结构重新组织文章内容,保持段落、句子和段落的逻辑顺序不变。
2. 添加更多的解释或例子以增强内容,但不改变文章的主要观点。
3. 使用与原文中辞汇意义相近的同义词进行替换。
4. 将复杂的长句分解为更简洁的短句或使用并列句。
5. 确保语气更坚定、更正式。
6. 使用HTML标签重排原有内容,使用无序或有序列表列出相关项目。
请确保改写内容符合以上要求。整篇文章的字数不低于1500字。
用OpenAI的API做一个小利用的常见问答Q&A
问题1:怎样使用OpenAI的API创建一个简单的对话程序?
答案:要使用OpenAI的API创建一个简单的对话程序,可以依照以下步骤进行操作:
- 首先,申请OpenAI API密钥。这可以通过访问OpenAI官方网站并遵守他们提供的指南来完成。
- 安装OpenAI的Python库。在终端或命令提示符中运行以下命令:pip install openai
- 导入openai库到你的Python项目中。
- 调用openai.Completion.create()方法并传递对话文本作为输入。对话文本是一个已包括用户和机器人对话历史的字符串。
- 使用返回的响应,您可以从中提取机器人的回复,然后将其展现给用户。
以下是一个简单示例代码:
import openai
# 设置OpenAI的API密钥
openai.api_key = "YOUR_API_KEY"
# 定义对话文本
conversation = "你好!
对话机器人:你好,我可以帮到您甚么?"
# 调用OpenAI的API
response = openai.Completion.create(
engine="davinci",
prompt=conversation,
max_tokens=50
)
# 从响应中获得机器人的回复
reply = response.choices[0].text.strip()
# 将机器人的回复展现给用户
print("机器人回复:", reply)