import sys import asyncio from PySide6.QtCore import QObject, Signal sys.path.append("..") import websocket import websockets import threading import time # import json # from websocket import WebSocketApp from config.douyin_pb import PushFrame, Response, ChatMessage, GiftMessage import gzip class Zhuaqu(QObject): signal_zhuanqu_liwu_main = Signal(str) def decode_message(self,ws, msg): frame = PushFrame() frame.ParseFromString(msg) ori_byte = gzip.decompress(frame.payload) response = Response() response.ParseFromString(ori_byte) if ws is not None and response.needAck: s = PushFrame() s.payloadType = "ack" s.payload = response.internalExt.encode('utf-8') s.logid = frame.logid ws.send(s.SerializeToString()) for item in response.messagesList: if item.method == "WebcastGiftMessage": message = GiftMessage() message.ParseFromString(item.payload) info = f"礼物>{message.user.nickname}>赠送: {message.totalCount}个{message.gift.name}>secid:{message.user.sec_uid}" # print(info) elif item.method == "WebcastChatMessage": message = ChatMessage() message.ParseFromString(item.payload) info = f"弹幕>{message.user.nickname}>说: {message.content}>secid:{message.user.sec_uid}" # print(info) self.signal_zhuanqu_liwu_main.emit(info) # else: # # print(item.method) # continue def on_message(self,ws, message): self.decode_message(ws,message) def on_error(self,ws, error): # print("Error:", error) print('失败') def on_close(self,ws): print("Connection closed") def on_open(self,ws): print("Connection established") # 在新线程中启动新服务 def start(self,ttwid,websocket_url): print(ttwid) print(websocket_url) headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Edg/129.0.0.0", } # 替换为抓包获得的WebSocket地址 ws = websocket.WebSocketApp(websocket_url, # header=headers, cookie=f"ttwid={ttwid}", on_message=self.on_message, on_error=self.on_error, on_close=self.on_close) ws.on_open = self.on_open ws.run_forever() # 第二个服务 async def send_messages(websocket): global info_send count = 0 while True: if info_send: await websocket.send(info_send) info_send=None # message = f"消息 {count}" # await websocket.send(message) # print(f"发送消息:{message}") # count += 1 await asyncio.sleep(1) async def handler(websocket, path): try: await send_messages(websocket) except websockets.exceptions.ConnectionClosedError: print("连接已关闭") def w_start(): asyncio.set_event_loop(asyncio.new_event_loop()) # 启动 WebSocket 服务端并等待连接 start_server = websockets.serve(handler, "localhost", 9000) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever() def start_second_websocket(): print('第二个服务已经启动===') thread2 = threading.Thread(target=w_start) thread2.start() time.sleep(100) # if __name__ == "__main__": # Zhua_qu().zhuaqu_start()