百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

Python Socket.IO服务端开发 python3 socketserver

itomcoil 2024-12-30 04:42 35 浏览

Python Socket.IO服务端开发

1.安装

pip install python-socketio

在该模块中,包括两个 Socket.IO server 类:

  • socketio.Server()

创建与Python标准库兼容的服务器。

  • socketio.AsyncServer()

创建与异步包兼容的服务器。

这两个服务器中的方法是相同的,唯一的区别是在异步服务器中,大多数方法都是作为协程实现的。

2.创建 Server 实例

Socket IO服务器是 socketio.server 类的实例。通过使用 socketio 包装该实例,可以将其转换为标准的 WSGI 应用程序。以下是 WSGIApp 类:

import socketio

# create a Socket.IO server
sio = socketio.Server()

# wrap with a WSGI application
app = socketio.WSGIApp(sio)

对于基于异步的服务器,socketio.AsyncServer 类提供相同的功能,但采用协程的格式。如果需要,socketio.ASGIApp 类可以将服务器转换为标准的 ASGI 应用程序:

# create a Socket.IO server
sio = socketio.AsyncServer()

# wrap with ASGI application
app = socketio.ASGIApp(sio)

这两个包装器还可以充当中间件,转发任何不打算发送到套接字的流量。IO服务器连接到另一个应用程序。这允许 Socket.IO 服务器可轻松集成到现有WSGI或ASGI应用程序中:

from wsgi import app  # a Flask, Django, etc. application
app = socketio.WSGIApp(sio, app)

3.事件

3.1定义事件

Socket.IO 协议是基于事件的。当客户端想要与服务器通信时,它会发出一个事件。每个事件都有一个名称和一个参数列表。服务器向 socketio.server 注册事件处理程序函数可以通过 event() 或 socketio.Server.on()装饰器:

@sio.event
def my_event(sid, data):
    pass

@sio.on('my custom event')
def another_event(sid, data):
    pass

第一个示例中,事件名称是从处理程序函数的名称获得的。

第二个示例稍微详细一些,但它允许事件名称与函数名称不同,或者在函数名称中包含非法字符,例如空格。

对于异步服务器,可以选择将事件处理程序作为协程提供:

@sio.event
async def my_event(sid, data):
    pass

sid 参数是 Socket.IO会话id,每个客户端连接的唯一标识符。给定客户端发送的所有事件都将具有相同的 sid 值。

3.2捕获所有事件

对于没有事件处理程序的任何事件,都会调用“全部捕获”事件处理程序。您可以使用 “*” 作为事件名称来定义catch-all处理程序:

@sio.on('*')
def catch_all(event, sid, data):
    pass

异步服务器也可以使用协程:

@sio.on('*')
async def catch_all(event, sid, data):
   pass

捕获所有事件处理程序接收事件名称作为第一个参数。其余参数与常规事件处理程序相同。

connect和disconnect事件必须明确定义,并且不能在catch-all事件处理程序上调用。

3.3 Connect 与 Disconnect 事件

连接和断开事件是特殊的;当客户端连接或断开与服务器的连接时,它们被自动调用:

@sio.event
def connect(sid, environ, auth):
    print('connect ', sid)

@sio.event
def disconnect(sid):
    print('disconnect ', sid)

connect 事件是执行用户身份验证以及应用程序中用户实体与分配给客户端的 sid 之间的任何必要映射的理想场所。environ 参数是一个标准 WSGI 格式的字典,包含请求信息,包括HTTP头。auth 参数包含客户端传递的任何身份验证详细信息,如果客户端未传递任何信息,则为 None。检查请求后,连接事件处理程序可以返回 False 以拒绝与客户端的连接。

有时,将数据传回被拒绝的客户机是有用的。在这种情况下,不要返回 False 可以引发socketio.exceptions.ConnectionRejectedError,并且它的所有参数都将与拒绝消息一起发送到客户端:

@sio.event
def connect(sid, environ):
    raise ConnectionRefusedError('authentication failed')

3.4 发送事件

Socket.IO是一种双向协议,因此服务器可以随时向其连接的客户端发送事件。socketio.Server.emit()方法用于此任务:

sio.emit('my event', {'data': 'foobar'})

有时,服务器可能只想将事件发送给特定的客户端。这可以通过向 emit 调用添加一个 room 参数来实现:

sio.emit('my event', {'data': 'foobar'}, room=user_sid)

socketio.Server.emit() 方法接受事件名称、str 类型的消息有效载荷、字节、列表、dict 或 tuple 以及收件人房间。发送元组时,其中的元素必须是其他四种允许的类型中的任何一种。元组的元素将作为多个参数传递给客户端事件处理程序函数。room 参数用于标识应该接收事件的客户端,并设置为分配给该客户端与服务器连接的 sid值。如果省略,则向所有连接的客户端广播事件。

3.5 事件回调

当客户端向服务器发送事件时,它可以选择提供回调函数,作为确认服务器已处理该事件的一种方式调用该函数。虽然这完全由客户端管理,但服务器可以提供要传递给回调函数的值的列表,只需从处理程序函数返回这些值即可:

@sio.event
def my_event(sid, data):
    # handle the message
    return "OK", 123

同样,服务器可以请求在客户端处理事件后调用回调函数。socketio.Server.emit() 方法有一个可选的回调参数,可以设置为可调用。如果给定此参数,则将在客户端处理事件后调用可调用函数,客户端返回的任何值都将作为参数传递给此函数。不建议在向多个客户端广播时使用回调函数,因为回调函数将为接收消息的每个客户端调用一次。

4.名称空间

Socket IO协议支持多个逻辑连接,所有逻辑连接都在同一物理连接上复用。客户端可以通过在每个连接上指定不同的命名空间来打开多个连接。客户端将名称空间作为主机名和端口之后的路径名http://example.com:8000/chat将打开到命名空间 /chat 的连接。

每个名称空间都独立于其他名称空间进行处理,具有单独的会话ID(sid)、事件处理程序和文件。使用多个命名空间的应用程序在设置其事件处理程序时,使用 socketio 中所有方法中可用的可选命名空间参数指定正确的命名空间,这一点很重要。服务器类:

@sio.event(namespace='/chat')
def my_custom_event(sid, data):
    pass

@sio.on('my custom event', namespace='/chat')
def my_custom_event(sid, data):
    pass

发出事件时,命名空间可选参数用于指定发送事件的命名空间。如果省略了命名空间参数,则使用默认的Socket,使用了名为 / 的 I O命名空间。

4.1 基于类的命名空间

作为基于装饰器的事件处理程序的替代,可以将属于命名空间的事件处理器创建为 socketio 子类的方法。命名空间:

class MyCustomNamespace(socketio.Namespace):
    def on_connect(self, sid, environ):
        pass

    def on_disconnect(self, sid):
        pass

    def on_my_event(self, sid, data):
        self.emit('my_response', data)

sio.register_namespace(MyCustomNamespace('/test'))

对于基于异步的服务器,命名空间必须继承自socketio.AsyncNamespace,如果需要,可以将事件处理程序定义为协程:

class MyCustomNamespace(socketio.AsyncNamespace):
    def on_connect(self, sid, environ):
        pass

    def on_disconnect(self, sid):
        pass

    async def on_my_event(self, sid, data):
        await self.emit('my_response', data)

sio.register_namespace(MyCustomNamespace('/test'))

当使用基于类的名称空间时,服务器接收到的任何事件都会被分派到一个名为事件名称的方法,该方法带有_前缀。例如,事件my_event将由名为on_my_event的方法处理。如果收到的事件在命名空间类中没有定义相应的方法,则忽略该事件。在基于类的命名空间中使用的所有事件名称必须使用方法名称中合法的字符。

为了方便在基于类的命名空间中定义方法,命名空间实例包括 socketio 的几个方法的版本。服务器和套接字。当未给定命名空间参数时,默认为正确命名空间的AsyncServer类。

如果事件在基于类的命名空间中有一个处理程序,并且还有一个基于装饰器的函数处理程序,则只调用独立的函数处理。

需要注意的是,基于类的命名空间是单体的。这意味着命名空间类的单个实例用于所有客户端,因此,命名空间实例不能用于存储客户端特定的信息。

5.Rooms

为了便于服务器向相关客户机组发出事件,应用程序可以将其客户机放入“房间”,然后向这些房间发送消息。

在上一节中,socketio.socketio 的 room参数。emit() 方法用于将特定客户端指定为事件的接收方。这是因为在连接时,将为每个客户端创建一个个人房间,并使用分配给连接的sid进行命名。然后,应用程序可以自由创建其他房间,并使用socketio.Server管理其中的客户端 enter_room()和 leave_room() 方法。客户可以根据需要进入任意多个房间,并且可以根据需要在房间之间移动。

@sio.event
def begin_chat(sid):
    sio.enter_room(sid, 'chat_users')

@sio.event
def exit_chat(sid):
    sio.leave_room(sid, 'chat_users')

在聊天应用中,通常希望将事件广播给房间的所有成员,除了一个成员,该成员是诸如聊天消息之类的事件的发起者。socketio.Server.emit() 方法提供了一个可选的 skip_sid 参数,以指示在广播过程中应该跳过的客户端。

@sio.event
def my_message(sid, data):
    sio.emit('my reply', data, room='chat_users', skip_sid=sid)

6.User Sessions

服务器可以在专用于每个连接的客户端的用户会话中维护应用程序特定信息。应用程序可以使用用户会话来编写在连接的整个生命周期中需要保存的用户的任何详细信息,例如用户名或用户ID。

save_session() 和 get_session() 方法用于存储和检索用户会话中的信息:

@sio.event
def connect(sid, environ):
    username = authenticate_user(environ)
    sio.save_session(sid, {'username': username})

@sio.event
def message(sid, data):
    session = sio.get_session(sid)
    print('message from ', session['username'])

对于异步服务器,这些方法是协程:

@sio.event
async def connect(sid, environ):
    username = authenticate_user(environ)
    await sio.save_session(sid, {'username': username})

@sio.event
async def message(sid, data):
    session = await sio.get_session(sid)
    print('message from ', session['username'])

会话也可以使用 session() 上下文管理器进行操作:

@sio.event
def connect(sid, environ):
    username = authenticate_user(environ)
    with sio.session(sid) as session:
        session['username'] = username

@sio.event
def message(sid, data):
    with sio.session(sid) as session:
        print('message from ', session['username'])

对于异步服务器,使用异步上下文管理器:

@sio.event
def connect(sid, environ):
    username = authenticate_user(environ)
    async with sio.session(sid) as session:
        session['username'] = username

@sio.event
def message(sid, data):
    async with sio.session(sid) as session:
        print('message from ', session['username'])

get_session()、save_session() 和 session() 方法采用可选的命名空间参数。如果未提供此参数,会话将附加到默认命名空间。

注意:当客户端断开连接时,用户会话的内容将被销毁。特别是,当客户端在意外断开与服务器的连接后重新连接时,不会保留用户会话内容。

7.部署

7.1 Tornado

Tornado 是一个支持 HTTP 和 WebSocket 的 web 框架。支持此框架需要 Python 3.5 及更高版本。由于与asyncio 的紧密集成,仅支持 Tornado 版本5和更高版本。

如果安装了 Tornado 库,类 socketio.AsyncServer将自动使用 Tornado 进行异步操作。要显式请求使用它,可以在构造函数中提供 async_mode 选项:

sio = socketio.AsyncServer(async_mode='tornado')

为 Socket IO 配置的服务器必须包含Socket.IO的请求处理程序:

app = tornado.web.Application(
    [
        (r"/socket.io/", socketio.get_tornado_handler(sio)),
    ],
    # ... other application options
)

Tornado 应用程序可以定义与 Socket.IO 服务器共存的其他路由。典型的模式是添加服务于客户端应用程序和任何相关静态文件的路由。

Tornado 应用程序随后以通常的方式执行:

app.listen(port)
tornado.ioloop.IOLoop.current().start()

7.2 标准线程

虽然在性能方面不能与 eventlet 和 gevent 相比,但 Socket.IO 服务器也可以配置为使用标准 Python 线程的多线程 web 服务器。这是用于 Werkzeug 等开发服务器的理想设置。

类 socketio 的实例,如果未安装 eventlet 和 gevent,服务器将自动使用线程模式。要显式请求线程模式,可以在构造函数中提供async_mode选项:

sio = socketio.Server(async_mode='threading')

配置用于线程的服务器作为常规 web 应用程序部署,使用任何 WSGI 的多线程服务器。下面的示例使用 Flask 基于 Werkzeug 开发的 web 服务器部署 Socket.IO 应用程序和 Flask web 应用程序:

sio = socketio.Server(async_mode='threading')
app = Flask(__name__)
app.wsgi_app = socketio.WSGIApp(sio, app.wsgi_app)

# ... Socket.IO and Flask handler functions ...

if __name__ == '__main__':
    app.run()

面的示例显示了如何启动 Socket。使用 Gunicorn 线程类的 IO 应用程序:

$ gunicorn -w 1 --threads 100 module:app

使用上述配置,服务器将能够处理多达100个并发客户端。

使用标准线程时,WebSocket 通过简单的 WebSocket 包得到支持,必须单独安装。这个包提供了一个多线程WebSocket 服务器,它与 Werkzeug 和 Gunicorn 的线程工作程序兼容。其他多线程 web 服务器不受支持,也不会启用 WebSocket 传输。

8.跨域控制

出于安全原因,默认情况下,服务器强制执行同源策略。实际上,这意味着:

  • 如果传入的HTTP或WebSocket请求包含Origin标头,则此标头必须与连接URL的方案和主机匹配。如果不匹配,则返回400状态代码响应,并拒绝连接。
  • 对不包含Origin标头的传入请求不施加任何限制。

如果需要,可以使用 cors_allowed_origins 选项来允许其他原点。此参数可以设置为字符串以设置单个允许的原点,也可以设置为列表以允许多个原点。可以使用特殊值“*”指示服务器允许所有来源,但这应该小心,因为这可能会使服务器容易受到跨站点请求伪造(CSRF)攻击。

如:

sio = socketio.AsyncServer(async_mode='tornado', cors_allowed_origins='*')

9.示例

server.py

# coding: utf-8
import socketio
from logger import logger
import ask.ask as ask
import globals


sio = socketio.AsyncServer(async_mode='tornado', cors_allowed_origins='*')

@sio.event
async def connect(sid,  environ, auth):
    logger.info('The {0} connect'.format(sid))
    sio.enter_room(sid, room='ROOM_%s' % sid)

@sio.event
async def disconnect(sid):
    logger.info('The {0} disconnect'.format(sid))
    sio.leave_room(sid, room='ROOM_%s' % sid);
    if sid in globals.clients.keys(): del globals.clients[sid]

@sio.on('orgs')
async def orgs(sid, data):
    logger.info('The {0} orgs'.format(sid))
    await sio.emit("orgs", ask.ask_orgs(), room='ROOM_%s' % sid)

@sio.on('storehouses')
async def storehouses(sid, data):
    logger.info('The {0} storehouses of org_id = {1}'.format(sid, data))
    await sio.emit("storehouses", ask.ask_storehouses(data), room='ROOM_%s' % sid)

@sio.on('register')
async def register(sid, data):
    logger.info('The {0} register: {1}'.format(sid, data))
    globals.clients[sid] = data.get('store_house_id')
    await sio.emit("envs", ask.register(data.get('store_house_id')), room='ROOM_%s' % sid)

@sio.on('queryAlarms')
async def alarms(sid, data):
    logger.info('The {0} register: {1}'.format(sid, data))
    await sio.emit("queryAlarms", ask.alarmsByTime(data.get('store_house_id'), data.get('data_minute')), room='ROOM_%s' % sid)

main.py

# coding: utf-8
import sys
import tornado.web
import tornado.ioloop
import socketio
from server import sio
import settings
from logger import logger


if __name__ == '__main__':
    if len(sys.argv) != 2:
        print('Usage: main.py <port>')
        sys.exit(0)

    port = int(sys.argv[1])
    logger.info('Starting at {0}'.format(port))

    app = tornado.web.Application(
        [
            (r"/socket.io/", socketio.get_tornado_handler(sio)),
        ]
    )

    app.listen(port)
    tornado.ioloop.IOLoop.current().start()

在异步服务器中,如果使用一个新的线程定期向客户端发送信息,在线程中调用 sio.emit() 方法时需要采用如下方法:

loop = asyncio.new_event_loop()
loop.run_until_complete(sio.emit("envs", envs, room='ROOM_%s' % socketId))
loop.close()



相关推荐

selenium(WEB自动化工具)

定义解释Selenium是一个用于Web应用程序测试的工具。Selenium测试直接运行在浏览器中,就像真正的用户在操作一样。支持的浏览器包括IE(7,8,9,10,11),MozillaF...

开发利器丨如何使用ELK设计微服务中的日志收集方案?

【摘要】微服务各个组件的相关实践会涉及到工具,本文将会介绍微服务日常开发的一些利器,这些工具帮助我们构建更加健壮的微服务系统,并帮助排查解决微服务系统中的问题与性能瓶颈等。我们将重点介绍微服务架构中...

高并发系统设计:应对每秒数万QPS的架构策略

当面试官问及"如何应对每秒几万QPS(QueriesPerSecond)"时,大概率是想知道你对高并发系统设计的理解有多少。本文将深入探讨从基础设施到应用层面的解决方案。01、理解...

2025 年每个 JavaScript 开发者都应该了解的功能

大家好,很高兴又见面了,我是"高级前端进阶",由我带着大家一起关注前端前沿、深入前端底层技术,大家一起进步,也欢迎大家关注、点赞、收藏、转发。1.Iteratorhelpers开发者...

JavaScript Array 对象

Array对象Array对象用于在变量中存储多个值:varcars=["Saab","Volvo","BMW"];第一个数组元素的索引值为0,第二个索引值为1,以此类推。更多有...

Gemini 2.5编程全球霸榜,谷歌重回AI王座,神秘模型曝光,奥特曼迎战

刚刚,Gemini2.5Pro编程登顶,6美元性价比碾压Claude3.7Sonnet。不仅如此,谷歌还暗藏着更强的编程模型Dragontail,这次是要彻底翻盘了。谷歌,彻底打了一场漂亮的翻...

动力节点最新JavaScript教程(高级篇),深入学习JavaScript

JavaScript是一种运行在浏览器中的解释型编程语言,它的解释器被称为JavaScript引擎,是浏览器的一部分,JavaScript广泛用于浏览器客户端编程,通常JavaScript脚本是通过嵌...

一文看懂Kiro,其 Spec工作流秒杀Cursor,可移植至Claude Code

当Cursor的“即兴编程”开始拖累项目质量,AWS新晋IDEKiro以Spec工作流打出“先规范后编码”的系统工程思维:需求-设计-任务三件套一次生成,文档与代码同步落地,复杂项目不...

「晚安·好梦」努力只能及格,拼命才能优秀

欢迎光临,浏览之前点击上面的音乐放松一下心情吧!喜欢的话给小编一个关注呀!Effortscanonlypass,anddesperatelycanbeexcellent.努力只能及格...

JavaScript 中 some 与 every 方法的区别是什么?

大家好,很高兴又见面了,我是姜茶的编程笔记,我们一起学习前端相关领域技术,共同进步,也欢迎大家关注、点赞、收藏、转发,您的支持是我不断创作的动力在JavaScript中,Array.protot...

10个高效的Python爬虫框架,你用过几个?

小型爬虫需求,requests库+bs4库就能解决;大型爬虫数据,尤其涉及异步抓取、内容管理及后续扩展等功能时,就需要用到爬虫框架了。下面介绍了10个爬虫框架,大家可以学习使用!1.Scrapysc...

12个高效的Python爬虫框架,你用过几个?

实现爬虫技术的编程环境有很多种,Java、Python、C++等都可以用来爬虫。但很多人选择Python来写爬虫,为什么呢?因为Python确实很适合做爬虫,丰富的第三方库十分强大,简单几行代码便可实...

pip3 install pyspider报错问题解决

运行如下命令报错:>>>pip3installpyspider观察上面的报错问题,需要安装pycurl。是到这个网址:http://www.lfd.uci.edu/~gohlke...

PySpider框架的使用

PysiderPysider是一个国人用Python编写的、带有强大的WebUI的网络爬虫系统,它支持多种数据库、任务监控、项目管理、结果查看、URL去重等强大的功能。安装pip3inst...

「机器学习」神经网络的激活函数、并通过python实现激活函数

神经网络的激活函数、并通过python实现whatis激活函数感知机的网络结构如下:左图中,偏置b没有被画出来,如果要表示出b,可以像右图那样做。用数学式来表示感知机:上面这个数学式子可以被改写:...