天下程序员首页cx06.com
管理 |  登录 |  注册
袁鹏飞的小屋
笔记(共39个) > python

python拿某音数据socket,同步版

阅读 0赞 0回复 2024-10-07 17:16:38
import sys
import asyncio
from PySide6.QtCore import QObject, Signal
sys.path.append("..")
import websocket


import websockets
import threading
import time
# import json
# from websocket import WebSocketApp
from config.douyin_pb import PushFrame, Response, ChatMessage, GiftMessage
import gzip
class Zhuaqu(QObject):
    signal_zhuanqu_liwu_main = Signal(str)
    def decode_message(self,ws, msg):
        frame = PushFrame()
        frame.ParseFromString(msg)
        ori_byte = gzip.decompress(frame.payload)
        response = Response()
        response.ParseFromString(ori_byte)
        if ws is not None and response.needAck:
            s = PushFrame()
            s.payloadType = "ack"
            s.payload = response.internalExt.encode('utf-8')
            s.logid = frame.logid
            ws.send(s.SerializeToString())
        for item in response.messagesList:
            if item.method == "WebcastGiftMessage":
                message = GiftMessage()
                message.ParseFromString(item.payload)
                info = f"礼物>{message.user.nickname}>赠送: {message.totalCount}个{message.gift.name}>secid:{message.user.sec_uid}"
                # print(info)
                
            elif item.method == "WebcastChatMessage":
                message = ChatMessage()
                message.ParseFromString(item.payload)
                info = f"弹幕>{message.user.nickname}>说: {message.content}>secid:{message.user.sec_uid}"
                # print(info)
            self.signal_zhuanqu_liwu_main.emit(info)
            # else:
            #     # print(item.method)
            #     continue
    def on_message(self,ws, message):
        self.decode_message(ws,message)
    def on_error(self,ws, error):
        # print("Error:", error)
        print('失败')
    def on_close(self,ws):
        print("Connection closed")
    def on_open(self,ws):
        print("Connection established")
        # 在新线程中启动新服务
    def start(self,ttwid,websocket_url):
        print(ttwid)
        print(websocket_url)
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Edg/129.0.0.0",
        }
        # 替换为抓包获得的WebSocket地址
        ws = websocket.WebSocketApp(websocket_url,
            # header=headers,
            cookie=f"ttwid={ttwid}",
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close)
        ws.on_open = self.on_open
        ws.run_forever()

# 第二个服务

async def send_messages(websocket):
    global info_send
    count = 0
    while True:
        
        if info_send:
            await websocket.send(info_send)
            info_send=None
        # message = f"消息 {count}"
        # await websocket.send(message)
        # print(f"发送消息:{message}")
        # count += 1
        await asyncio.sleep(1)
async def handler(websocket, path):
    try:
        await send_messages(websocket)
    except websockets.exceptions.ConnectionClosedError:
        print("连接已关闭")
def w_start():
    
    asyncio.set_event_loop(asyncio.new_event_loop())
    # 启动 WebSocket 服务端并等待连接
    start_server = websockets.serve(handler, "localhost", 9000)
    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()
def start_second_websocket():
    print('第二个服务已经启动===')
    thread2 = threading.Thread(target=w_start)
    thread2.start()
    time.sleep(100)


# if __name__ == "__main__":
#     Zhua_qu().zhuaqu_start()



赞(0)

文章作者置顶的回复

全部回复列表 当前第(1)页

添加回复,文明发言,会审核.(服务区回复可以发广告)

作者最新笔记
天下程序员 www.cx06.com 程序员的网上家园!
作者微信:13126507001