Open Source

This commit is contained in:
2025-04-22 09:11:40 +08:00
commit a4bf39a958
14 changed files with 1043 additions and 0 deletions

136
scrapper/1 main.py Normal file
View File

@@ -0,0 +1,136 @@
import requests
import json
from bs4 import BeautifulSoup
from tqdm import tqdm, trange
base_url = "https://www.kenney.nl/assets/page:"
total_pages = 13
all_links = []
headers = {
"User-Agent": "Mozilla/5.0"
}
def parse_resource_page(url):
response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, "lxml")
result = {}
# 资源名称
title_tag = soup.select_one(
'#content > section > div > div > div:nth-of-type(1) > h1')
result['title'] = title_tag.text.strip() if title_tag else 'N/A'
# 属性表
properties = {}
prop_table = soup.select_one(
'#content > section > div > div > div:nth-of-type(1) > table:nth-of-type(1) > tbody')
if prop_table:
for row in prop_table.find_all('tr'):
cols = row.find_all('td')
if len(cols) == 2:
key = cols[0].text.strip().rstrip(':')
value_links = cols[1].find_all('a')
if value_links:
value = [a.text.strip() for a in value_links]
else:
value = cols[1].text.strip()
properties[key] = value
result['properties'] = properties
# 更新记录
changelog = []
update_table = soup.select_one(
'#content > section > div > div > div:nth-of-type(1) > table:nth-of-type(2) > tbody')
if update_table:
for row in update_table.find_all('tr'):
cols = row.find_all('td')
if len(cols) == 2:
date = cols[0].text.strip()
spans = cols[1].find_all('span')
version = spans[0].text.strip() if len(spans) >= 1 else ''
description = spans[1].text.strip() if len(spans) >= 2 else ''
changelog.append({
'date': date,
'version': version,
'description': description
})
result['changelog'] = changelog
# 下载链接
zip_link = None
for a_tag in soup.find_all("a", href=True):
href = a_tag["href"]
if href.endswith(".zip"):
zip_link = "https://www.kenney.nl" + \
href if href.startswith("/") else href
break
result['download'] = zip_link if zip_link else "N/A"
# ✅ 图片提取
images = []
# 封面图Cover
cover_img = soup.select_one(
'#content > section > div > div > div:nth-of-type(2) > a > img')
if cover_img and cover_img.get("src"):
cover_url = cover_img["src"]
if cover_url.startswith("/"):
cover_url = "https://www.kenney.nl" + cover_url
images.append(cover_url)
# 图集中的图像
gallery_divs = soup.select(
'#content > section > div > div > div:nth-of-type(2) > div > div')
for div in gallery_divs:
img_tag = div.select_one("a > img")
if img_tag and img_tag.get("src"):
img_url = img_tag["src"]
if img_url.startswith("/"):
img_url = "https://www.kenney.nl" + img_url
images.append(img_url)
result['images'] = images
return result
# for page in range(1, total_pages + 1):
for page in trange(1, total_pages + 1, desc="Fetching all assets' page links"):
url = base_url + str(page)
response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, "lxml")
# 定位到//*[@id="content"]/section/div/div[1]
content_div = soup.select_one(
"#content > section > div > div:nth-of-type(1)")
if content_div:
item_divs = content_div.find_all("div", recursive=False)
for item_div in item_divs:
a_tag = item_div.find("a")
if a_tag and "href" in a_tag.attrs:
link = a_tag["href"]
full_link = link
all_links.append(full_link)
print(f"总共提取到 {len(all_links)} 个链接 ✅")
with open("kenney_links.txt", "w", encoding="utf-8") as f:
for link in all_links:
f.write(link + "\n")
# 爬取页面内的信息
all_resource_data = []
# for link in all_links:
for link in tqdm(all_links, desc="Fetching all assets' data"):
resource_data = parse_resource_page(link)
all_resource_data.append(resource_data)
with open("kenney_data.json", "w", encoding="utf-8") as f:
json.dump(all_resource_data, f, ensure_ascii=False, indent=4)
print("数据爬取完成 ✅")

View File

@@ -0,0 +1,72 @@
import os
import json
import requests
import time
import random
from tqdm import tqdm
# === 配置路径 ===
json_path = "kenney_data.json" # JSON 数据路径
output_dir = "kenney_assets" # 下载根目录
# === 加载 JSON 数据 ===
with open(json_path, "r", encoding="utf-8") as f:
resources = json.load(f)
# === 工具函数 ===
def sanitize_filename(name):
return "".join(c for c in name if c.isalnum() or c in "._- ()").strip()
def download_zip(entry):
title = entry["title"]
version = entry["changelog"][0]["version"] if entry["changelog"] else "1.0"
download_url = entry.get("download")
# 提取分类、系列
category = entry["properties"].get("Category", ["Uncategorized"])[0]
series = entry["properties"].get("Series", [None])[0]
# 构建目录结构
folder_path = os.path.join(output_dir, sanitize_filename(category))
if series:
folder_path = os.path.join(folder_path, sanitize_filename(series))
os.makedirs(folder_path, exist_ok=True)
# 构建文件路径
filename = f"{sanitize_filename(title)} V{version}.zip"
filepath = os.path.join(folder_path, filename)
if os.path.exists(filepath):
print(f"✅ 已存在,跳过: {filename}")
return
try:
print(f"⬇️ 开始下载: {filename}")
with requests.get(download_url, stream=True, timeout=60) as r:
r.raise_for_status()
with open(filepath, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"✅ 下载完成: {filename}")
except Exception as e:
print(f"❌ 下载失败: {filename} - {e}")
# 模拟人类行为:随机等待
time.sleep(random.uniform(1.5, 4.0))
# === 启动批量下载 ===
idx = 0
for resource in tqdm(resources, desc="处理资源"):
if idx < 156:
idx += 1
continue
if "download" in resource and resource["download"].endswith(".zip"):
download_zip(resource)
idx += 1
print("\n✅ 所有资源处理完成")

View File

@@ -0,0 +1,61 @@
import os
import json
import requests
import time
import random
from urllib.parse import urlparse, unquote
from tqdm import tqdm
# ========== 配置 ==========
json_path = "kenney_data.json" # JSON 数据路径
output_root = "kenney_assets_images" # 存储根目录
headers = {"User-Agent": "Mozilla/5.0"}
# ========== 工具函数 ==========
def sanitize_filename(name):
return "".join(c for c in name if c.isalnum() or c in "._- ()").strip()
def download_image(url, save_path):
if os.path.exists(save_path):
print(f"✅ 已存在,跳过: {save_path}")
return
try:
response = requests.get(url, stream=True, timeout=30)
with open(save_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"✅ 下载完成: {save_path}")
except Exception as e:
print(f"❌ 下载失败: {url} - {e}")
time.sleep(random.uniform(1.5, 4.0)) # 模拟人类访问
# ========== 加载 JSON ==========
with open(json_path, "r", encoding="utf-8") as f:
resources = json.load(f)
# ========== 批量处理 ==========
for entry in tqdm(resources, desc="处理资源"):
title = entry["title"]
category = entry["properties"].get("Category", ["Uncategorized"])[0]
series = entry["properties"].get("Series", [None])[0]
images = entry.get("images", [])
# 构建路径Category/Series/Title/
path = os.path.join(output_root, sanitize_filename(category))
if series:
path = os.path.join(path, sanitize_filename(series))
path = os.path.join(path, sanitize_filename(title))
os.makedirs(path, exist_ok=True)
for img_url in images:
parsed_url = urlparse(img_url)
img_name = os.path.basename(parsed_url.path)
img_name = unquote(img_name) # 处理 URL 编码,如 %20 => 空格
img_path = os.path.join(path, img_name)
download_image(img_url, img_path)
print("\n🎉 所有图片处理完成!")

View File

@@ -0,0 +1,68 @@
import os
import json
from urllib.parse import urlparse, unquote
# === 配置路径 ===
input_json = "kenney_data.json"
output_json = "kenney_data_local.json"
zip_root = "kenney_assets"
img_root = "kenney_assets_images"
# === 工具函数 ===
def sanitize_filename(name):
return "".join(c for c in name if c.isalnum() or c in "._- ()").strip()
def build_zip_path(entry):
title = entry["title"]
version = entry["changelog"][0]["version"] if entry["changelog"] else "1.0"
category = entry["properties"].get("Category", ["Uncategorized"])[0]
series = entry["properties"].get("Series", [None])[0]
folder = os.path.join(zip_root, sanitize_filename(category))
if series:
folder = os.path.join(folder, sanitize_filename(series))
filename = f"{sanitize_filename(title)} V{version}.zip"
return os.path.join(folder, filename)
def build_image_paths(entry):
title = entry["title"]
category = entry["properties"].get("Category", ["Uncategorized"])[0]
series = entry["properties"].get("Series", [None])[0]
images = entry.get("images", [])
folder = os.path.join(img_root, sanitize_filename(category))
if series:
folder = os.path.join(folder, sanitize_filename(series))
folder = os.path.join(folder, sanitize_filename(title))
local_paths = []
for img_url in images:
parsed = urlparse(img_url)
filename = unquote(os.path.basename(parsed.path))
local_paths.append(os.path.join(folder, filename))
return local_paths
# === 主处理 ===
with open(input_json, "r", encoding="utf-8") as f:
data = json.load(f)
for entry in data:
if "download" in entry and entry["download"].endswith(".zip"):
zip_path = build_zip_path(entry)
if os.path.exists(zip_path):
entry["download"] = zip_path # 替换为本地路径
if "images" in entry and isinstance(entry["images"], list):
entry["images"] = build_image_paths(entry)
# === 保存修改后的 JSON ===
with open(output_json, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print("✅ 已更新 JSON本地路径写入完毕")