dm/dm.py
2023-10-06 12:56:00 +08:00

291 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import hashlib
import logging
import random
import socket
import sys
import threading
import time
from urllib.parse import urlparse
import pymysql
from PySide6 import QtWidgets
from PySide6.QtWidgets import QApplication, QMainWindow, QTableWidget, QTableWidgetItem, QVBoxLayout, QWidget, \
QTextEdit, QPushButton, QHBoxLayout, QLineEdit, QMessageBox, QSizePolicy
def generate_random_hex_color():
# 生成随机的R、G、B分量
r = random.randint(0, 255)
g = random.randint(0, 255)
b = random.randint(0, 255)
# 将R、G、B分量转换为16进制并保证输出的字符串长度为2
hex_r = format(r, '02x')
hex_g = format(g, '02x')
hex_b = format(b, '02x')
# 拼接16进制颜色代码
hex_color = "#" + hex_r + hex_g + hex_b
return hex_color
def show_completion_message():
msg = QMessageBox()
msg.setIcon(QMessageBox.Information)
msg.setWindowTitle("采集完成")
msg.setText("没有需要采集的任务。")
msg.exec()
class TableWidgetExample(QMainWindow):
def __init__(self):
super().__init__()
self.logger = None
self.table = None
self.button_connection = None
self.connection_pwd = None
self.connection_user = None
self.connection_dbname = None
self.connection_port = None
self.connection_host = None
self.input_text = None
self.connection = None
self.cursor = None
self.is_connect = False
self.initLogger()
self.initUI()
def initUI(self):
self.setWindowTitle("采集")
self.setGeometry(100, 100, 800, 600)
central_widget = QWidget()
center_layout = QVBoxLayout(central_widget)
self.setCentralWidget(central_widget)
top_widget = QWidget()
top_layout = QVBoxLayout(top_widget)
self.table = QTableWidget()
self.table.setColumnCount(2)
# 设置水平表头标签
self.table.setHorizontalHeaderLabels(["URL", "状态"])
self.table.setEditTriggers(QTableWidget.NoEditTriggers)
header = self.table.horizontalHeader()
# header.setSectionResizeMode(QtWidgets.QHeaderView.ResizeToContents)
header.setSectionResizeMode(QtWidgets.QHeaderView.Stretch)
top_layout.addWidget(self.table)
center_layout.addWidget(top_widget)
# 下部分布局
bottom_widget = QWidget()
bottom_widget.setFixedHeight(200)
bottom_layout = QVBoxLayout(bottom_widget)
self.input_text = QTextEdit()
self.input_text.setPlaceholderText("请输入链接,一行一个")
bottom_layout.addWidget(self.input_text)
connection_layout = QHBoxLayout()
self.connection_host = QLineEdit()
self.connection_host.setPlaceholderText("地址")
self.connection_port = QLineEdit()
self.connection_port.setPlaceholderText("端口")
self.connection_dbname = QLineEdit()
self.connection_dbname.setPlaceholderText("数据库")
self.connection_user = QLineEdit()
self.connection_user.setPlaceholderText("用户名")
self.connection_pwd = QLineEdit()
self.connection_pwd.setPlaceholderText("密码")
self.connection_pwd.setEchoMode(QLineEdit.Password)
self.button_connection = QPushButton("连接")
self.button_connection.clicked.connect(self.connect_to_database)
connection_layout.addWidget(self.connection_host)
connection_layout.addWidget(self.connection_port)
connection_layout.addWidget(self.connection_dbname)
connection_layout.addWidget(self.connection_user)
connection_layout.addWidget(self.connection_pwd)
connection_layout.addWidget(self.button_connection)
bottom_layout.addLayout(connection_layout)
action_layout = QHBoxLayout()
button_add = QPushButton("添加链接")
button_add.clicked.connect(self.add_data_to_table)
action_layout.addWidget(button_add)
button_execute = QPushButton("开始采集")
button_execute.clicked.connect(self.start_collection)
action_layout.addWidget(button_execute)
bottom_layout.addLayout(action_layout)
center_layout.addWidget(bottom_widget)
def initLogger(self):
logging.basicConfig(level=logging.DEBUG)
self.logger = logging.getLogger("dmuk")
def connect_to_database(self):
# 数据库基本信息
host = self.connection_host.text()
port = self.connection_port.text()
user = self.connection_user.text()
pwd = self.connection_pwd.text()
dbname = self.connection_dbname.text()
if not host or not port or not dbname or not user or not pwd:
self.statusBar().showMessage("主机、端口、数据库名、用户名和密码不能为空!")
QMessageBox.critical(self, "Error", "主机、端口、数据库名、用户名和密码不能为空!")
return
try:
socket.gethostbyname(host)
except socket.error:
self.statusBar().showMessage("无效的主机!")
QMessageBox.critical(self, "Error", "无效的主机!")
return
else:
# 主机有效
print("有效的主机")
# 校验端口
try:
port = int(port)
if 0 < port <= 65535:
print("有效的端口")
else:
QMessageBox.critical(self, "Error", "端口超出范围!")
self.statusBar().showMessage("端口超出范围!")
return
except ValueError:
QMessageBox.critical(self, "Error", "无效的端口!")
self.statusBar().showMessage("无效的端口!")
return
try:
# 连接数据库
self.connection = pymysql.connect(host=host, port=port, user=user, password=pwd, db=dbname)
self.cursor = self.connection.cursor()
# 链接成功
self.is_connect = True
self.statusBar().showMessage("Connection succees!")
except pymysql.Error as e:
# 异常 连接失败
self.is_connect = False
QMessageBox.critical(self, "Error", str(e))
self.statusBar().showMessage("Connection failed")
def closeEvent(self, event):
if self.connection:
self.connection.close()
event.accept()
def add_data_to_table(self):
self.logger.debug("Add button clicked")
# 从input_text中获取数据添加到表格中
text = self.input_text.toPlainText()
if not text.strip():
self.logger.debug("Content is empty")
return
lines = text.split('\n')
for row, line in enumerate(lines):
url = QTableWidgetItem(line)
# 校验url是否合法
if not self.is_valid_url(line):
self.logger.debug("url is invalid")
self.statusBar().showMessage("存在无效的url")
continue
row_position = self.table.rowCount() # 获取下一个可用的行索引
self.table.insertRow(row_position)
status = QTableWidgetItem("等待中")
self.table.setItem(row_position, 0, url)
self.table.setItem(row_position, 1, status)
def is_valid_url(self, url):
try:
parsed_url = urlparse(url)
# 检查是否有有效的协议和主机
if parsed_url.scheme and parsed_url.netloc:
return True
else:
return False
except Exception as e:
return False
def start_collection(self):
# 遍历表格并查找状态为 "队列中" 的数据行
rows_to_collect = []
for row in range(self.table.rowCount()):
status_item = self.table.item(row, 1)
if status_item is not None and status_item.text() == "等待中":
rows_to_collect.append(row)
if not rows_to_collect:
show_completion_message()
return # 没有需要采集的数据
# 启动多线程来处理队列中的 URL 数据
for row in rows_to_collect:
url_item = self.table.item(row, 0)
url = url_item.text()
thread = threading.Thread(target=self.collect_data, args=(row, url))
thread.start()
def collect_data(self, row, url):
# 模拟数据采集
self.update_status(row, "采集中")
time.sleep(3) # 模拟耗时操作
self.get_video_dm_data(url)
# 更新状态为采集完成
self.update_status(row, "采集完成")
def update_status(self, row, status):
item = QTableWidgetItem(status)
self.table.setItem(row, 1, item)
def get_video_dm_data(self, url):
md5_hash = hashlib.md5()
md5_hash.update(url.encode('utf-8'))
encrypted_text = md5_hash.hexdigest()
# 取出MD5加密后的后20位数
last_20_chars = encrypted_text[-20:]
video_id = last_20_chars + " P"
# 弹幕池id
self.logger.debug("video_id: %s", video_id)
# 检查是否存在 "v.qq.com"
if "v.qq.com" in url:
# 调用 v.qq.com 对应的接口
print("调用 v.qq.com 的接口")
# 检查是否存在 "rainss.cn"
elif "rainss.cn" in url:
# 调用 rainss.cn 对应的接口
print("调用 rainss.cn 的接口")
# 检查是否存在 "allms.cn"
elif "allms.cn" in url:
# 调用 allms.cn 对应的接口
print("调用 allms.cn 的接口")
# 如果都不匹配
else:
print("未匹配到任何子字符串")
def generate_insert_sql(self, data):
sql = "INSERT INTO `danmaku_list` (`id`, `cid`, `type`, `text`, `color`, `size`, `videotime`, `ip`, `time`) VALUES ('de9d5e3f5b7f9ea2197d35f383530643', NULL, 'right', '不错哦', '#FFFFFF', '27.5px', '48.275', '58.62.32.191', '1666319003')"
return sql
if __name__ == "__main__":
app = QApplication(sys.argv)
window = TableWidgetExample()
window.show()
sys.exit(app.exec())