简介
在这篇文章中,我尝试逐行解释如何在 Python 中实现大部分 DNS 协议,以创建我自己可以信任的本地 DNS 服务器(是的,我就是那个偏执狂)。这只是我对这个项目的笔记,不要期待任何东西。
进口
import socket import struct import random import threading import time from collections import OrderedDict import urllib.request import os import hashlib
-
socket
:处理网络,例如发送和接收 DNS 查询。 -
struct
:帮助打包/解包二进制数据以构建/解析 DNS 数据包。 -
random
:为 DNS 查询生成随机事务 ID。 -
threading
:管理多个操作(例如,阻止列表更新)而不阻塞主服务器。 -
time
:处理基于时间的操作(例如,缓存过期)。 -
OrderedDict
:维护 DNS 缓存中的顺序,同时驱逐旧条目。 -
urllib.request
:从互联网下载阻止列表文件。 -
os
:处理文件和目录操作(例如,块列表缓存)。 -
hashlib
:为缓存文件名生成唯一的哈希值。
配置常量
DNS_SERVER = '9.9.9.9' BLOCKLIST_URLS = [ 'https://raw.githubusercontent.com/StevenBlack/hosts/master/hosts', 'https://raw.githubusercontent.com/hagezi/dns-blocklists/main/hosts/pro-compressed.txt', ] BLOCKLIST_CACHE_DIR = 'blocklist_cache' BLOCKLIST_CACHE_TTL = 24 * 60 * 60 # 24 hours
-
DNS_SERVER
:默认上游 DNS 服务器,用于解析非阻塞查询。 -
BLOCKLIST_URLS
:包含域阻止列表的 URL 列表。 -
BLOCKLIST_CACHE_DIR
:缓存下载的阻止列表的目录。 -
BLOCKLIST_CACHE_TTL
:块列表缓存的生存时间 (TTL),设置为 24 小时。
块BlocklistCache
类
此类管理阻止列表及其缓存。
初始化
class BlocklistCache: def __init__(self): self.blocked_domains = set() self.last_update = 0 self.lock = threading.Lock() self.cache_dir = BLOCKLIST_CACHE_DIR os.makedirs(self.cache_dir, exist_ok=True)
- 初始化:
-
blocked_domains
:被阻止的域集。 -
last_update
:上次更新的时间戳。 -
lock
:防止更新期间的竞争条件。 -
cache_dir
:缓存阻止列表的目录(如果丢失则创建)。
-
辅助函数
def _get_cache_path(self, url): url_hash = hashlib.md5(url.encode()).hexdigest() return os.path.join(self.cache_dir, f"blocklist_{url_hash}.txt")
- 使用 MD5 哈希值为阻止列表 URL 生成唯一的缓存文件路径。
def _is_cache_valid(self, cache_path): if not os.path.exists(cache_path): return False cache_age = time.time() - os.path.getmtime(cache_path) return cache_age < BLOCKLIST_CACHE_TTL
- 检查缓存文件是否存在并且在其 TTL 内。
下载并解析阻止列表
def _download_and_cache_blocklist(self, url): cache_path = self._get_cache_path(url) try: if self._is_cache_valid(cache_path): with open(cache_path, 'r') as f: return f.readlines() response = urllib.request.urlopen(url) content = response.read().decode('utf-8').splitlines() with open(cache_path, 'w') as f: f.write(' '.join(content)) return content except Exception as e: print(f"Error downloading blocklist {url}: {e}") if os.path.exists(cache_path): with open(cache_path, 'r') as f: return f.readlines() return []
- 下载并缓存阻止列表数据:
- 如果缓存有效,则从缓存中读取。
- 否则,它会下载阻止列表并将其写入缓存。
def update_blocklists(self): with self.lock: new_blocked_domains = set() for url in BLOCKLIST_URLS: content = self._download_and_cache_blocklist(url) for line in content: line = line.strip() if line and not line.startswith('#'): parts = line.split() if len(parts) >= 2 and parts[0] in {'0.0.0.0', '127.0.0.1'}: new_blocked_domains.add(parts[1].lower()) self.blocked_domains = new_blocked_domains self.last_update = time.time()
- 从所有来源下载阻止列表并更新
blocked_domains
集。
def is_blocked(self, domain): current_time = time.time() if current_time - self.last_update > BLOCKLIST_CACHE_TTL: threading.Thread(target=self.update_blocklists).start() return domain.lower() in self.blocked_domains
- 检查域是否被阻止。如果阻止列表已过时,则会异步更新。
DNSCache
类
管理 DNS 响应的内存缓存。
初始化
class DNSCache: def __init__(self, max_size=200 * 1024 * 1024): self.cache = OrderedDict() self.lock = threading.Lock() self.current_size = 0 self.max_size = max_size
- 初始化:
-
cache
:用于存储 DNS 响应的有序字典。 -
max_size
:允许的最大缓存大小 (200 MB)。
-
缓存管理
def set(self, key, value, ttl): with self.lock: expiration = time.time() + ttl entry_size = self._calculate_entry_size(key, value) while self.current_size + entry_size > self.max_size: self._evict_oldest() self.cache[key] = (value, expiration) self.cache.move_to_end(key) self.current_size += entry_size
- 添加对缓存的响应,如果需要则驱逐最旧的条目。
def get(self, key): with self.lock: now = time.time() if key in self.cache: value, expiration = self.cache[key] if expiration > now: self.cache.move_to_end(key) return value del self.cache[key] self.current_size -= self._calculate_entry_size(key, value) return None
- 如果响应尚未过期,则检索响应。
DNS查询/响应功能
这些函数构造、解析和转发 DNS 数据包:
-
build_query(domain, query_type)
:构建 DNS 查询数据包。 -
parse_query(data)
:解析传入的查询。 -
parse_response(data)
:解析 DNS 响应。 -
query_upstream(domain, query_type)
:将查询转发到上游服务器。
build_query(domain, query_type)
目的:
该函数构造一个 DNS 查询数据包。 DNS 查询被发送到服务器以将域名解析为 IP 地址(或其他记录类型)。
参数:
-
domain
:要查询的域名(例如example.com
)。 -
query_type
:DNS 查询的类型(例如,1
表示 A 记录,28
表示 AAAA 记录)。
过程:
-
交易编号:
- 使用
random.randint(0, 65535)
生成随机 16 位整数。 - 这有助于将响应与请求相匹配。
- 使用
-
标志:
- 使用固定值
0x0100
,表示:- 标准查询。
- 需要递归(如果需要,请询问上游服务器)。
- 使用固定值
-
标题结构:
- 使用
struct.pack(">HHHHHH")
打包六个 16 位字段:- 交易ID。
- 旗帜。
- QDCOUNT:1(查询中的一个问题)。
- ANCOUNT、NSCOUNT、ARCOUNT:0(尚无答案、权威或其他记录)。
- 使用
-
问题部分:
- 该域被分为标签(例如,
example.com
→example
和com
)。 - 每个标签都以其长度为前缀并以字节为单位进行编码。
- 添加空字节 (
�
以表示域名的结尾。
- 该域被分为标签(例如,
-
查询类型和类别:
- 附加两个字段:
- 查询类型(例如,
1
表示 A 记录)。 - 查询类(
1
表示 Internet)。
- 查询类型(例如,
- 附加两个字段:
-
返回:
- 将标头和问题部分组合成完整的 DNS 查询数据包。
- 返回数据包和交易ID以供参考。
parse_query(data)
目的:
解析收到的DNS查询报文,提取域名和查询类型。
参数:
-
data
:包含 DNS 查询数据包的二进制字符串。
过程:
-
跳过标题:
- 前 12 个字节是 DNS 标头。此后域开始。
-
域名解析:
- 遍历域名标签:
- 读取长度字节。
- 提取相应的字节数作为标签。
- 当遇到零长度标签 (
0
) 时停止,指示域的结尾。
- 遍历域名标签:
-
提取查询类型和类:
- 读取域名后接下来的 4 个字节:
- 查询类型(2 个字节)。
- 查询类(2 个字节)。
- 读取域名后接下来的 4 个字节:
-
返回:
- 返回域名(例如
example.com
)和查询类型(例如 A 记录为1
)。 - 如果解析失败,它会打印错误并返回
None, None
。
- 返回域名(例如
parse_response(data)
目的:
解析 DNS 响应数据包以提取详细信息,例如答案、权限和附加记录。
参数:
-
data
:包含 DNS 响应数据包的二进制字符串。
过程:
-
辅助功能:
-
parse_name(data, offset)
:- 使用标签和指针解析数据包中的域名。
-
parse_soa(data, offset)
:- 提取 SOA(授权开始)记录详细信息。
-
-
标头解析:
- 提取计数:
- 问题(
QDCOUNT
)。 - 答案(
ANCOUNT
)。 - 权限记录 (
NSCOUNT
)。 - 附加记录 (
ARCOUNT
)。
- 问题(
- 提取计数:
-
问题部分:
- 跳过问题部分(域名、查询类型和类别)。
-
资源记录:
- 使用辅助函数解析答案、权威和其他部分:
- A 记录:IPv4 地址。
- AAAA 记录:IPv6 地址。
- CNAME :规范名称。
- SOA :权威服务器详细信息。
- MX :邮件交换服务器。
- TXT :文本记录。
- NS :名称服务器。
- 使用辅助函数解析答案、权威和其他部分:
-
返回:
- 返回包含解析部分(
answers
、authority
、additional
)的字典。
- 返回包含解析部分(
query_upstream(domain, query_type)
目的:
将 DNS 查询转发到上游 DNS 服务器进行解析。
参数:
-
domain
:要解析的域名。 -
query_type
:DNS 查询的类型(例如,A、AAAA)。
过程:
-
构建查询:
- 调用
build_query
构造指定域和类型的DNS查询报文。
- 调用
-
创建套接字:
- 使用
socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
创建 UDP 套接字。 - 设置 5 秒超时以避免无限期挂起。
- 使用
-
发送查询:
- 将查询数据包发送到上游 DNS 服务器(端口
53
默认为9.9.9.9
)。
- 将查询数据包发送到上游 DNS 服务器(端口
-
收到回复:
- 等待来自服务器的响应数据包(最多 512 字节)。
-
关闭套接字:
- 关闭套接字以释放资源。
-
返回:
- 返回原始响应数据包以供进一步处理。
服务器逻辑
def handle_client(server_socket): try: data, client_addr = server_socket.recvfrom(512) domain, query_type = parse_query(data) if blocklist_cache.is_blocked(domain): response = get_blocked_response(client_transaction_id) else: cached_response = dns_cache.get(f"{domain}:{query_type}") response = cached_response or query_upstream(domain, query_type) server_socket.sendto(response, client_addr) except Exception as e: print(f"Error handling client: {e}")
- 处理传入的查询:
- 阻止阻止列表中的域。
- 从缓存或上游 DNS 检索响应。
def start_server(): print("Initializing blocklist cache...") blocklist_cache.update_blocklists() server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) server_socket.bind(('0.0.0.0', 853)) print("DNS server running on port 853...") while True: handle_client(server_socket)
- 在端口 853 上启动 DNS 服务器并初始化阻止列表缓存。
在此处阅读有关上游 9.9.9.9 的更多信息。
在此处下载完整的程序。