Python之socket模块使用详解(附带解决丢包、粘包问题)
文章目录一、Socket简介1. Socket一、Socket简介1. Socket网络上的两个程序通过一个双向的通信连接实现数据的交换,这个连接的一端称为一个socket。Socket又称"套接字",应用程序通常通过"套接字"向网络发出请求或者应答网络请求,使主机间或者一台计算机上的进程间可以通讯。他工作于TCP/IP协议中应用层和传输层之间的一个抽象,如下图:...
·
文章目录
一、Socket参数使用介绍
Python使用 socket 模块创建套接字,语法格式如下:
import socket
socket.socket(family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None) # 默认参数
1. socket()参数
family:
socket.AF_INET
- IPv4(默认)socket.AF_INET6
- IPv6socket.AF_UNIX
- 只能够用于单一的Unix系统进程间通信
type:
socket.SOCK_STREAM
- 流式socket, for TCP (默认)socket.SOCK_DGRAM
- 数据报式socket, for UDPsocket.SOCK_RAW
- 原始套接字socket.SOCK_RDM
- 可靠UDP形式socket.SOCK_SEQPACKET
- 可靠的连续数据包服务
2. socket对象内建方法
服务端套接字方法:
s.bind()
- 绑定地址(host,port)到套接字,在AF_INET下,以元组(host,port)的形式表示地址。s.listen()
- 开启TCP监听,操作系统可以挂起的最大连接数量,该值至少为1。s.accept()
- 被动接受TCP客户端连接,(阻塞式)等待连接的到来。
客户端套接字方法:
s.connect()
- 主动初始化TCP服务器连接,一般address的格式为元组(hostname,port),如果连接出错,返回socket.error错误。s.connect_ex()
- connect()函数的扩展版本,出错时返回出错码,而不是抛出异常。
公共套接字方法:
s.recv()
- 接收TCP数据,数据以byte类型返回,bufsize指定要接收的最大数据量。s.send()
- 发送TCP数据,将string中的数据转化为byte类型发送到连接的套接字,返回值是要发送的字节数量,该数量可能小于string的字节大小。s.sendall()
- 发送完整TCP数据,将string中的数据转化为byte类型发送到连接的套接字,但在返回之前会尝试发送所有数据,成功返回None,失败则抛出异常。s.recvfrom()
- 接收UDP数据,与recv()类似,但返回值是(data,address),其中data是包含接收数据的字符串,address是客户端的套接字地址。s.sendto()
- 发送UDP数据,将数据发送到套接字,参数形式为(data,(address,port))的元组,address为远程服务端地址,返回值是发送的字节数。s.close()
- 关闭套接字。s.getpeername()
- 返回连接套接字的远程地址,返回值通常是元组(ipaddr,port)。s.getsockname()
- 返回套接字自己的地址,通常是一个元组(ipaddr,port)。s.setsockopt(level,optname,value)
- 设置给定套接字选项的值。s.getsockopt(level,optname[.buflen])
- 返回套接字选项的值。s.settimeout(timeout)
- 设置套接字操作的超时时间,timeout是一个浮点数,单位是秒s.gettimeout()
- 返回当前超时期的值,单位是秒,如果没有设置超时期,则返回None。s.fileno()
- 返回套接字的文件描述符。s.setblocking(flag)
- 如果flag为0,则将套接字设为非阻塞模式,否则将套接字设为阻塞模式(默认值)。非阻塞模式下,如果调用recv()没有发现任何数据,或send()调用无法立即发送数据,那么将引起socket.error异常。s.makefile()
- 创建一个与该套接字相关连的文件。
二、丢包、粘包问题解决思路和方法
问题:
Socket有一个缓冲区,缓冲区是一个流,先进先出,发送和取出都可以自定义大小,如果缓冲区的数据未取完,则可能会存在数据堆积。其中【recv(1024)】表示从缓冲区里取最大为1024个字节,但实际取值大小是不确定的,可能会导致丢包,socket发送两条连续数据时,也有可能最终会拼接成一条进行发送,所以也会导致粘包问题的产生。
解决的一些办法和思路:
- 在每条数据发送之间增加停顿时间,如【tiem.sleep(0.5) # 延时0.5s】
- 每次发送后等待对方确认接收数据,确认完毕后再发送下一条(加验证),否则重传
- 减少一次性发送和接收数据的大小,理论上buffer size越小丢包或粘包率就越低,建议在1024~10240之间
下面提供一个解决TCP recv丢包的方法:
def receive_socket_info(handle, expected_msg, side='server', do_decode=True, do_print_info=True):
"""
循环接收socket info,判断其返回值,直到指定的值出现为止,防止socket信息粘连,并根据side打印不同的前缀信息
:param handle: socket句柄
:param expected_msg: 期待接受的内容,如果接受内容不在返回结果中,一直循环等待,期待内容可以为字符串,也可以为多个字符串组成的列表或元组
:param side: 默认server端
:param do_decode: 是否需要decode,默认True
:param do_print_info: 是否需要打印socket信息,默认True
:return:
"""
while True:
if do_decode:
socket_data = handle.recv(BUFFER_SIZE).decode()
else:
socket_data = handle.recv(BUFFER_SIZE)
if do_print_info:
current_time = time.strftime('%Y-%m-%d %H:%M:%S')
if side == 'server':
print(f'Server received ==> {current_time} - {socket_data}')
else:
print(f'Client received ==> {current_time} - {socket_data}')
# 如果expected_msg为空,跳出循环
if not expected_msg:
break
if isinstance(expected_msg, (list, tuple)):
flag = False
for expect in expected_msg: # 循环判断每个期待字符是否在返回结果中
if expect in socket_data: # 如果有任意一个存在,跳出循环
flag = True
break
if flag:
break
else:
if expected_msg in socket_data:
break
time.sleep(3) # 每隔3秒接收一次socket
return socket_data
原理就是使用while循环不停的接收socket,直到指定的字符出现为止,再跳出循环,这样可以防止socket丢包,也可以保证socket接收的完整性。
三、构建Socket-TCP传输
1. 客户端配置
代码如下:
# -*- coding:utf-8 -*-
import time
import socket
__author__ = 'Evan'
REMOTE_IP = ('127.0.0.1', 6666)
BUFFER_SIZE = 1024
SOCKET_TIMEOUT_TIME = 60
def send_socket_info(handle, msg, side='server', do_encode=True, do_print_info=True):
"""
发送socket info,并根据side打印不同的前缀信息
:param handle: socket句柄
:param msg: 要发送的内容
:param side: 默认server端
:param do_encode: 是否需要encode,默认True
:param do_print_info: 是否需要打印socket信息,默认True
:return:
"""
if do_encode:
handle.send(msg.encode())
else:
handle.send(msg)
if do_print_info:
current_time = time.strftime('%Y-%m-%d %H:%M:%S')
if side == 'server':
print(f'Server send --> {current_time} - {msg}')
else:
print(f'Client send --> {current_time} - {msg}')
def receive_socket_info(handle, expected_msg, side='server', do_decode=True, do_print_info=True):
"""
循环接收socket info,判断其返回值,直到指定的值出现为止,防止socket信息粘连,并根据side打印不同的前缀信息
:param handle: socket句柄
:param expected_msg: 期待接受的内容,如果接受内容不在返回结果中,一直循环等待,期待内容可以为字符串,也可以为多个字符串组成的列表或元组
:param side: 默认server端
:param do_decode: 是否需要decode,默认True
:param do_print_info: 是否需要打印socket信息,默认True
:return:
"""
while True:
if do_decode:
socket_data = handle.recv(BUFFER_SIZE).decode()
else:
socket_data = handle.recv(BUFFER_SIZE)
if do_print_info:
current_time = time.strftime('%Y-%m-%d %H:%M:%S')
if side == 'server':
print(f'Server received ==> {current_time} - {socket_data}')
else:
print(f'Client received ==> {current_time} - {socket_data}')
# 如果expected_msg为空,跳出循环
if not expected_msg:
break
if isinstance(expected_msg, (list, tuple)):
flag = False
for expect in expected_msg: # 循环判断每个期待字符是否在返回结果中
if expect in socket_data: # 如果有任意一个存在,跳出循环
flag = True
break
if flag:
break
else:
if expected_msg in socket_data:
break
time.sleep(3) # 每隔3秒接收一次socket
return socket_data
def start_client_socket():
"""
启动客户端TCP Socket
:return:
"""
ip, port = REMOTE_IP
client = socket.socket() # 使用TCP方式传输
print(f'开始连接服务端 {ip}:{port} ...')
client.connect((ip, port)) # 连接远程服务端
print(f'连接服务端 {ip}:{port} 成功')
client.settimeout(SOCKET_TIMEOUT_TIME) # 设置客户端超时时间
# 与服务端握手,达成一致
send_socket_info(handle=client, side='client', msg='客户端已就绪')
receive_socket_info(handle=client, side='client', expected_msg='服务端已就绪')
# 与服务端交互
while True:
answer = input('请输入要发送给服务端的信息:')
send_socket_info(handle=client, side='client', msg=answer)
socket_data = receive_socket_info(handle=client, side='client', expected_msg='')
if 'quit' in socket_data:
send_socket_info(handle=client, side='client', msg='quit')
break
# 断开socket连接
client.close()
print(f'与服务端 {ip}:{port} 断开连接')
if __name__ == '__main__':
start_client_socket() # 启动客户端socket
2. 服务端配置(阻塞式TCP连接)
代码如下:
# -*- coding:utf-8 -*-
"""
阻塞式TCP连接
"""
import time
import socket
__author__ = 'Evan'
SOCKET_IP = ('127.0.0.1', 6666)
BUFFER_SIZE = 1024
SOCKET_TIMEOUT_TIME = 60
def send_socket_info(handle, msg, side='server', do_encode=True, do_print_info=True):
"""
发送socket info,并根据side打印不同的前缀信息
:param handle: socket句柄
:param msg: 要发送的内容
:param side: 默认server端
:param do_encode: 是否需要encode,默认True
:param do_print_info: 是否需要打印socket信息,默认True
:return:
"""
if do_encode:
handle.send(msg.encode())
else:
handle.send(msg)
if do_print_info:
current_time = time.strftime('%Y-%m-%d %H:%M:%S')
if side == 'server':
print(f'Server send --> {current_time} - {msg}')
else:
print(f'Client send --> {current_time} - {msg}')
def receive_socket_info(handle, expected_msg, side='server', do_decode=True, do_print_info=True):
"""
循环接收socket info,判断其返回值,直到指定的值出现为止,防止socket信息粘连,并根据side打印不同的前缀信息
:param handle: socket句柄
:param expected_msg: 期待接受的内容,如果接受内容不在返回结果中,一直循环等待,期待内容可以为字符串,也可以为多个字符串组成的列表或元组
:param side: 默认server端
:param do_decode: 是否需要decode,默认True
:param do_print_info: 是否需要打印socket信息,默认True
:return:
"""
while True:
if do_decode:
socket_data = handle.recv(BUFFER_SIZE).decode()
else:
socket_data = handle.recv(BUFFER_SIZE)
if do_print_info:
current_time = time.strftime('%Y-%m-%d %H:%M:%S')
if side == 'server':
print(f'Server received ==> {current_time} - {socket_data}')
else:
print(f'Client received ==> {current_time} - {socket_data}')
# 如果expected_msg为空,跳出循环
if not expected_msg:
break
if isinstance(expected_msg, (list, tuple)):
flag = False
for expect in expected_msg: # 循环判断每个期待字符是否在返回结果中
if expect in socket_data: # 如果有任意一个存在,跳出循环
flag = True
break
if flag:
break
else:
if expected_msg in socket_data:
break
time.sleep(3) # 每隔3秒接收一次socket
return socket_data
def start_server_socket():
"""
启动服务端TCP Socket
:return:
"""
ip, port = SOCKET_IP
server = socket.socket() # 使用TCP方式传输
server.bind((ip, port)) # 绑定IP与端口
server.listen(5) # 设置最大连接数为5
print(f'服务端 {ip}:{port} 开启')
# 不断循环,接受客户端请求
while True:
print('等待客户端连接...')
conn, address = server.accept() # 使用accept阻塞式等待客户端请求,如果多个客户端同时访问,排队一个一个进
print(f'当前连接客户端:{address}')
conn.settimeout(SOCKET_TIMEOUT_TIME) # 设置服务端超时时间
# 与客户端握手,达成一致
receive_socket_info(handle=conn, expected_msg='客户端已就绪')
send_socket_info(handle=conn, msg='服务端已就绪')
# 不断接收客户端发来的消息
while True:
socket_data = receive_socket_info(handle=conn, expected_msg='')
if 'quit' in socket_data:
send_socket_info(handle=conn, msg='quit')
break
answer = input('请回复客户端的信息:')
send_socket_info(handle=conn, msg=answer)
# 断开socket连接
conn.close()
print(f'与客户端 {ip}:{port} 断开连接')
if __name__ == '__main__':
start_server_socket() # 启动服务端socket
3. 服务端配置(非阻塞式TCP连接)
代码如下:
# -*- coding:utf-8 -*-
"""
非阻塞式TCP连接
"""
import time
import socketserver
__author__ = 'Evan'
SOCKET_IP = ('127.0.0.1', 6666)
BUFFER_SIZE = 1024
SOCKET_TIMEOUT_TIME = 60
class UnblockSocketServer(socketserver.BaseRequestHandler):
# 继承socketserver.BaseRequestHandler类
# 首先执行setup方法,然后执行handle方法,最后执行finish方法
# 如果handle方法报错,则会跳过
# setup与finish无论如何都会执行
# 一般只定义handle方法即可
def setup(self):
print('开启非阻塞式连接...')
@staticmethod
def send_socket_info(handle, msg, side='server', do_encode=True, do_print_info=True):
"""
发送socket info,并根据side打印不同的前缀信息
:param handle: socket句柄
:param msg: 要发送的内容
:param side: 默认server端
:param do_encode: 是否需要encode,默认True
:param do_print_info: 是否需要打印socket信息,默认True
:return:
"""
if do_encode:
handle.send(msg.encode())
else:
handle.send(msg)
if do_print_info:
current_time = time.strftime('%Y-%m-%d %H:%M:%S')
if side == 'server':
print(f'Server send --> {current_time} - {msg}')
else:
print(f'Client send --> {current_time} - {msg}')
@staticmethod
def receive_socket_info(handle, expected_msg, side='server', do_decode=True, do_print_info=True):
"""
循环接收socket info,判断其返回值,直到指定的值出现为止,防止socket信息粘连,并根据side打印不同的前缀信息
:param handle: socket句柄
:param expected_msg: 期待接受的内容,如果接受内容不在返回结果中,一直循环等待,期待内容可以为字符串,也可以为多个字符串组成的列表或元组
:param side: 默认server端
:param do_decode: 是否需要decode,默认True
:param do_print_info: 是否需要打印socket信息,默认True
:return:
"""
while True:
if do_decode:
socket_data = handle.recv(BUFFER_SIZE).decode()
else:
socket_data = handle.recv(BUFFER_SIZE)
if do_print_info:
current_time = time.strftime('%Y-%m-%d %H:%M:%S')
if side == 'server':
print(f'Server received ==> {current_time} - {socket_data}')
else:
print(f'Client received ==> {current_time} - {socket_data}')
# 如果expected_msg为空,跳出循环
if not expected_msg:
break
if isinstance(expected_msg, (list, tuple)):
flag = False
for expect in expected_msg: # 循环判断每个期待字符是否在返回结果中
if expect in socket_data: # 如果有任意一个存在,跳出循环
flag = True
break
if flag:
break
else:
if expected_msg in socket_data:
break
time.sleep(3) # 每隔3秒接收一次socket
return socket_data
def handle(self):
"""
所有和客户端交互的操作写在这里
:return:
"""
conn = self.request # 获取socket句柄
# 与客户端握手,达成一致
self.receive_socket_info(handle=conn, expected_msg='客户端已就绪')
self.send_socket_info(handle=conn, msg='服务端已就绪')
# 不断接收客户端发来的消息
while True:
socket_data = self.receive_socket_info(handle=conn, expected_msg='')
if 'quit' in socket_data:
self.send_socket_info(handle=conn, msg='quit')
break
answer = input('请回复客户端的信息:')
self.send_socket_info(handle=conn, msg=answer)
# 断开socket连接
conn.close()
def finish(self):
print('连接关闭')
def main():
# 创建多线程实例
server = socketserver.ThreadingTCPServer(SOCKET_IP, UnblockSocketServer)
# 开启异步多线程,等待连接
server.timeout = SOCKET_TIMEOUT_TIME # 设置服务端超时时间
print(f'服务端 {SOCKET_IP[0]}:{SOCKET_IP[1]} 开启')
server.serve_forever() # 永久运行
if __name__ == '__main__':
main()
4. 交互过程demo
客户端:
服务端:
四、构建Socket-UDP传输
1. 客户端配置
代码如下:
# -*- coding:utf-8 -*-
import socket
__author__ = 'Evan'
REMOTE_IP = ('127.0.0.1', 6666)
BUFFER_SIZE = 1024
SOCKET_TIMEOUT_TIME = 60
def start_client_socket():
"""
启动客户端UDP Socket
:return:
"""
ip, port = REMOTE_IP
client = socket.socket(type=socket.SOCK_DGRAM) # 使用TCP方式传输
print(f'开始连接服务端 {ip}:{port} ...')
client.connect((ip, port)) # 连接远程服务端
print(f'连接服务端 {ip}:{port} 成功')
# 与服务端交互
while True:
answer = input('请输入要发送给服务端的信息:')
client.sendto(answer.encode(), REMOTE_IP) # 使用sendto发送UDP消息,address填入服务端IP和端口
if 'quit' in answer:
break
# 断开socket连接
client.close()
print(f'与服务端 {ip}:{port} 断开连接')
if __name__ == '__main__':
start_client_socket() # 启动客户端socket
2. 服务端配置
代码如下:
# -*- coding:utf-8 -*-
import socket
__author__ = 'Evan'
SOCKET_IP = ('127.0.0.1', 6666)
BUFFER_SIZE = 1024
def start_server_socket():
"""
启动服务端UDP Socket
:return:
"""
ip, port = SOCKET_IP
server = socket.socket(type=socket.SOCK_DGRAM) # 使用UDP方式传输
server.bind((ip, port)) # 绑定IP与端口
print(f'服务端 {ip}:{port} 开启')
# 不断循环,接受客户端发来的消息
while True:
socket_data, address = server.recvfrom(BUFFER_SIZE)
print('收到客户端 -> {} 发来的消息: {}'.format(address, socket_data.decode()))
if __name__ == '__main__':
start_server_socket()
3. 交互过程demo
客户端:
服务端:
Finish!
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
已为社区贡献3条内容
所有评论(0)