diff --git a/TYQH_JK.py b/TYQH_JK.py new file mode 100644 index 0000000..8fa4145 --- /dev/null +++ b/TYQH_JK.py @@ -0,0 +1,398 @@ +# !/usr/bin/python3 +# -- coding: utf-8 -- +# ------------------------------- +# @Author : cherwin +# ------------------------------- +# cron "*/10 9:58-10:01 1-31 2-6 * *" script-path=xxx.py,tag=匹配cron用 +# 2-6月每月1日9点55到10点1分,每10秒执行一次 +# const $ = new Env('统一茄皇监控') +import hashlib +import json +import os +import random +import string +import time +from datetime import datetime +from os import environ, path + +import requests +from requests.packages.urllib3.exceptions import InsecureRequestWarning + +# 禁用安全请求警告 +requests.packages.urllib3.disable_warnings(InsecureRequestWarning) + +if os.path.isfile('DEV_ENV.py'): + import DEV_ENV + +if os.path.isfile('notify.py'): + from notify import send + print("加载通知服务成功!") +else: + print("加载通知服务失败!") +send_msg = '' +one_msg='' +def Log(cont=''): + global send_msg,one_msg + print(cont) + if cont: + one_msg += f'{cont}\n' + send_msg += f'{cont}\n' + +USER_INFO = {} + +class RUN: + def __init__(self, info,index): + global one_msg + one_msg = '' + split_info = info.split('@') + self.third_id = split_info[0] + self.wid = split_info[1] + len_split_info = len(split_info) + last_info = split_info[len_split_info - 1] + self.send_UID = None + if len_split_info > 0 and "UID_" in last_info: + self.send_UID = last_info + + self.user_index = index + 1 + print(f"\n---------开始执行第{self.user_index}个账号>>>>>") + self.s = requests.session() + self.s.verify = False + + self.UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x6309080f) XWEB/8555' + + self.headers = { + 'User-Agent': self.UA, + 'Sec-Fetch-Site': 'cross-site', + 'Sec-Fetch-Mode': 'cors', + 'Sec-Fetch-Dest': 'empty', + 'Accept-Encoding': 'gzip, deflate, br', + 'Accept-Language': 'zh-CN,zh;q=0.9', + 'Origin': 'https://thekingoftomato.ioutu.cn', + 'Referer': 'https://thekingoftomato.ioutu.cn/' + + } + self.base_url = 'https://qiehuang-apig.xiaoyisz.com/qiehuangsecond/ga' + self.goodsid = None + self.Login_res = self.login() + + def load_json(self): + try: + with open(f"INVITE_CODE/{ENV_NAME}_INVITE_CODE.json", 'r', encoding='utf-8') as file: + data = json.load(file) + return data + except FileNotFoundError: + print("未找到文件,返回空字典") + return {} + except Exception as e: + print(f"发生错误:{e}") + return {} + + def make_request(self, url, method='post', headers={}, params={}): + if headers == {}: + headers = self.headers + if params == {}: + params = self.params + try: + if method.lower() == 'get': + response = requests.get(url, headers=headers, verify=False) + + elif method.lower() == 'post': + response = requests.post(url, headers=headers, json=params, verify=False) + else: + raise ValueError("不支持的请求方法: " + method) + return response.json() + except requests.exceptions.RequestException as e: + print("请求异常:", e) + except ValueError as e: + print("值错误或不支持的请求方法:", e) + except Exception as e: + print("发生了未知错误:", e) + + def gen_sign(self, parameters={}, body=None): + sorted_keys = sorted(parameters.keys()) + parameter_strings = [] + for key in sorted_keys: + if isinstance(parameters[key], dict): + parameter_strings.append(f"{key}={json.dumps(parameters[key])}") + else: + parameter_strings.append(f"{key}={parameters[key]}") + + current_time = int(datetime.now().timestamp() * 1000) + secret_chars = list('BxzTx45uIGT25TTHIIBU2') + last_three_digits = str(current_time)[-3:] + for digit in last_three_digits: + secret_chars.insert(int(digit), digit) + + secret = hashlib.md5(''.join(secret_chars).encode()).hexdigest() + nonce_str = ''.join(random.choices(string.ascii_letters + string.digits, k=16)) + + sign_data = { + 'client_id': 'game', + 'nonstr': nonce_str, + 'timestamp': current_time, + 'body': json.dumps(body) if body else '', + 'query': '&'.join(parameter_strings) if parameter_strings else '', + 'secret': secret + } + + sign_string = '|'.join([str(v) for v in sign_data.values()]) + sign = hashlib.md5(sign_string.encode()).hexdigest().upper() + sign_header = { + 'client_id': 'game', + 'timestamp': str(current_time), + 'nonstr': sign_data['nonstr'], + 'sign': sign + } + self.headers.update(sign_header) + return self.headers + + def login(self): + login_successful = False + try: + login_params = { + 'thirdId': self.third_id, + 'wid': self.wid + } + sign_header = self.gen_sign({}, login_params) + # print(self.headers) + # Hypothetically speaking, this is how you might perform a POST request in Python with the requests library. + response = self.s.post(f'{self.base_url}/public/api/login', json=login_params, headers=sign_header) + if response.status_code == 200: + response_data = response.json() + if response_data.get('code', -1) == 0: + auth = response_data['data']['token'] or '' + if auth: + login_successful = True + print(f'账号【{self.user_index}】登录成功') + Authorization = {'Authorization': auth} + self.headers.update(Authorization) + else: + print(f'账号【{self.user_index}】登录获取auth失败') + else: + print(f"登录获取auth失败[{response_data['code']}]: {response_data['message']}") + elif response.status_code == 403: + print('登录失败[403]: 黑IP了, 换个IP试试吧') + except Exception as e: + print(e) + finally: + return login_successful + + def get_CapCode(self, slideImgInfo): + slidingImage = slideImgInfo.get('slidingImage', None) + backImage = slideImgInfo.get('backImage', None) + dddddocr_api = os.environ.get('OCR_API',False) + if not dddddocr_api: + print('未定义变量【OCR_API】\n取消验证码识别\n搭建方式:https://github.com/CHERWING/CHERWIN_SCRIPTS') + return False + if slidingImage and backImage: + data = { + "slidingImage": slidingImage, + "backImage": backImage + } + response = requests.post(f"{dddddocr_api}/capcode", data=json.dumps(data),headers={'Content-Type': 'application/json'}) + print(response.json()) + self.capcode = response.json().get('result','') + if self.capcode: + return True + else: + return False + + def get_CapCode_local(self, slideImgInfo): + slidingImage = slideImgInfo.get('slidingImage', None) + backImage = slideImgInfo.get('backImage', None) + if slidingImage and backImage: + self.capcode =CHERWIN_TOOLS.CAPCODE(slidingImage,backImage) + if self.capcode: + return True + else: + return False + + def checkUserCapCode(self): + print(f'提交验证码--->>>') + # try: + print(f'验证码:{self.capcode}') + params = {'xpos':self.capcode} + print(params) + sign_header = self.gen_sign(body=params) + url = f'{self.base_url}/checkUserCapCode' + response = self.s.post(url, headers=sign_header, json=params) + data = response.json() + code = data.get('code', -1) + if code == 0: + data = data.get('data', 0) + print(f"验证码正确,获取到[{data}]") + return True + else: + message = data.get('message', '') + print(f"验证码错误[{message}]") + return False + # except Exception as e: + # print(e) + + def exchange_find(self): + print(f'获取兑换列表--->>>') + try: + sign_header = self.gen_sign() + url = f'{self.base_url}/exchange/find' + response = self.s.get(url, headers=sign_header) + data = response.json() + code = data.get('code', -1) + if code == 0: + data = data.get('data', {})[1] + num = data.get('num',0) + if num < 0 : + Log('兑换上限,跳过') + return False + # print(f"兑换列表:{data}") + exchangePrizeVoList = data.get('exchangePrizeVoList',None) + if exchangePrizeVoList: + for goods in exchangePrizeVoList: + name = goods.get('name','') + self.goodsid = goods.get('id','') + usableStock = goods.get('usableStock','') + if usableStock >0: + Log(f'ID:【{self.goodsid}】 【{name}】 当前剩余:{usableStock}【可兑换】') + self.sendMsg() + if send: send(f'{APP_NAME}挂机通知', send_msg + TIPS_HTML) + if TYQH_DHID and TYQH_DHID == self.goodsid: + self.exchange_reward(TYQH_DHID) + elif TYQH_DHID == '0': + self.exchange_reward(self.goodsid) + else: + continue + else: + print(f'ID:【{self.goodsid}】【{name}】 当前剩余:{usableStock}【不可兑换】') + else: + message = data.get('message', '') + print(f'{message}') + except Exception as e: + print(e) + + def exchange_reward(self, reward_id): + print(f'尝试兑换--->>>') + try: + params = {'id': reward_id} + sign_header = self.gen_sign(params) + url = f'{self.base_url}/exchange/reward' + response = self.s.get(url, headers=sign_header, params=params) + data = response.json() + code = data.get('code', -1) + if code == 0: + data = data.get('data', {}).get('name') or '' + Log(f"兑换[{data}]成功") + elif code == 4000: + slideImgInfo = data.get('data', {}).get('slideImgInfo', None) + validateCount = data.get('data', {}).get('validateCount', None) + if slideImgInfo and validateCount: + print(f"兑换需要滑块验证") + if self.get_CapCode(slideImgInfo): + if self.checkUserCapCode(): + self.exchange_reward(reward_id) + else: + print(f"兑换验证码上限") + else: + message = data.get('message', '') + print(f'兑换[id={reward_id}]: {message}') + except Exception as e: + print(e) + + def userTask(self): + print('\n--------------- 开始日常任务 ---------------') + wait_time = random.randint(1000, 3000) / 1000.0 # 转换为秒 + if not self.Login_res: + return False + self.exchange_find() + + return True + + + def sendMsg(self, help=False): + if self.send_UID: + push_res = CHERWIN_TOOLS.wxpusher(self.send_UID, one_msg, APP_NAME, help) + print(push_res) + + +def down_file(filename, file_url): + print(f'开始下载:{filename},下载地址:{file_url}') + try: + response = requests.get(file_url, verify=False, timeout=10) + response.raise_for_status() + with open(filename + '.tmp', 'wb') as f: + f.write(response.content) + print(f'【{filename}】下载完成!') + + # 检查临时文件是否存在 + temp_filename = filename + '.tmp' + if os.path.exists(temp_filename): + # 删除原有文件 + if os.path.exists(filename): + os.remove(filename) + # 重命名临时文件 + os.rename(temp_filename, filename) + print(f'【{filename}】重命名成功!') + return True + else: + print(f'【{filename}】临时文件不存在!') + return False + except Exception as e: + print(f'【{filename}】下载失败:{str(e)}') + return False + +def import_Tools(): + global CHERWIN_TOOLS,ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode + import CHERWIN_TOOLS + ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode = CHERWIN_TOOLS.main(APP_NAME, local_script_name, ENV_NAME,local_version) + + +if __name__ == '__main__': + APP_NAME = '统一茄皇监控' + ENV_NAME = 'TYQH_JK' + print(f''' +✨✨✨ {APP_NAME}脚本 ✨✨✨ +✨ 功能: + 奖品监控 +✨ 设置青龙变量: +export TYQH= '' 使用相同TYQH变量 +export TYQH_DHID= '112' 设置兑换ID则开启自动兑换,设置为0遍历兑换全部可兑换商品 +export OCR_API= 'http://localhost:3721' +✨ 由于青龙python版本问题无法直接使用dddocr需要自行搭建API,搭建方式:https://github.com/CHERWING/CHERWIN_OCR +✨ 如果你的环境可以安装dddocr库则可以替换代码内的【self.get_CapCode】为【self.get_CapCode_local】 +export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启 +✨ 推荐定时:*/10 9:59-10:01 1-31 2-6 * * 2-6月每月1日9点55到10点1分,每10秒执行一次 +✨ 第一个账号助力作者,其余互助 +✨✨✨ @Author CHERWIN✨✨✨ + ''') + local_script_name = os.path.basename(__file__) + local_version = '2024.05.04' + if os.path.isfile('CHERWIN_TOOLS.py'): + import_Tools() + else: + if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'): + print('脚本依赖下载完成请重新运行脚本') + import_Tools() + else: + print('脚本依赖下载失败,请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖') + exit() + print(TIPS) + token = '' + ENV = os.environ.get('TYQH','') + TYQH_DHID = os.environ.get('TYQH_DHID',None) + if TYQH_DHID == 0: + Log(f'\n当前已设置TYQH_DHID变量,将自动遍历兑换商品') + elif TYQH_DHID: + Log(f'\n当前已设置TYQH_DHID变量,将自动兑换id【{TYQH_DHID}】商品') + else: + Log('\n未定义TYQH_DHID变量,取消自动兑换') + token = ENV if ENV else token + if not token: + print(f"未填写TYQH变量\n青龙可在环境变量设置TYQH 或者在本脚本文件上方将ck填入token =''") + exit() + tokens = CHERWIN_TOOLS.ENV_SPLIT(token) + # print(tokens) + if len(tokens) > 0: + print(f"\n>>>>>>>>>>共获取到{len(tokens)}个账号<<<<<<<<<<") + access_token = [] + for index, infos in enumerate(tokens): + run_result = RUN(infos, index).userTask() + if not run_result: continue +