import hashlib import logging import random import socket import sys import threading import time from urllib.parse import urlparse import pymysql from PySide6 import QtWidgets from PySide6.QtWidgets import QApplication, QMainWindow, QTableWidget, QTableWidgetItem, QVBoxLayout, QWidget, \ QTextEdit, QPushButton, QHBoxLayout, QLineEdit, QMessageBox, QSizePolicy def generate_random_hex_color(): # 生成随机的R、G、B分量 r = random.randint(0, 255) g = random.randint(0, 255) b = random.randint(0, 255) # 将R、G、B分量转换为16进制,并保证输出的字符串长度为2 hex_r = format(r, '02x') hex_g = format(g, '02x') hex_b = format(b, '02x') # 拼接16进制颜色代码 hex_color = "#" + hex_r + hex_g + hex_b return hex_color def show_completion_message(): msg = QMessageBox() msg.setIcon(QMessageBox.Information) msg.setWindowTitle("采集完成") msg.setText("没有需要采集的任务。") msg.exec() class TableWidgetExample(QMainWindow): def __init__(self): super().__init__() self.logger = None self.table = None self.button_connection = None self.connection_pwd = None self.connection_user = None self.connection_dbname = None self.connection_port = None self.connection_host = None self.input_text = None self.connection = None self.cursor = None self.is_connect = False self.logger_area = None self.initLogger() self.initUI() def initUI(self): self.setWindowTitle("采集") self.setGeometry(100, 100, 1000, 600) central_widget = QWidget() central_layout = QHBoxLayout(central_widget) # central_layout.setSpacing(0) self.setCentralWidget(central_widget) left_widget = QWidget() left_layout = QVBoxLayout(left_widget) left_layout.setContentsMargins(0, 0, 0, 0) central_layout.addWidget(left_widget, 3) top_widget = QWidget() top_layout = QVBoxLayout(top_widget) self.table = QTableWidget() self.table.setColumnCount(2) # 设置水平表头标签 self.table.setHorizontalHeaderLabels(["URL", "状态"]) self.table.setEditTriggers(QTableWidget.NoEditTriggers) header = self.table.horizontalHeader() # header.setSectionResizeMode(QtWidgets.QHeaderView.ResizeToContents) header.setSectionResizeMode(QtWidgets.QHeaderView.Stretch) top_layout.addWidget(self.table) left_layout.addWidget(top_widget) # 下部分布局 bottom_widget = QWidget() bottom_widget.setFixedHeight(200) bottom_layout = QVBoxLayout(bottom_widget) self.input_text = QTextEdit() self.input_text.setPlaceholderText("请输入链接,一行一个") bottom_layout.addWidget(self.input_text) connection_layout = QHBoxLayout() self.connection_host = QLineEdit() self.connection_host.setPlaceholderText("地址") self.connection_port = QLineEdit() self.connection_port.setPlaceholderText("端口") self.connection_dbname = QLineEdit() self.connection_dbname.setPlaceholderText("数据库") self.connection_user = QLineEdit() self.connection_user.setPlaceholderText("用户名") self.connection_pwd = QLineEdit() self.connection_pwd.setPlaceholderText("密码") self.connection_pwd.setEchoMode(QLineEdit.Password) self.button_connection = QPushButton("连接") self.button_connection.clicked.connect(self.connect_to_database) connection_layout.addWidget(self.connection_host) connection_layout.addWidget(self.connection_port) connection_layout.addWidget(self.connection_dbname) connection_layout.addWidget(self.connection_user) connection_layout.addWidget(self.connection_pwd) connection_layout.addWidget(self.button_connection) bottom_layout.addLayout(connection_layout) action_layout = QHBoxLayout() button_add = QPushButton("添加链接") button_add.clicked.connect(self.add_data_to_table) action_layout.addWidget(button_add) button_execute = QPushButton("开始采集") button_execute.clicked.connect(self.start_collection) action_layout.addWidget(button_execute) bottom_layout.addLayout(action_layout) left_layout.addWidget(bottom_widget) right_widget = QWidget() right_layout = QVBoxLayout(right_widget) right_layout.setContentsMargins(0, 0, 0, 0) central_layout.addWidget(right_widget, 1) logger_widget = QWidget() # logger_widget.setFixedHeight(200) logger_layout = QVBoxLayout(logger_widget) self.logger_area = QTextEdit() self.logger_area.setPlaceholderText("日志...") self.logger_area.setReadOnly(True) logger_layout.addWidget(self.logger_area) right_layout.addWidget(logger_widget) def initLogger(self): logging.basicConfig(level=logging.DEBUG) self.logger = logging.getLogger("dmuk") def connect_to_database(self): # 数据库基本信息 host = self.connection_host.text() port = self.connection_port.text() user = self.connection_user.text() pwd = self.connection_pwd.text() dbname = self.connection_dbname.text() if not host or not port or not dbname or not user or not pwd: self.statusBar().showMessage("主机、端口、数据库名、用户名和密码不能为空!") QMessageBox.critical(self, "Error", "主机、端口、数据库名、用户名和密码不能为空!") return try: socket.gethostbyname(host) except socket.error: self.statusBar().showMessage("无效的主机!") QMessageBox.critical(self, "Error", "无效的主机!") return else: # 主机有效 print("有效的主机") # 校验端口 try: port = int(port) if 0 < port <= 65535: print("有效的端口") else: QMessageBox.critical(self, "Error", "端口超出范围!") self.statusBar().showMessage("端口超出范围!") return except ValueError: QMessageBox.critical(self, "Error", "无效的端口!") self.statusBar().showMessage("无效的端口!") return try: # 连接数据库 self.connection = pymysql.connect(host=host, port=port, user=user, password=pwd, db=dbname) self.cursor = self.connection.cursor() # 链接成功 self.is_connect = True self.statusBar().showMessage("Connection succees!") except pymysql.Error as e: # 异常 连接失败 self.is_connect = False QMessageBox.critical(self, "Error", str(e)) self.statusBar().showMessage("Connection failed") def closeEvent(self, event): if self.connection: self.connection.close() event.accept() def add_data_to_table(self): self.logger.debug("Add button clicked") # 从input_text中获取数据添加到表格中 text = self.input_text.toPlainText() if not text.strip(): self.logger.debug("Content is empty") return lines = text.split('\n') for row, line in enumerate(lines): url = QTableWidgetItem(line) # 校验url是否合法 if not self.is_valid_url(line): self.logger.debug("url is invalid") self.statusBar().showMessage("存在无效的url") continue row_position = self.table.rowCount() # 获取下一个可用的行索引 self.table.insertRow(row_position) status = QTableWidgetItem("等待中") self.table.setItem(row_position, 0, url) self.table.setItem(row_position, 1, status) def is_valid_url(self, url): try: parsed_url = urlparse(url) # 检查是否有有效的协议和主机 if parsed_url.scheme and parsed_url.netloc: return True else: return False except Exception as e: return False def start_collection(self): # 遍历表格并查找状态为 "队列中" 的数据行 rows_to_collect = [] for row in range(self.table.rowCount()): status_item = self.table.item(row, 1) if status_item is not None and status_item.text() == "等待中": rows_to_collect.append(row) if not rows_to_collect: show_completion_message() return # 没有需要采集的数据 # 启动多线程来处理队列中的 URL 数据 for row in rows_to_collect: url_item = self.table.item(row, 0) url = url_item.text() thread = threading.Thread(target=self.collect_data, args=(row, url)) thread.start() def collect_data(self, row, url): # 模拟数据采集 self.update_status(row, "采集中") time.sleep(3) # 模拟耗时操作 self.get_video_dm_data(url) # 更新状态为采集完成 self.update_status(row, "采集完成") def update_status(self, row, status): item = QTableWidgetItem(status) self.table.setItem(row, 1, item) def get_video_dm_data(self, url): md5_hash = hashlib.md5() md5_hash.update(url.encode('utf-8')) encrypted_text = md5_hash.hexdigest() # 取出MD5加密后的后20位数 last_20_chars = encrypted_text[-20:] video_id = last_20_chars + " P" # 弹幕池id self.logger.debug("video_id: %s", video_id) # 检查是否存在 "v.qq.com" if "v.qq.com" in url: # 调用 v.qq.com 对应的接口 print("调用 v.qq.com 的接口") # 检查是否存在 "rainss.cn" elif "rainss.cn" in url: # 调用 rainss.cn 对应的接口 print("调用 rainss.cn 的接口") # 检查是否存在 "allms.cn" elif "allms.cn" in url: # 调用 allms.cn 对应的接口 print("调用 allms.cn 的接口") # 如果都不匹配 else: print("未匹配到任何子字符串") def generate_insert_sql(self, data): sql = "INSERT INTO `danmaku_list` (`id`, `cid`, `type`, `text`, `color`, `size`, `videotime`, `ip`, `time`) VALUES ('de9d5e3f5b7f9ea2197d35f383530643', NULL, 'right', '不错哦', '#FFFFFF', '27.5px', '48.275', '58.62.32.191', '1666319003')" return sql if __name__ == "__main__": app = QApplication(sys.argv) window = TableWidgetExample() window.show() sys.exit(app.exec())