问了 deepseek、豆包、gemini、chatgpt,回答不正确
我一开始问他们是怎么在 uvicorn+fastapi 生态下实现,但是 request.scope.get('ssl')
为空
import uvicorn
from loggers import logger
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from middleware.loggers import RequestLogMiddleware
from middleware.correlations import CorrelationIdMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=['*'],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(BaseHTTPMiddleware, dispatch=RequestLogMiddleware())
app.add_middleware(BaseHTTPMiddleware, dispatch=CorrelationIdMiddleware())
@app.get('/')
@logger.catch
async def root(request: Request):
ssl_info = request.scope.get('ssl')
if ssl_info:
# 尝试获取 TLS 指纹,不同的 Python 版本和环境可能有所不同
tls_fingerprint = ssl_info.get('peer_cert_fingerprint')
if tls_fingerprint:
logger.info(f"Client TLS fingerprint: {tls_fingerprint}")
else:
logger.info("Could not get client TLS fingerprint.")
else:
logger.info("No SSL information available.")
response_body = {
"ip": request.client.host
}
logger.debug(response_body)
return response_body
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=8086,
workers=1,
ssl_keyfile="/Users/ponponon/Downloads/xxxx.cn_nginx/xxxx.cn.key",
ssl_certfile="/Users/ponponon/Downloads/xxxx.cn_nginx/xxxx.cn.pem"
)
后来又让我直接 socket 写 server,但是都读取不到客户端请求中的 TLS 指纹
import ssl
import socket
from loguru import logger
from pyja3 import extract_ja3_from_client_hello
from threading import Thread
CERT_FILE = "/home/pon/code/me/ssl/xxxx.cn_nginx/xxxx.cn.pem"
KEY_FILE = "/home/pon/code/me/ssl/xxxx.cn_nginx/xxxx.cn.key"
def handle_client(client_socket, addr):
try:
raw_data = client_socket.recv(4096, socket.MSG_PEEK)
ja3_str, ja3_hash = extract_ja3_from_client_hello(raw_data)
logger.info(f"[{addr}] JA3: {ja3_str}, MD5: {ja3_hash}")
except Exception as e:
logger.warning(f"[{addr}] Failed to get JA3: {e}")
finally:
client_socket.close()
def main():
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(certfile=CERT_FILE, keyfile=KEY_FILE)
bindsocket = socket.socket()
bindsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
bindsocket.bind(("0.0.0.0", 8086))
bindsocket.listen(5)
logger.info("Server listening on 0.0.0.0:8086")
while True:
client_socket, fromaddr = bindsocket.accept()
Thread(target=handle_client, args=(client_socket, fromaddr)).start()
if __name__ == "__main__":
main()
import socket
import ssl
import threading
from loguru import logger
CERT_FILE = "/path/to/cert.pem"
KEY_FILE = "/path/to/key.pem"
def get_client_hello(sock):
class TLSLogger:
def __init__(self):
self.client_hello = None
def callback(self, conn, direction, version, content_type, msg_type, data):
# ClientHello是握手类型1
if direction == 0 and content_type == 22 and msg_type == 1:
self.client_hello = data
logger.info(f"捕获到ClientHello: {len(data)}字节")
logger_obj = TLSLogger()
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(certfile=CERT_FILE, keyfile=KEY_FILE)
# 设置消息回调
context.set_msg_callback(logger_obj.callback)
ssl_sock = context.wrap_socket(sock, server_side=True, do_handshake_on_connect=False)
try:
ssl_sock.do_handshake()
except:
pass
return logger_obj.client_hello
def handle_client(client_sock, addr):
try:
client_hello = get_client_hello(client_sock)
if client_hello:
logger.info(f"客户端 {addr} 的TLS握手数据: {client_hello[:20]}...")
except Exception as e:
logger.error(f"处理客户端 {addr} 出错: {e}")
finally:
client_sock.close()
def main():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8443))
server.listen(5)
logger.info("服务器启动在 0.0.0.0:8443")
try:
while True:
client, addr = server.accept()
logger.info(f"接受连接: {addr}")
client_thread = threading.Thread(
target=handle_client, args=(client, addr))
client_thread.start()
except KeyboardInterrupt:
logger.info("服务器关闭")
finally:
server.close()
if __name__ == "__main__":
main()
最好是用Nginx和Lua脚本