网络编程之python-socket编程

 对于python的socket编程,在之前仅有非常基础的了解。通常来说我们会利用以下的代码完成我们的socket编程。

1
2
3
4
5
6
7
while True:
# 接受一个新连接:
sock, addr = s.accept() # 阻塞
# 直接进行一些操作
# or 创建新线程来处理TCP连接:
t = threading.Thread(target=tcplink, args=(sock, addr))
t.start()

 直接进行一些操作将会造成非常严重的阻塞问题,直观的来看,似乎采用多线程/进程的方式可以解决这个问题,但是其实并不然如果要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而线程与进程本身也更容易进入假死状态。1

 很多人可能会采用线程池、进程池的方法减少创建和销毁进程线程的次数,降低系统因为销毁或者创建进程/线程消耗的系统资源。但是在大规模的系统中仍然可能会遇到瓶颈。所以这里我们会采用非阻塞式接口来解决这个问题。

一 非阻塞IO

 在python-socket编程中,设置setblocking的值为False(默认为True),那么现在accept()将不再阻塞。常规的非阻塞式IO的代码如下 2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
while True:
try:
conn, addr = sock.accept() # 被动接受TCP客户的连接,等待连接的到来,收不到时会报异常
print('connect by ', addr)
conn_list.append(conn)
conn.setblocking(False) # 设置非阻塞
except BlockingIOError as e:
pass

tmp_list = [conn for conn in conn_list]
for conn in tmp_list:
try:
data = conn.recv(1024) # 接收数据1024字节
if data:
print('收到的数据是{}'.format(data.decode()))
conn.send(data)
else:
print('close conn',conn)
conn.close()
conn_list.remove(conn)
print('还有客户端=>',len(conn_list))
except IOError:
pass

 但是从上述的代码我们可以观察到,虽然非阻塞式IO解决了阻塞的问题,一个进程可以同时干其他的任务,但是非阻塞式IO是非常不被推荐的,由于不断地while(True)可能会导致CPU资源占满,无故消耗了许多不需要消耗的系统资源。

二 多路复用IO

select模型

 把socket交给操作系统去监控,相当于找个代理人(select), 去收快递。快递到了,就通知用户,用户自己去取。

 阻塞I/O只能阻塞一个I/O操作,而I/O复用模型能够阻塞多个I/O操作,所以才叫做多路复用

 使用select以后最大的优势是用户可以在一个线程内同时处理多个socket的IO请求。用户可以注册多个socket,然后不断地调用select读取被激活的socket,即可达到在同一个线程内同时处理多个IO请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
s_list = [s.fileno(),]  #fileno()获取套接字的唯一描述符,每个套接字都是唯一不同的
s_dict = {}

while 1:
list_readable,a,b = select(s_list,[],[]) #分别对应: 输入,输出,错误输出
for i in list_readable:
if i == s.fileno():
conn,userinfo = s.accept()
s_list.append(conn.fileno())
s_dict[conn.fileno()] = conn
else:
cs = s_dict[i]
recv_data = cs.recv(1024)
if len(recv_data) <= 0:
s_dict[i].close()
s_dict.pop(i)
s_list.remove(i)
else:
cs.send(recv_data)

 但是select模型的复杂度较高,每次会不断的轮询所负责的所有socket描述符,当某个socket有数据到达了,就通知用户进程。所以select模型主要存在以下的问题 3

  1. 最大并发数限制,因为一个进程所打开的 FD (文件描述符)是有限制的,由 FD_SETSIZE 设置,默认值是 1024/2048 ,因此 Select 模型的最大并发数就被相应限制了。当然我们也可以采用修改FD_SETSIZE从而增加最大并发数。
  2. 效率问题, select 每次调用都会线性扫描全部的 FD 集合,这样效率就会呈现线性下降,把 FD_SETSIZE 改大的后果就是,大家都慢慢来,什么?都超时了。
  3. 内核 / 用户空间 内存拷贝问题,如何让内核把 FD 消息通知给用户空间呢?在这个问题上 select 采取了内存拷贝方法,在FD非常多的时候,非常的耗费时间。

 上述三点可以总结为:1.连接数受限 2.查找配对速度慢 3.数据由内核拷贝到用户态消耗时间

epoll模型

 epoll模型是对select模型的改进,其效率非常高,但仅可用于Unix/Linux操作系统。由于其非常重要,这里完整摘取python源码。 4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#创建socket对象
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#设置IP地址复用
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
#ip地址和端口号
server_address = ("127.0.0.1", 8888)
#绑定IP地址
serversocket.bind(server_address)
#监听,并设置最大连接数
serversocket.listen(10)
print "服务器启动成功,监听IP:" , server_address
#服务端设置非阻塞
serversocket.setblocking(False)
#超时时间
timeout = 10
#创建epoll事件对象,后续要监控的事件添加到其中
epoll = select.epoll()
#注册服务器监听fd到等待读事件集合
epoll.register(serversocket.fileno(), select.EPOLLIN)
#保存连接客户端消息的字典,格式为{}
message_queues = {}
#文件句柄到所对应对象的字典,格式为{句柄:对象}
fd_to_socket = {serversocket.fileno():serversocket,}

while True:
print "等待活动连接......"
#轮询注册的事件集合,返回值为[(文件句柄,对应的事件),(...),....]
events = epoll.poll(timeout)
if not events:
print "epoll超时无活动连接,重新轮询......"
continue
print "有" , len(events), "个新事件,开始处理......"

for fd, event in events:
socket = fd_to_socket[fd]
#如果活动socket为当前服务器socket,表示有新连接
if socket == serversocket:
connection, address = serversocket.accept()
print "新连接:" , address
#新连接socket设置为非阻塞
connection.setblocking(False)
#注册新连接fd到待读事件集合
epoll.register(connection.fileno(), select.EPOLLIN)
#把新连接的文件句柄以及对象保存到字典
fd_to_socket[connection.fileno()] = connection
#以新连接的对象为键值,值存储在队列中,保存每个连接的信息
message_queues[connection] = Queue.Queue()
#关闭事件
elif event & select.EPOLLHUP:
print 'client close'
#在epoll中注销客户端的文件句柄
epoll.unregister(fd)
#关闭客户端的文件句柄
fd_to_socket[fd].close()
#在字典中删除与已关闭客户端相关的信息
del fd_to_socket[fd]
#可读事件
elif event & select.EPOLLIN:
#接收数据
data = socket.recv(1024)
if data:
print "收到数据:" , data , "客户端:" , socket.getpeername()
#将数据放入对应客户端的字典
message_queues[socket].put(data)
#修改读取到消息的连接到等待写事件集合(即对应客户端收到消息后,再将其fd修改并加入写事件集合)
epoll.modify(fd, select.EPOLLOUT)
else:
print 'closing', address, 'after reading no data'
epoll.unregister(fd)
connections[fd].close()
del connections[fd]
#可写事件
elif event & select.EPOLLOUT:
try:
#从字典中获取对应客户端的信息
msg = message_queues[socket].get_nowait()
except Queue.Empty:
print socket.getpeername() , " queue empty"
#修改文件句柄为读事件
epoll.modify(fd, select.EPOLLIN)
else :
print "发送数据:" , data , "客户端:" , socket.getpeername()
#发送数据
socket.send(msg)

#在epoll中注销服务端文件句柄
epoll.unregister(serversocket.fileno())
#关闭epoll
epoll.close()
#关闭服务器socket
serversocket.close()

 但客户端的代码基本与其他模型无疑,这里对客户端代码做基本记录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#创建客户端socket对象
clientsocket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
#服务端IP地址和端口号元组
server_address = ('127.0.0.1',8888)
#客户端连接指定的IP地址和端口号
clientsocket.connect(server_address)

while True:
#输入数据
data = raw_input('please input:')
#客户端发送数据
clientsocket.sendall(data)
#客户端接收数据
server_data = clientsocket.recv(1024)
print '客户端收到的数据:'server_data
#关闭客户端socket
clientsocket.close()

 通过上述的这些样例,大致了解了python如何利用内核模型进行编程,但是由于缺少编程经验,暂时尚未掌握这些操作在实习开发时如何运用。希望未来能对这个领域进行更深一步的理解,届时将再次更新总结对内核模型的理解。

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×