来自远方 - python https://runyf.cn/category/python/ zh-CN Thu, 25 Jul 2024 18:16:00 +0800 Thu, 25 Jul 2024 18:16:00 +0800 使用python创建多线程下载超多数量文件 https://runyf.cn/archives/459/ https://runyf.cn/archives/459/ Thu, 25 Jul 2024 18:16:00 +0800 远方 import requests import urllib.parse from tqdm import tqdm import os import concurrent.futures def check_local_file(url, destination,count,total_count): # 如果本地文件存在 if os.path.exists(destination): response = requests.head(url) # 只获取头部信息,获取文件大小等 remote_file_size = int(response.headers.get('content-length', 0)) local_file_size = os.path.getsize(destination) # 获取本地文件大小 # 如果本地文件大小与远程文件大小一致 if local_file_size == remote_file_size: print("no."+str(count)+"/"+str(total_count)+" "+destination+"文件已存在且特征一致,无需下载") return True return False def download_file(url, destination,count,total_count): if not check_local_file(url, destination,count,total_count): response = requests.get(url, stream=True) total_size = int(response.headers.get('content-length', 0)) block_size = 1024 # 每次读取的字节数 with open("down.txt", "a") as file: file.write(url + '\n') with open(destination, 'wb') as file, tqdm( desc="Downloading no."+str(count)+"/"+str(total_count), total=total_size, unit='iB', unit_scale=True, unit_divisor=1024 ) as bar: for data in response.iter_content(block_size): file.write(data) bar.update(len(data)) # 假设链接存储在一个文本文件 'links.txt' 中 with open('full-links.txt', 'r') as file: links = file.readlines() # 遍历链接下载文件 links_array = [] urls = [] for link in links: link = link.strip() # 去除换行符和空格 urls.append(link) num_threads = 32 # 根据您的计算机性能调整此值 total_len = len(urls) with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: for index, url in enumerate(urls): file_name = urllib.parse.urlparse(url).path.split('/')[-1] # 从链接中获取文件名 file_path = "./opennueroQTAB/"+file_name executor.submit(download_file, url, file_path,index,total_len) ]]> 0 https://runyf.cn/archives/459/#comments https://runyf.cn/feed/category/python/ 使用爬虫批量把html中的base64图片保存到本地 https://runyf.cn/archives/356/ https://runyf.cn/archives/356/ Fri, 06 Oct 2023 19:45:15 +0800 远方 import re import base64 import os # 读取本地文件,也可以改成在线获取 with open("base64.html", "r", encoding='utf-8') as f: data = f.read() #读取文本 html = data # 用正则表达式查找页面中的base64格式图片 base64_imgs = re.findall('data:image/(?:jpeg|png|jpg);base64,(.*?)"', html) if not os.path.exists('images'): os.mkdir('images') # 遍历图片数据,解码保存到本地 for i, img in enumerate(base64_imgs): img_data = base64.b64decode(img) filename = 'images/' + str(i) + '.jpg' with open(filename, 'wb') as f: f.write(img_data) print('Images saved.') ]]> 0 https://runyf.cn/archives/356/#comments https://runyf.cn/feed/category/python/ 爬虫案例之js逆向有道翻译 https://runyf.cn/archives/355/ https://runyf.cn/archives/355/ Mon, 02 Oct 2023 08:55:00 +0800 远方 sign.js
var cry = require("crypto")


d = "fanyideskweb"
u = "webfanyi"

var e = (new Date).getTime()
// var e = 1695965834800

function A(e) {
    // return r.a.createHash("md5").update(e.toString()).digest("hex")
    return cry.MD5(e).toString()
}
function w(e, t) {
    console.log(`client=${d}&mysticTime=${e}&product=${u}&key=${t}`)
    return A(`client=${d}&mysticTime=${e}&product=${u}&key=${t}`)
}

function sign(){
    var t = "fsdsogkndfokasodnaso"
    return [w(e,t),e]
}

console.log(sign())

main.py

import requests
import execjs


# 执行js函数
with open("sign.js", "r") as f:
    JSCode = f.read()
docjs = execjs.compile(JSCode)
sign,time = docjs.call('sign')



text = input("请输入要翻译的内容: ")
url = "https://dict.youdao.com/webtranslate"
myData = {
    "i":text,
    "from":"auto",
    "to":"",
    "dictResult":"true",
    "keyid":"webfanyi",
    "sign": sign,
    "client":"fanyideskweb",
    "product":"webfanyi",
    "appVersion":"1.0.0",
    "vendor":"web",
    "pointParam":"client,mysticTime,product",
    "mysticTime":time,
    "keyfrom":"fanyi.web",
    "mid":"1",
    "screen":"1",
    "model":"1",
    "network":"wifi",
    "abtest":"0",
    "yduuid":"abcdefg"
}

myHeaders = {
    "User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}
res = requests.post(url,headers = myHeaders,data = myData)
print(res.text)
]]>
1 https://runyf.cn/archives/355/#comments https://runyf.cn/feed/category/python/
爬虫案例之爬取美剧 https://runyf.cn/archives/354/ https://runyf.cn/archives/354/ Mon, 02 Oct 2023 08:51:00 +0800 远方 import requests from concurrent.futures import ThreadPoolExecutor myHeasers = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36", } #获取全部链接列表 def getUrlList(): res = requests.get("https://pptv.1080tg.com/202306/09/WyBkvykgLm3/video/900k_0X480_64k_25/hls/index.m3u8") tsList = res.text.split("\n") urlList = []; for i in tsList: if i.startswith("#"): continue urlList.append(i) return urlList #下载内容 def download_file(url,urls,i): if isinstance(url, str): print("下载中"+url) res = requests.get(url,headers=myHeasers) urls[i] = res.content #检验 def checkCotent(urls): i = 0 for url in urls: if isinstance(url, bytes): pass else: i += 1 return i def downStart(urls): # 线程池,程序入口 max_workers=线程个数配置 with ThreadPoolExecutor(max_workers=20) as executor: #print(urls) # 提交任务 i = 0 for url in urls: executor.submit(download_file, url, urls, i) i += 1 # 等待完成 executor.shutdown(wait=True) count = checkCotent(urls) print(count) #递归函数 if count > 0: print("有"+str(count)+"个文件没有下载成功,重新开启线程下载中") downStart(urls) else: print("文件生成中") for line in urls: with open("movies.mp4","ab+") as f: f.write(line) urls = getUrlList() downStart(urls) ]]> 0 https://runyf.cn/archives/354/#comments https://runyf.cn/feed/category/python/ python使用logging记录日志 https://runyf.cn/archives/320/ https://runyf.cn/archives/320/ Tue, 25 Apr 2023 18:06:00 +0800 远方 import logging import os import time LOG_DIR = './logs' if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR) timestamp = time.strftime("%Y%m%d") log_file = "{}/{}.log".format(LOG_DIR, timestamp) logging.basicConfig( level=logging.DEBUG, # 定义输出到文件的log级别,大于此级别的都被输出 format='%(asctime)s %(filename)s : %(levelname)s %(message)s', # 定义输出log的格式 datefmt='%Y-%m-%d %A %H:%M:%S', # 时间 filename=log_file, # log文件名 filemode='a') logging.debug("debug") a = 5 b = 0 try: c = a / b except Exception as e: logging.exception(e) ]]> 0 https://runyf.cn/archives/320/#comments https://runyf.cn/feed/category/python/