CHERWIN_SCRIPTS/TYQH.py
CHERWING ee348bfb67
Update TYQH.py
修复角色解锁任务,更新版本号
2024-04-16 11:47:01 +08:00

1240 lines
51 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# !/usr/bin/python3
# -- coding: utf-8 --
# -------------------------------
# @Author : cherwin
# -------------------------------
# cron "23 7-23/2 * * * " script-path=xxx.py,tag=匹配cron用
# const $ = new Env('统一快乐星球小程序-茄皇的家')
import hashlib
import json
import os
import random
import string
import time
from datetime import datetime
from os import environ, path
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
else:
print("加载通知服务失败!")
send_msg = ''
one_msg=''
def Log(cont=''):
global send_msg,one_msg
print(cont)
if cont:
one_msg += f'{cont}\n'
send_msg += f'{cont}\n'
USER_INFO = {}
class RUN:
def __init__(self, info,index):
global one_msg
one_msg = ''
split_info = info.split('@')
self.third_id = split_info[0]
self.wid = split_info[1]
len_split_info = len(split_info)
last_info = split_info[len_split_info - 1]
self.send_UID = None
if len_split_info > 0 and "UID_" in last_info:
self.send_UID = last_info
self.user_index = index + 1
print(f"\n---------开始执行第{self.user_index}个账号>>>>>")
self.s = requests.session()
self.s.verify = False
self.UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x6309080f) XWEB/8555'
self.headers = {
'User-Agent': self.UA,
'Sec-Fetch-Site': 'cross-site',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Origin': 'https://thekingoftomato.ioutu.cn',
'Referer': 'https://thekingoftomato.ioutu.cn/'
}
self.base_url = 'https://qiehuang-apig.xiaoyisz.com/qiehuangsecond/ga'
self.sun = 0
self.land = {}
self.refresh_land_step = False
self.need_help_unlock = True
self.need_help_task = True
self.need_help_risk = True
self.can_go_risk = True
self.can_add_friend = True
self.can_help_task = True
self.can_help_risk = True
self.can_help_unlock = True
self.role_id =''
self.role_progress=''
self.role_max=''
self.all_land_unlock = True
self.all_role_unlock = True
self.help_task_config = {}
self.help_role_config = {}
self.group_step = ['发育期', '幼苗期', '开花期', '结果期', '收获期']
self.Login_res = self.login()
def load_json(self):
try:
with open(f"INVITE_CODE/{ENV_NAME}_INVITE_CODE.json", 'r', encoding='utf-8') as file:
data = json.load(file)
return data
except FileNotFoundError:
print("未找到文件,返回空字典")
return {}
except Exception as e:
print(f"发生错误:{e}")
return {}
def make_request(self, url, method='post', headers={}, params={}):
if headers == {}:
headers = self.headers
if params == {}:
params = self.params
try:
if method.lower() == 'get':
response = requests.get(url, headers=headers, verify=False)
elif method.lower() == 'post':
response = requests.post(url, headers=headers, json=params, verify=False)
else:
raise ValueError("不支持的请求方法: " + method)
return response.json()
except requests.exceptions.RequestException as e:
print("请求异常:", e)
except ValueError as e:
print("值错误或不支持的请求方法:", e)
except Exception as e:
print("发生了未知错误:", e)
def gen_sign(self, parameters={}, body=None):
sorted_keys = sorted(parameters.keys())
parameter_strings = []
for key in sorted_keys:
if isinstance(parameters[key], dict):
parameter_strings.append(f"{key}={json.dumps(parameters[key])}")
else:
parameter_strings.append(f"{key}={parameters[key]}")
current_time = int(datetime.now().timestamp() * 1000)
secret_chars = list('BxzTx45uIGT25TTHIIBU2')
last_three_digits = str(current_time)[-3:]
for digit in last_three_digits:
secret_chars.insert(int(digit), digit)
secret = hashlib.md5(''.join(secret_chars).encode()).hexdigest()
nonce_str = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
sign_data = {
'client_id': 'game',
'nonstr': nonce_str,
'timestamp': current_time,
'body': json.dumps(body) if body else '',
'query': '&'.join(parameter_strings) if parameter_strings else '',
'secret': secret
}
sign_string = '|'.join([str(v) for v in sign_data.values()])
sign = hashlib.md5(sign_string.encode()).hexdigest().upper()
sign_header = {
'client_id': 'game',
'timestamp': str(current_time),
'nonstr': sign_data['nonstr'],
'sign': sign
}
self.headers.update(sign_header)
return self.headers
def login(self):
login_successful = False
try:
login_params = {
'thirdId': self.third_id,
'wid': self.wid
}
sign_header = self.gen_sign({}, login_params)
# print(self.headers)
# Hypothetically speaking, this is how you might perform a POST request in Python with the requests library.
response = self.s.post(f'{self.base_url}/public/api/login', json=login_params, headers=sign_header)
if response.status_code == 200:
response_data = response.json()
if response_data.get('code', -1) == 0:
auth = response_data['data']['token'] or ''
if auth:
login_successful = True
Log(f'账号【{self.user_index}】登录成功')
Authorization = {'Authorization': auth}
self.headers.update(Authorization)
else:
Log(f'账号【{self.user_index}】登录获取auth失败')
else:
print(f"登录获取auth失败[{response_data['code']}]: {response_data['message']}")
elif response.status_code == 403:
print('登录失败[403]: 黑IP了, 换个IP试试吧')
except Exception as e:
print(e)
finally:
return login_successful
def userInfo_get(self,END=False):
print(f'获取用户[{self.user_index}]信息--->>>')
try:
sign_header = self.gen_sign()
# print(self.headers)
url = f'{self.base_url}/userInfo/get'
# 发起GET请求
response = self.s.get(url, headers=sign_header, verify=False)
data = response.json()
code = data.get('code', -1)
if code == 0:
data = data.get('data', {})
self.userId = data['userId']
self.gold = int(data['gold'])
self.score = int(data['score'])
self.sun = int(data['sun'])
self.sunMax = int(data['sunMax'])
self.nickName = data['nickName']
if self.nickName == None:self.nickName = '未命名'
new_data = {
self.userId:
{
'name': self.nickName
}
}
USER_INFO.update(new_data)
CHERWIN_TOOLS.SAVE_INVITE_CODE(f"INVITE_CODE/{ENV_NAME}_INVITE_CODE.json", new_data)
if END:
Log(f"-----用户[{self.user_index}]信息-----\n用户Id:{self.userId}\n用户名:{self.nickName}\n调料🧂x{self.gold}\n番茄🍅x{self.score}\n阳光☀x{self.sun}\n------用户[{self.user_index}]信息END------")
else:
print(f"-----用户[{self.user_index}]信息-----\n用户Id:{self.userId}\n用户名:{self.nickName}\n调料🧂x{self.gold}\n番茄🍅x{self.score}\n阳光☀x{self.sun}\n------用户[{self.user_index}]信息END------")
else:
error_message = data.get('message', '')
print(f"获取账号信息失败[{str(code)}]: {error_message}")
except Exception as e:
print(e)
finally:
return
def userInfo_autoSun(self):
print('收集阳光--->>>')
sign_header = self.gen_sign()
url = f'{self.base_url}/userInfo/autoSun'
response = self.s.get(url, headers=sign_header)
data = response.json()
code = data.get('code', -1)
if code == 0:
sun = data.get('data', {}).get('sun', 0)
print(f"刷新收集到【{sun}】阳光---√")
else:
print(f"刷新收集阳光失败[{data['code']}]: {str(code)}")
def task_get(self):
print(f'获取任务列表--->>>')
sign_header = self.gen_sign()
url = f'{self.base_url}/task/get'
response = self.s.get(url, headers=sign_header)
data = response.json()
code = data.get('code', -1)
if code == 0:
tasks = data.get('data', [])
for task in tasks:
self.task_id = task['id']
self.task_title = task['title']
task_status = task['status']
self.task_progress = task.get('currentProgress', 0)
self.task_max_progress = task.get('progress', 1)
print(f"任务:【{self.task_title}】进度:【{self.task_progress}/{self.task_max_progress}")
if self.task_title == '邀请好友助力':
if self.task_progress == self.task_max_progress:
self.need_help_task = False
# new_data = {
# self.userId: {
# 'task_id': self.task_id,
# 'task_stu': self.can_help_task,
# 'current_progress': self.task_progress
# }
# }
# USER_INFO.update(new_data)
# CHERWIN_TOOLS.SAVE_INVITE_CODE("INVITE_CODE/QH_INVITE_CODE.json", new_data)
print(f"任务 '{self.task_title}' 【跳过】---》")
continue
elif self.task_title == '邀请新人助力':
print(f"任务 '{self.task_title}' 【跳过】---》")
continue
if task_status == 0 and self.task_progress < self.task_max_progress:
# 此任务尚未完成,根据具体业务逻辑进行处理,如启动任务
print(f"任务【{self.task_title}】【未完成】执行任务---》")
self.task_doTask() # 取消这行注释来执行任务
elif task_status == 1:
# 任务已完成但奖励未被领取,根据业务逻辑领取奖励
print(f"任务【{self.task_title}】【已完成】领取奖励---》")
self.task_reward() # 取消这行注释来领取奖励
else:
print(f"获取任务列表失败[{data['code']}]: {str(code)}")
def task_doHelpTask(self,help_data):
# print('开始助力任务---》')
params = {'id': help_data["task_id"]}
sign_header = self.gen_sign(params)
url = f'{self.base_url}/task/doTask'
response = self.s.get(url, headers=sign_header,params=params)
data = response.json()
code = data.get('code', -1)
if code == 0:
print(f"助力账号[{help_data['index']}][{help_data['name']}]成功---√")
elif '已超出' in data.get('message', ''):
self.can_help_task = False
error_message = data.get('message', '')
print(f"助力账号[{help_data['index']}][{help_data['name']}]失败[{code}]: {error_message}")
else:
error_message = data.get('message', '')
print(f"助力账号[{help_data['index']}][{help_data['name']}]失败[{code}]: {error_message}")
if '助力次数' in error_message:
self.can_help_task = False
def task_doTask(self):
sign_header = self.gen_sign({'id': self.task_id})
url = f'{self.base_url}/task/doTask'
params = {'id': self.task_id}
response = self.s.get(url, headers=sign_header, params=params)
data = response.json()
code = data.get('code', -1)
if code == 0:
print(f"完成任务[{self.task_title}]成功---√")
else:
print(f"完成任务[{self.task_title}]失败[{str(code)}]: {data.get('message', '')}")
def task_reward(self):
sign_header = self.gen_sign({'id': self.task_id})
url = f'{self.base_url}/task/reward'
params = {'id': self.task_id}
response = self.s.get(url, headers=sign_header, params=params)
data = response.json()
code = data.get('code', -1)
if code == 0:
print(f"领取任务[{self.task_title}]奖励成功---√")
else:
print(f"领取任务[{self.task_title}]奖励失败[{str(code)}]: {data.get('message', '')}")
def user_role_get(self):
print('\n查询角色信息--->>>')
try:
sign_header = self.gen_sign()
url = f'{self.base_url}/user-role/get'
response = self.s.get(url, headers=sign_header)
data = response.json()
code = data.get('code', -1)
if code == 0:
data = data.get('data')
if data and data.get('isReward'):
print('可领取定时奖励,开始领取---》')
self.user_role_reward() # 此处调用 user_role_reward 函数
for role in data.get('roleList', []):
if role['status'] > 0:
print(f"角色【{role['name']}】已解锁---√")
continue
if role['unlockType'] == 1:
if self.gold >= role['unlockNum']:
print(f"调料包充足,开始解锁角色【{role['name']}】---》")
self.user_role_goldUnlock(role) # 此处调用 user_role_goldUnlock 函数
else:
print(f"调料包不足解锁角色【{role['name']}】---")
elif role['unlockType'] == 2:
self.user_role_findFriendHelpInfo(role) # 此处调用 user_role_findFriendHelpInfo 函数
else:
message = data.get('message', '')
print(f'角色信息失败【{code}】: {message}')
except Exception as e:
print(e)
def user_role_findFriendHelpInfo(self, role):
print(f'查询角色邀请进度--->>>')
try:
params = {'userRoleId': role['id']}
sign_header = self.gen_sign(params)
url = f'{self.base_url}/user-role/findFriendHelpInfo'
response = self.s.get(url, headers=sign_header, params=params)
data = response.json()
code = data.get('code', -1)
if code == 0:
data = data.get('data')
self.help_role_config = {
'id': role['id'],
'progress': len(data) if data else 0,
'max': role['unlockNum']
}
self.role_id = role['id']
self.role_progress = len(data) if data else 0
self.role_max = role['unlockNum']
if role['unlockNum'] == len(data) if data else 0:
self.need_help_unlock=False
# new_data = {
# self.userId:
# {
# 'role_id': self.role_id,
# 'role_can_help': self.can_help_unlock,
# 'role_progress': self.role_progress,
# 'role_max': self.role_max
# }
# }
# USER_INFO.update(new_data)
# CHERWIN_TOOLS.SAVE_INVITE_CODE("INVITE_CODE/QH_INVITE_CODE.json", new_data)
print(f'查询角色【{role["name"]}】邀请进度成功---√')
else:
message = data.get('message', '')
print(f'查询角色【{role["name"]}】邀请进度失败[{code}]: {message}')
except Exception as e:
print(e)
def user_role_friendHelpUnlock(self, help_data):
print('开始角色解锁互助--->>>')
# try:
params = {'userRoleId': help_data['role_id']}
sign_header = self.gen_sign(params)
url = f'{self.base_url}/user-role/friendHelpUnlock'
response = self.s.get(url, headers=sign_header, params=params)
data = response.json()
code = data.get('code', -1)
if code == 0:
help_data['role_progress'] += 1
print(f'助力账号[{help_data["index"]}][{help_data["name"]}]解锁角色成功---√')
else:
message = data.get('message', '')
print(f'助力账号[{help_data["index"]}][{help_data["name"]}]解锁角色失败[{code}]: {message}')
# except Exception as e:
# print(e)
def user_role_goldUnlock(self, role):
print(f'解锁角色开始--->>>')
try:
params = {'roleId': role['roleId']}
sign_header = self.gen_sign(params)
url = f'{self.base_url}/user-role/goldUnlock'
response = self.s.get(url, headers=sign_header, params=params)
data = response.json()
code = data.get('code', -1)
if code == 0:
self.gold -= role['unlockNum']
print(f'解锁角色[{role["name"]}]成功')
self.user_role_reward() # 此处调用 user_role_reward 函数
else:
message = data.get('message', '')
print(f'解锁角色[{role["name"]}]失败[{code}]: {message}')
except Exception as e:
print(e)
def user_role_reward(self):
print(f'领取伴手礼--->>>')
try:
sign_header = self.gen_sign()
url = f'{self.base_url}/user-role/reward'
response = self.s.get(url, headers=sign_header)
data = response.json()
code = data.get('code', -1)
if code == 0:
data = data.get('data')
name = data.get('name') if data else 'unknown'
print(f'领取伴手礼【{name}】成功')
else:
message = data.get('message', '')
print(f'领取伴手礼失败[{code}]: {message}')
except Exception as e:
print(e)
def user_land_get(self):
print(f'\n刷新土地信息--->>>')
try:
sign_header = self.gen_sign()
url = f'{self.base_url}/user-land/get'
response = self.s.get(url, headers=sign_header)
data = response.json()
code = data.get('code', -1)
if code == 0:
print('土地信息刷新成功---√')
for land in data.get('data', {}).get('gaUserLandList', []):
ga_user_land_list = data.get('data', {}).get('gaUserLandList', [])
for land_info in ga_user_land_list:
land_id = land_info['id']
land_no = land_info['no']
status = land_info['status']
step = land_info['step']
left_sun_count = land_info['leftSunCount']
sum_sun_count = land_info['sumSunCount']
sun_time = land_info['sunTime']
sun_timestamp = land_info['sunTimestamp']
need_sun = land_info['needSun']
use_sun_count = land_info['useSunCount']
unlock_gold = land_info['unlockGold']
if not self.land.get(land_no):
self.land[land_no] = {}
self.land[land_no].update({
'id': land_id,
'no': land_no,
'status': status,
'step': step,
'leftSunCount': left_sun_count,
'sumSunCount': sum_sun_count,
'sunTime': sun_time,
'sunTimestamp': sun_timestamp,
'needSun': need_sun,
'useSunCount': use_sun_count,
'unlockGold': unlock_gold
})
if land['status'] == 0:
# 检查是否应该解锁土地
if self.gold >= land['unlockGold']:
print('开始解锁新土地---》')
self.user_land_unlock(self.land[land_no])
else:
# print('调料包不足以解锁新土地')
self.all_land_unlock = False
else:
message = data.get('message', '')
# 这里应该是日志记录的代码
print('获取账号信息失败[{}]: {}'.format(str(code), message))
except Exception as e:
print(e)
def user_land_unlock(self, land_info):
print(f'解锁土地--->>>')
try:
sign_header = self.gen_sign()
url = f'{self.base_url}/user-land/unlock'
response = self.s.get(url, headers=sign_header)
data = response.json()
code = data.get('code', -1)
if code == 0:
print(f"[{land_info['no']}号土地]解锁成功---√")
self.gold -= land_info['unlockGold']
self.user_land_get()
else:
message = data.get('message', '')
print(f"[{land_info['no']}号土地]解锁失败[{str(code)}]:{message}")
except Exception as e:
print(e)
def user_land_result(self, land_info):
print(f'收获番茄--->>>')
try:
params = {'no': land_info['no']}
sign_header = self.gen_sign(params)
url = f'{self.base_url}/user-land/result'
response = self.s.get(url, headers=sign_header, params=params)
data = response.json()
code = data.get('code', -1)
if code == 0:
self.refresh_land_step = True
print(f'[{land_info["no"]}号土地]收获成功: 番茄x{data.get("data", 0)}---√')
self.user_land_get()
else:
message = data.get('message', '')
print(f"[{land_info['no']}号土地]收获失败[{str(code)}]: {message}")
except Exception as e:
print(e)
def user_land_sow(self, land_info):
print(f'播种--->>>')
try:
params = {'no': land_info['no']}
sign_header = self.gen_sign(params)
url = f'{self.base_url}/user-land/sow'
response = self.s.get(url, headers=sign_header, params=params)
data = response.json()
code = data.get('code', -1)
if code == 0:
self.refresh_land_step = True
print(f"[{land_info['no']}号土地]种植成功---√")
self.user_land_get()
else:
message = data.get('message', '')
print(f"[{land_info['no']}号土地]种植失败[{str(code)}]: {message}")
except Exception as e:
print(e)
def user_land_sun(self, land_info):
print(f'撒阳光--->>>')
try:
params = {'no': land_info['no']}
sign_header = self.gen_sign(params)
url = f'{self.base_url}/user-land/sun'
response = self.s.get(url, headers=sign_header, params=params)
data = response.json()
code = data.get('code', -1)
if code == 0:
print(f"[{land_info['no']}号土地]撒阳光成功---√")
time.sleep(1)
self.sun -= land_info['needSun']
self.user_land_get()
else:
message = data.get('message', '')
print(f"[{land_info['no']}号土地]撒阳光失败[{str(code)}]:{message}")
except Exception as e:
print(e)
def user_land_level(self, land_info):
print(f'浇水升级--->>>')
try:
params = {'no': land_info['no']}
sign_header = self.gen_sign(params)
url = f'{self.base_url}/user-land/level'
response = self.s.get(url, headers=sign_header, params=params)
data = response.json()
code = data.get('code', -1)
if code == 0:
self.refresh_land_step = True
print(f"[{land_info['no']}号土地]浇水升级成功---√")
self.user_land_get()
else:
message = data.get('message', '')
print(f"[{land_info['no']}号土地]浇水升级失败[{str(code)}]: {message}")
except Exception as e:
print(e)
def take_risk_online(self):
print(f'进入冒险页--->>>')
try:
sign_header = self.gen_sign()
url = f'{self.base_url}/take-risk/online'
response = self.s.get(url, headers=sign_header)
data = response.json()
code = data.get('code', -1)
if code != 0:
message = data.get('message', '')
print(f'进入冒险页失败{str(code)}: {message}')
else:
print(f'进入冒险页成功---√')
end = data.get('data', {}).get('end', True)
if end == True:
self.need_help_risk = False
except Exception as e:
print(e)
def take_risk_get(self):
print(f'查询冒险次数--->>>')
try:
sign_header = self.gen_sign()
url = f'{self.base_url}/common/take-risk/get'
response = self.s.get(url, headers=sign_header)
data = response.json()
code = data.get('code', -1)
if code == 0:
self.risk_num = data.get('data', {}).get('num', 0)
if data.get('data', {}).get('complete', False) == False:
self.take_risk_up(data.get('data', {}).get('gameMapEvent'))
print(f"剩余冒险【{self.risk_num}】次")
else:
message = data.get('message', '')
print(f'查询冒险次数失败: {message}' )
except Exception as e:
print(e)
def take_risk_go(self):
print(f'开始冒险--->>>')
try:
sign_header = self.gen_sign()
url = f'{self.base_url}/common/take-risk/go'
response = self.s.get(url, headers=sign_header)
data = response.json()
# print(data)
code = data.get('code', -1)
if code == 0:
self.risk_num = data.get('data', {}).get('num', 0)
if data.get('data', {}).get('complete'):
game_map_event_answer_list = data.get('data', {}).get('gameMapEvent', {}).get(
'gameMapEventAnswerList', [])
filtered_list = filter(lambda x: x.get('dropReward', {}).get('finalNum'),game_map_event_answer_list)
rewards = []
for item in filtered_list:
reward_name = item['dropReward']['name']
final_num = item['dropReward']['finalNum']
rewards.append(f'{reward_name}x{final_num}')
if rewards:
print(f'冒险奖励: {rewards}---√')
else:
print('触发冒险事件没有获取奖励')
else:
# print(data.get('data', {}).get('gameMapEvent'))
self.take_risk_up(data.get('data', {}).get('gameMapEvent'))
elif code == 4000:
slideImgInfo = data.get('data', {}).get('slideImgInfo', None)
validateCount = data.get('data', {}).get('validateCount', None)
if slideImgInfo and validateCount:
print('本次冒险需要验证码')
self.can_go_risk = False
# if self.get_CapCode(slideImgInfo):
# self.checkUserCapCode()
else:
print(f"验证次数上限")
self.can_go_risk = False
else:
message = data.get('message', '')
print(f'冒险失败{code}]: {message}')
if message and '冒险暂停中' in message or code == 4000:
self.can_go_risk = False
except Exception as e:
print(e)
def take_risk_up(self, game_map_event):
print(f'触发冒险事件--->>>')
# try:
gameMapEventAnswerList = game_map_event.get('gameMapEventAnswerList', [])
index = random.randint(0, len(gameMapEventAnswerList) - 1)
gameMapEventAnswer = gameMapEventAnswerList[index]
json_id = gameMapEventAnswer['jsonId']
# print(json_id)
params = {'jsonId': json_id}
sign_header = self.gen_sign(params)
url = f'{self.base_url}/common/take-risk/up'
response = self.s.get(url, headers=sign_header, params=params)
data = response.json()
code = data.get('code', -1)
if code == 0:
self.risk_num = data.get('data', {}).get('num', 0)
print(f'剩余次数:【{self.risk_num}')
game_map_event_list = data.get('data', {}).get('gameMapEvent', {}).get('gameMapEventAnswerList', [])
reward_list = [event['dropReward']['name'] + 'x' + str(event['dropReward']['finalNum']) for event in
game_map_event_list if event.get('dropReward', {}).get('finalNum')]
if reward_list:
print(f'冒险奖励: {reward_list}---√')
else:
print('触发冒险事件没有获取奖励')
else:
message = data.get('message', '')
print(f'触发冒险事件[ {json_id} ]失败{code}]: {message}')
# except Exception as e:
# print(e)
def take_risk_reward(self):
print(f'领取冒险定时奖励--->>>')
try:
url = f'{self.base_url}/take-risk/reward'
sign_header = self.gen_sign()
response = self.s.get(url, headers=sign_header)
data = response.json()
code = data.get('code', -1)
if code == 0:
print('领取冒险定时奖励成功---√')
else:
message = data.get('message', '').replace('\r', '').split('\n')
message = ','.join(filter(lambda x: x, message))
print(f'领取冒险定时奖励失败{code}]: {message}')
except Exception as e:
print(e)
def randomString(self,length, chars='abcdef0123456789'):
return ''.join(random.choice(chars) for _ in range(length))
def friend_help_task_risk(self, friend_info):
print('开始冒险互助--->>>')
# try:
params = {
'userId': friend_info['userId'],
'type': 0x1,
'randomId': self.randomString(32, '0123456789qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM')
}
sign_header = self.gen_sign(params)
url = f'{self.base_url}/friend-help/help'
response = self.s.get(url, headers=sign_header,params=params)
data = response.json()
code = data.get('code', -1)
if code == 0:
if data.get('data'):
self.can_help_risk = False
print(f"冒险助力账号[{friend_info['index']}][{friend_info['name']}]成功---√")
else:
message = data.get('message', '').replace('\r', '').split('\n')
message = ','.join(filter(lambda x: x, message))
print(f"冒险助力账号[{friend_info['index']}][{friend_info['name']}]失败{message}")
if '助力次数' in message:
self.can_help_risk = False
elif '挂机时间已完成' in message:
friend_info['need_help_risk'] = False
# except Exception as e:
# print(e)
def friend_help(self, friend_info):
print(f'开始阳光互助--->>>')
# try:
params = {
'userId': friend_info['userId'],
'type': 0x0,
'randomId': self.randomString(32, '0123456789qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM')
}
sign_header = self.gen_sign(params)
url = f'{self.base_url}/friend-help/help'
response = self.s.get(url, headers=sign_header,params=params)
data = response.json()
code = data.get('code', -1)
if code == 0:
if data.get('data'):
# print(f"请求助力账号[{friend_info['index']}][{friend_info['name']}] 成功---√")
self.task_doHelpTask(friend_info)
else:
message = data.get('message', '').replace('\r', '').split('\n')
message = ','.join(filter(lambda x: x, message))
print(f"请求助力账号[{friend_info['index']}][{friend_info['name']}] 失败: {message}")
if '助力次数' in message:
self.can_help_task = False
# except Exception as e:
# print(e)
def friend_findRecommend(self):
print(f'查询添加好友列表--->>>')
try:
url = f'{self.base_url}/friend/findRecommend'
sign_header = self.gen_sign()
response = self.s.get(url, headers=sign_header)
data = response.json()
code = data.get('code', -1)
if code == 0:
print(f'查询添加好友列表成功---√')
for user_info in data.get('data', []):
if not self.can_add_friend:
print('好友已满')
break
# if len(list(filter(lambda user: user['userId'] == user_info['userId'], self.user_list))) > 0:
# continue
self.friend_addFriend(user_info)
# self.friend_deleteFriend(user_info)
else:
message = data.get('message', '')
print(f'查询添加好友列表失败{code}]: {message}')
except Exception as e:
print(e)
def friend_addFriend(self, user_data):
print(f'添加好友--->>>')
try:
friend_user_id = user_data['userId']
params = {'friendUserId': friend_user_id}
sign_header = self.gen_sign(params)
url = f'{self.base_url}/friend/addFriend'
response = self.s.get(url, headers=sign_header, params=params)
data = response.json()
code = data.get('code', -1)
if code == 0:
print(f"添加好友[{user_data['nickName']}]成功---√")
else:
message = data.get('message', '')
print(f'添加好友{user_data["nickName"]}失败{code}]: {message}')
if message and '达到好友上限' in message:
self.can_add_friend = False
except Exception as e:
print(e)
def friend_deleteFriend(self, user_data):
print(f'删除好友--->>>')
try:
friend_user_id = user_data['userId']
params = {'friendUserId': friend_user_id}
sign_header = self.gen_sign(params)
url = f'{self.base_url}/friend/delFriend'
response = self.s.get(url, headers=sign_header, params=params)
data = response.json()
code = data.get('code', -1)
if code == 0:
print(f"删除好友[{user_data['nickName']}]成功---√")
else:
message = data.get('message', '')
print(f'删除好友{user_data["nickName"]}失败{code}]: {message}')
except Exception as e:
print(e)
def friend_findFriend(self):
print(f'\n查询好友列表--->>>')
try:
sign_header = self.gen_sign()
url = f'{self.base_url}/friend/findFriend'
response = self.s.get(url, headers=sign_header)
data = response.json()
code = data.get('code', -1)
if code == 0:
print(f'查询好友列表成功---√')
friend_list = data.get('data', {}).get('friendList', [])
friend_list.sort(key=lambda x: x['gold'], reverse=True)
for friend in friend_list:
if friend.get('stealFlag', False):
if not self.friend_stealGold(friend):
break
else:
message = data.get('message', '')
print(f'查询好友列表失败[{str(code)}]:{message}')
except Exception as e:
print(e)
def get_CapCode(self,slideImgInfo):
slidingImage = slideImgInfo.get('slidingImage', None)
backImage = slideImgInfo.get('backImage', None)
if slidingImage and backImage:
slidingImage_path = 'slidingImage.png'
backImage_path = 'backImage.png'
self.capcode = CHERWIN_TOOLS.OCR_API(slidingImage,backImage)
return True
# slidingImage_res = CHERWIN_TOOLS.BASE64_TO_IMG(slidingImage, slidingImage_path)
# backImage_res = CHERWIN_TOOLS.BASE64_TO_IMG(backImage, backImage_path)
# if slidingImage_res and backImage_res:
# self.capcode = CHERWIN_TOOLS.CAPCODE(slidingImage_path, backImage_path)
# return True
# else:
# return False
else:
return False
def friend_stealGold(self, user_data):
print(f'偷取好友--->>>')
# try:
friend_user_id = user_data['userId']
params = {'friendUserId': friend_user_id}
sign_header = self.gen_sign(params)
url = f'{self.base_url}/friend/stealGold'
response = self.s.get(url, headers=sign_header, params=params)
data = response.json()
code = data.get('code', -1)
if code == 0:
print(f"偷取好友[{user_data['nickName']}]: 阳光x{data.get('data', 0)}")
return True
elif code == 4000:
slideImgInfo = data.get('data', {}).get('slideImgInfo', None)
validateCount = data.get('data', {}).get('validateCount', None)
if slideImgInfo and validateCount:
print(f"偷取好友[{user_data['nickName']}],需要滑块验证")
self.can_stealGold = False
# if self.get_CapCode(slideImgInfo):
# self.checkUserCapCode()
else:
print(f"偷取好友[{user_data['nickName']}]验证码上限")
self.can_stealGold = False
else:
message = data.get('message', '')
print(f"偷取好友[{user_data['nickName']}]阳光失败[{str(code)}]: {message}")
return False
# except Exception as e:
# print(e)
def checkUserCapCode(self):
print(f'提交验证码--->>>')
# try:
print(f'验证码:{self.capcode}')
params = {'xpos':self.capcode}
print(params)
sign_header = self.gen_sign(body=params)
url = f'{self.base_url}/checkUserCapCode'
response = self.s.post(url, headers=sign_header, json=params)
data = response.json()
code = data.get('code', -1)
if code == 0:
data = data.get('data', 0)
print(f"验证码正确,获取到[{data}]")
else:
message = data.get('message', '')
print(f"验证码错误,偷取失败[{message}]")
# except Exception as e:
# print(e)
def exchange_reward(self, reward_id):
print(f'兑换--->>>')
try:
params = {'id': reward_id}
sign_header = self.gen_sign(params)
url = f'{self.base_url}/exchange/reward'
response = self.s.get(url, headers=sign_header, params=params)
data = response.json()
code = data.get('code', -1)
if code == 0:
data = data.get('data', {}).get('name') or ''
print(f"兑换[{data}]成功")
else:
message = data.get('message', '')
print(f'兑换[id={reward_id}]: {message}')
except Exception as e:
print(e)
def risk_task(self):
self.take_risk_get()
self.take_risk_online()
self.take_risk_reward()
print(f'可以冒险【{self.risk_num}】次')
while self.risk_num > 0 and self.can_go_risk == True:
self.take_risk_go()
time.sleep(2)
def land_task(self):
self.user_land_get()
if self.all_land_unlock and self.all_role_unlock:
pass
for land_number, land_info in self.land.items():
# print(land_number)
# print(land_info)
if land_info['status'] == 0:
continue
step, use_sun_count, left_sun_count = land_info['step'], land_info['useSunCount'], land_info['leftSunCount']
total_sun_count = (use_sun_count if use_sun_count else 0) + (left_sun_count if left_sun_count else 0)
print(f"\n---[{land_info['no']}号土地]---\n{self.group_step[step-1]},阶段{step}, 进度{use_sun_count}/{total_sun_count}")
self.refresh_land_step = True
while self.refresh_land_step:
self.refresh_land_step = False
if land_info['status'] == 0:
break
if step == 0:
self.user_land_sow(land_info)
else:
if left_sun_count == 0:
if step == 5:
self.user_land_result(land_info)
else:
current_time = int(time.time() * 1000)
if current_time >= (land_info['sunTime'] + land_info['sunTimestamp']) * 1000:
self.user_land_level(land_info)
else:
if self.sun >= land_info['needSun']:
current_time = int(time.time() * 1000)
if current_time >= (land_info['sunTime'] + land_info['sunTimestamp']) * 1000:
self.user_land_sun(land_info)
# time.sleep(2)
def userTask(self):
print('\n--------------- 开始日常任务 ---------------')
wait_time = random.randint(1000, 3000) / 1000.0 # 转换为秒
if not self.Login_res:
return False
self.userInfo_autoSun()
self.userInfo_get()
self.task_get()
self.user_role_get()
self.risk_task()
self.land_task()
self.steal_task()
self.userInfo_get(END=True)
new_data = {
self.userId: {
'userId': self.userId,
'task_id': self.task_id,
'task_can_help': self.can_help_task,
'task_need_help': self.need_help_task,
'task_progress': self.task_progress,
'task_max_progress': self.task_max_progress,
'role_id': self.role_id,
'role_can_help': self.can_help_unlock,
'role_need_help': self.need_help_unlock,
'role_progress': self.role_progress,
'role_max': self.role_max,
'risk_can_help': self.can_help_risk,
'risk_need_help': self.need_help_task,
'can_go_risk': self.can_go_risk,
'can_add_friend': self.can_add_friend
}
}
# print(new_data)
USER_INFO.update(new_data)
CHERWIN_TOOLS.SAVE_INVITE_CODE(f"INVITE_CODE/{ENV_NAME}_INVITE_CODE.json", new_data)
# self.steal_task()
self.sendMsg()
return True
def helpEachOther(self):
print('--------------- 开始互助 ---------------')
if not self.Login_res:
return False
self.userInfo_get()
if self.user_index == 1:
print('第一个账号助力作者--->>>')
data_li = AuthorCode
help_type = '作者'
else:
print('其余账号互助--->>>')
json_data = self.load_json()
data_li = list(json_data.values())
help_type = '本地'
# print(data_li)
for index, code_li in enumerate(data_li):
# print(f"Index: {index}")
# print(f"Data: {code_li}")
if code_li.get('userId','') == self.userId:
continue
task_need_help = code_li.get("task_need_help", '')
role_need_help = code_li.get("role_need_help", '')
risk_need_help = code_li.get("task_can_help", '')
code_li["index"] = index+1
print(f'\n------助力{help_type}账号【{index+1}】开始------')
# print(code_li)
# 冒险助力
if self.can_help_risk and risk_need_help:
self.friend_help_task_risk(code_li)
else:
print(f'好友冒险助力已完成或冒险助力次数已耗尽')
# 解锁角色助力
if self.can_help_unlock and role_need_help:
self.user_role_friendHelpUnlock(code_li)
else:
print(f'好友解锁角色助力已完成或解锁角色助力次数已耗尽')
# 阳光助力
if self.can_help_task and task_need_help:
self.friend_help(code_li)
else:
print(f'阳光助力次数已耗尽')
print(f'------助力{help_type}账号【{index+1}】结束------')
time.sleep(1)
new_data = {
self.userId: {
'task_can_help': self.can_help_task,
'role_can_help': self.can_help_unlock,
'risk_can_help': self.can_help_risk
}
}
CHERWIN_TOOLS.SAVE_INVITE_CODE(f"INVITE_CODE/{ENV_NAME}_INVITE_CODE.json", new_data)
self.sendMsg(True)
return True
def steal_task(self, count=5):
if not self.Login_res:
return False
json_data = self.load_json()
self.can_add_friend = json_data.get(self.userId, {}).get('can_add_friend', '')
for i in range(count):
if self.can_add_friend:
self.friend_findRecommend()
time.sleep(2)
self.friend_findFriend()
return True
def sendMsg(self, help=False):
if self.send_UID:
push_res = CHERWIN_TOOLS.wxpusher(self.send_UID, one_msg, APP_NAME, help)
print(push_res)
def down_file(filename, file_url):
print(f'开始下载:{filename},下载地址:{file_url}')
try:
response = requests.get(file_url, verify=False, timeout=10)
response.raise_for_status()
with open(filename + '.tmp', 'wb') as f:
f.write(response.content)
print(f'{filename}】下载完成!')
# 检查临时文件是否存在
temp_filename = filename + '.tmp'
if os.path.exists(temp_filename):
# 删除原有文件
if os.path.exists(filename):
os.remove(filename)
# 重命名临时文件
os.rename(temp_filename, filename)
print(f'{filename}】重命名成功!')
return True
else:
print(f'{filename}】临时文件不存在!')
return False
except Exception as e:
print(f'{filename}】下载失败:{str(e)}')
return False
def import_Tools():
global CHERWIN_TOOLS,ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode
import CHERWIN_TOOLS
ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode = CHERWIN_TOOLS.main(APP_NAME, local_script_name, ENV_NAME,local_version)
if __name__ == '__main__':
APP_NAME = '统一茄皇'
ENV_NAME = 'TYQH'
CK_NAME = 'thirdId@wid'
CK_URL = '.../public/api/login'
print(f'''
✨✨✨ {APP_NAME}脚本 ✨✨✨
✨ 功能:
日常任务
互助任务
✨ 抓包步骤:
统一快乐星球小程序-活动
开始抓包-茄皇的家第三期
{CK_URL}{CK_NAME}
✨ 设置青龙变量:
export {ENV_NAME}= '{CK_NAME}'多账号#分割或&
export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
✨ 推荐定时25 7-23/2 * * *
✨ 第一个账号助力作者,其余互助
✨✨✨ @Author CHERWIN✨✨✨
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.04.16'
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print('脚本依赖下载失败请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token
if not token:
print(f"未填写{ENV_NAME}变量\n青龙可在环境变量设置 {ENV_NAME} 或者在本脚本文件上方将{CK_NAME}填入token =''")
exit()
tokens = CHERWIN_TOOLS.ENV_SPLIT(token)
# print(tokens)
if len(tokens) > 0:
print(f"\n>>>>>>>>>>共获取到{len(tokens)}个账号<<<<<<<<<<")
access_token = []
for index, infos in enumerate(tokens):
run_result = RUN(infos, index).userTask()
if not run_result: continue
for index, infos in enumerate(tokens):
run_result =RUN(infos, index).helpEachOther()
if not run_result: continue
if send: send(f'{APP_NAME}挂机通知', send_msg + TIPS_HTML)