mirror of
https://github.com/CHERWING/CHERWIN_SCRIPTS.git
synced 2024-11-22 09:04:11 +08:00
1238 lines
51 KiB
Python
Executable File
1238 lines
51 KiB
Python
Executable File
# !/usr/bin/python3
|
||
# -- coding: utf-8 --
|
||
# -------------------------------
|
||
# @Author : cherwin
|
||
# -------------------------------
|
||
# cron "23 7-23/2 * * * " script-path=xxx.py,tag=匹配cron用
|
||
# const $ = new Env('统一快乐星球小程序-茄皇的家')
|
||
import hashlib
|
||
import json
|
||
import os
|
||
import random
|
||
import string
|
||
import time
|
||
from datetime import datetime
|
||
from os import environ, path
|
||
|
||
import requests
|
||
from requests.packages.urllib3.exceptions import InsecureRequestWarning
|
||
|
||
# 禁用安全请求警告
|
||
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
|
||
|
||
if os.path.isfile('DEV_ENV.py'):
|
||
import DEV_ENV
|
||
|
||
if os.path.isfile('notify.py'):
|
||
from notify import send
|
||
print("加载通知服务成功!")
|
||
else:
|
||
print("加载通知服务失败!")
|
||
send_msg = ''
|
||
one_msg=''
|
||
def Log(cont=''):
|
||
global send_msg,one_msg
|
||
print(cont)
|
||
if cont:
|
||
one_msg += f'{cont}\n'
|
||
send_msg += f'{cont}\n'
|
||
|
||
USER_INFO = {}
|
||
|
||
class RUN:
|
||
def __init__(self, info,index):
|
||
global one_msg
|
||
one_msg = ''
|
||
split_info = info.split('@')
|
||
self.third_id = split_info[0]
|
||
self.wid = split_info[1]
|
||
len_split_info = len(split_info)
|
||
last_info = split_info[len_split_info - 1]
|
||
self.send_UID = None
|
||
if len_split_info > 0 and "UID_" in last_info:
|
||
self.send_UID = last_info
|
||
|
||
self.user_index = index + 1
|
||
print(f"\n---------开始执行第{self.user_index}个账号>>>>>")
|
||
self.s = requests.session()
|
||
self.s.verify = False
|
||
|
||
self.UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x6309080f) XWEB/8555'
|
||
|
||
self.headers = {
|
||
'User-Agent': self.UA,
|
||
'Sec-Fetch-Site': 'cross-site',
|
||
'Sec-Fetch-Mode': 'cors',
|
||
'Sec-Fetch-Dest': 'empty',
|
||
'Accept-Encoding': 'gzip, deflate, br',
|
||
'Accept-Language': 'zh-CN,zh;q=0.9',
|
||
'Origin': 'https://thekingoftomato.ioutu.cn',
|
||
'Referer': 'https://thekingoftomato.ioutu.cn/'
|
||
|
||
}
|
||
self.base_url = 'https://qiehuang-apig.xiaoyisz.com/qiehuangsecond/ga'
|
||
self.sun = 0
|
||
self.land = {}
|
||
self.refresh_land_step = False
|
||
self.need_help_unlock = True
|
||
self.need_help_task = True
|
||
self.need_help_risk = True
|
||
|
||
self.can_go_risk = True
|
||
self.can_add_friend = True
|
||
|
||
self.can_help_task = True
|
||
self.can_help_risk = True
|
||
self.can_help_unlock = True
|
||
|
||
self.all_land_unlock = True
|
||
self.all_role_unlock = True
|
||
self.help_task_config = {}
|
||
self.help_role_config = {}
|
||
self.group_step = ['发育期', '幼苗期', '开花期', '结果期', '收获期']
|
||
self.Login_res = self.login()
|
||
|
||
def load_json(self):
|
||
try:
|
||
with open(f"INVITE_CODE/{ENV_NAME}_INVITE_CODE.json", 'r', encoding='utf-8') as file:
|
||
data = json.load(file)
|
||
return data
|
||
except FileNotFoundError:
|
||
print("未找到文件,返回空字典")
|
||
return {}
|
||
except Exception as e:
|
||
print(f"发生错误:{e}")
|
||
return {}
|
||
|
||
def make_request(self, url, method='post', headers={}, params={}):
|
||
if headers == {}:
|
||
headers = self.headers
|
||
if params == {}:
|
||
params = self.params
|
||
try:
|
||
if method.lower() == 'get':
|
||
response = requests.get(url, headers=headers, verify=False)
|
||
|
||
elif method.lower() == 'post':
|
||
response = requests.post(url, headers=headers, json=params, verify=False)
|
||
else:
|
||
raise ValueError("不支持的请求方法: " + method)
|
||
return response.json()
|
||
except requests.exceptions.RequestException as e:
|
||
print("请求异常:", e)
|
||
except ValueError as e:
|
||
print("值错误或不支持的请求方法:", e)
|
||
except Exception as e:
|
||
print("发生了未知错误:", e)
|
||
|
||
def gen_sign(self, parameters={}, body=None):
|
||
sorted_keys = sorted(parameters.keys())
|
||
parameter_strings = []
|
||
for key in sorted_keys:
|
||
if isinstance(parameters[key], dict):
|
||
parameter_strings.append(f"{key}={json.dumps(parameters[key])}")
|
||
else:
|
||
parameter_strings.append(f"{key}={parameters[key]}")
|
||
|
||
current_time = int(datetime.now().timestamp() * 1000)
|
||
secret_chars = list('BxzTx45uIGT25TTHIIBU2')
|
||
last_three_digits = str(current_time)[-3:]
|
||
for digit in last_three_digits:
|
||
secret_chars.insert(int(digit), digit)
|
||
|
||
secret = hashlib.md5(''.join(secret_chars).encode()).hexdigest()
|
||
nonce_str = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
|
||
|
||
sign_data = {
|
||
'client_id': 'game',
|
||
'nonstr': nonce_str,
|
||
'timestamp': current_time,
|
||
'body': json.dumps(body) if body else '',
|
||
'query': '&'.join(parameter_strings) if parameter_strings else '',
|
||
'secret': secret
|
||
}
|
||
|
||
sign_string = '|'.join([str(v) for v in sign_data.values()])
|
||
sign = hashlib.md5(sign_string.encode()).hexdigest().upper()
|
||
sign_header = {
|
||
'client_id': 'game',
|
||
'timestamp': str(current_time),
|
||
'nonstr': sign_data['nonstr'],
|
||
'sign': sign
|
||
}
|
||
self.headers.update(sign_header)
|
||
return self.headers
|
||
|
||
def login(self):
|
||
login_successful = False
|
||
try:
|
||
login_params = {
|
||
'thirdId': self.third_id,
|
||
'wid': self.wid
|
||
}
|
||
sign_header = self.gen_sign({}, login_params)
|
||
# print(self.headers)
|
||
# Hypothetically speaking, this is how you might perform a POST request in Python with the requests library.
|
||
response = self.s.post(f'{self.base_url}/public/api/login', json=login_params, headers=sign_header)
|
||
if response.status_code == 200:
|
||
response_data = response.json()
|
||
if response_data.get('code', -1) == 0:
|
||
auth = response_data['data']['token'] or ''
|
||
if auth:
|
||
login_successful = True
|
||
Log(f'账号【{self.user_index}】登录成功')
|
||
Authorization = {'Authorization': auth}
|
||
self.headers.update(Authorization)
|
||
else:
|
||
Log(f'账号【{self.user_index}】登录获取auth失败')
|
||
else:
|
||
print(f"登录获取auth失败[{response_data['code']}]: {response_data['message']}")
|
||
elif response.status_code == 403:
|
||
print('登录失败[403]: 黑IP了, 换个IP试试吧')
|
||
except Exception as e:
|
||
print(e)
|
||
finally:
|
||
return login_successful
|
||
|
||
def userInfo_get(self,END=False):
|
||
print(f'获取用户[{self.user_index}]信息--->>>')
|
||
try:
|
||
sign_header = self.gen_sign()
|
||
# print(self.headers)
|
||
url = f'{self.base_url}/userInfo/get'
|
||
# 发起GET请求
|
||
response = self.s.get(url, headers=sign_header, verify=False)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
data = data.get('data', {})
|
||
self.userId = data['userId']
|
||
self.gold = int(data['gold'])
|
||
self.score = int(data['score'])
|
||
self.sun = int(data['sun'])
|
||
self.sunMax = int(data['sunMax'])
|
||
self.nickName = data['nickName']
|
||
if self.nickName == None:self.nickName = '未命名'
|
||
new_data = {
|
||
self.userId:
|
||
{
|
||
'name': self.nickName
|
||
}
|
||
}
|
||
USER_INFO.update(new_data)
|
||
CHERWIN_TOOLS.SAVE_INVITE_CODE(f"INVITE_CODE/{ENV_NAME}_INVITE_CODE.json", new_data)
|
||
if END:
|
||
Log(f"-----用户[{self.user_index}]信息-----\n用户Id:{self.userId}\n用户名:{self.nickName}\n调料🧂x{self.gold}\n番茄🍅x{self.score}\n阳光☀x{self.sun}\n------用户[{self.user_index}]信息END------")
|
||
else:
|
||
print(f"-----用户[{self.user_index}]信息-----\n用户Id:{self.userId}\n用户名:{self.nickName}\n调料🧂x{self.gold}\n番茄🍅x{self.score}\n阳光☀x{self.sun}\n------用户[{self.user_index}]信息END------")
|
||
|
||
else:
|
||
error_message = data.get('message', '')
|
||
print(f"获取账号信息失败[{str(code)}]: {error_message}")
|
||
except Exception as e:
|
||
print(e)
|
||
finally:
|
||
return
|
||
|
||
def userInfo_autoSun(self):
|
||
print('收集阳光--->>>')
|
||
sign_header = self.gen_sign()
|
||
url = f'{self.base_url}/userInfo/autoSun'
|
||
response = self.s.get(url, headers=sign_header)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
sun = data.get('data', {}).get('sun', 0)
|
||
print(f"刷新收集到【{sun}】阳光---√")
|
||
else:
|
||
print(f"刷新收集阳光失败[{data['code']}]: {str(code)}")
|
||
|
||
def task_get(self):
|
||
print(f'获取任务列表--->>>')
|
||
sign_header = self.gen_sign()
|
||
url = f'{self.base_url}/task/get'
|
||
response = self.s.get(url, headers=sign_header)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
tasks = data.get('data', [])
|
||
for task in tasks:
|
||
self.task_id = task['id']
|
||
self.task_title = task['title']
|
||
task_status = task['status']
|
||
self.task_progress = task.get('currentProgress', 0)
|
||
self.task_max_progress = task.get('progress', 1)
|
||
|
||
print(f"任务:【{self.task_title}】进度:【{self.task_progress}/{self.task_max_progress}】")
|
||
if self.task_title == '邀请好友助力':
|
||
if self.task_progress == self.task_max_progress:
|
||
self.need_help_task = False
|
||
# new_data = {
|
||
# self.userId: {
|
||
# 'task_id': self.task_id,
|
||
# 'task_stu': self.can_help_task,
|
||
# 'current_progress': self.task_progress
|
||
# }
|
||
# }
|
||
# USER_INFO.update(new_data)
|
||
# CHERWIN_TOOLS.SAVE_INVITE_CODE("INVITE_CODE/QH_INVITE_CODE.json", new_data)
|
||
print(f"任务 '{self.task_title}' 【跳过】---》")
|
||
continue
|
||
elif self.task_title == '邀请新人助力':
|
||
print(f"任务 '{self.task_title}' 【跳过】---》")
|
||
continue
|
||
|
||
if task_status == 0 and self.task_progress < self.task_max_progress:
|
||
# 此任务尚未完成,根据具体业务逻辑进行处理,如启动任务
|
||
print(f"任务【{self.task_title}】【未完成】执行任务---》")
|
||
self.task_doTask() # 取消这行注释来执行任务
|
||
|
||
elif task_status == 1:
|
||
# 任务已完成但奖励未被领取,根据业务逻辑领取奖励
|
||
print(f"任务【{self.task_title}】【已完成】领取奖励---》")
|
||
self.task_reward() # 取消这行注释来领取奖励
|
||
else:
|
||
print(f"获取任务列表失败[{data['code']}]: {str(code)}")
|
||
|
||
def task_doHelpTask(self,help_data):
|
||
# print('开始助力任务---》')
|
||
params = {'id': help_data["task_id"]}
|
||
sign_header = self.gen_sign(params)
|
||
url = f'{self.base_url}/task/doTask'
|
||
response = self.s.get(url, headers=sign_header,params=params)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
print(f"助力账号[{help_data['index']}][{help_data['name']}]成功---√")
|
||
elif '已超出' in data.get('message', ''):
|
||
self.can_help_task = False
|
||
error_message = data.get('message', '')
|
||
print(f"助力账号[{help_data['index']}][{help_data['name']}]失败[{code}]: {error_message}")
|
||
else:
|
||
error_message = data.get('message', '')
|
||
print(f"助力账号[{help_data['index']}][{help_data['name']}]失败[{code}]: {error_message}")
|
||
if '助力次数' in error_message:
|
||
self.can_help_task = False
|
||
|
||
def task_doTask(self):
|
||
sign_header = self.gen_sign({'id': self.task_id})
|
||
url = f'{self.base_url}/task/doTask'
|
||
params = {'id': self.task_id}
|
||
response = self.s.get(url, headers=sign_header, params=params)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
print(f"完成任务[{self.task_title}]成功---√")
|
||
else:
|
||
print(f"完成任务[{self.task_title}]失败[{str(code)}]: {data.get('message', '')}")
|
||
|
||
def task_reward(self):
|
||
sign_header = self.gen_sign({'id': self.task_id})
|
||
url = f'{self.base_url}/task/reward'
|
||
params = {'id': self.task_id}
|
||
response = self.s.get(url, headers=sign_header, params=params)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
print(f"领取任务[{self.task_title}]奖励成功---√")
|
||
else:
|
||
print(f"领取任务[{self.task_title}]奖励失败[{str(code)}]: {data.get('message', '')}")
|
||
|
||
def user_role_get(self):
|
||
print('\n查询角色信息--->>>')
|
||
try:
|
||
sign_header = self.gen_sign()
|
||
url = f'{self.base_url}/user-role/get'
|
||
response = self.s.get(url, headers=sign_header)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
data = data.get('data')
|
||
if data and data.get('isReward'):
|
||
print('可领取定时奖励,开始领取---》')
|
||
self.user_role_reward() # 此处调用 user_role_reward 函数
|
||
for role in data.get('roleList', []):
|
||
if role['status'] > 0:
|
||
print(f"角色【{role['name']}】已解锁---√")
|
||
continue
|
||
if role['unlockType'] == 1:
|
||
if self.gold >= role['unlockNum']:
|
||
print(f"调料包充足,开始解锁角色【{role['name']}】---》")
|
||
self.user_role_goldUnlock(role) # 此处调用 user_role_goldUnlock 函数
|
||
else:
|
||
print(f"调料包不足解锁角色【{role['name']}】---!")
|
||
elif role['unlockType'] == 2:
|
||
self.user_role_findFriendHelpInfo(role) # 此处调用 user_role_findFriendHelpInfo 函数
|
||
else:
|
||
message = data.get('message', '')
|
||
print(f'角色信息失败【{code}】: {message}')
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def user_role_findFriendHelpInfo(self, role):
|
||
print(f'查询角色邀请进度--->>>')
|
||
try:
|
||
params = {'userRoleId': role['id']}
|
||
sign_header = self.gen_sign(params)
|
||
url = f'{self.base_url}/user-role/findFriendHelpInfo'
|
||
response = self.s.get(url, headers=sign_header, params=params)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
data = data.get('data')
|
||
self.help_role_config = {
|
||
'id': role['id'],
|
||
'progress': len(data) if data else 0,
|
||
'max': role['unlockNum']
|
||
}
|
||
self.role_id = role['id']
|
||
self.role_progress = len(data) if data else 0
|
||
self.role_max = role['unlockNum']
|
||
if role['unlockNum'] == len(data) if data else 0:
|
||
self.need_help_unlock=False
|
||
# new_data = {
|
||
# self.userId:
|
||
# {
|
||
# 'role_id': self.role_id,
|
||
# 'role_can_help': self.can_help_unlock,
|
||
# 'role_progress': self.role_progress,
|
||
# 'role_max': self.role_max
|
||
# }
|
||
# }
|
||
# USER_INFO.update(new_data)
|
||
# CHERWIN_TOOLS.SAVE_INVITE_CODE("INVITE_CODE/QH_INVITE_CODE.json", new_data)
|
||
print(f'查询角色【{role["name"]}】邀请进度成功---√')
|
||
else:
|
||
message = data.get('message', '')
|
||
print(f'查询角色【{role["name"]}】邀请进度失败[{code}]: {message}')
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def user_role_friendHelpUnlock(self, help_data):
|
||
print('开始角色解锁互助--->>>')
|
||
# try:
|
||
params = {'userRoleId': help_data['role_id']}
|
||
sign_header = self.gen_sign(params)
|
||
url = f'{self.base_url}/user-role/friendHelpUnlock'
|
||
response = self.s.get(url, headers=sign_header, params=params)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
help_data['role_progress'] += 1
|
||
print(f'助力账号[{help_data["index"]}][{help_data["name"]}]解锁角色成功---√')
|
||
else:
|
||
message = data.get('message', '')
|
||
print(f'助力账号[{help_data["index"]}][{help_data["name"]}]解锁角色失败[{code}]: {message}')
|
||
# except Exception as e:
|
||
# print(e)
|
||
|
||
def user_role_goldUnlock(self, role):
|
||
print(f'解锁角色开始--->>>')
|
||
try:
|
||
params = {'roleId': role['roleId']}
|
||
sign_header = self.gen_sign(params)
|
||
url = f'{self.base_url}/user-role/goldUnlock'
|
||
response = self.s.get(url, headers=sign_header, params=params)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
self.gold -= role['unlockNum']
|
||
print(f'解锁角色[{role["name"]}]成功')
|
||
self.user_role_reward() # 此处调用 user_role_reward 函数
|
||
else:
|
||
message = data.get('message', '')
|
||
print(f'解锁角色[{role["name"]}]失败[{code}]: {message}')
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def user_role_reward(self):
|
||
print(f'领取伴手礼--->>>')
|
||
try:
|
||
sign_header = self.gen_sign()
|
||
url = f'{self.base_url}/user-role/reward'
|
||
response = self.s.get(url, headers=sign_header)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
data = data.get('data')
|
||
name = data.get('name') if data else 'unknown'
|
||
print(f'领取伴手礼【{name}】成功')
|
||
else:
|
||
message = data.get('message', '')
|
||
print(f'领取伴手礼失败[{code}]: {message}')
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def user_land_get(self):
|
||
print(f'\n刷新土地信息--->>>')
|
||
try:
|
||
sign_header = self.gen_sign()
|
||
url = f'{self.base_url}/user-land/get'
|
||
response = self.s.get(url, headers=sign_header)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
print('土地信息刷新成功---√')
|
||
for land in data.get('data', {}).get('gaUserLandList', []):
|
||
ga_user_land_list = data.get('data', {}).get('gaUserLandList', [])
|
||
for land_info in ga_user_land_list:
|
||
land_id = land_info['id']
|
||
land_no = land_info['no']
|
||
status = land_info['status']
|
||
step = land_info['step']
|
||
left_sun_count = land_info['leftSunCount']
|
||
sum_sun_count = land_info['sumSunCount']
|
||
sun_time = land_info['sunTime']
|
||
sun_timestamp = land_info['sunTimestamp']
|
||
need_sun = land_info['needSun']
|
||
use_sun_count = land_info['useSunCount']
|
||
unlock_gold = land_info['unlockGold']
|
||
|
||
if not self.land.get(land_no):
|
||
self.land[land_no] = {}
|
||
|
||
self.land[land_no].update({
|
||
'id': land_id,
|
||
'no': land_no,
|
||
'status': status,
|
||
'step': step,
|
||
'leftSunCount': left_sun_count,
|
||
'sumSunCount': sum_sun_count,
|
||
'sunTime': sun_time,
|
||
'sunTimestamp': sun_timestamp,
|
||
'needSun': need_sun,
|
||
'useSunCount': use_sun_count,
|
||
'unlockGold': unlock_gold
|
||
})
|
||
if land['status'] == 0:
|
||
# 检查是否应该解锁土地
|
||
if self.gold >= land['unlockGold']:
|
||
print('开始解锁新土地---》')
|
||
self.user_land_unlock(self.land[land_no])
|
||
else:
|
||
# print('调料包不足以解锁新土地')
|
||
self.all_land_unlock = False
|
||
else:
|
||
message = data.get('message', '')
|
||
# 这里应该是日志记录的代码
|
||
print('获取账号信息失败[{}]: {}'.format(str(code), message))
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def user_land_unlock(self, land_info):
|
||
print(f'解锁土地--->>>')
|
||
try:
|
||
sign_header = self.gen_sign()
|
||
url = f'{self.base_url}/user-land/unlock'
|
||
response = self.s.get(url, headers=sign_header)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
print(f"[{land_info['no']}号土地]解锁成功---√")
|
||
self.gold -= land_info['unlockGold']
|
||
self.user_land_get()
|
||
else:
|
||
message = data.get('message', '')
|
||
print(f"[{land_info['no']}号土地]解锁失败[{str(code)}]:{message}")
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def user_land_result(self, land_info):
|
||
print(f'收获番茄--->>>')
|
||
try:
|
||
params = {'no': land_info['no']}
|
||
sign_header = self.gen_sign(params)
|
||
url = f'{self.base_url}/user-land/result'
|
||
response = self.s.get(url, headers=sign_header, params=params)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
self.refresh_land_step = True
|
||
print(f'[{land_info["no"]}号土地]收获成功: 番茄x{data.get("data", 0)}---√')
|
||
self.user_land_get()
|
||
else:
|
||
message = data.get('message', '')
|
||
print(f"[{land_info['no']}号土地]收获失败[{str(code)}]: {message}")
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def user_land_sow(self, land_info):
|
||
print(f'播种--->>>')
|
||
try:
|
||
params = {'no': land_info['no']}
|
||
sign_header = self.gen_sign(params)
|
||
url = f'{self.base_url}/user-land/sow'
|
||
response = self.s.get(url, headers=sign_header, params=params)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
self.refresh_land_step = True
|
||
print(f"[{land_info['no']}号土地]种植成功---√")
|
||
self.user_land_get()
|
||
else:
|
||
message = data.get('message', '')
|
||
print(f"[{land_info['no']}号土地]种植失败[{str(code)}]: {message}")
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def user_land_sun(self, land_info):
|
||
print(f'撒阳光--->>>')
|
||
try:
|
||
params = {'no': land_info['no']}
|
||
sign_header = self.gen_sign(params)
|
||
url = f'{self.base_url}/user-land/sun'
|
||
response = self.s.get(url, headers=sign_header, params=params)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
print(f"[{land_info['no']}号土地]撒阳光成功---√")
|
||
time.sleep(1)
|
||
self.sun -= land_info['needSun']
|
||
self.user_land_get()
|
||
else:
|
||
message = data.get('message', '')
|
||
print(f"[{land_info['no']}号土地]撒阳光失败[{str(code)}]:{message}")
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def user_land_level(self, land_info):
|
||
print(f'浇水升级--->>>')
|
||
try:
|
||
params = {'no': land_info['no']}
|
||
sign_header = self.gen_sign(params)
|
||
url = f'{self.base_url}/user-land/level'
|
||
response = self.s.get(url, headers=sign_header, params=params)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
self.refresh_land_step = True
|
||
print(f"[{land_info['no']}号土地]浇水升级成功---√")
|
||
self.user_land_get()
|
||
else:
|
||
message = data.get('message', '')
|
||
print(f"[{land_info['no']}号土地]浇水升级失败[{str(code)}]: {message}")
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def take_risk_online(self):
|
||
print(f'进入冒险页--->>>')
|
||
try:
|
||
sign_header = self.gen_sign()
|
||
url = f'{self.base_url}/take-risk/online'
|
||
response = self.s.get(url, headers=sign_header)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code != 0:
|
||
message = data.get('message', '')
|
||
print(f'进入冒险页失败{str(code)}: {message}')
|
||
else:
|
||
print(f'进入冒险页成功---√')
|
||
end = data.get('data', {}).get('end', True)
|
||
if end == True:
|
||
self.need_help_risk = False
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def take_risk_get(self):
|
||
print(f'查询冒险次数--->>>')
|
||
try:
|
||
sign_header = self.gen_sign()
|
||
url = f'{self.base_url}/common/take-risk/get'
|
||
response = self.s.get(url, headers=sign_header)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
self.risk_num = data.get('data', {}).get('num', 0)
|
||
if data.get('data', {}).get('complete', False) == False:
|
||
self.take_risk_up(data.get('data', {}).get('gameMapEvent'))
|
||
print(f"剩余冒险【{self.risk_num}】次")
|
||
else:
|
||
message = data.get('message', '')
|
||
print(f'查询冒险次数失败: {message}' )
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def take_risk_go(self):
|
||
print(f'开始冒险--->>>')
|
||
try:
|
||
sign_header = self.gen_sign()
|
||
url = f'{self.base_url}/common/take-risk/go'
|
||
response = self.s.get(url, headers=sign_header)
|
||
data = response.json()
|
||
# print(data)
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
self.risk_num = data.get('data', {}).get('num', 0)
|
||
if data.get('data', {}).get('complete'):
|
||
game_map_event_answer_list = data.get('data', {}).get('gameMapEvent', {}).get(
|
||
'gameMapEventAnswerList', [])
|
||
filtered_list = filter(lambda x: x.get('dropReward', {}).get('finalNum'),game_map_event_answer_list)
|
||
rewards = []
|
||
for item in filtered_list:
|
||
reward_name = item['dropReward']['name']
|
||
final_num = item['dropReward']['finalNum']
|
||
rewards.append(f'{reward_name}x{final_num}')
|
||
if rewards:
|
||
print(f'冒险奖励: {rewards}---√')
|
||
else:
|
||
print('触发冒险事件没有获取奖励')
|
||
else:
|
||
# print(data.get('data', {}).get('gameMapEvent'))
|
||
self.take_risk_up(data.get('data', {}).get('gameMapEvent'))
|
||
elif code == 4000:
|
||
slideImgInfo = data.get('data', {}).get('slideImgInfo', None)
|
||
validateCount = data.get('data', {}).get('validateCount', None)
|
||
if slideImgInfo and validateCount:
|
||
print('本次冒险需要验证码')
|
||
self.can_go_risk = False
|
||
# if self.get_CapCode(slideImgInfo):
|
||
# self.checkUserCapCode()
|
||
else:
|
||
print(f"验证次数上限")
|
||
self.can_go_risk = False
|
||
else:
|
||
message = data.get('message', '')
|
||
print(f'冒险失败{code}]: {message}')
|
||
if message and '冒险暂停中' in message or code == 4000:
|
||
self.can_go_risk = False
|
||
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def take_risk_up(self, game_map_event):
|
||
print(f'触发冒险事件--->>>')
|
||
# try:
|
||
gameMapEventAnswerList = game_map_event.get('gameMapEventAnswerList', [])
|
||
index = random.randint(0, len(gameMapEventAnswerList) - 1)
|
||
gameMapEventAnswer = gameMapEventAnswerList[index]
|
||
json_id = gameMapEventAnswer['jsonId']
|
||
# print(json_id)
|
||
params = {'jsonId': json_id}
|
||
sign_header = self.gen_sign(params)
|
||
url = f'{self.base_url}/common/take-risk/up'
|
||
response = self.s.get(url, headers=sign_header, params=params)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
self.risk_num = data.get('data', {}).get('num', 0)
|
||
print(f'剩余次数:【{self.risk_num}】')
|
||
|
||
game_map_event_list = data.get('data', {}).get('gameMapEvent', {}).get('gameMapEventAnswerList', [])
|
||
|
||
reward_list = [event['dropReward']['name'] + 'x' + str(event['dropReward']['finalNum']) for event in
|
||
game_map_event_list if event.get('dropReward', {}).get('finalNum')]
|
||
if reward_list:
|
||
print(f'冒险奖励: {reward_list}---√')
|
||
else:
|
||
print('触发冒险事件没有获取奖励')
|
||
else:
|
||
message = data.get('message', '')
|
||
print(f'触发冒险事件[ {json_id} ]失败{code}]: {message}')
|
||
# except Exception as e:
|
||
# print(e)
|
||
|
||
def take_risk_reward(self):
|
||
print(f'领取冒险定时奖励--->>>')
|
||
try:
|
||
url = f'{self.base_url}/take-risk/reward'
|
||
sign_header = self.gen_sign()
|
||
response = self.s.get(url, headers=sign_header)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
print('领取冒险定时奖励成功---√')
|
||
else:
|
||
message = data.get('message', '').replace('\r', '').split('\n')
|
||
message = ','.join(filter(lambda x: x, message))
|
||
print(f'领取冒险定时奖励失败{code}]: {message}')
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def randomString(self,length, chars='abcdef0123456789'):
|
||
return ''.join(random.choice(chars) for _ in range(length))
|
||
|
||
def friend_help_task_risk(self, friend_info):
|
||
print('开始冒险互助--->>>')
|
||
# try:
|
||
params = {
|
||
'userId': friend_info['userId'],
|
||
'type': 0x1,
|
||
'randomId': self.randomString(32, '0123456789qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM')
|
||
}
|
||
sign_header = self.gen_sign(params)
|
||
url = f'{self.base_url}/friend-help/help'
|
||
response = self.s.get(url, headers=sign_header,params=params)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
if data.get('data'):
|
||
self.can_help_risk = False
|
||
print(f"冒险助力账号[{friend_info['index']}][{friend_info['name']}]成功---√")
|
||
else:
|
||
message = data.get('message', '').replace('\r', '').split('\n')
|
||
message = ','.join(filter(lambda x: x, message))
|
||
print(f"冒险助力账号[{friend_info['index']}][{friend_info['name']}]失败{message}")
|
||
if '助力次数' in message:
|
||
self.can_help_risk = False
|
||
elif '挂机时间已完成' in message:
|
||
friend_info['need_help_risk'] = False
|
||
# except Exception as e:
|
||
# print(e)
|
||
|
||
def friend_help(self, friend_info):
|
||
print(f'开始阳光互助--->>>')
|
||
# try:
|
||
params = {
|
||
'userId': friend_info['userId'],
|
||
'type': 0x0,
|
||
'randomId': self.randomString(32, '0123456789qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM')
|
||
}
|
||
sign_header = self.gen_sign(params)
|
||
url = f'{self.base_url}/friend-help/help'
|
||
response = self.s.get(url, headers=sign_header,params=params)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
if data.get('data'):
|
||
# print(f"请求助力账号[{friend_info['index']}][{friend_info['name']}] 成功---√")
|
||
self.task_doHelpTask(friend_info)
|
||
else:
|
||
message = data.get('message', '').replace('\r', '').split('\n')
|
||
message = ','.join(filter(lambda x: x, message))
|
||
print(f"请求助力账号[{friend_info['index']}][{friend_info['name']}] 失败: {message}")
|
||
if '助力次数' in message:
|
||
self.can_help_task = False
|
||
# except Exception as e:
|
||
# print(e)
|
||
|
||
def friend_findRecommend(self):
|
||
print(f'查询添加好友列表--->>>')
|
||
try:
|
||
url = f'{self.base_url}/friend/findRecommend'
|
||
sign_header = self.gen_sign()
|
||
response = self.s.get(url, headers=sign_header)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
print(f'查询添加好友列表成功---√')
|
||
for user_info in data.get('data', []):
|
||
if not self.can_add_friend:
|
||
print('好友已满')
|
||
break
|
||
# if len(list(filter(lambda user: user['userId'] == user_info['userId'], self.user_list))) > 0:
|
||
# continue
|
||
self.friend_addFriend(user_info)
|
||
# self.friend_deleteFriend(user_info)
|
||
else:
|
||
message = data.get('message', '')
|
||
print(f'查询添加好友列表失败{code}]: {message}')
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def friend_addFriend(self, user_data):
|
||
print(f'添加好友--->>>')
|
||
try:
|
||
friend_user_id = user_data['userId']
|
||
params = {'friendUserId': friend_user_id}
|
||
sign_header = self.gen_sign(params)
|
||
url = f'{self.base_url}/friend/addFriend'
|
||
response = self.s.get(url, headers=sign_header, params=params)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
print(f"添加好友[{user_data['nickName']}]成功---√")
|
||
else:
|
||
message = data.get('message', '')
|
||
print(f'添加好友{user_data["nickName"]}失败{code}]: {message}')
|
||
if message and '达到好友上限' in message:
|
||
self.can_add_friend = False
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def friend_deleteFriend(self, user_data):
|
||
print(f'删除好友--->>>')
|
||
try:
|
||
friend_user_id = user_data['userId']
|
||
params = {'friendUserId': friend_user_id}
|
||
sign_header = self.gen_sign(params)
|
||
url = f'{self.base_url}/friend/delFriend'
|
||
response = self.s.get(url, headers=sign_header, params=params)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
print(f"删除好友[{user_data['nickName']}]成功---√")
|
||
else:
|
||
message = data.get('message', '')
|
||
print(f'删除好友{user_data["nickName"]}失败{code}]: {message}')
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def friend_findFriend(self):
|
||
print(f'\n查询好友列表--->>>')
|
||
try:
|
||
sign_header = self.gen_sign()
|
||
url = f'{self.base_url}/friend/findFriend'
|
||
response = self.s.get(url, headers=sign_header)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
print(f'查询好友列表成功---√')
|
||
friend_list = data.get('data', {}).get('friendList', [])
|
||
friend_list.sort(key=lambda x: x['gold'], reverse=True)
|
||
for friend in friend_list:
|
||
if friend.get('stealFlag', False):
|
||
if not self.friend_stealGold(friend):
|
||
break
|
||
|
||
else:
|
||
message = data.get('message', '')
|
||
print(f'查询好友列表失败[{str(code)}]:{message}')
|
||
except Exception as e:
|
||
print(e)
|
||
def get_CapCode(self,slideImgInfo):
|
||
slidingImage = slideImgInfo.get('slidingImage', None)
|
||
backImage = slideImgInfo.get('backImage', None)
|
||
if slidingImage and backImage:
|
||
slidingImage_path = 'slidingImage.png'
|
||
backImage_path = 'backImage.png'
|
||
self.capcode = CHERWIN_TOOLS.OCR_API(slidingImage,backImage)
|
||
return True
|
||
# slidingImage_res = CHERWIN_TOOLS.BASE64_TO_IMG(slidingImage, slidingImage_path)
|
||
# backImage_res = CHERWIN_TOOLS.BASE64_TO_IMG(backImage, backImage_path)
|
||
# if slidingImage_res and backImage_res:
|
||
# self.capcode = CHERWIN_TOOLS.CAPCODE(slidingImage_path, backImage_path)
|
||
# return True
|
||
# else:
|
||
# return False
|
||
else:
|
||
return False
|
||
def friend_stealGold(self, user_data):
|
||
print(f'偷取好友--->>>')
|
||
# try:
|
||
friend_user_id = user_data['userId']
|
||
params = {'friendUserId': friend_user_id}
|
||
sign_header = self.gen_sign(params)
|
||
url = f'{self.base_url}/friend/stealGold'
|
||
response = self.s.get(url, headers=sign_header, params=params)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
print(f"偷取好友[{user_data['nickName']}]: 阳光x{data.get('data', 0)}")
|
||
return True
|
||
elif code == 4000:
|
||
|
||
slideImgInfo = data.get('data', {}).get('slideImgInfo', None)
|
||
validateCount = data.get('data', {}).get('validateCount', None)
|
||
if slideImgInfo and validateCount:
|
||
print(f"偷取好友[{user_data['nickName']}],需要滑块验证")
|
||
self.can_stealGold = False
|
||
# if self.get_CapCode(slideImgInfo):
|
||
# self.checkUserCapCode()
|
||
else:
|
||
print(f"偷取好友[{user_data['nickName']}]验证码上限")
|
||
self.can_stealGold = False
|
||
else:
|
||
message = data.get('message', '')
|
||
print(f"偷取好友[{user_data['nickName']}]阳光失败[{str(code)}]: {message}")
|
||
|
||
return False
|
||
# except Exception as e:
|
||
# print(e)
|
||
|
||
def checkUserCapCode(self):
|
||
print(f'提交验证码--->>>')
|
||
# try:
|
||
print(f'验证码:{self.capcode}')
|
||
params = {'xpos':self.capcode}
|
||
print(params)
|
||
sign_header = self.gen_sign(body=params)
|
||
url = f'{self.base_url}/checkUserCapCode'
|
||
response = self.s.post(url, headers=sign_header, json=params)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
data = data.get('data', 0)
|
||
print(f"验证码正确,获取到[{data}]")
|
||
else:
|
||
message = data.get('message', '')
|
||
print(f"验证码错误,偷取失败[{message}]")
|
||
# except Exception as e:
|
||
# print(e)
|
||
def exchange_reward(self, reward_id):
|
||
print(f'兑换--->>>')
|
||
try:
|
||
params = {'id': reward_id}
|
||
sign_header = self.gen_sign(params)
|
||
url = f'{self.base_url}/exchange/reward'
|
||
response = self.s.get(url, headers=sign_header, params=params)
|
||
data = response.json()
|
||
code = data.get('code', -1)
|
||
if code == 0:
|
||
data = data.get('data', {}).get('name') or ''
|
||
print(f"兑换[{data}]成功")
|
||
else:
|
||
message = data.get('message', '')
|
||
print(f'兑换[id={reward_id}]: {message}')
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def risk_task(self):
|
||
self.take_risk_get()
|
||
self.take_risk_online()
|
||
self.take_risk_reward()
|
||
print(f'可以冒险【{self.risk_num}】次')
|
||
while self.risk_num > 0 and self.can_go_risk == True:
|
||
self.take_risk_go()
|
||
time.sleep(2)
|
||
|
||
def land_task(self):
|
||
self.user_land_get()
|
||
if self.all_land_unlock and self.all_role_unlock:
|
||
pass
|
||
for land_number, land_info in self.land.items():
|
||
# print(land_number)
|
||
# print(land_info)
|
||
if land_info['status'] == 0:
|
||
continue
|
||
step, use_sun_count, left_sun_count = land_info['step'], land_info['useSunCount'], land_info['leftSunCount']
|
||
|
||
total_sun_count = (use_sun_count if use_sun_count else 0) + (left_sun_count if left_sun_count else 0)
|
||
|
||
print(f"\n---[{land_info['no']}号土地]---\n{self.group_step[step-1]},阶段{step}, 进度{use_sun_count}/{total_sun_count}")
|
||
|
||
self.refresh_land_step = True
|
||
while self.refresh_land_step:
|
||
self.refresh_land_step = False
|
||
if land_info['status'] == 0:
|
||
break
|
||
if step == 0:
|
||
self.user_land_sow(land_info)
|
||
else:
|
||
if left_sun_count == 0:
|
||
if step == 5:
|
||
self.user_land_result(land_info)
|
||
else:
|
||
current_time = int(time.time() * 1000)
|
||
if current_time >= (land_info['sunTime'] + land_info['sunTimestamp']) * 1000:
|
||
self.user_land_level(land_info)
|
||
else:
|
||
if self.sun >= land_info['needSun']:
|
||
current_time = int(time.time() * 1000)
|
||
if current_time >= (land_info['sunTime'] + land_info['sunTimestamp']) * 1000:
|
||
self.user_land_sun(land_info)
|
||
# time.sleep(2)
|
||
|
||
def userTask(self):
|
||
print('\n--------------- 开始日常任务 ---------------')
|
||
wait_time = random.randint(1000, 3000) / 1000.0 # 转换为秒
|
||
if not self.Login_res:
|
||
return False
|
||
|
||
self.userInfo_autoSun()
|
||
|
||
self.userInfo_get()
|
||
|
||
self.task_get()
|
||
|
||
self.user_role_get()
|
||
|
||
self.risk_task()
|
||
|
||
self.land_task()
|
||
|
||
self.steal_task()
|
||
|
||
self.userInfo_get(END=True)
|
||
new_data = {
|
||
self.userId: {
|
||
'userId': self.userId,
|
||
'task_id': self.task_id,
|
||
'task_can_help': self.can_help_task,
|
||
'task_need_help': self.need_help_task,
|
||
'task_progress': self.task_progress,
|
||
'task_max_progress': self.task_max_progress,
|
||
'role_id': self.role_id,
|
||
'role_can_help': self.can_help_unlock,
|
||
'role_need_help': self.need_help_unlock,
|
||
'role_progress': self.role_progress,
|
||
'role_max': self.role_max,
|
||
'risk_can_help': self.can_help_risk,
|
||
'risk_need_help': self.need_help_task,
|
||
'can_go_risk': self.can_go_risk,
|
||
'can_add_friend': self.can_add_friend
|
||
|
||
}
|
||
}
|
||
# print(new_data)
|
||
USER_INFO.update(new_data)
|
||
CHERWIN_TOOLS.SAVE_INVITE_CODE(f"INVITE_CODE/{ENV_NAME}_INVITE_CODE.json", new_data)
|
||
# self.steal_task()
|
||
self.sendMsg()
|
||
return True
|
||
|
||
def helpEachOther(self):
|
||
print('--------------- 开始互助 ---------------')
|
||
|
||
if not self.Login_res:
|
||
return False
|
||
|
||
self.userInfo_get()
|
||
|
||
if self.user_index == 1:
|
||
print('第一个账号助力作者--->>>')
|
||
data_li = AuthorCode
|
||
help_type = '作者'
|
||
else:
|
||
print('其余账号互助--->>>')
|
||
json_data = self.load_json()
|
||
data_li = list(json_data.values())
|
||
help_type = '本地'
|
||
|
||
|
||
# print(data_li)
|
||
for index, code_li in enumerate(data_li):
|
||
# print(f"Index: {index}")
|
||
# print(f"Data: {code_li}")
|
||
if code_li.get('userId','') == self.userId:
|
||
continue
|
||
task_need_help = code_li.get("task_need_help", '')
|
||
role_need_help = code_li.get("role_need_help", '')
|
||
risk_need_help = code_li.get("task_can_help", '')
|
||
code_li["index"] = index+1
|
||
print(f'\n------助力{help_type}账号【{index+1}】开始------')
|
||
# print(code_li)
|
||
# 冒险助力
|
||
if self.can_help_risk and risk_need_help:
|
||
self.friend_help_task_risk(code_li)
|
||
else:
|
||
print(f'好友冒险助力已完成或冒险助力次数已耗尽')
|
||
|
||
# 解锁角色助力
|
||
if self.can_help_unlock and role_need_help:
|
||
self.user_role_friendHelpUnlock(code_li)
|
||
else:
|
||
print(f'好友解锁角色助力已完成或解锁角色助力次数已耗尽')
|
||
|
||
# 阳光助力
|
||
if self.can_help_task and task_need_help:
|
||
self.friend_help(code_li)
|
||
else:
|
||
print(f'阳光助力次数已耗尽')
|
||
|
||
print(f'------助力{help_type}账号【{index+1}】结束------')
|
||
time.sleep(1)
|
||
|
||
|
||
new_data = {
|
||
self.userId: {
|
||
'task_can_help': self.can_help_task,
|
||
'role_can_help': self.can_help_unlock,
|
||
'risk_can_help': self.can_help_risk
|
||
}
|
||
}
|
||
CHERWIN_TOOLS.SAVE_INVITE_CODE(f"INVITE_CODE/{ENV_NAME}_INVITE_CODE.json", new_data)
|
||
self.sendMsg(True)
|
||
return True
|
||
|
||
def steal_task(self, count=5):
|
||
if not self.Login_res:
|
||
return False
|
||
json_data = self.load_json()
|
||
self.can_add_friend = json_data.get(self.userId, {}).get('can_add_friend', '')
|
||
for i in range(count):
|
||
if self.can_add_friend:
|
||
self.friend_findRecommend()
|
||
time.sleep(2)
|
||
self.friend_findFriend()
|
||
return True
|
||
|
||
def sendMsg(self, help=False):
|
||
if self.send_UID:
|
||
push_res = CHERWIN_TOOLS.wxpusher(self.send_UID, one_msg, APP_NAME, help)
|
||
print(push_res)
|
||
|
||
|
||
def down_file(filename, file_url):
|
||
print(f'开始下载:{filename},下载地址:{file_url}')
|
||
try:
|
||
response = requests.get(file_url, verify=False, timeout=10)
|
||
response.raise_for_status()
|
||
with open(filename + '.tmp', 'wb') as f:
|
||
f.write(response.content)
|
||
print(f'【{filename}】下载完成!')
|
||
|
||
# 检查临时文件是否存在
|
||
temp_filename = filename + '.tmp'
|
||
if os.path.exists(temp_filename):
|
||
# 删除原有文件
|
||
if os.path.exists(filename):
|
||
os.remove(filename)
|
||
# 重命名临时文件
|
||
os.rename(temp_filename, filename)
|
||
print(f'【{filename}】重命名成功!')
|
||
return True
|
||
else:
|
||
print(f'【{filename}】临时文件不存在!')
|
||
return False
|
||
except Exception as e:
|
||
print(f'【{filename}】下载失败:{str(e)}')
|
||
return False
|
||
|
||
def import_Tools():
|
||
global CHERWIN_TOOLS,ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode
|
||
import CHERWIN_TOOLS
|
||
ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode = CHERWIN_TOOLS.main(APP_NAME, local_script_name, ENV_NAME,local_version)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
APP_NAME = '统一茄皇'
|
||
ENV_NAME = 'TYQH'
|
||
CK_NAME = 'thirdId@wid'
|
||
CK_URL = '.../public/api/login'
|
||
print(f'''
|
||
✨✨✨ {APP_NAME}脚本 ✨✨✨
|
||
✨ 功能:
|
||
日常任务
|
||
互助任务
|
||
✨ 抓包步骤:
|
||
统一快乐星球小程序-活动
|
||
开始抓包-茄皇的家第三期
|
||
抓{CK_URL}取{CK_NAME}
|
||
✨ 设置青龙变量:
|
||
export {ENV_NAME}= '{CK_NAME}'多账号#分割或&
|
||
export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
|
||
✨ 推荐定时:25 7-23/2 * * *
|
||
✨ 第一个账号助力作者,其余互助
|
||
✨✨✨ @Author CHERWIN✨✨✨
|
||
''')
|
||
local_script_name = os.path.basename(__file__)
|
||
local_version = '2024.04.06'
|
||
if os.path.isfile('CHERWIN_TOOLS.py'):
|
||
import_Tools()
|
||
else:
|
||
if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'):
|
||
print('脚本依赖下载完成请重新运行脚本')
|
||
import_Tools()
|
||
else:
|
||
print('脚本依赖下载失败,请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖')
|
||
exit()
|
||
print(TIPS)
|
||
token = ''
|
||
token = ENV if ENV else token
|
||
if not token:
|
||
print(f"未填写{ENV_NAME}变量\n青龙可在环境变量设置 {ENV_NAME} 或者在本脚本文件上方将{CK_NAME}填入token =''")
|
||
exit()
|
||
tokens = CHERWIN_TOOLS.ENV_SPLIT(token)
|
||
# print(tokens)
|
||
if len(tokens) > 0:
|
||
print(f"\n>>>>>>>>>>共获取到{len(tokens)}个账号<<<<<<<<<<")
|
||
access_token = []
|
||
for index, infos in enumerate(tokens):
|
||
run_result = RUN(infos, index).userTask()
|
||
if not run_result: continue
|
||
|
||
for index, infos in enumerate(tokens):
|
||
run_result =RUN(infos, index).helpEachOther()
|
||
if not run_result: continue
|
||
if send: send(f'{APP_NAME}挂机通知', send_msg + TIPS_HTML)
|