花生壳的内网穿透应该是利用的端口转发,所以才会有带宽等等限制,优点就是可以使用DDNS来访问,打洞效率高。P2P的优点就是点对点带宽只局限于链路而不是服务器限制,缺点就是打洞成功率一般。

实现

一、P2P的实现

分别基于UDP和TCP打洞实现P2P连接-Python

转载出处


受限制于NAT网关的特性,处于不同局域网内的客户端无法直接连接。即使知道了网关的公网ip和映射端口,任何“不请自来”的数据包都会被NAT网关丢弃,从而无法建立连接。因此需要依赖第三方服务器辅助双方“打洞”建立连接,一旦连接成功建立,即开启了真正的P2P通信,再无需服务器介入。

打洞原理为:首先client A和client B(为一对peer)分别向服务器发起请求,服务器记录下双方的公网ip和端口,将对方的地址返回给各自客户端,这样client A和client B都获得了对方的公网地址,接下来就可以尝试通过打洞互相连接了。之前说过,任何“不请自来”的请求都不被允许,因此client A和client B都应异步的向对方发送请求。假设client B首先向client A发送了一次请求,由于之前双方没有任何通信,显然该请求不会成功,但client B已经在它的网关上留下出站数据包(即打了一个洞),此时若client A向client B发送请求,该请求便可以通过client B的网关并成功与client B建立连接。

*注意:打洞能否成功依旧取决于NAT网关是否支持此类操作,并非100%成功。

具体实现方面,UDP比TCP要简单可靠一些,因为UDP通信并不基于实际的连接,即不留下确定的session,只要在发送数据包时指定接收者即可。我这里在客户端连接服务器时需要输入相同的口令以确定身份,在建立P2P连接时也通过服务器给的特定随机数签名验证身份,打洞成功后实现了一个简单的异步聊天功能。

UDP服务器端

import socket
import random

s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('', 11111))

#记录口令信息
peers = {}

while True:
    message, address = s.recvfrom(2048)
    message = message.decode()
    if not message.startswith('#connectChain*'):
        continue
    chain = message.replace('#connectChain*', '')
    if chain not in peers:
        peers[chain] = address
    else:
        print('matchedPeers: ', peers[chain], address)
        verifySignature = random.randint(10000, 99999)  #签名验证,用于peers双方验证身份
        #给双方发送peer地址信息和签名
        s.sendto(str([peers[chain], verifySignature]).encode(), address)
        s.sendto(str([address,verifySignature]).encode(), peers[chain])
        peers.pop(chain)

UDP客户端(client A和client B均使用一样的代码互相通信)

import socket
import threading
import time

s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
serverAddress = ('ryanxin.cn', 11111)

#连接服务器
chain = input('连接口令:')
send = ('#connectChain*'+chain).encode()
s.sendto(send, serverAddress)
message = eval(s.recvfrom(2048)[0].decode())
myPeer = message[0]
signature = str(message[1])
print('got myPeer: ', myPeer)

peerConnected = False
#先连接myPeer,再互发消息
def sendToMyPeer():
    #发送包含签名的连接请求
    global peerConnected
    while True:
        s.sendto(signature.encode(), myPeer)
        if peerConnected:
            break
        time.sleep(1)
    
    #发送聊天信息
    while True:
        send_text = input("我方发送:")
        s.sendto(send_text.encode(), myPeer)

def recFromMyPeer():
    #接收请求并验证签名or接收聊天信息
    global peerConnected
    while True:
        message = s.recvfrom(2048)[0].decode()
        if message == signature:
            if not peerConnected:
                print('connected successfully')
            peerConnected = True
        elif peerConnected:
            print('\r对方回复:'+message+'\n我方发送:', end='')

sen_thread = threading.Thread(target=sendToMyPeer)
rec_thread = threading.Thread(target=recFromMyPeer)

sen_thread.start()
rec_thread.start()

sen_thread.join()
rec_thread.join()

TCP部分原理也是类似的,只不过需要先建立连接,这里我把双方的代码分开来写方便实现异步请求打洞。具体的,首先client B向client A发起连接请求(必定失败)尝试打洞,client A等待几秒后向client B正常发起连接请求(成功与否取决于NAT硬件)。

TCP服务器端

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', 11111))

#记录口令信息
peers = {}

sock.listen(5)
while True:
    s, address = sock.accept()
    message = s.recv(2048).decode()
    if not message.startswith('#connectChain*'):
        continue
    chain = message.replace('#connectChain*', '')
    if chain not in peers:
        peers[chain] = (s, address)
    else:
        print('matchedPeers: ', peers[chain][1], address)
        #给双方发送peer地址信息和签名
        peers[chain][0].sendall(str(address).encode())
        s.sendall(str(peers[chain][1]).encode())
        s.close()
        peers.pop(chain)

client A

import socket
import threading
import time

#连接服务器
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serverAddress = ('ryanxin.cn', 11111)

s.connect(serverAddress)
myAddress = s.getsockname() #本机ip端口
chain = input('连接口令:')
send = ('#connectChain*'+chain).encode()
s.sendall(send)
myPeer = eval(s.recv(2048).decode())  # peer ip端口
print('myAddress: ', myAddress)
print('got myPeer: ', myPeer)
s.close()

#等待对方打洞
time.sleep(3)

#发起TCP连接
print('正在发起连接请求')
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(myAddress)
s.connect(myPeer)

#聊天
def sendToMyPeer():
    while True:
        send_text = input("我方发送:")
        s.sendall(send_text.encode())

def recFromMyPeer():
    while True:
        message = s.recv(2048).decode()
        print('\r对方回复:'+message+'\n我方发送:', end='')

sen_thread = threading.Thread(target=sendToMyPeer)
rec_thread = threading.Thread(target=recFromMyPeer)

rec_thread.start()
sen_thread.start()


sen_thread.join()
rec_thread.join()

client B

import socket
import threading
import time

#连接服务器
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serverAddress = ('ryanxin.cn', 11111)

s.connect(serverAddress)
myAddress = s.getsockname() #本机ip端口
chain = input('连接口令:')
send = ('#connectChain*'+chain).encode()
s.sendall(send)
myPeer = eval(s.recv(2048).decode())  # peer ip端口
print('myAddress: ', myAddress)
print('got myPeer: ', myPeer)
s.close()

#发送一个TCP连接,用于打洞,无需对方接收
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(myAddress)
try:
    s.connect(myPeer)
except ConnectionRefusedError:
    print('已尝试打洞')
s.close()

#监听TCP连接
sc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sc.bind(myAddress)
sc.listen(1)
s, address = sc.accept()
sc.close()

#聊天
def sendToMyPeer():
    while True:
        send_text = input("我方发送:")
        s.sendall(send_text.encode())


def recFromMyPeer():
    while True:
        message = s.recv(2048).decode()
        print('\r对方回复:'+message+'\n我方发送:', end='')

sen_thread = threading.Thread(target=sendToMyPeer)
rec_thread = threading.Thread(target=recFromMyPeer)

rec_thread.start()
sen_thread.start()


sen_thread.join()
rec_thread.join()

二、端口转发的实现

一个lcx的python实现

转载出处

#!/usr/local/bin/python3
#coding:utf-8
import sys
import socket
import time
import multiprocessing
import threading
import select

def usage():
    print('AWADA port forward tools')
    print('-h: help')
    print('-v: verbose')
    print('-listen portA,portB: listen two ports and transmit data')
    print('-tran localport,targetip,targetport: listen a local port and transmit data from localport to target:targetport')
    print('-slave reverseip,reverseport,targetip,targetport: connect reverseip:reverseport with targetip:targetport')

def subTransmit(recvier,sender,stopflag):
    theRecvier = recvier[0]
    theSender = sender[0]
    verbose = False
    i = 0
    recvierData = b""
    senderData = b""
    if '-v' in sys.argv:
        verbose = True
    while not stopflag['flag']:
        data = b""
        try:
            rlist, wlist, elist = select.select([theRecvier,theSender],[theRecvier,theSender],[],0.1)
            if len(rlist) != 0:
                for socketer in rlist:
                    data = socketer.recv(20480)
                    if len(data) == 0:
                        raise Exception('连接已断开')
                    if socketer == theRecvier:
                        senderData += data
                        address = recvier[1]
                    else:
                        recvierData += data
                        address = sender[1]
                    bytes = len(data)
                    if verbose:
                        print("Recv From %s:%d" % address," %d bytes" % bytes)
            if len(senderData) != 0:
                bytes = len(senderData)
                if verbose:
                    print("Send to %s:%d" % sender[1]," %d bytes" % bytes)
                theSender.send(senderData)
                senderData = b""
            if len(recvierData) != 0:
                bytes = len(recvierData)
                if verbose:
                    print("Send to %s:%d", recvier[1], " %d bytes" % bytes)
                theRecvier.send(recvierData)
                recvierData = b""
        except Exception as e:
            stopflag['flag'] = True
            try:
                theRecvier.shutdown(socket.SHUT_RDWR)
                theRecvier.close()
            except:
                pass
            try:
                theSender.shutdown(socket.SHUT_RDWR)
                theSender.close()
            except:
                pass
            print("Closed Two Connections")

def transmit(conns,lock=None):
    stopFlag = {'flag':False}
    connA, addressA, connB, addressB = conns
    threading.Thread(target=subTransmit,args=((connA,addressA),(connB,addressB), stopFlag)).start()
    while not stopFlag['flag']:
        time.sleep(1)
    print("%s:%d" % addressA,"<->","%s:%d" % addressB," Closed")

def bindToBind(portA,portB):
    socketA = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    socketA.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    socketB = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    socketB.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    try:
        print("Listen Port %d." % portA)
        socketA.bind(('0.0.0.0',portA))
        socketA.listen(10)
        print("Listen Port Ok!")
    except:
        print("Listen Port Failed!")
        exit()

    try:
        print("Listen Port %d." % portB)
        socketB.bind(('0.0.0.0',portB))
        socketB.listen(10)
        print("Listen Port Ok!")
    except:
        print("Listen port Failed!")
        exit()

    while(True):
        print("Wait For Connection At Port %d" % portA)
        connA, addressA = socketA.accept()
        print("Accept Connection From %s:%d" % addressA)
        print("Wait For Another Connection At Port %d" % portB)
        connB, addressB = socketB.accept()
        print("Accept Connecton From %s:%d" % addressB)
        multiprocessing.Process(target=transmit,args=((connA,addressA,connB,addressB),)).start()
        time.sleep(1)
        print("Create Thread Ok!")

def bindToConn(port,target,targetPort):
    socketA = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    socketA.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    localAddress = ('0.0.0.0',port)
    targetAddress = (target,targetPort)

    try:
        print("Listen Port %d." % port)
        socketA.bind(localAddress)
        socketA.listen(10)
        print("Listen Port Ok!")
    except:
        print("Listen Port Failed!")
        exit()

    while True:
        print("Wait For Connection At Port %d" % localAddress[1])
        connA, addressA = socketA.accept()
        print("Accept Connection From %s:%d" % addressA)
        targetConn = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        targetConn.settimeout(5)
        try:
            targetConn.connect(targetAddress)
            multiprocessing.Process(target=transmit,args=((connA,addressA,targetConn,targetAddress),)).start()
            time.sleep(1)
            print("Create Thread Ok!")
        except TimeoutError:
            print("Connect To %s:%d Failed!" % targetAddress)
            connA.close()
            exit()
        except:
            print("Something wrong!")
            connA.close()
            exit()

def connToConn(reverseIp,reversePort,targetIp,targetPort):
    reverseAddress = (reverseIp, reversePort)
    targetAddress = (targetIp, targetPort)
    while True:
        data = b""
        reverseSocket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        targetSocket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        try:
            print("Connect To %s:%d" % reverseAddress)
            reverseSocket.connect(reverseAddress)
            print("Connect Ok!")
        except:
            print("Connect Failed!")
            exit()
        while True:
            try:
                if select.select([reverseSocket],[],[]) == ([reverseSocket],[],[]):
                    data = reverseSocket.recv(20480)
                    if len(data) != 0:
                        break
            except:
                continue

        while True:
            try:
                print("Connect ot ",targetAddress)
                targetSocket.connect(targetAddress)
                print("Connect ok!")
            except:
                print("TargetPort Is Not Open")
                reverseSocket.close()
                exit()
            try:
                targetSocket.send(data)
            except:
                continue
            break
        print("All Connect Ok!")

        try:
            multiprocessing.Process(target=transmit,args=((reverseSocket,reverseAddress,targetSocket,targetAddress),)).start()
            print("Create Thread Success!")
            #time.sleep(1)
        except:
            print("Create Thread Failed!")
            exit()

def main():
    global verbose
    if '-h' in sys.argv:
        usage()
        exit()
    if '-listen' in sys.argv:
        index = sys.argv.index('-listen')
        try:
            portA = int(sys.argv[index+1])
            portB = int(sys.argv[index+2])
            assert portA != 0 and portB != 0
            bindToBind(portA,portB)
        except:
            print("Something Wrong")
        exit()

    elif '-tran' in sys.argv:
        index = sys.argv.index('-tran')
        try:
            port = int(sys.argv[index+1])
            target = sys.argv[index+2]
            targetPort = int(sys.argv[index+3])
            assert port!=0 and targetPort!=0
            bindToConn(port,target,targetPort)
        except:
            print("Something Wrong")
        exit()
    elif '-slave' in sys.argv:
        index = sys.argv.index('-slave')
        try:
            reverseIp = sys.argv[index+1]
            reversePort = int(sys.argv[index+2])
            targetIp = sys.argv[index+3]
            targetPort = int(sys.argv[index+4])
            connToConn(reverseIp,reversePort,targetIp,targetPort)
        except:
            print("Something Wrong")
        exit()
    usage()

if __name__ == '__main__':
    main()

用法

usage:python3 awada.py -h
chmod +x awada.py
用法1:awada.py -listen 8000 8001,此时本机端口8000与8001可以相互通信
用法2:awada.py -tran 80 8.8.8.8 80,此时访问本机端口80的数据会重定向到8.8.8.8:80端口
用法3:awada.py -slave 10.1.1.1 8000 127.0.0.1 3389,此时把127.0.0.1:3389反弹到10.1.1.1:8000上
例如,现在遇到一台内网的主机10.1.1.100,开放了端口3389,但是无法直接访问,需要反弹。假设有一台带公网ip:112.112.112.112的主机,
可以在公网主机运行awada.py -listen 8000 8001。然后在内网主机运行awada.py -slave 112.112.112.112 8000 127.0.0.1 3389。
成功运行之后,则可以连接112.112.112.112:8001,数据会重定向到内网主机的3389端口。
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐