跳转至

Python 请求库

参考:https://medium.com/@louis.rosevi/python-web-sockets-5-different-ways-5ffb1a9015f9

websockets

https://websockets.readthedocs.io

pip install websockets

server

import asyncio
import websockets

"""
1. 编写异步任务处理函数 handler
2. 创建 server 对象
3. 异步运行 server 对象
4. 启动服务端:python server.py
"""

"""
handler基础写法
客户端断开连接时,服务端会引发ConnectionClosedOK异常
异常信息会对服务端日志产生噪音
"""
# async def handler(websocket):
#     while True:
#         message = await websocket.recv()
#         print(message)
"""
handler改进写法,捕获异常
"""
# async def handler(websocket):
#     while True:
#         try:
#             message = await websocket.recv()
#         except websockets.ConnectionClosedOK:
#             break
#         print(message)
"""
handler最优写法
websockets 提供了快捷方式接收信息,无需自己处理异常
"""
async def handler(websocket):
    async for message in websocket:
        print(message)


async def main():
    """
    main() 协程调用 serve() 来启动 Websockets 服务器
    创建一个 Future 可等待对象,暂停协程,等待 Future 对象完成
    但没有任何机制设置其完成状态(比如 set_result 或 set_exception),所以将永远处于等待状态
    即 WebSockets 服务会持续运行,直到程序被外部信号终止
    """
    async with websockets.serve(handler, "localhost", 8001):
        await asyncio.Future()


if __name__ == "__main__":
    """入口点:创建一个 asyncio 事件循环,运行 main() 协程,然后关闭循环"""
    asyncio.run(main())

client

  • 命令行
python -m websockets ws://localhost:8001
  • 代码实现
import asyncio
import websockets

"""
1. 建立与服务器的连接:connect()
2. 收发消息:recv(), send()
3. 关闭连接:close()
"""

async def hello():
    uri = "ws://localhost:8001"
    async with websockets.connect(uri) as websocket:
        await websocket.send("hello")

        # res = await websocket.recv()
        # print(res)

if __name__ == "__main__":
    asyncio.run(hello())

websocket-client

主要用于创建 WebSocket 客户端,且不支持异步,即不兼容 aioredis 等异步库

pip install websocket-client

大多数教程中使用的测试链接已经停止服务:ws://echo.websocket.org,有人搭建了新的替代服务:ws://echo.websocket.events(UI界面: https://echo.websocket.events/.ws),也可以自己搭建:https://github.com/jmalloc/echo-server

短连接

# 创建连接,方式1
# import websocket
# # websocket.enableTrace(True)  # 查看详细连接信息:request/response header、Sent/Rcv raw/decoded
# ws = websocket.WebSocket()
# ws.connect("ws://echo.websocket.events")


# 创建连接,方式2
from websocket import create_connection
ws = create_connection("ws://echo.websocket.events/")

print(ws.getstatus())  # 获取连接状态

print("Sending...")
ws.send("Hello, World")  # 发送请求

result =  ws.recv()  # 返回结果
print("Received '%s'" % result)

ws.close()  # 断开连接


# 输出结果
"""
101
Sending...
Received 'echo.websocket.events sponsored by Lob.com'
"""

使用with自动关闭消息

from contextlib import closing
from websocket import create_connection

with closing(create_connection("wss://testnet-explorer.binance.org/ws/block")) as conn:
    conn.send("Hello, World")
    print(conn.recv())

长连接

import json
import ssl
import sys
import time
from threading import Thread

import websocket

payload = json.dumps({
    "op": "subscribe",
    "args": [{
        "channel": "mark-price",
        "instId": "BTC-USDT"
    }]
})


# 用于接收消息
def on_message(ws, message):
    print(message)


# 用于接收错误异常
def on_error(ws, error):
    print(error)


# 用于关闭连接
def on_close(ws, close_status_code, close_msg):
    print("### closed ###")


# 用于保持连接
def on_open(ws):
    def run(*args):
        for i in range(3):
            ws.send("ping")
            time.sleep(1)

        ws.send(payload)
        time.sleep(1)
        ws.close()
        print("Thread terminating...")
    Thread(target=run).start()



if __name__ == "__main__":
    # websocket.enableTrace(True)  # 显示详细信息
    host = "wss://wspap.okx.com:8443/ws/v5/public?brokerId=9999" if len(sys.argv) < 2 else sys.argv[1]
    ws = websocket.WebSocketApp(
        host,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
        # header=headers,
    )
    ws.on_open = on_open
    ws.run_forever(
        sslopt={"cert_reqs": ssl.CERT_NONE},  # 禁用ssl
    )