Python socket编程核心模式socket是网络通信的基础抽象。Python的socket模块直接封装了伯克利套接字API。创建TCP服务器import socketserver socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server.bind((localhost, 8888))server.listen(5)def handle_client(conn):data conn.recv(1024)conn.send(data) # echoconn.close()while True:conn, addr server.accept()handle_client(conn)TCP服务器创建过程socket - bind - listen - accept循环。创建TCP客户端client socket.socket(socket.AF_INET, socket.SOCK_STREAM)client.connect((localhost, 8888))client.send(bHello)response client.recv(1024)client.close()TCP客户端socket - connect - send/recv - close。UDP服务器server socket.socket(socket.AF_INET, socket.SOCK_DGRAM)server.bind((localhost, 9999))data, addr server.recvfrom(1024)server.sendto(bReply, addr)UDP无连接。recvfrom返回数据和地址sendto发送到指定地址。非阻塞IOserver.setblocking(False)server.settimeout(5.0) # 或设置超时try:conn, addr server.accept()except socket.timeout:print(No connection within 5 seconds)setblocking(False)使所有操作立即返回可能抛出BlockingIOError。settimeout设置超时值。selectors模块的多路复用import selectorsimport socketsel selectors.DefaultSelector()server socket.socket()server.bind((localhost, 8888))server.listen(100)server.setblocking(False)sel.register(server, selectors.EVENT_READ, dataaccept)while True:events sel.select(timeoutNone)for key, mask in events:if key.data accept:conn, addr key.fileobj.accept()conn.setblocking(False)sel.register(conn, selectors.EVENT_READ, dataecho)else:data key.fileobj.recv(1024)if data:key.fileobj.send(data)else:sel.unregister(key.fileobj)key.fileobj.close()selectors模块封装了select/poll/epoll的差异。使用统一的接口处理IO事件。sendall保证完整发送data bx * 100000sent client.send(data) # 可能只发送部分数据client.sendall(data) # 直到全部发送完成sendall循环调用send直到数据全部发送或出错。套接字选项import sockets socket.socket()s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 65536)s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)SO_REUSEADDR允许地址重用。TCP_NODELAY禁用Nagle算法。SO_KEEPALIVE启用心跳。getsockopt查看当前选项值bufsize s.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)print(fSend buffer: {bufsize})套接字超时异常处理import sockets socket.socket()s.settimeout(2.0)try:s.connect((192.168.1.1, 80))except socket.timeout:print(Connection timed out)except ConnectionRefusedError:print(Connection refused)except OSError as e:print(fSocket error: {e})finally:s.close()socket.gethostbyname域名解析ip socket.gethostbyname(example.com)print(fIP: {ip})hostname, aliases, addresses socket.gethostbyaddr(ip)print(fHostname: {hostname})getaddrinfo返回地址信息列表info socket.getaddrinfo(example.com, 80, socket.AF_INET, socket.SOCK_STREAM)for family, type, proto, canonname, sockaddr in info:print(fFamily: {family}, Type: {type}, Addr: {sockaddr})创建原始套接字需要root权限try:raw socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_TCP)raw.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)except PermissionError:print(Raw sockets require root/admin)