import os import re import requests import random import time import json from datetime import datetime from mysql_helper import MySQLHelper from bs4 import BeautifulSoup from tqdm import tqdm, trange KENNEY_ASSET_URL = "https://www.kenney.nl/assets/" def get_headers(): """生成随机 UA 的请求头""" # 一些常见的桌面浏览器 UA 列表(可以自己扩充) USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/126.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/605.1.15 (KHTML, like Gecko) " "Version/17.3 Safari/605.1.15", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) " "Gecko/20100101 Firefox/128.0" ] return { "User-Agent": random.choice(USER_AGENTS), "Accept": ( "text/html,application/xhtml+xml,application/xml;" "q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8" ), "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Referer": "https://www.google.com/", "Upgrade-Insecure-Requests": "1" } class SmartCrawler: def __init__(self, delay_range=(1, 3)): self.session = requests.Session() self.delay_range = delay_range self.headers = get_headers() def get(self, url): time.sleep(random.uniform(*self.delay_range)) return self.session.get(url, headers=self.headers) def parse_date(date_str): """将 dd/mm/yyyy 转为 yyyy-mm-dd 格式""" return datetime.strptime(date_str, "%d/%m/%Y").strftime("%Y-%m-%d") def get_total_pages() -> int: """Get total asset pages""" crawler = SmartCrawler() resp = crawler.get(KENNEY_ASSET_URL) soup = BeautifulSoup(resp.text, "lxml") result = 0 last_page_button_selector = "#content > section > div > div.row.text-center.margin-top > div > ul > li:last-child > a" last_page_button = soup.select_one(last_page_button_selector) # Should obtain element like this: # ... # obtain the href attribute, and extract the number after "page:" if last_page_button and "href" in last_page_button.attrs: page_link = last_page_button["href"] splitted_page_link = page_link.split(':') if splitted_page_link[-1].isdigit(): result = int(splitted_page_link[-1]) else: raise ValueError(f"Expected int in page number, got {splitted_page_link[-1]}") else: raise Exception("Last page button or its href attribute not found") return result def get_all_asset_infos(total_pages: int) -> list[dict[str, str]]: result = [] for page in trange(1, total_pages + 1, desc="Fetching all assets' page links"): asset_page_url = KENNEY_ASSET_URL + f"page:{page}?search=&sort=release" crawler = SmartCrawler(delay_range=(1, 3)) resp = crawler.get(asset_page_url) soup = BeautifulSoup(resp.text, "lxml") contents_selector = "#content > section > div > div:nth-of-type(1)" contents_div = soup.select_one(contents_selector) if contents_div: item_divs = contents_div.find_all("div", recursive=False) for item_div in item_divs: # We may get these info from the grid asset_info = { "name": "", "category": "", "series": "", "page_link": "" } h2_tag = item_div.find("h2") asset_info["name"] = h2_tag.text if h2_tag else None a_tags = item_div.find_all("a") asset_info["category"] = a_tags[2].text if len(a_tags) > 2 else None asset_info["series"] = a_tags[3].text if len(a_tags) > 3 else None asset_info["page_link"] = a_tags[1]["href"] if "href" in a_tags[0].attrs else None result.append(asset_info) return result def get_asset_pack_info(asset: dict[str, ]) -> None: crawler = SmartCrawler() resp = crawler.get(asset["page_link"]) soup = BeautifulSoup(resp.text, "lxml") properties = {} prop_table = soup.select_one("#content > section > div > div > div.col-md-6.text-left > table:nth-of-type(1) > tbody") if prop_table: for row in prop_table.find_all("tr"): cols = row.find_all('td') if len(cols) == 2: key = cols[0].text.strip().rstrip(':') value_links = cols[1].find_all('a') if value_links: value = [a.text.strip() for a in value_links] else: value = cols[1].text.strip() properties[key] = value asset["tags"] = properties.get("Tags", []) zip_link = None for a_tag in soup.find_all("a", href=True): href = a_tag["href"] if href.endswith(".zip"): zip_link = "https://www.kenney.nl" + \ href if href.startswith("/") else href break changelog = [] update_table = soup.select_one( '#content > section > div > div > div:nth-of-type(1) > table:nth-of-type(2) > tbody') if update_table: for idx, row in enumerate(update_table.find_all('tr')): cols = row.find_all('td') if len(cols) == 2: date = cols[0].text.strip() spans = cols[1].find_all('span') version = spans[0].text.strip() if len(spans) >= 1 else '' description = spans[1].text.strip() if len(spans) >= 2 else '' version_info = { 'date': parse_date(date), 'version': version, 'description': description, 'files': 0, 'feat_animation': False, 'feat_variation': False, 'orig_file_link': None } if idx == 0: # Latest version version_info["files"] = int(''.join(ch for ch in properties["Files"] if ch.isdigit())) if "Files" in properties else 0 version_info["feat_animation"] = "Animation" in properties["Features"] version_info["feat_variation"] = "Variation" in properties["Features"] version_info["orig_file_link"] = zip_link if zip_link else None changelog.append(version_info) changelog.reverse() asset["changelog"] = changelog asset["released_at"] = changelog[-1]["date"] asset["updated_at"] = changelog[0]["date"] images = [] # 封面图(Cover) cover_img = soup.select_one( '#content > section > div > div > div:nth-of-type(2) > a > img') if cover_img and cover_img.get("src"): cover_url = cover_img["src"] if cover_url.startswith("/"): cover_url = "https://www.kenney.nl" + cover_url images.append(cover_url) # 图集中的图像 gallery_divs = soup.select( '#content > section > div > div > div:nth-of-type(2) > div > div') for div in gallery_divs: img_tag = div.select_one("a > img") if img_tag and img_tag.get("src"): img_url = img_tag["src"] if img_url.startswith("/"): img_url = "https://www.kenney.nl" + img_url images.append(img_url) asset['images'] = images def sync_table( db, table_name: str, column_name: str, items: set[str], ) -> dict[str, int]: """同步唯一字段数据到指定表,并返回 name -> id 的映射""" # 从数据库读取已存在的记录 saved_records = db.fetch_all(f"SELECT * FROM {table_name}") saved_names = {r[column_name] for r in saved_records} # 找出缺失项 missing_items = sorted(items - saved_names) # 插入缺失项(假设 id 是自增,不需要手动计算) if missing_items: insert_sql = f"INSERT INTO {table_name} ({column_name}) VALUES (%s)" with db.get_conn() as conn: cursor = conn.cursor() cursor.executemany(insert_sql, [(name,) for name in missing_items]) conn.commit() cursor.close() # 重新获取完整映射(保证 ID 正确) final_records = db.fetch_all(f"SELECT * FROM {table_name}") return {r[column_name]: r["id"] for r in final_records} def build_id_map(db, table, name_field, values): return sync_table(db, table, name_field, {v for v in values if v}) def build_insert_sql(table: str, columns: list[str]): cols_str = ', '.join(columns) placeholders = ', '.join(['%s'] * len(columns)) return f"INSERT INTO {table} ({cols_str}) VALUES ({placeholders})" allowed_path_pattern = re.compile(r'[^a-zA-Z0-9._-]') def sanitize_path(path: str): if not path: return None return '_'.join([allowed_path_pattern.sub('', word.lower()) for word in path.split()]) def main() -> None: # total_pages = get_total_pages() # all_asset_infos = get_all_asset_infos(total_pages) # # Let the oldest become first in the array to make sure it can be inserted into database first # all_asset_infos.reverse() # for asset in tqdm(all_asset_infos, "Fetching asset pack info"): # get_asset_pack_info(asset) all_asset_infos: list[dict[str, ]] = json.load(open("all_asset_infos_detailed.json")) # for asset in all_asset_infos: # asset["name"] = None if asset["name"] == "" else asset["name"] # asset["category"] = None if asset["category"] == "" else asset["category"] # asset["series"] = None if asset["series"] == "" else asset["series"] # asset["page_link"] = None if asset["page_link"] == "" else asset["page_link"] # # for log in asset["changelog"]: # # log['files'] = 0 if "files" not in log else log["files"] # # log['feat_animation'] = False if "feat_animation" not in log else log["feat_animation"] # # log['feat_variation'] = False if "feat_variation" not in log else log["feat_variation"] # # log['orig_file_link'] = asset["download"] if log["files"] != 0 else None # # asset.pop('download') # json.dump(all_asset_infos, open("all_asset_infos_detailed_fix.json", "w")) # exit() # Download file and save to database output_dir = "media" for asset in tqdm(all_asset_infos, "Downloading assets and images"): asset_name = sanitize_path(asset["name"]) asset_category = sanitize_path(asset["category"]) asset_version = asset["changelog"][-1]["version"] asset_dir_path = os.path.join(output_dir, asset_category) asset_dir_path = os.path.join(asset_dir_path, asset_name) asset["base_asset_path"] = asset_dir_path os.makedirs(asset_dir_path, exist_ok=True) filename = f"{asset_name} V{asset_version}.zip" filepath = os.path.join(asset_dir_path, filename) if os.path.exists(filepath): print(f"✅ {filename} exists, skipping...") else: try: download_url = asset["changelog"][-1]["orig_file_link"] with requests.get(download_url, headers=get_headers()) as resp: resp.raise_for_status() with open(filepath, "wb") as f: for chunk in resp.iter_content(chunk_size=8192): if chunk: f.write(chunk) print(f"✅ {filename} download completed.") except Exception as e: print(f"❌ Download failed: {filename} - {e}") time.sleep(random.uniform(1.5, 4.0)) # Download asset file exit() # Initialize database db = MySQLHelper( "10.147.20.103", "kenney-assets", "9a77caa2a5c705db7e8a93c6a3fbc46a", "kenney_assets" ) # Build mapping tables category_id_map = build_id_map(db, "category", "name", (a["category"] for a in all_asset_infos)) series_id_map = build_id_map(db, "series", "name", (a["series"] for a in all_asset_infos)) tags_id_map = build_id_map(db, "tag", "name", (tag for a in all_asset_infos for tag in (a.get("tags") or []))) asset_pack_sql = build_insert_sql("asset_pack", ["name", "category_id", "series_id", "released_at", "updated_at", "base_asset_path", "orig_page_link"]) asset_pack_values: list[tuple] = [] # 给每个 asset 添加 category_id 和 series_id for asset in all_asset_infos: asset["category_id"] = category_id_map.get(asset.get("category")) asset["series_id"] = series_id_map.get(asset.get("series")) asset_pack_values.append(( asset["name"], asset["category_id"], asset["series_id"], asset["released_at"], asset["updated_at"], asset["base_asset_path"], asset["page_link"] )) db.bulk_insert(asset_pack_sql, asset_pack_values) # Get asset_pack id map asset_pack_records = db.fetch_all(f"SELECT id, name FROM asset_pack") asset_pack_id_map = {r["name"]: r["id"] for r in asset_pack_records} asset_pack_tag_sql = build_insert_sql("asset_pack_tag", ["asset_pack_id", "tag_id"]) asset_pack_tag_values: list[tuple] = [] update_log_sql = build_insert_sql("update_log", ["asset_pack_id", "released_date", "version", "description", "files_count", "feat_animations", "feat_variations", "orig_download_link"]) update_log_values: list[tuple] = [] asset_pack_image_sql = build_insert_sql("asset_pack_image", ["asset_pack_id", "orig_file_link"]) asset_pack_image_values: list[tuple] = [] for asset in tqdm(all_asset_infos, "Preparing data to database"): asset_pack_id = asset_pack_id_map.get(asset.get("name")) tags = asset.get("tags") for tag in tags: asset_pack_tag_values.append(( asset_pack_id, tags_id_map[tag] )) changelog = asset.get("changelog") for log in changelog: update_log_values.append(( asset_pack_id, log["date"], log["version"], log["description"], log["files"], log["feat_animation"], log["feat_variation"], log["orig_file_link"] )) images = asset.get("images") for image in images: asset_pack_image_values.append(( asset_pack_id, image )) with db.get_conn() as conn: cursor = conn.cursor() cursor.executemany(asset_pack_tag_sql, asset_pack_tag_values) cursor.executemany(update_log_sql, update_log_values) cursor.executemany(asset_pack_image_sql, asset_pack_image_values) conn.commit() cursor.close() if __name__ == "__main__": main()