zhaoqingang
2024-11-22 aa7ebd168dd512dcc99b4c91335819d73afd55b6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import json
 
import httpx
 
from Log import logger
 
 
class BasicService:
    def __init__(self, base_url: str):
        self.base_url = base_url
 
    def _check_response(self, response: httpx.Response):
        """检查响应并处理错误"""
        if response.status_code not in [200, 201]:
            raise Exception(f"Failed to fetch data from API: {response.text}")
        response_data = response.json()
        return response_data
 
    async def download_from_url(self, url, params=None):
        async with httpx.AsyncClient() as client:
            async with client.stream('GET', url, params=params) as response:
                if response.status_code == 200:
                    # 获取文件名
                    content_disposition = response.headers.get('Content-Disposition')
                    if content_disposition:
                        filename = content_disposition.split('filename=')[1].strip('"')
                    else:
                        filename = 'unknown_filename'
 
                    # 获取内容类型
                    content_type = response.headers.get('Content-Type')
 
                    # 读取文件内容
                    content = await response.aread()
                    return content, filename, content_type
                else:
                    raise Exception(f"Failed to download: {response.status_code}")
 
    async def excel_talk_image_download(self, file_id: str):
        url = f"{self.base_url}/exceltalk/download/image"
        return await self.download_from_url(url, params={'images_name': file_id})
 
    async def excel_talk_excel_download(self, file_id: str):
        url = f"{self.base_url}/exceltalk/download/excel"
        return await self.download_from_url(url, params={'excel_name': file_id})
 
    async def excel_talk_upload(self, chat_id: str, filename: str, file_content: bytes):
        url = f"{self.base_url}/exceltalk/upload/files"
        params = {'chat_id': chat_id, 'is_col': '0'}
 
        # 创建 FormData 对象
        files = [('files', (filename, file_content, 'application/octet-stream'))]
 
        async with httpx.AsyncClient() as client:
            response = await client.post(
                url,
                files=files,
                params=params
            )
            return self._check_response(response)
 
    async def excel_talk(self, question: str, chat_id: str):
        url = f"{self.base_url}/exceltalk/talk"
        params = {'chat_id': chat_id}
        data = {"query": question}
        headers = {'Content-Type': 'application/json'}
        buffer = bytearray()
        async with httpx.AsyncClient(timeout=300.0) as client:
            async with client.stream("POST", url, params=params, json=data, headers=headers) as response:
                if response.status_code == 200:
                    try:
                        async for chunk in response.aiter_bytes():
                            json_data = process_buffer(chunk, buffer)
                            if json_data:
                                yield json_data
                                buffer.clear()
                    except GeneratorExit as e:
                        print(e)
                        yield {"message": "内部错误", "type": "close"}
                    finally:
                        # 在所有数据接收完毕后记录日志
                        logger.info("All messages received and processed - over")
                        yield {"message": "", "type": "close"}
 
                else:
                    yield f"Error: {response.status_code}"
 
 
 
    async def questions_talk(self, question, chat_id: str):
        logger.error("---------------questions_talk--------------------------")
        url = f"{self.base_url}/questions/talk"
        params = {'chat_id': chat_id}
        headers = {'Content-Type': 'text/plain'}
        async with httpx.AsyncClient(timeout=1800) as client:
            response = await client.post(
                url,
                data=question,
                headers=headers,
                params=params
            )
            return self._check_response(response)
 
    async def questions_talk_word_download(self, file_id: str):
        url = f"{self.base_url}/questions/download/word"
        return await self.download_from_url(url, params={'word_name': file_id})
 
 
def process_buffer(data, buffer):
    def try_parse_json(data1):
        try:
            return True, json.loads(data1)
        except json.JSONDecodeError:
            return False, None
 
    if data.startswith(b'data:'):
        # 删除 'data:' 头
        data = data[5:].strip()
    else:
        pass
 
    # 直接拼接到缓冲区尝试解析JSON
    buffer.extend(data.strip())
    success, parsed_data = try_parse_json(buffer)
    if success:
        return parsed_data
    else:
        # 解析失败,继续拼接
        return None