一、Socket参数使用介绍

Python使用 socket 模块创建套接字,语法格式如下:

import socket
socket.socket(family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None)  # 默认参数

1. socket()参数

family:

  • socket.AF_INET - IPv4(默认)
  • socket.AF_INET6 - IPv6
  • socket.AF_UNIX - 只能够用于单一的Unix系统进程间通信

type:

  • socket.SOCK_STREAM - 流式socket, for TCP (默认)
  • socket.SOCK_DGRAM - 数据报式socket, for UDP
  • socket.SOCK_RAW - 原始套接字
  • socket.SOCK_RDM - 可靠UDP形式
  • socket.SOCK_SEQPACKET - 可靠的连续数据包服务

2. socket对象内建方法

服务端套接字方法:

  • s.bind() - 绑定地址(host,port)到套接字,在AF_INET下,以元组(host,port)的形式表示地址。
  • s.listen() - 开启TCP监听,操作系统可以挂起的最大连接数量,该值至少为1。
  • s.accept() - 被动接受TCP客户端连接,(阻塞式)等待连接的到来。

客户端套接字方法:

  • s.connect() - 主动初始化TCP服务器连接,一般address的格式为元组(hostname,port),如果连接出错,返回socket.error错误。
  • s.connect_ex() - connect()函数的扩展版本,出错时返回出错码,而不是抛出异常。

公共套接字方法:

  • s.recv() - 接收TCP数据,数据以byte类型返回,bufsize指定要接收的最大数据量。
  • s.send() - 发送TCP数据,将string中的数据转化为byte类型发送到连接的套接字,返回值是要发送的字节数量,该数量可能小于string的字节大小。
  • s.sendall() - 发送完整TCP数据,将string中的数据转化为byte类型发送到连接的套接字,但在返回之前会尝试发送所有数据,成功返回None,失败则抛出异常。
  • s.recvfrom() - 接收UDP数据,与recv()类似,但返回值是(data,address),其中data是包含接收数据的字符串,address是客户端的套接字地址。
  • s.sendto() - 发送UDP数据,将数据发送到套接字,参数形式为(data,(address,port))的元组,address为远程服务端地址,返回值是发送的字节数。
  • s.close() - 关闭套接字。
  • s.getpeername() - 返回连接套接字的远程地址,返回值通常是元组(ipaddr,port)。
  • s.getsockname() - 返回套接字自己的地址,通常是一个元组(ipaddr,port)。
  • s.setsockopt(level,optname,value) - 设置给定套接字选项的值。
  • s.getsockopt(level,optname[.buflen]) - 返回套接字选项的值。
  • s.settimeout(timeout) - 设置套接字操作的超时时间,timeout是一个浮点数,单位是秒
  • s.gettimeout() - 返回当前超时期的值,单位是秒,如果没有设置超时期,则返回None。
  • s.fileno() - 返回套接字的文件描述符。
  • s.setblocking(flag) - 如果flag为0,则将套接字设为非阻塞模式,否则将套接字设为阻塞模式(默认值)。非阻塞模式下,如果调用recv()没有发现任何数据,或send()调用无法立即发送数据,那么将引起socket.error异常。
  • s.makefile() - 创建一个与该套接字相关连的文件。

二、丢包、粘包问题解决思路和方法

问题:

Socket有一个缓冲区,缓冲区是一个流,先进先出,发送和取出都可以自定义大小,如果缓冲区的数据未取完,则可能会存在数据堆积。其中【recv(1024)】表示从缓冲区里取最大为1024个字节,但实际取值大小是不确定的,可能会导致丢包,socket发送两条连续数据时,也有可能最终会拼接成一条进行发送,所以也会导致粘包问题的产生。

解决的一些办法和思路:

  1. 在每条数据发送之间增加停顿时间,如【tiem.sleep(0.5) # 延时0.5s】
  2. 每次发送后等待对方确认接收数据,确认完毕后再发送下一条(加验证),否则重传
  3. 减少一次性发送和接收数据的大小,理论上buffer size越小丢包或粘包率就越低,建议在1024~10240之间

下面提供一个解决TCP recv丢包的方法:

def receive_socket_info(handle, expected_msg, side='server', do_decode=True, do_print_info=True):
    """
    循环接收socket info,判断其返回值,直到指定的值出现为止,防止socket信息粘连,并根据side打印不同的前缀信息
    :param handle: socket句柄
    :param expected_msg: 期待接受的内容,如果接受内容不在返回结果中,一直循环等待,期待内容可以为字符串,也可以为多个字符串组成的列表或元组
    :param side: 默认server端
    :param do_decode: 是否需要decode,默认True
    :param do_print_info: 是否需要打印socket信息,默认True
    :return:
    """
    while True:
        if do_decode:
            socket_data = handle.recv(BUFFER_SIZE).decode()
        else:
            socket_data = handle.recv(BUFFER_SIZE)

        if do_print_info:
            current_time = time.strftime('%Y-%m-%d %H:%M:%S')
            if side == 'server':
                print(f'Server received ==> {current_time} - {socket_data}')
            else:
                print(f'Client received ==> {current_time} - {socket_data}')

        # 如果expected_msg为空,跳出循环
        if not expected_msg:
            break

        if isinstance(expected_msg, (list, tuple)):
            flag = False
            for expect in expected_msg:  # 循环判断每个期待字符是否在返回结果中
                if expect in socket_data:  # 如果有任意一个存在,跳出循环
                    flag = True
                    break
            if flag:
                break
        else:
            if expected_msg in socket_data:
                break
        time.sleep(3)  # 每隔3秒接收一次socket
    return socket_data

原理就是使用while循环不停的接收socket,直到指定的字符出现为止,再跳出循环,这样可以防止socket丢包,也可以保证socket接收的完整性。

三、构建Socket-TCP传输

1. 客户端配置

代码如下:

# -*- coding:utf-8 -*-
import time
import socket

__author__ = 'Evan'


REMOTE_IP = ('127.0.0.1', 6666)
BUFFER_SIZE = 1024
SOCKET_TIMEOUT_TIME = 60


def send_socket_info(handle, msg, side='server', do_encode=True, do_print_info=True):
    """
    发送socket info,并根据side打印不同的前缀信息
    :param handle: socket句柄
    :param msg: 要发送的内容
    :param side: 默认server端
    :param do_encode: 是否需要encode,默认True
    :param do_print_info: 是否需要打印socket信息,默认True
    :return:
    """
    if do_encode:
        handle.send(msg.encode())
    else:
        handle.send(msg)

    if do_print_info:
        current_time = time.strftime('%Y-%m-%d %H:%M:%S')
        if side == 'server':
            print(f'Server send --> {current_time} - {msg}')
        else:
            print(f'Client send --> {current_time} - {msg}')


def receive_socket_info(handle, expected_msg, side='server', do_decode=True, do_print_info=True):
    """
    循环接收socket info,判断其返回值,直到指定的值出现为止,防止socket信息粘连,并根据side打印不同的前缀信息
    :param handle: socket句柄
    :param expected_msg: 期待接受的内容,如果接受内容不在返回结果中,一直循环等待,期待内容可以为字符串,也可以为多个字符串组成的列表或元组
    :param side: 默认server端
    :param do_decode: 是否需要decode,默认True
    :param do_print_info: 是否需要打印socket信息,默认True
    :return:
    """
    while True:
        if do_decode:
            socket_data = handle.recv(BUFFER_SIZE).decode()
        else:
            socket_data = handle.recv(BUFFER_SIZE)

        if do_print_info:
            current_time = time.strftime('%Y-%m-%d %H:%M:%S')
            if side == 'server':
                print(f'Server received ==> {current_time} - {socket_data}')
            else:
                print(f'Client received ==> {current_time} - {socket_data}')

        # 如果expected_msg为空,跳出循环
        if not expected_msg:
            break

        if isinstance(expected_msg, (list, tuple)):
            flag = False
            for expect in expected_msg:  # 循环判断每个期待字符是否在返回结果中
                if expect in socket_data:  # 如果有任意一个存在,跳出循环
                    flag = True
                    break
            if flag:
                break
        else:
            if expected_msg in socket_data:
                break
        time.sleep(3)  # 每隔3秒接收一次socket
    return socket_data


def start_client_socket():
    """
    启动客户端TCP Socket
    :return:
    """
    ip, port = REMOTE_IP
    client = socket.socket()  # 使用TCP方式传输
    print(f'开始连接服务端 {ip}:{port} ...')
    client.connect((ip, port))  # 连接远程服务端
    print(f'连接服务端 {ip}:{port} 成功')
    client.settimeout(SOCKET_TIMEOUT_TIME)  # 设置客户端超时时间

    # 与服务端握手,达成一致
    send_socket_info(handle=client, side='client', msg='客户端已就绪')
    receive_socket_info(handle=client, side='client', expected_msg='服务端已就绪')

    # 与服务端交互
    while True:
        answer = input('请输入要发送给服务端的信息:')
        send_socket_info(handle=client, side='client', msg=answer)

        socket_data = receive_socket_info(handle=client, side='client', expected_msg='')
        if 'quit' in socket_data:
            send_socket_info(handle=client, side='client', msg='quit')
            break

    # 断开socket连接
    client.close()
    print(f'与服务端 {ip}:{port} 断开连接')


if __name__ == '__main__':
    start_client_socket()  # 启动客户端socket

2. 服务端配置(阻塞式TCP连接)

代码如下:

# -*- coding:utf-8 -*-
"""
阻塞式TCP连接
"""
import time
import socket

__author__ = 'Evan'


SOCKET_IP = ('127.0.0.1', 6666)
BUFFER_SIZE = 1024
SOCKET_TIMEOUT_TIME = 60


def send_socket_info(handle, msg, side='server', do_encode=True, do_print_info=True):
    """
    发送socket info,并根据side打印不同的前缀信息
    :param handle: socket句柄
    :param msg: 要发送的内容
    :param side: 默认server端
    :param do_encode: 是否需要encode,默认True
    :param do_print_info: 是否需要打印socket信息,默认True
    :return:
    """
    if do_encode:
        handle.send(msg.encode())
    else:
        handle.send(msg)

    if do_print_info:
        current_time = time.strftime('%Y-%m-%d %H:%M:%S')
        if side == 'server':
            print(f'Server send --> {current_time} - {msg}')
        else:
            print(f'Client send --> {current_time} - {msg}')


def receive_socket_info(handle, expected_msg, side='server', do_decode=True, do_print_info=True):
    """
    循环接收socket info,判断其返回值,直到指定的值出现为止,防止socket信息粘连,并根据side打印不同的前缀信息
    :param handle: socket句柄
    :param expected_msg: 期待接受的内容,如果接受内容不在返回结果中,一直循环等待,期待内容可以为字符串,也可以为多个字符串组成的列表或元组
    :param side: 默认server端
    :param do_decode: 是否需要decode,默认True
    :param do_print_info: 是否需要打印socket信息,默认True
    :return:
    """
    while True:
        if do_decode:
            socket_data = handle.recv(BUFFER_SIZE).decode()
        else:
            socket_data = handle.recv(BUFFER_SIZE)

        if do_print_info:
            current_time = time.strftime('%Y-%m-%d %H:%M:%S')
            if side == 'server':
                print(f'Server received ==> {current_time} - {socket_data}')
            else:
                print(f'Client received ==> {current_time} - {socket_data}')

        # 如果expected_msg为空,跳出循环
        if not expected_msg:
            break

        if isinstance(expected_msg, (list, tuple)):
            flag = False
            for expect in expected_msg:  # 循环判断每个期待字符是否在返回结果中
                if expect in socket_data:  # 如果有任意一个存在,跳出循环
                    flag = True
                    break
            if flag:
                break
        else:
            if expected_msg in socket_data:
                break
        time.sleep(3)  # 每隔3秒接收一次socket
    return socket_data


def start_server_socket():
    """
    启动服务端TCP Socket
    :return:
    """
    ip, port = SOCKET_IP
    server = socket.socket()  # 使用TCP方式传输
    server.bind((ip, port))  # 绑定IP与端口
    server.listen(5)  # 设置最大连接数为5
    print(f'服务端 {ip}:{port} 开启')

    # 不断循环,接受客户端请求
    while True:
        print('等待客户端连接...')
        conn, address = server.accept()  # 使用accept阻塞式等待客户端请求,如果多个客户端同时访问,排队一个一个进
        print(f'当前连接客户端:{address}')
        conn.settimeout(SOCKET_TIMEOUT_TIME)  # 设置服务端超时时间

        # 与客户端握手,达成一致
        receive_socket_info(handle=conn, expected_msg='客户端已就绪')
        send_socket_info(handle=conn, msg='服务端已就绪')

        # 不断接收客户端发来的消息
        while True:
            socket_data = receive_socket_info(handle=conn, expected_msg='')
            if 'quit' in socket_data:
                send_socket_info(handle=conn, msg='quit')
                break

            answer = input('请回复客户端的信息:')
            send_socket_info(handle=conn, msg=answer)

        # 断开socket连接
        conn.close()
        print(f'与客户端 {ip}:{port} 断开连接')


if __name__ == '__main__':
    start_server_socket()  # 启动服务端socket

3. 服务端配置(非阻塞式TCP连接)

代码如下:

# -*- coding:utf-8 -*-
"""
非阻塞式TCP连接
"""
import time
import socketserver

__author__ = 'Evan'


SOCKET_IP = ('127.0.0.1', 6666)
BUFFER_SIZE = 1024
SOCKET_TIMEOUT_TIME = 60


class UnblockSocketServer(socketserver.BaseRequestHandler):
    # 继承socketserver.BaseRequestHandler类
    # 首先执行setup方法,然后执行handle方法,最后执行finish方法
    # 如果handle方法报错,则会跳过
    # setup与finish无论如何都会执行
    # 一般只定义handle方法即可

    def setup(self):
        print('开启非阻塞式连接...')

    @staticmethod
    def send_socket_info(handle, msg, side='server', do_encode=True, do_print_info=True):
        """
        发送socket info,并根据side打印不同的前缀信息
        :param handle: socket句柄
        :param msg: 要发送的内容
        :param side: 默认server端
        :param do_encode: 是否需要encode,默认True
        :param do_print_info: 是否需要打印socket信息,默认True
        :return:
        """
        if do_encode:
            handle.send(msg.encode())
        else:
            handle.send(msg)

        if do_print_info:
            current_time = time.strftime('%Y-%m-%d %H:%M:%S')
            if side == 'server':
                print(f'Server send --> {current_time} - {msg}')
            else:
                print(f'Client send --> {current_time} - {msg}')

    @staticmethod
    def receive_socket_info(handle, expected_msg, side='server', do_decode=True, do_print_info=True):
        """
        循环接收socket info,判断其返回值,直到指定的值出现为止,防止socket信息粘连,并根据side打印不同的前缀信息
        :param handle: socket句柄
        :param expected_msg: 期待接受的内容,如果接受内容不在返回结果中,一直循环等待,期待内容可以为字符串,也可以为多个字符串组成的列表或元组
        :param side: 默认server端
        :param do_decode: 是否需要decode,默认True
        :param do_print_info: 是否需要打印socket信息,默认True
        :return:
        """
        while True:
            if do_decode:
                socket_data = handle.recv(BUFFER_SIZE).decode()
            else:
                socket_data = handle.recv(BUFFER_SIZE)

            if do_print_info:
                current_time = time.strftime('%Y-%m-%d %H:%M:%S')
                if side == 'server':
                    print(f'Server received ==> {current_time} - {socket_data}')
                else:
                    print(f'Client received ==> {current_time} - {socket_data}')

            # 如果expected_msg为空,跳出循环
            if not expected_msg:
                break

            if isinstance(expected_msg, (list, tuple)):
                flag = False
                for expect in expected_msg:  # 循环判断每个期待字符是否在返回结果中
                    if expect in socket_data:  # 如果有任意一个存在,跳出循环
                        flag = True
                        break
                if flag:
                    break
            else:
                if expected_msg in socket_data:
                    break
            time.sleep(3)  # 每隔3秒接收一次socket
        return socket_data

    def handle(self):
        """
        所有和客户端交互的操作写在这里
        :return:
        """
        conn = self.request  # 获取socket句柄

        # 与客户端握手,达成一致
        self.receive_socket_info(handle=conn, expected_msg='客户端已就绪')
        self.send_socket_info(handle=conn, msg='服务端已就绪')

        # 不断接收客户端发来的消息
        while True:
            socket_data = self.receive_socket_info(handle=conn, expected_msg='')
            if 'quit' in socket_data:
                self.send_socket_info(handle=conn, msg='quit')
                break

            answer = input('请回复客户端的信息:')
            self.send_socket_info(handle=conn, msg=answer)

        # 断开socket连接
        conn.close()

    def finish(self):
        print('连接关闭')


def main():
    # 创建多线程实例
    server = socketserver.ThreadingTCPServer(SOCKET_IP, UnblockSocketServer)
    # 开启异步多线程,等待连接
    server.timeout = SOCKET_TIMEOUT_TIME  # 设置服务端超时时间
    print(f'服务端 {SOCKET_IP[0]}:{SOCKET_IP[1]} 开启')
    server.serve_forever()  # 永久运行


if __name__ == '__main__':
    main()

4. 交互过程demo

客户端:
在这里插入图片描述
服务端:
在这里插入图片描述

四、构建Socket-UDP传输

1. 客户端配置

代码如下:

# -*- coding:utf-8 -*-
import socket

__author__ = 'Evan'


REMOTE_IP = ('127.0.0.1', 6666)
BUFFER_SIZE = 1024
SOCKET_TIMEOUT_TIME = 60


def start_client_socket():
    """
    启动客户端UDP Socket
    :return:
    """
    ip, port = REMOTE_IP
    client = socket.socket(type=socket.SOCK_DGRAM)  # 使用TCP方式传输
    print(f'开始连接服务端 {ip}:{port} ...')
    client.connect((ip, port))  # 连接远程服务端
    print(f'连接服务端 {ip}:{port} 成功')

    # 与服务端交互
    while True:
        answer = input('请输入要发送给服务端的信息:')
        client.sendto(answer.encode(), REMOTE_IP)  # 使用sendto发送UDP消息,address填入服务端IP和端口
        if 'quit' in answer:
            break

    # 断开socket连接
    client.close()
    print(f'与服务端 {ip}:{port} 断开连接')


if __name__ == '__main__':
    start_client_socket()  # 启动客户端socket

2. 服务端配置

代码如下:

# -*- coding:utf-8 -*-
import socket

__author__ = 'Evan'


SOCKET_IP = ('127.0.0.1', 6666)
BUFFER_SIZE = 1024


def start_server_socket():
    """
    启动服务端UDP Socket
    :return:
    """
    ip, port = SOCKET_IP
    server = socket.socket(type=socket.SOCK_DGRAM)  # 使用UDP方式传输
    server.bind((ip, port))  # 绑定IP与端口
    print(f'服务端 {ip}:{port} 开启')

    # 不断循环,接受客户端发来的消息
    while True:
        socket_data, address = server.recvfrom(BUFFER_SIZE)
        print('收到客户端 -> {} 发来的消息: {}'.format(address, socket_data.decode()))


if __name__ == '__main__':
    start_server_socket()

3. 交互过程demo

客户端:
在这里插入图片描述
服务端:
在这里插入图片描述
在这里插入图片描述
Finish!

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐