Python 基于队列实现 tcp socket 连接池

连接池实现

socket_pool.py

# -*- coding:utf-8 -*-
import socket
import time
import threading
import os
import logging
import traceback
from queue import Queue, Empty
_logger = logging.getLogger('mylogger')
class SocketPool:
 def __init__(self, host, port, min_connections=10, max_connections=10):
 '''
 初始化Socket连接池
 :param host: 目标主机地址
 :param port: 目标端口号
 :param min_connections: 最小连接数
 :param max_connections: 最大连接数
 '''
 self.host = host
 self.port = port
 self.min_connections = min_connections
 self.max_connections = max_connections
 self.busy_sockets_dict = {} # 存放从连接池取出的socket的id
 self._sock_lock = threading.Lock() # 线程锁保证计数正确
 self._pool = Queue(max_connections) # 基于线程安全的队列存储连接
 self._lock = threading.Lock() # 线程锁保证资源安全:
 self._init_pool() # 预创建连接
 self._start_health_check() # 启动连接健康检查线程
 def _init_pool(self):
 '''预创建连接并填充到池中'''
 
 for _ in range(self.min_connections):
 sock = self._create_socket()
 self._pool.put(sock)
 def _create_socket(self):
 '''创建新的Socket连接'''
 
 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 try:
 sock.connect((self.host, self.port))
 return sock
 except socket.error as e:
 raise ConnectionError(f'Failed to connect: {e}') # 连接失败抛出异常
 def _start_health_check(self):
 '''启动后台线程定期检查连接有效性'''
 
 def check():
 while True:
 with self._lock:
 for _ in range(self._pool.qsize()):
 sock = self._pool.get()
 self.busy_sockets_dict[sock] = 1
 try:
 sock.send(b'PING<END>') # 发送心跳包验证连接状态
 # 以下 11 为服务端返回数据字节长度,不能乱写,否则会导致获取非健康检查响应报文数据存在多余内容,不符合格式,从而导致数据解析问题
 sock.recv(11)
 self._pool.put(sock)
 self.busy_sockets_dict.pop(sock)
 except (socket.error, ConnectionResetError):
 _logger.error('socket连接健康检查出错:%s, 关闭失效连接并创建新连接替换' % traceback.format_exc())
 sock.close() # 关闭失效连接并创建新连接替换
 self.busy_sockets_dict.pop(sock)
 new_sock = self._create_socket()
 self._pool.put(new_sock)
 
 # 如果sock数量小于最小数量,则补充
 for _ in range(0, self.min_connections - self._pool.qsize()):
 new_sock = self._create_socket()
 self._pool.put(new_sock)
 time.sleep(60) # 每60秒检查一次
 threading.Thread(target=check, daemon=True).start()
 def get_connection(self):
 '''
 从池中获取一个可用连接
 :return: socket对象
 '''
 
 with self._sock_lock:
 if self._pool.empty():
 if len(self.busy_sockets_dict.keys()) < self.max_connections:
 new_sock = self._create_socket()
 self.busy_sockets_dict[new_sock] = 1
 return new_sock
 else:
 raise Empty('No available connections in pool')
 else:
 try:
 sock = self._pool.get(block=False)
 self.busy_sockets_dict[sock] = 1
 return sock
 except Exception:
 _logger.error('获取socket连接出错:%s' % traceback.format_exc())
 raise
 
 def release_connection(self, sock):
 '''
 将连接归还到池中
 :param sock: 待归还的socket对象
 '''
 if not sock._closed:
 self._pool.put(sock)
 if sock in self.busy_sockets_dict:
 self.busy_sockets_dict.pop(sock)
 def close_all(self):
 '''关闭池中所有连接'''
 
 while not self._pool.empty():
 sock = self._pool.get()
 sock.close()
 self.busy_sockets_dict.pop(sock.id)
 self.busy_sockets_dict = {} # 兜底
host = os.environ.get('MODBUS_TCP_SERVER_HOST', '127.0.0.1')
port = int(os.environ.get('MODBUS_TCP_SERVER_PORT', '9000'))
min_connections = int(os.environ.get('DJANGO_SOCKET_POOL_MAX_CONNECTIONS', '10'))
max_connections = int(os.environ.get('DJANGO_SOCKET_POOL_MAX_CONNECTIONS', '100'))
socketPool = SocketPool(host, port, min_connections, max_connections)

使用连接池

from socket_pool import socketPool
def send_socket_msg(data):
 global socketPool
 
 try:
 sock = None
 # 获取连接(支持超时控制)
 sock = socketPool.get_connection()
 # 发送数据
 sock.sendall(data.encode('utf-8'))
 except Exception:
 error_msg = '发送消息出错:%s' % traceback.format_exc()
 _logger.error(error_msg)
 
 if sock is not None:
 sock.close()
 socketPool.release_connection(sock)
 return send_socket_msg(data)
 
 response = ''
 try:
 while True:
 chunk = sock.recv(4096)
 chunk = chunk.decode('utf-8')
 response += chunk
 if response.endswith('<END>'):
 response = response.rstrip('<END>')
 return {'success':True, 'message':response}
 except Exception:
 error_msg = '获取消息出错:%s' % traceback.format_exc()
 _logger.error(error_msg)
 return {'success':False, 'message': error_msg}
 finally:
 # 必须归还连接!
 socketPool.release_connection(sock)
作者:授客原文地址:https://www.cnblogs.com/shouke/p/18857363

%s 个评论

要回复文章请先登录注册