python asyncio 异步获取子域名小demo
96106 字典用了228秒可能是我写的方式不对我之前用多线程的方式 只要 118秒 (https://github.com/Ciyfly/y_subdomain)#!/usr/bin/python# coding=utf-8'''Date: 2021-06-24 16:32:54LastEditors: recarLastEditTime: 2021-06-24 17:33:30'''impor
·
96106 字典用了228秒
可能是我写的方式不对
我之前用多线程的方式 只要 118秒 (https://github.com/Ciyfly/y_subdomain)
#!/usr/bin/python
# coding=utf-8
'''
Date: 2021-06-24 16:32:54
LastEditors: recar
LastEditTime: 2021-06-24 17:33:30
'''
import asyncio
import aiodns
import time
loop = asyncio.get_event_loop()
resolver = aiodns.DNSResolver(loop=loop)
task_list = list()
tasks_all_count = 0
task_surplus_count = 0
task_success_count = 0
result_list = list()
start = time.perf_counter()
def print_progress(currne_size, all_size, start, find_size):
"""输出进度条
:param currne_size 当前队列大小
:param all_size 总体队列大小
:param start 开启时间
:param find_size 找到多少条
"""
out_u = int(currne_size/all_size*50) # ##
out_l = 50 - out_u
percentage = 100-(currne_size/all_size*100)
use_time = time.perf_counter() - start
print(
'\r'+'[' + '>' * out_l + '-' * out_u +']'
+ f'{percentage:.2f}%'
+ f'|size: {currne_size}'
+ f'|use time: {use_time:.2f}s'
+ f'|find: {find_size} ', end="")
async def query(name, query_type):
ip_list = list()
try:
response = await resolver.query(name, query_type)
except:
return {}
for res in response:
ip_list.append(res.host)
return {
"domain": name,
"ip_list": ip_list
}
def update_data(future):
global task_list
global tasks_all_count
global task_surplus_count
global task_success_count
global start
data = future.result()
if data.get("ip_list"):
result_list.append(data)
task_success_count +=1
task_surplus_count -=1
print_progress(task_surplus_count, tasks_all_count, start, task_success_count)
def gen_task(base_domain):
global task_list
global tasks_all_count
global task_surplus_count
with open("subdomains.txt", "r") as f:
for line in f:
domain = "{0}.{1}".format(line.strip(), base_domain)
task = loop.create_task(query(domain, 'A'))
task.add_done_callback(update_data)
task_list.append(task)
tasks_all_count +=1
task_surplus_count +=1
base_domain = "baidu.com"
gen_task(base_domain)
print("all tasks: {0}".format(tasks_all_count))
wait_tasks = asyncio.wait(task_list)
loop.run_until_complete(wait_tasks)
loop.close()
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
已为社区贡献2条内容
所有评论(0)