更新脚本

This commit is contained in:
CHERWIN 2024-05-15 06:21:51 +08:00
parent 5a1deccaa2
commit fc39ef5c83
22 changed files with 6311 additions and 4738 deletions

19
BWCJ.py
View File

@ -10,8 +10,11 @@ from requests.packages.urllib3.exceptions import InsecureRequestWarning
# import CHERWIN_TOOLS
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
IS_DEV = False
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
@ -190,6 +193,7 @@ class RUN:
def main(self):
if not self.personal_info() :
Log("用户信息无效请更新CK")
self.sendMsg()
return False
self.user_sign_statistics()
self.points_info()
@ -260,16 +264,19 @@ export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
@Author CHERWIN
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.04.06'
if os.path.isfile('CHERWIN_TOOLS.py'):
local_version = '2024.05.15'
if IS_DEV:
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
print('脚本依赖下载失败请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print('脚本依赖下载失败请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token

View File

@ -1,32 +1,37 @@
import hashlib
import json
import os
import importlib.util
import random
import string
import subprocess
import sys
import time
import requests
from http import HTTPStatus
from datetime import datetime
NOW_TOOLS_VERSION = '2024.05.04'
NOW_TOOLS_VERSION = '2024.05.15'
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
else:
IS_DEV = False
# 尝试导入包
def import_or_install(package_name, import_name=None):
# 如果传入了 import_name则使用它来检查模块否则默认与包名相同
import_name = import_name or package_name
try:
# 检查模块是否已安装
package_spec = importlib.util.find_spec(import_name)
if package_spec is None:
print(f"{package_name} 模块未安装. 开始安装...")
subprocess.check_call([sys.executable, '-m', 'pip', 'install', package_name])
print(f"{package_name} 模块安装完成。")
else:
print(f"{package_name} 模块已安装。")
# 尝试导入模块检查是否安装成功
__import__(import_name)
module = importlib.import_module(import_name)
@ -51,7 +56,6 @@ def SAVE_INVITE_CODE(file_name, new_data):
if not os.path.exists(directory):
os.makedirs(directory)
data = {}
# 检查是否已存在相同的键,如果存在,合并数据
for key, value in new_data.items():
if key in data:
@ -60,7 +64,6 @@ def SAVE_INVITE_CODE(file_name, new_data):
else:
# 如果键不存在,直接插入新数据
data[key] = value
# 将更新后的数据写入JSON文件
with open(file_name, 'w', encoding='utf-8') as file:
json.dump(data, file, indent=4)
@ -79,13 +82,13 @@ def create_dict_from_string(self, data_string):
def compare_versions(local_version, server_version):
local_parts = local_version.split('.') # 将本地版本号拆分成数字部分
server_parts = server_version.split('.') # 将服务器版本号拆分成数字部分
for l, s in zip(local_parts, server_parts):
if int(l) < int(s):
return True # 当前版本低于服务器版本
return True
# 当前版本低于服务器版本
elif int(l) > int(s):
return False # 当前版本高于服务器版本
return False
# 当前版本高于服务器版本
# 如果上述循环没有返回结果,则表示当前版本与服务器版本的数字部分完全相同
if len(local_parts) < len(server_parts):
return True # 当前版本位数较短,即版本号形如 x.y 比 x.y.z 低
@ -93,60 +96,6 @@ def compare_versions(local_version, server_version):
return False # 当前版本与服务器版本相同或更高
def CHECK_UPDATE(local_version, server_version_url, server_script_url, script_filename):
"""
检查版本并更新
Args:
local_version (str): 本地版本号
server_version_url (str): 服务器版本文件地址
server_script_url (str): 服务器脚本地址
script_filename (str): 要保存的脚本文件名
Returns:
bool: 是否进行了更新操作
"""
try:
# 获取服务器版本号
response = requests.get(server_version_url, verify=False)
response.raise_for_status() # Raises an HTTPError for bad responses
# print(response.text)
server_version = response.text.strip() # 去除首尾空格
print(f'当前版本:【{local_version}')
print(f'服务器版本:【{server_version}')
if compare_versions(local_version, server_version):
# 需要更新,下载服务器脚本
AUTO_UPDATE = os.getenv("SCRIPT_UPDATE", "True").lower() != "false"
# print(AUTO_UPDATE)
if AUTO_UPDATE:
print(">>>>>>>发现新版本的脚本,默认自动更新,准备更新...")
print(">>>>>>>禁用更新请定义变量export SCRIPT_UPDATE = 'False'")
response_script = requests.get(server_script_url, verify=False, timeout=10)
response_script.raise_for_status()
with open(script_filename, 'wb') as f:
f.write(response_script.content)
print(f'{script_filename} 下载完成!')
print(f'尝试运行新脚本')
import subprocess, sys
# 使用 sys.executable 获取 Python 可执行文件的完整路径
python_executable = sys.executable
subprocess.Popen([python_executable, script_filename])
else:
print(">>>>>>>发现新版本的脚本您禁用了自动更新如需启用请删除变量SCRIPT_UPDATE")
else:
print(f'当前版本高于或等于服务器版本')
except requests.exceptions.RequestException as e:
print(f'发生网络错误:{e}')
except Exception as e:
print(f'发生未知错误:{e}')
return False # 返回 False 表示没有进行更新操作
def CHECK_UPDATE_NEW(local_version, server_version, server_script_url, script_filename, server_version_url=None,
APP_NAME=None):
@ -176,7 +125,6 @@ def CHECK_UPDATE_NEW(local_version, server_version, server_script_url, script_fi
if not server_version: server_version = NOW_TOOLS_VERSION
print(f'本地版本:【{local_version}')
print(f'服务器版本:【{server_version}')
if compare_versions(local_version, server_version):
# 需要更新,下载服务器脚本
AUTO_UPDATE = os.getenv("SCRIPT_UPDATE", "True").lower() != "false"
@ -192,16 +140,13 @@ def CHECK_UPDATE_NEW(local_version, server_version, server_script_url, script_fi
else:
print(f'无需更新\n')
return False
except requests.exceptions.RequestException as e:
print(f'发生网络错误:{e}')
server_base_url = f"https://py.cherwin.cn/{APP_NAME}/"
server_script_url = f"{server_base_url}{script_filename}"
CHECK_UPDATE_NEW(local_version, server_version, server_script_url, script_filename, APP_NAME=APP_NAME)
except Exception as e:
print(f'发生未知错误:{e}')
return False # 返回 False 表示没有进行更新操作
@ -213,7 +158,6 @@ def down_file(filename, file_url):
with open(filename + '.tmp', 'wb') as f:
f.write(response.content)
print(f'{filename}】下载完成!')
# 检查临时文件是否存在
temp_filename = filename + '.tmp'
if os.path.exists(temp_filename):
@ -244,57 +188,24 @@ def get_AuthorInviteCode(url):
else:
# print("无法获取文件。状态代码:", response.status_code)
return {}
except Exception as e:
print(f"An error occurred: {e}")
return {}
def GET_TIPS(server_base_url):
url = f'{server_base_url}tips.txt'
try:
response = requests.get(url, verify=False)
# 检查响应的编码
encoding = response.encoding
# print(f"编码: {encoding}")
# 设置正确的编码(根据实际情况可能需要调整)
response.encoding = 'utf-8'
# 读取内容
content = response.text
if 'code' in content:
content = None
except:
content = None
print('获取通知内容失败')
if content:
return (f'''
***********通知内容************\n
{content}
***********通知内容************\n
''')
else:
return (f'''
***********通知内容************\n
@Author Cherwin
***********通知内容************\n
''')
def CHECK_PARAMENTERS(index, input_string, required_parameters):
# required_parameters = ['deviceid', 'jysessionid', 'shopid', 'memberid', 'access_token', 'sign']
# 记录缺少的参数
missing_parameters = []
# 将输入字符串和参数列表中的所有字符都转换为小写
input_string_lower = input_string.lower()
required_parameters_lower = [param.lower() for param in required_parameters]
# 判断字符串中是否包含所有必需的参数
for param in required_parameters_lower:
if param not in input_string_lower:
missing_parameters.append(param)
if missing_parameters:
print(f"\n第【{index + 1}】个账号,缺少以下参数:【{missing_parameters}")
return False
@ -378,30 +289,6 @@ def CAPCODE(captcha_slider, captcha_bg):
return False
def BASE64_TO_IMG(base64_string, output_path):
import base64
import io
Image = import_or_install('Pillow', 'PIL.Image')
if Image:
try:
# 解码base64字符串
image_data = base64.b64decode(base64_string)
# 将字节数据转换为字节流
image_buf = Image.open(io.BytesIO(image_data))
# 转换为RGB模式
image_buf = image_buf.convert('RGB')
# 保存图片到指定路径
image_buf.save(output_path, format='JPEG')
return True
except Exception as e:
print(f'发生错误:{e}')
return False
else:
print('需要的模块[PIL]无法导入,函数无法执行。')
return False
def send_wxpusher(UID, one_msg, APP_NAME, help=False):
WXPUSHER = os.environ.get('WXPUSHER', False)
if WXPUSHER:
@ -421,17 +308,16 @@ def wxpusher(UID, msg, title, help=False):
print(f'标题:【{title}\n内容:{msg}')
webapi = 'http://wxpusher.zjiecode.com/api/send/message'
msg = msg.replace("\n", "<br>")
tips = TIPS_HTML.replace("\n", "<br>")
# tips = TIPS_HTML.replace("\n", "<br>")
data = {
"appToken": WXPUSHER,
"content": f'{title}<br>{msg}<br>{tips}',
"content": f'{title}<br>{msg}<br>{TIPS_HTML}',
# "summary": msg[:99], # 该参数可选,默认为 msg 的前10个字符
"summary": title,
"contentType": 2,
"uids": [UID],
"url": "https://gj.cherwin.cn"
}
try:
result = requests.post(url=webapi, json=data)
result.raise_for_status() # 对于非2xx状态码抛出异常
@ -474,9 +360,71 @@ def CHECK():
except:
print('获取CHERWIN_SCRIPT_CONFIG.json失败')
return False
def GJJJ_SIGN():
app_id = "667516"
app_crypto = "FH3yRrHG2RfexND8"
timestamp = int(time.time() * 1000)
# timestamp = 1715180892075
text = f"{app_id}{app_crypto}{timestamp}"
sign = hashlib.md5(text.encode()).hexdigest()
new_data = {
'timestamp': str(timestamp),
"sign": sign
}
return new_data
def KWW_SIGN(memberId):
timestamp = int(time.time() * 1000)
random_num = random.randint(0, 31)
u = [
"A", "Z", "B", "Y", "C", "X", "D", "T", "E", "S", "F", "R", "G", "Q", "H", "P", "I", "O", "J", "N", "k",
"M", "L", "a", "c", "d", "f", "h", "k", "p", "y", "n"]
r = f"{timestamp}{memberId}{u[random_num]}"
sign = hashlib.md5(r.encode()).hexdigest()
update_headers = {
"user-sign": sign,
"user-paramname": "memberId",
"user-timestamp": str(timestamp),
"user-random": str(random_num)
}
return update_headers
def TYQH_SIGN(parameters={}, body=None):
sorted_keys = sorted(parameters.keys())
parameter_strings = []
for key in sorted_keys:
if isinstance(parameters[key], dict):
parameter_strings.append(f"{key}={json.dumps(parameters[key])}")
else:
parameter_strings.append(f"{key}={parameters[key]}")
current_time = int(datetime.now().timestamp() * 1000)
secret_chars = list('BxzTx45uIGT25TTHIIBU2')
last_three_digits = str(current_time)[-3:]
for digit in last_three_digits:
secret_chars.insert(int(digit), digit)
def main(APP_NAME, local_script_name, ENV_NAME, local_version):
secret = hashlib.md5(''.join(secret_chars).encode()).hexdigest()
nonce_str = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
sign_data = {
'client_id': 'game',
'nonstr': nonce_str,
'timestamp': current_time,
'body': json.dumps(body) if body else '',
'query': '&'.join(parameter_strings) if parameter_strings else '',
'secret': secret
}
sign_string = '|'.join([str(v) for v in sign_data.values()])
sign = hashlib.md5(sign_string.encode()).hexdigest().upper()
sign_header = {
'client_id': 'game',
'timestamp': str(current_time),
'nonstr': sign_data['nonstr'],
'sign': sign
}
return sign_header
def main(APP_NAME, local_script_name, ENV_NAME, local_version,need_invite=False):
global APP_INFO, TIPS, TIPS_HTML
git_url = f'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/{local_script_name}'
if CHECK():
@ -486,20 +434,19 @@ def main(APP_NAME, local_script_name, ENV_NAME, local_version):
if CHECK_UPDATE_NEW(local_version, server_version, git_url, local_script_name, APP_NAME=APP_NAME):
print('更新成功,请重新运行脚本!')
if not APP_INFO.get('ENABLE', False):
if not APP_INFO.get('ENABLE', False) and not IS_DEV:
print('当前脚本未开放')
exit()
TIPS = APP_INFO.get('NTC', '') if APP_INFO.get('NTC', '') else CHERWIN_SCRIPT_CONFIG.get('GLOBAL_NTC', '')
TIPS_HTML = APP_INFO.get('NTC', '') if APP_INFO.get('NTC', '') else CHERWIN_SCRIPT_CONFIG.get('GLOBAL_NTC_HTML',
'')
TIPS_HTML = APP_INFO.get('NTC', '') if APP_INFO.get('NTC', '') else CHERWIN_SCRIPT_CONFIG.get('GLOBAL_NTC_HTML','')
ENV = os.environ.get(ENV_NAME)
AuthorCode = get_AuthorInviteCode(f'https://yhsh.ziyuand.cn/{ENV_NAME}_INVITE_CODE.json')
if need_invite:
AuthorCode = get_AuthorInviteCode(f'https://yhsh.ziyuand.cn/{ENV_NAME}_INVITE_CODE.json')
else:
AuthorCode = ''
return ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode
else:
exit()
if __name__ == '__main__':
pass
print(NOW_TOOLS_VERSION)

89
DBKD.py
View File

@ -3,8 +3,8 @@
# -------------------------------
# ✨✨✨ @Author CHERWIN✨✨✨
# -------------------------------
# cron "0 7 * * *" script-path=xxx.py,tag=匹配cron用
# const $ = new Env('德邦快递小程序签到')
# cron "0 6 * * *" script-path=xxx.py,tag=匹配cron用
# const $ = new Env('中通快递小程序签到')
import os
import requests
@ -13,9 +13,10 @@ from requests.packages.urllib3.exceptions import InsecureRequestWarning
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
IS_DEV = False
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
@ -46,14 +47,12 @@ class RUN:
self.index = index + 1
self.s = requests.session()
self.s.verify = False
self.token = token
self.token = f'ECO_TOKEN={token};'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x6309080f) XWEB/9079',
'Cookie':self.token,
'Referer': 'https://servicewechat.com/wxa1ebeeb0ed47f0b2/633/page-frame.html'
}
self.baseUrl = 'https://api.ztomember.com/api/'
def do_request(self, method, url, params=None, data=None, headers=None):
if not headers:
@ -86,6 +85,26 @@ class RUN:
print(f"查询账户异常: {e}")
return False
def queryUserInfo(self):
try:
self.headers['Content-Type'] = 'application/json'
self.headers['Accept'] = '*/*'
response = self.do_request('GET', 'https://www.deppon.com/ndcc-gwapi/userService/eco/user/secure/queryUserInfo')
if response and response.get('message') == 'ok':
# print(response)
result = response.get('result', {})
phone = result.get('mobile', '')
self.mobile = phone[:3] + "*" * 4 + phone[7:]
self.userName = result.get('userName', '')
Log(f'\n用户名:【{self.userName}\n手机号:【{self.mobile}')
return True
else:
Log(f"登录验证失败: {response}")
return False
except Exception as e:
print(f"登录验证异常: {e}")
return False
def generate_tmp_token(self):
try:
response = self.do_request('GET', 'https://www.deppon.com/ndcc-gwapi/userService/eco/user/token/secure/generateTmpToken')
@ -107,11 +126,11 @@ class RUN:
if response and response.get('code') == 200:
# print(response)
Log(f"登录验证成功!")
self.phone = response['data']['mobile']
self.token = response['data']['token']
self.headers['mobile'] = self.phone
data = response.get('data', {})
self.token = data.get('token','')
self.phone = data.get('mobile','')
self.headers['token'] = self.token
#
self.headers['mobile'] = self.phone
# print(f"PHONE【{self.phone}】")
# print(f"TOKEN【{self.token}】")
return True
@ -248,7 +267,7 @@ class RUN:
"rightsClaimStatus":1,
"taskRuleId":self.taskRuleId
}
response = self.do_request('POST', 'https://mas.deppon.com/crm-api/deppon/pay/member/add/package/query',data = data)
response = self.do_request('POST', 'https://mas.deppon.com/crm-api/deppon/points/task/changeStatus',data = data)
if response and response.get('code') == 200:
Log(f'完成任务:【{self.name}】成功!')
return response['msg']
@ -276,14 +295,15 @@ class RUN:
def lottery(self):
try:
data = {"phone": self.phone,"gameId":'Zv8i7IsOPZIQrvvupKXE1w=='}
response = self.do_request('POST', 'https://mas.deppon.com/crm-api/game/draw/lottery',data = data)
data = {"mobile": self.phone,"gameId":'67'}
response = self.do_request('POST', 'https://mas.deppon.com/admin/envelops/game/lottery',data = data)
if response and response.get('code') == 200:
print('抽奖成功')
print(response)
self.pointsAvailableValue -= 1
return response['msg']
if response and response.get('code') == 0:
# print(response)
data = response.get('data','')
name = data.get('name','')
Log(f'抽奖成功,获得:{name}')
return
else:
print(f"抽奖失败: {response}")
return None
@ -318,16 +338,20 @@ class RUN:
def main(self):
Log(f"\n开始执行第{self.index}个账号--------------->>>>>")
if self.login():
# if self.login():
if self.queryUserInfo():
self.generate_tmp_token()
self.getSvipNewestInfo()
self.signIn_info()
self.points_signIn_info()
self.task_list()
self.lottery_query2()
self.lottery()
self.getSvipNewestInfo('执行后')
self.sendMsg()
return True
else:
self.sendMsg()
return False
def sendMsg(self):
@ -374,8 +398,8 @@ def import_Tools():
if __name__ == '__main__':
APP_NAME = '德邦快递小程序'
ENV_NAME = 'DBKD'
CK_NAME = 'code'
CK_URL = 'https://www.deppon.com/ndcc-gwapi/userService/eco/user/login'
CK_NAME = 'ECO_TOKEN=里面的值;'
CK_URL = 'https://www.deppon.com/ndcc-gwapi/userService/eco/user/secure/queryUserInfo'
print(f'''
{APP_NAME}签到
功能
@ -384,7 +408,7 @@ if __name__ == '__main__':
打开{APP_NAME}
授权登陆
打开抓包工具
{CK_URL}请求body里面的[{CK_NAME}]
{CK_URL}请求Cookies里面的[{CK_NAME}]
复制里面的[{CK_NAME}]参数值
参数示例0f1bjLFa1ZdedHxxxxxxxx
wxpusher一对一推送功能
@ -398,16 +422,19 @@ export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
@Author CHERWIN
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.04.18'
if os.path.isfile('CHERWIN_TOOLS.py'):
local_version = '2024.05.15'
if IS_DEV:
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
print('脚本依赖下载失败请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print('脚本依赖下载失败请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token
@ -422,4 +449,4 @@ export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
for index, infos in enumerate(tokens):
run_result = RUN(infos, index).main()
if not run_result: continue
if send: send(f'{APP_NAME}挂机通知', send_msg + TIPS_HTML)
# if send: send(f'{APP_NAME}挂机通知', send_msg + TIPS_HTML)

274
EMS.py Executable file
View File

@ -0,0 +1,274 @@
# !/usr/bin/python3
# -- coding: utf-8 --
# -------------------------------
# ✨✨✨ @Author CHERWIN✨✨✨
# -------------------------------
# cron "0 10 * * *" script-path=xxx.py,tag=匹配cron用
# const $ = new Env('EMS邮惠中心小程序')
import os
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
IS_DEV = False
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
else:
print("加载通知服务失败!")
send_msg = ''
one_msg=''
def Log(cont=''):
global send_msg,one_msg
print(cont)
if cont:
one_msg += f'{cont}\n'
send_msg += f'{cont}\n'
class RUN:
def __init__(self,info,index):
global one_msg
one_msg = ''
split_info = info.split('@')
token = split_info[0]
len_split_info = len(split_info)
last_info = split_info[len_split_info - 1]
self.send_UID = None
if len_split_info > 0 and "UID_" in last_info:
print('检测到设置了UID')
print(last_info)
self.send_UID = last_info
self.index = index + 1
self.s = requests.session()
self.s.verify = False
self.openId = token
self.headers = {
'Host': 'ump.ems.com.cn',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF XWEB/6945',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-User': '?1',
'Sec-Fetch-Dest': 'document',
'Accept-Language': 'zh-CN,zh',
}
def do_request(self, url, method="POST",params=None, data=None, headers=None):
if not headers:
headers = self.headers
try:
response = self.s.request(method, url, params=params, json=data, headers=headers)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"请求错误: {e}")
return None
def findByOpenIdAppId(self):
Log(f"====== 获取TOKEN ======")
try:
params = {
"appId":"wx52872495fb375c4b",
"openId":self.openId,
"source":"JD"
}
response = self.do_request('https://ump.ems.com.cn/memberCenterApiV2/member/findByOpenIdAppId',data=params)
if response and response.get('code') == '000000':
# print(response)
info = response.get('info', {})
self.token = info.get('token', '')
self.memberId = info.get('memberId', '')
self.headers['MC-TOKEN'] = self.token
Log(f'>>获取token成功✅')
# print(f'>用户ID【{self.memberId}】')
return True
else:
Log(f">获取TOKEN失败❌: {response}")
return False
except Exception as e:
print(f"获取TOKEN异常❌: {e}")
return False
def details(self):
Log(f"====== 获取用户信息 ======")
try:
params = {}
response = self.do_request('https://ump.ems.com.cn/memberCenterApiV2/member/details',data=params)
if response and response.get('code') == '000000':
# print(response)
info = response.get('info', {})
phone = info.get('phone', '')
Log(f'>>获取用户信息成功✅')
Log(f'>用户ID{self.memberId}\n手机号:【{phone}')
return True
else:
Log(f">获取TOKEN失败❌: {response}")
return False
except Exception as e:
print(f"获取TOKEN异常❌: {e}")
return False
def sign(self):
Log(f"====== 开始签到 ======")
try:
params = {
"appId":"wx52872495fb375c4b",
"userId":self.memberId,
"openId":self.openId,
"activId":'d191dce0740849b1b7377e83c00475d6'
}
response = self.do_request('https://ump.ems.com.cn/activCenterApi/signActivInfo/sign',data=params)
if response and response.get('code') == '000000':
# print(response)
info = response.get('info', {})
prizeSize = info.get('prizeSize', {})
Log(f'>签到成功✅获得:【{prizeSize}】积分')
return True
elif response and response.get('code') == '600001':
msg = response.get('msg')
Log(f'>签到失败❌ 【{msg}')
else:
Log(f"签到失败❌: {response}")
return False
except Exception as e:
print(f"登录验证异常❌: {e}")
return False
def memberGoldsInfo(self):
Log(f"====== 获取积分信息 ======")
try:
params = {}
response = self.do_request('https://ump.ems.com.cn/memberCenterApiV2/golds/memberGoldsInfo',data=params)
if response and response.get('code') == '000000':
# print(response)
info = response.get('info', {})
availableGoldsTotal = info.get('availableGoldsTotal', {})
Log(f'>当前积分:【{availableGoldsTotal}')
return True
else:
Log(f"获取积分信息失败❌: {response}")
return False
except Exception as e:
print(f"获取积分信息异常❌: {e}")
return False
def main(self):
Log(f"\n开始执行第{self.index}个账号--------------->>>>>")
# if self.login():
if self.findByOpenIdAppId():
self.details()
self.sign()
self.memberGoldsInfo()
self.sendMsg()
return True
else:
self.sendMsg()
return False
def sendMsg(self):
if self.send_UID:
push_res = CHERWIN_TOOLS.wxpusher(self.send_UID, one_msg, APP_NAME)
print(push_res)
return True
def down_file(filename, file_url):
print(f'开始下载:{filename},下载地址:{file_url}')
try:
response = requests.get(file_url, verify=False, timeout=10)
response.raise_for_status()
with open(filename + '.tmp', 'wb') as f:
f.write(response.content)
print(f'{filename}】下载完成!')
# 检查临时文件是否存在
temp_filename = filename + '.tmp'
if os.path.exists(temp_filename):
# 删除原有文件
if os.path.exists(filename):
os.remove(filename)
# 重命名临时文件
os.rename(temp_filename, filename)
print(f'{filename}】重命名成功!')
return True
else:
print(f'{filename}】临时文件不存在!')
return False
except Exception as e:
print(f'{filename}】下载失败:{str(e)}')
return False
def import_Tools():
global CHERWIN_TOOLS, ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode
import CHERWIN_TOOLS
ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode = CHERWIN_TOOLS.main(APP_NAME, local_script_name, ENV_NAME,
local_version)
if __name__ == '__main__':
APP_NAME = 'EMS邮惠中心小程序'
ENV_NAME = 'EMS'
CK_NAME = 'openId'
CK_URL = 'https://ump.ems.com.cn/memberCenterApiV2/member/findByOpenIdAppId'
print(f'''
{APP_NAME}签到
功能
积分签到
抓包步骤
打开{APP_NAME}
授权登陆
打开抓包工具
{CK_URL}请求body里面的[{CK_NAME}]
复制里面的[{CK_NAME}]参数值
参数示例o-7675D-prmxxxxxxxxxx
wxpusher一对一推送功能
需要定义变量export WXPUSHER=wxpusher的app_token不设置则不启用wxpusher一对一推送
需要在{ENV_NAME}变量最后添加@wxpusher的UID
设置青龙变量
export {ENV_NAME}='{CK_NAME}参数值'多账号#或&分割
export SCRIPT_UPDATE = 'False' 关闭脚本自动更新默认开启
注意抓完CK没事儿别打开小程序重新打开小程序请重新抓包
推荐cron0 10 * * *
@Author CHERWIN
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.05.15'
if IS_DEV:
import_Tools()
else:
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print('脚本依赖下载失败请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token
if not token:
print(f"未填写{ENV_NAME}变量\n青龙可在环境变量设置 {ENV_NAME} 或者在本脚本文件上方将{CK_NAME}填入token =''")
exit()
tokens = CHERWIN_TOOLS.ENV_SPLIT(token)
# print(tokens)
if len(tokens) > 0:
print(f"\n>>>>>>>>>>共获取到{len(tokens)}个账号<<<<<<<<<<")
access_token = []
for index, infos in enumerate(tokens):
run_result = RUN(infos, index).main()
if not run_result: continue
if send: send(f'{APP_NAME}挂机通知', send_msg + TIPS_HTML)

493
GJJJ.py Executable file
View File

@ -0,0 +1,493 @@
# !/usr/bin/python3
# -- coding: utf-8 --
# -------------------------------
# ✨✨✨ @Author CHERWIN✨✨✨
# -------------------------------
# cron "1 9 * * *" script-path=xxx.py,tag=匹配cron用
# const $ = new Env('顾家家居小程序')
import os
import random
import time
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
IS_DEV = False
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
else:
print("加载通知服务失败!")
send_msg = ''
one_msg=''
def Log(cont=''):
global send_msg,one_msg
print(cont)
if cont:
one_msg += f'{cont}\n'
send_msg += f'{cont}\n'
class RUN:
def __init__(self,info,index):
global one_msg
one_msg = ''
split_info = info.split('@')
# self.token = json.loads(split_info[0])
len_split_info = len(split_info)
if len_split_info < 3:return False
identityValue = split_info[0]
openid = split_info[1]
unionid = split_info[2]
self.token = {
"identityType":"mobile",
"identityValue":identityValue,
"type2":"wechat-unionid",
"value2":"",
"source":"顾家小程序",
"contentName":"",
"openid":openid,
"unionid":unionid
}
# print(self.token)
last_info = split_info[len_split_info - 1]
self.send_UID = None
if len_split_info > 0 and "UID_" in last_info:
print('检测到设置了UID')
print(last_info)
self.send_UID = last_info
self.index = index + 1
self.max_try = 3
self.headers = {
"Host": "mc.kukahome.com",
"E-Opera": "",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090a13) XWEB/8555",
"Content-Type": "application/json",
# "timestamp": "1715180892075",
"xweb_xhr": "1",
"brandCode": "K001",
# "X-Customer": "4802198",
"appid": "667516",
# "sign": "7acf4f02413bd9855047cc345be6da21",
"Accept": "*/*",
"Sec-Fetch-Site": "cross-site",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Referer": "https://servicewechat.com/wx0770280d160f09fe/179/page-frame.html",
"Accept-Language": "zh-CN,zh;q=0.9",
}
def generate_sign(self):
new_data = CHERWIN_TOOLS.GJJJ_SIGN()
self.headers.update(new_data)
def make_request(self, url, method='post', headers={}, params={}):
self.generate_sign()
if headers == {}:
headers = self.headers
# if params == {}:
# params = self.default_data
try:
if method.lower() == 'get':
response = requests.get(url, headers=headers, verify=False)
elif method.lower() == 'post':
response = requests.post(url, headers=headers, json=params, verify=False)
else:
raise ValueError("不支持的请求方法❌: " + method)
return response.json()
except requests.exceptions.RequestException as e:
print("请求异常❌:", e)
except ValueError as e:
print("值错误或不支持的请求方法❌:", e)
except Exception as e:
print("发生了未知错误❌:", e)
def automaticLogin(self):
Log('======= 刷新用户信息 =======')
url = 'https://mc.kukahome.com/club-server/member/automaticLogin'
response = self.make_request(url,params=self.token)
# print(response)
if response.get('msg')== "成功":
data = response.get('data','')
AccessToken = data.get('AccessToken','')
membership = data.get('membership', {})
point = membership.get('point', '')
# systemSource = membership.get('systemSource', '')
self.membershipId = membership.get('id', '')
self.brandCode = membership.get('brandCode', '')
self.mobile = membership.get('mobile', '')
oneId = membership.get('oneId', '')
memberLevel = membership.get('memberLevel', '')
new_headers={
"brandCode":self.brandCode,
"X-Customer": str(self.membershipId),
"AccessToken":AccessToken
}
self.headers.update(new_headers)
Log(f'>手机号:【{self.mobile}')
print(f'>ID{self.membershipId}')
print(f'>等级:【{memberLevel}')
print(f'>oneId{oneId}')
Log(f'>当前积分:【{point}')
return True
else:
Log(f'>可能token失效了❌,{response}')
return False
def personal(self):
Log('======= 查询用户信息 =======')
url = 'https://mc.kukahome.com/club-server/front/member/personal'
response = self.make_request(url,params=self.token)
# print(response)
if response:
point = response.get('point','')
Log(f'>执行后积分:【{point}')
return True
else:
Log(f'>可能token失效了❌,{response}')
return False
def selectPointTask(self):
Log('======= 获取任务列表 =======')
url = 'https://mc.kukahome.com/club-server/front/member/selectPointTask'
data={"brandCode":self.brandCode}
response = self.make_request(url,params=data)
print(response)
if response.get('code')== 0:
data = response.get('data',[{}])
for li in data:
pass
return True
else:
Log(f'>可能token失效了❌,{response}')
return False
def checkSign(self):
Log('======= 查询签到状态 =======')
url = 'https://mc.kukahome.com/club-server/front/member/calendar'
data={
"t":int(time.time()*1000),
"membershipId":self.membershipId
}
response = self.make_request(url,params=data)
if response:
isTodaySigned = response.get('isTodaySigned',False)
if not isTodaySigned:
Log('>今日未签到')
self.signIn()
else:
Log('>今日已签到✅')
return True
else:
Log(f'>可能token失效了❌,{response}')
return False
def signIn(self):
Log('======= 开始签到 =======')
url = 'https://mc.kukahome.com/club-server/front/member/signIn'
data={
"identityType":"mobile",
"identityValue":self.mobile,
"membershipId":self.membershipId
}
response = self.make_request(url,params=data)
# print(response)
if response.get('status')== 200:
data = response.get('data',[])
success = data.get('success',False)
if success:
Log('>签到成功!✅')
else:
Log(f'>可能token失效了❌,{response}')
return False
def selectPage(self):
print('======= 获取帖子列表 =======')
url = 'https://mc.kukahome.com/club-server/applet/waterfall/selectPage'
random_topicId = str(random.randint(0,27))
data={"source":1,"pageNum":1,"pageSize":5,"topicId":random_topicId}
response = self.make_request(url,params=data)
# print(response)
if response.get('code')== 0:
data = response.get('data',{})
list = data.get('list',[{}])
if list:
print('>获取帖子列表成功✅')
randomlist=random.choices(list)
# print(randomlist)
self.postLikeTask(randomlist)
self.postCollectTask(randomlist)
self.postCollectTask(randomlist)
return True
else:
if self.max_try > 0:
print('>获取帖子列表失败❌!正常重试....')
self.selectPage()
self.max_try -= 1
else:
Log(f'>可能token失效了❌,{response}')
return False
def like_post(self, post_id):
url = 'https://mc.kukahome.com/club-server/front/postOrder/like'
data = {"id": post_id}
response = self.make_request(url, params=data)
return response
def likeSendPoint(self, post_id,triggerType,content):
url = 'https://mc.kukahome.com/club-server/front/member/likeSendPoint'
data = {
"postOrderId": post_id,
"triggerType": triggerType,
"content": content
}
response = self.make_request(url, params=data)
return response
def collect_post(self, post_id):
url = 'https://mc.kukahome.com/club-server/front/postOrder/collect'
data = {"id": post_id}
response = self.make_request(url, params=data)
return response
def submit_comment(self, post_id, content, post_member_id, parent_id="", is_flag=False):
url = 'https://mc.kukahome.com/club-server/user/post/comment/insert'
data = {
"postId": post_id,
"parentId": parent_id,
"content": content,
"postMemberId": post_member_id,
"commentMemberId": self.access_token,
"isFlag": is_flag
}
response = self.make_request(url, params=data)
return response
def insert_foot_point(self, buried_point_logo, subordinate_terminal, business_name, business_code,current_page_link):
url = 'https://mc.kukahome.com/club-server/front/foot/point/insertFootPoint'
data = {
"brandCode": "K001",
"buriedPointLogo": buried_point_logo,
"subordinateTerminal": subordinate_terminal,
"businessName": business_name,
"businessCode": business_code,
"currentPageLink": current_page_link
}
response = self.make_request(url, params=data)
return response
def push_event(self, event_id, content, target_id, target_name, business_id, business_name):
url = 'https://mc.kukahome.com/club-server/front/member/pushEvent'
data = {
"eventId": event_id,
"content": content,
"targetId": target_id,
"targetName": target_name,
"businessId": business_id,
"businessName": business_name
}
response = self.make_request(url, params=data)
return response
def like_send_point(self, post_order_id, trigger_type, content):
url = 'https://mc.kukahome.com/club-server/front/member/likeSendPoint'
data = {
"postOrderId": post_order_id,
"triggerType": trigger_type,
"content": content
}
response = self.make_request(url, params=data)
return response
def resp_result(self,even,response):
if response.get('message','') == '提示:保存成功!':
print(f'{even}保存成功✅')
else:
print(f'{even}保存失败❌')
def postLikeTask(self,postlist):
# 点赞帖子
post_id = postlist[0]['id']
title = postlist[0]['title']
print("======= 进入点赞流程------>>>")
# 推送点赞事件
event_response = self.push_event(event_id="c_showhome_like", content="晒家-点赞", target_id="300001",target_name="晒家-点赞", business_id=post_id,business_name=title)
self.resp_result("推送点赞事件:", event_response)
# 点赞事件
like_response = self.like_post(post_id)
self.resp_result("点赞帖子:", like_response)
# 插入点赞足迹
foot_point_response = self.insert_foot_point(buried_point_logo="do_good_btn",subordinate_terminal="会员小程序", business_name="",business_code="", current_page_link="")
self.resp_result("插入点赞足迹:", foot_point_response)
#
# 点赞送积分
like_point_response = self.like_send_point(post_order_id=post_id, trigger_type="1",content="点赞")
self.resp_result("点赞送积分响应:", like_point_response)
print("------>>>点赞结束\n")
print("======= 进入取消点赞流程------>>>")
# 取消点赞帖子
like_response = self.like_post(post_id)
self.resp_result("取消点赞帖子:", like_response)
# 插入取消点赞足迹
foot_point_response = self.insert_foot_point(buried_point_logo="cancel_good_btn", subordinate_terminal="会员小程序",business_name="", business_code="", current_page_link="")
self.resp_result("插入取消点赞足迹:", foot_point_response)
#
# 推送取消点赞事件
event_response = self.push_event(event_id="c_showhome_unlike", content="晒家-取消点赞", target_id="300002",target_name="晒家-取消点赞", business_id=post_id, business_name=title)
self.resp_result("推送取消点赞事件:", event_response)
print("------>>>取消点赞结束\n")
def postCollectTask(self,postlist):
post_id = postlist[0]['id']
title = postlist[0]['title']
print("======= 进入收藏帖子流程------>>>")
# 收藏帖子
collect_response = self.collect_post(post_id)
self.resp_result("收藏帖子:", collect_response)
# 插入收藏足迹
foot_point_response = self.insert_foot_point(buried_point_logo="buriedPointLogo", subordinate_terminal="会员小程序", business_name="", business_code="", current_page_link="")
self.resp_result("插入收藏足迹:", foot_point_response)
#
#收藏送积分
likeSendPoint_response = self.likeSendPoint(post_id, 2, "收藏")
self.resp_result("收藏送积分:", likeSendPoint_response)
print("------>>>收藏帖子结束\n")
print("======= 进入取消收藏帖子流程------>>>")
# 取消收藏足迹
foot_point_response = self.insert_foot_point(buried_point_logo="cancel_collect_btn", subordinate_terminal="会员小程序",business_name="", business_code="", current_page_link="")
self.resp_result("取消收藏足迹:", foot_point_response)
# 取消收藏帖子
uncollect_response = self.collect_post(post_id)
self.resp_result("取消收藏帖子:", uncollect_response)
print("------>>>取消收藏帖子结束\n")
def main(self):
Log(f"\n======= 开始执行第{self.index}个账号 =======")
if self.automaticLogin():
self.checkSign()
# self.selectPointTask()
self.selectPage()
self.personal()
self.sendMsg()
return True
else:
self.sendMsg()
return False
def sendMsg(self):
if self.send_UID:
push_res = CHERWIN_TOOLS.wxpusher(self.send_UID, one_msg, APP_NAME)
print(push_res)
def down_file(filename, file_url):
print(f'开始下载:{filename},下载地址:{file_url}')
try:
response = requests.get(file_url, verify=False, timeout=10)
response.raise_for_status()
with open(filename + '.tmp', 'wb') as f:
f.write(response.content)
print(f'{filename}】下载完成!')
# 检查临时文件是否存在
temp_filename = filename + '.tmp'
if os.path.exists(temp_filename):
# 删除原有文件
if os.path.exists(filename):
os.remove(filename)
# 重命名临时文件
os.rename(temp_filename, filename)
print(f'{filename}】重命名成功!')
return True
else:
print(f'{filename}】临时文件不存在!')
return False
except Exception as e:
print(f'{filename}】下载失败:{str(e)}')
return False
def import_Tools():
global CHERWIN_TOOLS,ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode
import CHERWIN_TOOLS
ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode = CHERWIN_TOOLS.main(APP_NAME, local_script_name, ENV_NAME,local_version)
if __name__ == '__main__':
APP_NAME = '顾家家居小程序'
ENV_NAME = 'GJJJ'
CK_URL = 'https://mc.kukahome.com/club-server/member/automaticLogin'
CK_NAME = 'identityValue@openid@o98mO0xrQ9Jqp0DUsGpmfHpQm_pQ'
print(f'''
{APP_NAME}签到
功能
积分签到
社区互动
抓包步骤
打开{APP_NAME}
授权登陆
打开抓包工具
{CK_URL}返回值[{CK_NAME}]
参数示例3ee9ceccccscscscscscscsc
wxpusher一对一推送功能
需要定义变量export WXPUSHER=wxpusher的app_token不设置则不启用wxpusher一对一推送
需要在{ENV_NAME}变量最后添加@wxpusher的UID
设置青龙变量
export {ENV_NAME}='{CK_NAME}参数值'多账号#或&分割
export SCRIPT_UPDATE = 'False' 关闭脚本自动更新默认开启
注意抓完CK没事儿别打开小程序重新打开小程序请重新抓包
推荐cron1 9 * * *
@Author CHERWIN
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.05.09'
if IS_DEV:
import_Tools()
else:
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print('脚本依赖下载失败请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token
if not token:
print(f"未填写{ENV_NAME}变量\n青龙可在环境变量设置 {ENV_NAME} 或者在本脚本文件上方将{CK_NAME}填入token =''")
exit()
tokens = CHERWIN_TOOLS.ENV_SPLIT(token)
# print(tokens)
if len(tokens) > 0:
print(f"\n>>>>>>>>>>共获取到{len(tokens)}个账号<<<<<<<<<<")
access_token = []
for index, infos in enumerate(tokens):
s = requests.session()
s.verify = False
run_result = RUN(infos, index).main()
if not run_result: continue
if send: send(f'{APP_NAME}挂机通知', send_msg + TIPS_HTML)

26
HDL.py
View File

@ -1,10 +1,11 @@
# !/usr/bin/python3
# -- coding: utf-8 --
# -------------------------------
# ✨ 推荐cron0 6 * * *
# ✨✨✨ @Author CHERWIN✨✨✨
# -------------------------------
# cron "1 8 * * *" script-path=xxx.py,tag=匹配cron用
# const $ = new Env('海底捞小程序签到')
# cron "0 6 * * *" script-path=xxx.py,tag=匹配cron用
# const $ = new Env('中通快递小程序签到')
import os
from os import path
@ -15,8 +16,11 @@ from requests.packages.urllib3.exceptions import InsecureRequestWarning
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
IS_DEV = False
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
@ -237,6 +241,7 @@ class RUN:
self.sendMsg()
return True
else:
self.sendMsg()
return False
def sendMsg(self):
if self.send_UID:
@ -299,20 +304,23 @@ if __name__ == '__main__':
export {ENV_NAME}='{CK_NAME}参数值'多账号#或&分割
export SCRIPT_UPDATE = 'False' 关闭脚本自动更新默认开启
注意抓完CK没事儿别打开小程序重新打开小程序请重新抓包
推荐cron01 8 * * *
推荐cron0 6 * * *
@Author CHERWIN
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.04.08'
if os.path.isfile('CHERWIN_TOOLS.py'):
local_version = '2024.05.15'
if IS_DEV:
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
print('脚本依赖下载失败请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print('脚本依赖下载失败请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token

22
JTSD.py
View File

@ -1,9 +1,8 @@
# !/usr/bin/python3
# -- coding: utf-8 --
# -------------------------------
# ✨✨✨ @Author CHERWIN✨✨✨
# -------------------------------
# cron "0 6 * * *" script-path=xxx.py,tag=匹配cron用
# cron "0 5 * * *" script-path=xxx.py,tag=匹配cron用
# const $ = new Env('极兔速递小程序签到')
import json
import os
@ -17,8 +16,11 @@ from requests.packages.urllib3.exceptions import InsecureRequestWarning
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
IS_DEV = False
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
@ -133,6 +135,7 @@ class RUN:
self.sign()
self.sendMsg()
else:
self.sendMsg()
return False
def sendMsg(self):
@ -200,16 +203,19 @@ export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
@Author CHERWIN
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.04.18'
if os.path.isfile('CHERWIN_TOOLS.py'):
local_version = '2024.05.15'
if IS_DEV:
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
print('脚本依赖下载失败请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print('脚本依赖下载失败请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token

115
KFHS.py
View File

@ -1,21 +1,23 @@
# !/usr/bin/python3
# -- coding: utf-8 --
# -------------------------------
# cron "30 1 * * *" script-path=xxx.py,tag=匹配cron用
# cron "0 0,8 * * " script-path=xxx.py,tag=匹配cron用
# const $ = new Env('微信公众号:卡夫亨氏新厨艺')
import os
import random
import time
from datetime import date, datetime
from datetime import date, datetime,time as times
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# import CHERWIN_TOOLS
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
IS_DEV = False
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
@ -75,7 +77,7 @@ class RUN:
openId = data.get('openId','')
signTimes = data.get('signTimes',0)
memberInfo = data.get('memberInfo', {})
phone = memberInfo.get('phone', '')
self.phone = memberInfo.get('phone', '')
score = memberInfo.get('score', '')
if End :
Log(f'执行后积分:【{score}')
@ -84,22 +86,23 @@ class RUN:
# add = {"nickname":nickname,"member_id":member_id}
userid_list.append(self.member_id)
username_list.append(nickname)
Log(f'>>>当前用户:【{nickname}\nID{self.member_id}\nOpenID:【{openId}\n已连续签到【{signTimes}】天')
serialSign = data.get('serialSign', [{}])
signTimes = data.get('signTimes', 0)
Log(f'>>>当前用户:【{nickname}\nID{self.member_id}\nOpenID:【{openId}\n已连续签到【{signTimes}】天')
if serialSign :
if serialSign:
current_date = date.today()
date_string = serialSign[0].get('createdAt',current_date)
memberBalance = serialSign[0].get('memberBalance', 0)
parsed_date = datetime.strptime(date_string, '%Y-%m-%d %H:%M:%S').date()
if parsed_date == current_date:
print(f"今日已签到,当前积分:【{memberBalance}")
else:
print("今日未签到")
wait_time = random.randint(1000, 10000) / 1000.0 # 转换为秒
time.sleep(wait_time)
print('随机延时1-10秒执行签到')
self.dailySign()
else:
print("今日未签到")
wait_time = random.randint(1000, 10000) / 1000.0 # 转换为秒
time.sleep(wait_time)
print('随机延时1-10秒执行签到')
self.dailySign()
return True
except:
print(response.text)
@ -141,12 +144,33 @@ class RUN:
id_list = resp['data']['chineseCookbook']['data']
for i in id_list:
Cookid_list.append(i['id'])
# self.creatCookbookCode(i['id'])
print(f'>获取到菜谱ID:【{Cookid_list}')
except:
print(response.text)
else:
print("API访问失败")
def creatCookbookCode(self,cookbook_id):
data = {
'cookbook_id':cookbook_id
}
response = self.s.post(
f'{self.baseUrl}createCookbookCode',
headers=self.headers, data=data
)
if response.status_code == 200:
try:
resp = response.json()
data = resp.get('data',{})
code_url = data.get('code_url','')
print(f'创建分享链接成功:[{code_url}]')
except:
print(response.text)
else:
print("API访问失败")
def recordScoreShare(self, cookbook_id, now_id):
# print('')
# self.getUserInfo()
@ -195,19 +219,31 @@ class RUN:
else:
Log('助力对象为自身,跳过')
# 预留兑换函数
def exchangeIntegralNew(self):
def exchange(self):
data = {
'value': '爱奇艺月卡',
'phone': '',
'type': '视频卡'
'phone': self.phone
}
# data = {
# 'value': '全网10元话费',
# 'phone': '',
# 'type': '话费',
# 'memberId': '',
# }
huafei_li = ['全网10元话费', '全网20元话费']
videoCard_li = ['爱奇艺', '腾讯', '优酷']
cardType = ['年卡', '季卡', '月卡', '周卡']
for card in huafei_li:
data['value'] = card
data['type'] = '话费'
data['memberId'] = self.member_id
self.exchangeIntegralNew(data)
for card in cardType:
for video in videoCard_li:
cardname = video + card
data['value'] = cardname
data['type'] = '视频卡'
self.exchangeIntegralNew(data)
print(cardname)
# 预留兑换函数
def exchangeIntegralNew(self,data):
print(f'正在尝试兑换【{data["value"]}')
response = self.s.post(
f'{self.baseUrl}exchangeIntegralNew',
headers=self.headers,
@ -224,15 +260,18 @@ class RUN:
print("API访问失败")
def main(self):
# self.getUserInfo()
# self.getCookbookIndex()
if self.getUserInfo():
if self.index == 1:
self.getCookbookIndex()
now = datetime.now().time()
start_time = times(23, 59, 59)
end_time = times(0, 5)
if start_time <= now <= end_time:
self.exchange()
self.getCookbookIndex()
self.getUserInfo(True)
self.sendMsg()
return True
else:
self.sendMsg()
return False
def help(self):
@ -245,6 +284,7 @@ class RUN:
now_id = userid_list[self.index-1]
Log(f'\n当前用于助力用户:【{username}】 ID:【{now_id}')
self.helpAuthor(662056, cookbook_id, now_id)
self.helpAuthor(662086, cookbook_id, now_id)
self.recordScoreShare(cookbook_id, now_id)
self.sendMsg(True)
return True
@ -316,29 +356,32 @@ if __name__ == '__main__':
export {ENV_NAME}='{CK_NAME}参数值'多账号#或&分割
export SCRIPT_UPDATE = 'False' 关闭脚本自动更新默认开启
注意token有效期7天7天后重新抓
推荐cron5 8 * * *
推荐cron0 0,8 * *
@Author CHERWIN
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.04.06'
if os.path.isfile('CHERWIN_TOOLS.py'):
local_version = '2024.05.15'
if IS_DEV:
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
print('脚本依赖下载失败请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print('脚本依赖下载失败请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token
print(token)
# print(token)
if not token:
print(f"未填写{ENV_NAME}变量\n青龙可在环境变量设置 {ENV_NAME} 或者在本脚本文件上方将{CK_NAME}填入token =''")
exit()
tokens = CHERWIN_TOOLS.ENV_SPLIT(token)
print(tokens)
# print(tokens)
Cookid_list = []
userid_list = []
username_list = []

372
KGZJ.py Executable file
View File

@ -0,0 +1,372 @@
# !/usr/bin/python3
# -- coding: utf-8 --
# -------------------------------
# cron "30 9 * * *" script-path=xxx.py,tag=匹配cron用
# const $ = new Env('宽哥之家小程序')
import os
import random
import time
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# import CHERWIN_TOOLS
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
IS_DEV = False
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
else:
print("加载通知服务失败!")
send_msg = ''
one_msg=''
def Log(cont=''):
global send_msg,one_msg
print(cont)
if cont:
one_msg += f'{cont}\n'
send_msg += f'{cont}\n'
class RUN:
def __init__(self,info,index):
global one_msg
one_msg = ''
split_info = info.split('@')
self.token = split_info[0]
len_split_info = len(split_info)
last_info = split_info[len_split_info - 1]
self.send_UID = None
if len_split_info > 0 and "UID_" in last_info:
self.send_UID = last_info
self.index = index + 1
Log(f"\n---------开始执行第{self.index}个账号>>>>>")
self.s = requests.session()
self.s.verify = False
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090a13) XWEB/9129',
'Host': 'shop.sctobacco.com',
'Connection': 'keep-alive',
'xweb_xhr': '1',
'gray': '0',
'token': self.token,
'Accept': '*/*',
'Sec-Fetch-Site': 'cross-site',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Referer': 'https://servicewechat.com/wxb6bc0796e0f0db00/224/page-frame.html',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9'
}
self.s.headers.update(self.headers)
def do_request(self, url, method='POST',params=None, data=None, headers=None):
try:
response = self.s.request(method, url, params=params, json=data, headers=headers)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"请求错误: {e}")
return None
def personal_info(self):
Log(f'======= 查询用户信息 =======')
personal_info_valid = False
try:
# 发送GET请求
response = self.do_request('https://shop.sctobacco.com/api/mc-server/mypage/simpleInfo',method='GET')
# 检查请求是否成功
if response.get('code','-1'):
personal_info_valid = True
data = response.get('data', {})
mobile_phone = data.get('phone','')
self.name = data.get('nickname','')
unionId = data.get('unionId','')
self.mobile_phone = mobile_phone[:3] + "*" * 4 + mobile_phone[7:]
# 提取个人信息
Log(f">>账号[{self.index}]登陆成功!✅\n用户名:【{self.name}\n手机号:【{self.mobile_phone}")
else:
# 如果请求不成功,则打印错误信息
message = response.get('msg', '')
Log(f'>>登录失败❌: {message}')
except Exception as e:
# 捕获任何异常并打印
print(e)
finally:
# 最终返回请求是否成功的标志
return personal_info_valid
def myTask(self):
print(f'======= 查询任务列表 =======')
try:
# 发送GET请求
response = self.do_request('https://shop.sctobacco.com/api/mc-server/mcTask/myTask',method='GET')
# 检查请求是否成功
if response.get('code','-1'):
data = response.get('data', {})
taskList = data.get('taskList',[{}])
skip_task =[50001,50002,30002]
for task in taskList:
taskName = task.get('taskName', '')
taskId = task.get('taskId', '')
isCompleted = task.get('isCompleted', '0')
print(f'>>当前任务:【{taskName}')
if taskId in skip_task:
print('>暂不支持,跳过❌')
continue
if isCompleted != '0' :
print('>已完成,跳过✅')
continue
if taskId == 30016:
self.SignSubmit()
elif taskId == 30004:
self.listForMobile()
else:
# 如果请求不成功,则打印错误信息
message = response.get('msg', '')
Log(f'查询任务列表失败❌: {message}')
except Exception as e:
# 捕获任何异常并打印
print(e)
def get_score(self):
Log(f'======= 获取积分信息 =======')
try:
timestamp = int(time.time() * 1000)
# 发送GET请求
response = self.do_request(f'https://shop.sctobacco.com/api/sc-server/log/detail?scoreTypeId=jifen001',method='GET')
# 检查请求是否成功
if response.get('code','-1'):
data = response.get('data', {})
if data:
scoreTypeName = data.get('scoreTypeName', '')
totalScore = data.get('totalScore', '')
Log(f'>当前{scoreTypeName}:【{totalScore}】✅')
else:
Log('>获取积分信息失败❌')
else:
# 如果请求不成功,则打印错误信息
message = response.get('msg', '')
Log(f'获取积分信息失败❌: {message}')
except Exception as e:
# 捕获任何异常并打印
print(e)
def SignSubmit(self):
Log(f'======= 签到 =======')
try:
timestamp = int(time.time() * 1000)
# 发送GET请求
response = self.do_request(f'https://shop.sctobacco.com/api/ac-server/manage/acSignMemberLog/SignSubmit?t={timestamp}',method='GET')
# 检查请求是否成功
if response.get('code','-1'):
data = response.get('data', '')
if data:
Log(f'>签到成功✅,获得{data}积分')
else:
Log('>签到失败❌')
else:
# 如果请求不成功,则打印错误信息
message = response.get('msg', '')
Log(f'签到失败❌: {message}')
except Exception as e:
# 捕获任何异常并打印
print(e)
def listForMobile(self):
Log(f'======= 获取文章列表 =======')
try:
timestamp = int(time.time() * 1000)
# 发送GET请求
response = self.do_request(f'https://shop.sctobacco.com/api/mc-server/mcMedia/listForMobile?t={timestamp}&offset=0&limit=10&isShow=1',method='GET')
# 检查请求是否成功
if response.get('code','-1'):
data = response.get('data', {})
rows = response.get('rows', [{}])
random_element = random.choice(rows)
if random_element:
Log(f'>>>获取文章列表成功✅')
appid = random_element['appid']
title = random_element['title']
mediaId = random_element['mediaId']
Log(f'>>前选择文章:【{title}】 appid:{appid} mediaId:{mediaId}')
self.clickMedia(mediaId,appid)
else:
Log('>>获取文章列表失败❌')
else:
# 如果请求不成功,则打印错误信息
message = response.get('msg', '')
Log(f'获取文章列表失败❌: {message}')
except Exception as e:
# 捕获任何异常并打印
print(e)
def clickMedia(self,mediaId,appid):
Log(f'======= 阅读文章 =======')
try:
parmas={
'mpMediaId':mediaId,
'mediaId':mediaId,
'appid' :appid
}
# 发送GET请求
response = self.do_request(f'https://shop.sctobacco.com/api/mc-server/mcMedia/clickMedia',method='GET',params=parmas)
# 检查请求是否成功
if response.get('code','-1'):
message = response.get('message', {})
if message == "success":
Log(f'>阅读文章成功✅')
else:
Log('>阅读文章失败❌')
else:
# 如果请求不成功,则打印错误信息
message = response.get('msg', '')
Log(f'阅读文章❌: {message}')
except Exception as e:
# 捕获任何异常并打印
print(e)
def clickMedia(self,mediaId,appid):
Log(f'======= 阅读文章 =======')
try:
parmas={
'mpMediaId':mediaId,
'mediaId':mediaId,
'appid' :appid
}
# 发送GET请求
response = self.do_request(f'https://shop.sctobacco.com/api/mc-server/mcMedia/clickMedia',method='GET',params=parmas)
# 检查请求是否成功
if response.get('code','-1'):
message = response.get('message', {})
if message == "success":
Log(f'>阅读文章成功✅')
else:
Log('>阅读文章失败❌')
else:
# 如果请求不成功,则打印错误信息
message = response.get('msg', '')
Log(f'阅读文章❌: {message}')
except Exception as e:
# 捕获任何异常并打印
print(e)
def main(self):
if not self.personal_info() :
Log("用户信息无效请更新CK")
self.sendMsg()
return False
self.myTask()
self.get_score()
self.sendMsg()
return True
def sendMsg(self, help=False):
if self.send_UID:
push_res = CHERWIN_TOOLS.wxpusher(self.send_UID, one_msg, APP_NAME, help)
print(push_res)
def down_file(filename, file_url):
print(f'开始下载:{filename},下载地址:{file_url}')
try:
response = requests.get(file_url, verify=False, timeout=10)
response.raise_for_status()
with open(filename + '.tmp', 'wb') as f:
f.write(response.content)
print(f'{filename}】下载完成!')
# 检查临时文件是否存在
temp_filename = filename + '.tmp'
if os.path.exists(temp_filename):
# 删除原有文件
if os.path.exists(filename):
os.remove(filename)
# 重命名临时文件
os.rename(temp_filename, filename)
print(f'{filename}】重命名成功!')
return True
else:
print(f'{filename}】临时文件不存在!')
return False
except Exception as e:
print(f'{filename}】下载失败:{str(e)}')
return False
def import_Tools():
global CHERWIN_TOOLS,ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode
import CHERWIN_TOOLS
ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode = CHERWIN_TOOLS.main(APP_NAME, local_script_name, ENV_NAME,local_version)
if __name__ == '__main__':
APP_NAME = '宽哥之家小程序'
ENV_NAME = 'KGZJ'
CK_NAME = 'token'
print(f'''
{APP_NAME}签到
功能
积分签到
抓包步骤
打开{APP_NAME}
授权登陆
打开抓包工具
找请求头带{CK_NAME}的URl
复制里面的{CK_NAME}参数值
wxpusher一对一推送功能
需要定义变量export WXPUSHER=wxpusher的app_token不设置则不启用wxpusher一对一推送
需要在{ENV_NAME}变量最后添加@wxpusher的UID
参数示例1790314xxxxxx@UID_xxxxx
设置青龙变量
export {ENV_NAME}='{CK_NAME}参数值'多账号#或&分割
export SCRIPT_UPDATE = 'False' 关闭脚本自动更新默认开启
注意抓完CK没事儿别打开小程序重新打开小程序请重新抓包
推荐cron5 8 * * *
@Author CHERWIN
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.05.15'
if IS_DEV:
import_Tools()
else:
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print('脚本依赖下载失败请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token
if not token:
print(f"未填写{ENV_NAME}变量\n青龙可在环境变量设置 {ENV_NAME} 或者在本脚本文件上方将{CK_NAME}填入token =''")
exit()
tokens = CHERWIN_TOOLS.ENV_SPLIT(token)
# print(tokens)
if len(tokens) > 0:
print(f"\n>>>>>>>>>>共获取到{len(tokens)}个账号<<<<<<<<<<")
for index, infos in enumerate(tokens):
run_result = RUN(infos, index).main()
if not run_result: continue
if send: send(f'{APP_NAME}挂机通知', f'{send_msg}\n{TIPS_HTML}')

18
KKYP.py Normal file → Executable file
View File

@ -10,8 +10,11 @@ import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
IS_DEV = False
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
@ -159,16 +162,19 @@ export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
@Author CHERWIN
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.05.04'
if os.path.isfile('CHERWIN_TOOLS.py'):
local_version = '2024.05.15'
if IS_DEV:
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
print('脚本依赖下载失败请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print('脚本依赖下载失败请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token

19
LSXDS.py Normal file → Executable file
View File

@ -14,9 +14,11 @@ from requests.packages.urllib3.exceptions import InsecureRequestWarning
# import CHERWIN_TOOLS
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
#
IS_DEV = False
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
@ -368,16 +370,19 @@ export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
@Author CHERWIN
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.05.08'
if os.path.isfile('CHERWIN_TOOLS.py'):
local_version = '2024.05.15'
if IS_DEV:
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
print('脚本依赖下载失败请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print('脚本依赖下载失败请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token

18
MXBC.py Normal file → Executable file
View File

@ -20,8 +20,11 @@ from requests.packages.urllib3.exceptions import InsecureRequestWarning
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
IS_DEV = False
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
@ -300,16 +303,19 @@ export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
@Author CHERWIN
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.04.15'
if os.path.isfile('CHERWIN_TOOLS.py'):
local_version = '2024.05.15'
if IS_DEV:
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
print('脚本依赖下载失败请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print('脚本依赖下载失败请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token

18
NXDD.py
View File

@ -18,8 +18,11 @@ from requests.packages.urllib3.exceptions import InsecureRequestWarning
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
IS_DEV = False
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
@ -308,16 +311,19 @@ export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
@Author CHERWIN
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.04.06'
if os.path.isfile('CHERWIN_TOOLS.py'):
local_version = '2024.05.15'
if IS_DEV:
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
print('脚本依赖下载失败请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print('脚本依赖下载失败请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token

174
PPCS.py
View File

@ -19,8 +19,11 @@ import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
IS_DEV = False
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
@ -75,6 +78,15 @@ class RUN:
self.zip = None
self.lng = None
self.lat = None
self.boost_id = ''
self.boost_id_li = []
self.boost_name = ''
self.boost_type = ''
self.boost_is_enabled = ''
self.boost_is_finished = ''
self.boost_entity_id = ''
self.need_newuser = False
self.status = False
self.params = {
'supplement_id': '',
'lat_y': '',
@ -117,11 +129,11 @@ class RUN:
print('>>>>>>开始随机选择位置')
url = 'https://j1.pupuapi.com/client/store/place/near_location_by_city/v2'
# 生成随机的四位数
random_digits = ''.join(random.choices('0123456789', k=4))
random_digits = ''.join(random.choices('0123456789', k=13))
# 准备查询参数
search_params = {
'lng': '119.31' + random_digits,
'lat': '26.06' + random_digits
'lng': '119.30' + random_digits,
'lat': '26.08' + random_digits
}
try:
result =self.make_request(url,method='get',params=search_params)
@ -169,13 +181,14 @@ class RUN:
response.raise_for_status()
json_response = response.json()
print(json_response)
# print(json_response)
data = json_response.get('data', {})
access_token = data.get('access_token', '')
if access_token:
self.access_token = f'Bearer '+access_token
self.user_id = data.get('user_id')
self.name = data.get('nick_name', '')
self.is_new_user = data.get('is_new_user', '')
self.headers['Authorization'] = self.access_token
self.headers['pp-userid'] = self.user_id
append_data = {
@ -183,6 +196,7 @@ class RUN:
'user_id':self.user_id
}
access_token_li.append(append_data)
self.status = True
# print(f'账号【{self.index}】access_token{self.access_token}')
return True
else:
@ -382,18 +396,20 @@ class RUN:
print(f'查询组队信息失败[{errcode}]: {errmsg}')
except Exception as e:
print(str(e))
# 组队
def joinAuthorTeam(self):
Log(f'>>>>>>第1个账号开始助力作者')
print(f'>>>>>>第1个账号开始助力作者')
if len(AuthorCode) > 0:
for code in AuthorCode:
# print(code['teamId'])
url = f"https://j1.pupuapi.com/client/game/coin_share/teams/{code['teamId']}/join"
response = self.make_request(url)
# print(response)
if 'errcode' in response and response.get('errcode', '') == 0:
print(f'>入队成功:【{code}')
break
if code.get('status',False) and code.get('phone','') != self.phone:
url = f"https://j1.pupuapi.com/client/game/coin_share/teams/{code['teamId']}/join"
response = self.make_request(url)
# print(response)
if 'errcode' in response and response.get('errcode', '') == 0:
print(f'>入队成功:【{code}')
break
# elif 'errcode' in response and response.get('errcode', '') != 0:
# print(f'>入队失败:【{response.get("errmsg", "")}】')
# else:
@ -420,6 +436,100 @@ class RUN:
Log(f'>入队失败')
print(response)
def boost_recommend(self):
print(f'>>>>>>开始获取助力活动列表')
url = f'https://j1.pupuapi.com/client/boost/recommend'
try:
result = self.make_request(url,method='get')
errcode = result.get('errcode', -1)
if errcode == 0:
data = result.get('data',[])
for li in data:
self.boost_id = li.get('id','')
self.boost_name = li.get('name','')
self.boost_type = li.get('type','')
self.boost_is_enabled = li.get('is_enabled','')
self.boost_is_finished = li.get('is_finished','')
self.boost_entity_id = li.get('entity_id','')
self.boost_finish_condition_msg = li.get('boost_finish_condition_msg','')
if self.boost_entity_id:
print(f'当前项目:【{self.boost_name}】 已发起 ID{self.boost_entity_id}】 完成状态:{self.boost_is_finished} ')
li = {
'boost_entity_id': self.boost_entity_id,
'boost_type': self.boost_type,
'need_newuser':self.need_newuser
}
self.boost_id_li.append(li)
continue
print(f'当前项目:【{self.boost_name}】 完成状态:{self.boost_is_finished} ')
if '新人' in self.boost_finish_condition_msg:
self.need_newuser = True
continue
if not self.boost_take_in():break
print(self.boost_id_li)
else:
errmsg = result.get('errmsg', '')
print(f'查询组队信息失败[{errcode}]: {errmsg}')
except Exception as e:
print(str(e))
def boost_help(self):
print(f'>开始助力')
if len(AuthorCode) > 0:
for code in AuthorCode:
# print(code['teamId'])
if code.get('status',False) and code.get('phone','') != self.phone and code.get('boost',False) and self.is_new_user:
boost_li = code.get('boost',[])
for boost in boost_li:
boost_entity_id = boost.get('boost_entity_id','')
boost_type = boost.get('boost_type', '')
url = f'https://j1.pupuapi.com/client/boost/assist/{boost_entity_id}?group_type={boost_type}'
# try:
result = self.make_request(url)
errcode = result.get('errcode', -1)
if errcode == 0:
print('助力成功')
self.is_new_user = False
break
else:
errmsg = result.get('errmsg', '')
print(f'助力失败[{errcode}]: {errmsg}')
break
else:
continue
# except Exception as e:
# print(str(e))
def boost_take_in(self):
print(f'>开始发起助力活动')
self.lng = str(self.location['lng_x'])
self.lat = str(self.location['lat_y'])
url = f'https://j1.pupuapi.com/client/boost/take_in/{self.boost_id}?lat_y={self.lat}&lng_x={self.lng}'
# try:
result = self.make_request(url)
errcode = result.get('errcode', -1)
if errcode == 0:
self.boost_entity_id = result.get('data','')
# print(self.boost_entity_id)
if self.boost_entity_id:
li = {
'boost_entity_id':self.boost_entity_id,
'boost_type':self.boost_type,
'need_newuser':self.need_newuser
}
self.boost_id_li.append(li)
return True
else:
errmsg = result.get('errmsg', '')
print(f'发起组队失败[{errcode}]: {errmsg}')
return False
# except Exception as e:
# print(str(e))
def main(self):
# print(self.refresh_token)
if self.get_AccessToken():
@ -428,35 +538,42 @@ class RUN:
self.signStu()
self.getCoinInfo()
self.get_myTeam()
self.boost_recommend()
if self.is_new_user:
self.boost_help()
new_data = {
self.user_id:
{
'status': self.status,
'phone': self.phone,
'invite_code': self.invite_code
'invite_code': self.invite_code,
'boost':self.boost_id_li
}
}
if self.teamId:new_data[self.user_id]['teamId']=self.teamId
CHERWIN_TOOLS.SAVE_INVITE_CODE("INVITE_CODE/PPCS_INVITE_CODE.json", new_data)
self.sendMsg()
return True
else:
Log( f'账号[{self.index}] {CK_NAME}:\n{self.refresh_token}\n已失效请及时更新')
self.sendMsg()
return False
self.sendMsg()
def help(self):
if self.get_AccessToken():
if self.index == 1:
Log('--------签到组队--------')
print('--------签到组队--------')
self.joinAuthorTeam()
else:
Log('--------签到组队--------')
print('--------签到组队--------')
self.joinTeam()
return True
else:
Log(f'账号[{self.index}] {CK_NAME}:\n{self.refresh_token}\n已失效请及时更新')
self.sendMsg(True)
return False
self.sendMsg(True)
def sendMsg(self, help=False):
if self.send_UID:
@ -493,7 +610,7 @@ def down_file(filename, file_url):
def import_Tools():
global CHERWIN_TOOLS,ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode
import CHERWIN_TOOLS
ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode = CHERWIN_TOOLS.main(APP_NAME, local_script_name, ENV_NAME,local_version)
ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode = CHERWIN_TOOLS.main(APP_NAME, local_script_name, ENV_NAME,local_version,True)
if __name__ == '__main__':
APP_NAME = '朴朴超市'
@ -521,16 +638,19 @@ export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.04.06'
if os.path.isfile('CHERWIN_TOOLS.py'):
local_version = '2024.05.15'
if IS_DEV:
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
print('脚本依赖下载失败请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print('脚本依赖下载失败请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token
@ -538,6 +658,12 @@ export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
print(f"未填写{ENV_NAME}变量\n青龙可在环境变量设置 {ENV_NAME} 或者在本脚本文件上方将{CK_NAME}填入token =''")
exit()
tokens = CHERWIN_TOOLS.ENV_SPLIT(token)
# with open('INVITE_CODE/PPCS_INVITE_CODE.json','r') as f:
# content = json.loads(f.read())
# print(content)
# AuthorCode = list(content.values())
# print(AuthorCode)
# print(tokens)
if len(tokens) > 0:
print(f"\n>>>>>>>>>>共获取到{len(tokens)}个账号<<<<<<<<<<")

367
SFSY.py
View File

@ -13,28 +13,37 @@ from datetime import datetime, timedelta
from sys import exit
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
IS_DEV = False
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
else:
print("加载通知服务失败!")
send_msg = ''
one_msg=''
one_msg = ''
def Log(cont=''):
global send_msg,one_msg
global send_msg, one_msg
print(cont)
if cont:
one_msg += f'{cont}\n'
send_msg += f'{cont}\n'
#1905 #0945 #6332 #6615 2559
inviteId=['8C3950A023D942FD93BE9218F5BFB34B','EF94619ED9C84E968C7A88CFB5E0B5DC','9C92BD3D672D4B6EBB7F4A488D020C79','803CF9D1E0734327BDF67CDAE1442B0E','00C81F67BE374041A692FA034847F503']
# 1905 #0945 #6332 #6615 2559
inviteId = [
'8C3950A023D942FD93BE9218F5BFB34B', 'EF94619ED9C84E968C7A88CFB5E0B5DC', '9C92BD3D672D4B6EBB7F4A488D020C79','803CF9D1E0734327BDF67CDAE1442B0E', '00C81F67BE374041A692FA034847F503']
class RUN:
def __init__(self,info,index):
def __init__(self, info, index):
global one_msg
one_msg = ''
split_info = info.split('@')
@ -62,9 +71,12 @@ class RUN:
}
self.anniversary_black = False
self.member_day_black = False
self.member_day_red_packet_drew_today = False
self.member_day_red_packet_map = {}
self.login_res = self.login(url)
self.today = datetime.now().strftime('%Y-%m-%d')
self.answer = APP_INFO.get('ANSWER',[]).get(self.today,False)
self.answer = APP_INFO.get('ANSWER', []).get(self.today, False)
def get_deviceId(self, characters='abcdef0123456789'):
result = ''
@ -77,7 +89,7 @@ class RUN:
result += char
return result
def login(self,sfurl):
def login(self, sfurl):
ress = self.s.get(sfurl, headers=self.headers)
# print(ress.text)
self.user_id = self.s.cookies.get_dict().get('_login_user_id_', '')
@ -89,6 +101,7 @@ class RUN:
else:
Log(f'获取用户信息失败')
return False
def getSign(self):
timestamp = str(int(round(time.time() * 1000)))
token = 'wwesldfs29aniversaryvdld29'
@ -102,7 +115,8 @@ class RUN:
}
self.headers.update(data)
return data
def do_request(self, url, data = {}, req_type = 'post'):
def do_request(self, url, data={}, req_type='post'):
self.getSign()
try:
if req_type.lower() == 'get':
@ -120,20 +134,19 @@ class RUN:
print('JSON decoding failed:', e)
return None
def sign(self):
print(f'>>>>>>开始执行签到')
json_data = {"comeFrom":"vioin","channelFrom":"WEIXIN"}
json_data = {"comeFrom": "vioin", "channelFrom": "WEIXIN"}
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~integralTaskSignPlusService~automaticSignFetchPackage'
response = self.do_request(url, data=json_data)
# print(response)
if response.get('success') == True:
count_day = response.get('obj',{}).get('countDay',0)
count_day = response.get('obj', {}).get('countDay', 0)
if response.get('obj') and response['obj'].get('integralTaskSignPackageVOList'):
packet_name = response["obj"]["integralTaskSignPackageVOList"][0]["packetName"]
Log(f'>>>签到成功,获得【{packet_name}】,本周累计签到【{count_day+1}】天')
Log(f'>>>签到成功,获得【{packet_name}】,本周累计签到【{count_day + 1}】天')
else:
Log(f'今日已签到,本周累计签到【{count_day+1}】天')
Log(f'今日已签到,本周累计签到【{count_day + 1}】天')
else:
print(f'签到失败!原因:{response.get("errorMessage")}')
@ -143,7 +156,7 @@ class RUN:
'channel': 'czflqdlhbxcx'
}
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberActLengthy~redPacketActivityService~superWelfare~receiveRedPacket'
response = self.do_request(url,data=json_data)
response = self.do_request(url, data=json_data)
# print(response)
if response.get('success') == True:
gift_list = response.get('obj', {}).get('giftList', [])
@ -157,16 +170,16 @@ class RUN:
error_message = response.get('errorMessage') or json.dumps(response) or '无返回'
print(f'超值福利签到失败: {error_message}')
def get_SignTaskList(self,END=False):
if not END:print(f'>>>开始获取签到任务列表')
def get_SignTaskList(self, END=False):
if not END: print(f'>>>开始获取签到任务列表')
json_data = {
'channelType': '3',
'deviceId': self.get_deviceId(),
}
url='https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~integralTaskStrategyService~queryPointTaskAndSignFromES'
response = self.do_request(url,data=json_data)
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~integralTaskStrategyService~queryPointTaskAndSignFromES'
response = self.do_request(url, data=json_data)
# print(response)
if response.get('success') == True and response.get('obj') !=[]:
if response.get('success') == True and response.get('obj') != []:
totalPoint = response["obj"]["totalPoint"]
if END:
Log(f'当前积分:【{totalPoint}')
@ -178,7 +191,7 @@ class RUN:
self.strategyId = task["strategyId"]
self.title = task["title"]
status = task["status"]
skip_title = ['用行业模板寄件下单','去新增一个收件偏好','参与积分活动']
skip_title = ['用行业模板寄件下单', '去新增一个收件偏好', '参与积分活动']
if status == 3:
print(f'>{self.title}-已完成')
continue
@ -198,8 +211,8 @@ class RUN:
json_data = {
'taskCode': self.taskCode,
}
url='https://mcs-mimp-web.sf-express.com/mcs-mimp/commonRoutePost/memberEs/taskRecord/finishTask'
response = self.do_request(url,data=json_data)
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonRoutePost/memberEs/taskRecord/finishTask'
response = self.do_request(url, data=json_data)
if response.get('success') == True:
print(f'>【{self.title}】任务-已完成')
else:
@ -233,11 +246,11 @@ class RUN:
def receive_honeyTask(self):
print('>>>执行收取丰蜜任务')
# 收取
self.headers['syscode']='MCS-MIMP-CORE'
self.headers['channel']='wxwdsj'
self.headers['accept']='application/json, text/plain, */*'
self.headers['content-type']='application/json;charset=UTF-8'
self.headers['platform']='MINI_PROGRAM'
self.headers['syscode'] = 'MCS-MIMP-CORE'
self.headers['channel'] = 'wxwdsj'
self.headers['accept'] = 'application/json, text/plain, */*'
self.headers['content-type'] = 'application/json;charset=UTF-8'
self.headers['platform'] = 'MINI_PROGRAM'
json_data = {"taskType": self.taskType}
# print(json_data)
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~receiveExchangeIndexService~receiveHoney'
@ -295,8 +308,8 @@ class RUN:
print('>>>开始获取采蜜换大礼任务列表')
# 任务列表
json_data = {}
self.headers['channel']='wxwdsj'
url ='https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~receiveExchangeIndexService~taskDetail'
self.headers['channel'] = 'wxwdsj'
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~receiveExchangeIndexService~taskDetail'
response = self.do_request(url, data=json_data)
# print(response)
@ -323,11 +336,11 @@ class RUN:
print('>>>执行大冒险任务')
# 大冒险
gameNum = 5
for i in range(1,gameNum):
for i in range(1, gameNum):
json_data = {
'gatherHoney': 20,
}
if gameNum < 0 :break
if gameNum < 0: break
print(f'>>开始第{i}次大冒险')
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~receiveExchangeGameService~gameReport'
response = self.do_request(url, data=json_data)
@ -342,8 +355,8 @@ class RUN:
print(f'>大冒险失败!【{response.get("errorMessage")}')
break
def honey_indexData(self,END=False):
if not END:print('\n>>>>>>>开始执行采蜜换大礼任务')
def honey_indexData(self, END=False):
if not END: print('\n>>>>>>>开始执行采蜜换大礼任务')
# 邀请
random_invite = random.choice([invite for invite in inviteId if invite != self.user_id])
self.headers['channel'] = 'wxwdsj'
@ -357,7 +370,7 @@ class RUN:
return
Log(f'执行前丰蜜:【{usableHoney}')
taskDetail = response.get('obj').get('taskDetail')
activityEndTime = response.get('obj').get('activityEndTime','')
activityEndTime = response.get('obj').get('activityEndTime', '')
activity_end_time = datetime.strptime(activityEndTime, "%Y-%m-%d %H:%M:%S")
current_time = datetime.now()
@ -372,8 +385,6 @@ class RUN:
self.receive_honeyTask()
time.sleep(2)
def EAR_END_2023_TaskList(self):
print('\n>>>>>>开始年终集卡任务')
# 任务列表
@ -381,11 +392,11 @@ class RUN:
"activityCode": "YEAR_END_2023",
"channelType": "MINI_PROGRAM"
}
self.headers['channel']='xcx23nz'
self.headers['platform']='MINI_PROGRAM'
self.headers['syscode']='MCS-MIMP-CORE'
self.headers['channel'] = 'xcx23nz'
self.headers['platform'] = 'MINI_PROGRAM'
self.headers['syscode'] = 'MCS-MIMP-CORE'
url ='https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~activityTaskService~taskList'
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~activityTaskService~taskList'
response = self.do_request(url, data=json_data)
# print(response)
@ -476,7 +487,7 @@ class RUN:
print(f'>>>开始抽卡')
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~yearEnd2023GardenPartyService~getAward'
for l in range(10):
for i in range(0,3):
for i in range(0, 3):
json_data = {
"cardType": i
}
@ -497,7 +508,7 @@ class RUN:
def EAR_END_2023_GuessIdiom(self):
print(f'>>>开始猜成语')
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~yearEnd2023GuessIdiomService~win'
for i in range(1,11):
for i in range(1, 11):
json_data = {
"index": i
}
@ -545,6 +556,7 @@ class RUN:
print(f'查询每周领券失败: {error_message}')
if '系统繁忙' in error_message or '用户手机号校验未通过' in error_message:
self.anniversary_black = True
def anniversary2024_receive_weekly_gift(self):
print(f'>>>开始领取每周领券')
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~anniversary2024IndexService~receiveWeeklyGift'
@ -554,7 +566,7 @@ class RUN:
print(f'每周领券: {product_names}')
else:
error_message = response.get('errorMessage') or json.dumps(response) or '无返回'
print( f'每周领券失败: {error_message}')
print(f'每周领券失败: {error_message}')
if '系统繁忙' in error_message or '用户手机号校验未通过' in error_message:
self.anniversary_black = True
@ -581,13 +593,17 @@ class RUN:
pass
elif task['taskType'] == 'FOLLOW_SFZHUNONG_VEDIO_ID':
pass
elif task['taskType'] in ['BROWSE_VIP_CENTER', 'GUESS_GAME_TIP', 'CREATE_SFID', 'CLICK_MY_SETTING', 'CLICK_TEMPLATE', 'REAL_NAME', 'SEND_SUCCESS_RECALL', 'OPEN_SVIP', 'OPEN_FAST_CARD', 'FIRST_CHARGE_NEW_EXPRESS_CARD', 'CHARGE_NEW_EXPRESS_CARD', 'INTEGRAL_EXCHANGE']:
elif task['taskType'] in ['BROWSE_VIP_CENTER', 'GUESS_GAME_TIP', 'CREATE_SFID', 'CLICK_MY_SETTING',
'CLICK_TEMPLATE', 'REAL_NAME', 'SEND_SUCCESS_RECALL', 'OPEN_SVIP',
'OPEN_FAST_CARD', 'FIRST_CHARGE_NEW_EXPRESS_CARD', 'CHARGE_NEW_EXPRESS_CARD',
'INTEGRAL_EXCHANGE']:
pass
else:
for _ in range(task['restFinishTime']):
if self.anniversary_black:
break
self.anniversary2024_finishTask(task)
def anniversary2024_finishTask(self, task):
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonRoutePost/memberEs/taskRecord/finishTask'
data = {'taskCode': task['taskCode']}
@ -597,7 +613,8 @@ class RUN:
# 完成任务后获取任务奖励的逻辑
self.anniversary2024_fetchMixTaskReward(task)
else:
print('完成任务[%s]失败: %s' % (task['taskName'], response.get('errorMessage') or (json.dumps(response) if response else '无返回')))
print('完成任务[%s]失败: %s' % (
task['taskName'], response.get('errorMessage') or (json.dumps(response) if response else '无返回')))
def anniversary2024_fetchMixTaskReward(self, task):
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~anniversary2024TaskService~fetchMixTaskReward'
@ -885,7 +902,7 @@ class RUN:
self.cards[currency_key] = int(card.get('balance'))
card_info.append('[' + card.get('currency') + ']X' + str(card.get('balance')))
Log(f'已收集拼图: {card_info}' )
Log(f'已收集拼图: {card_info}')
cards_li.sort(key=lambda x: x.get('balance'), reverse=True)
else:
@ -896,11 +913,11 @@ class RUN:
def do_draw(self, cards):
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~anniversary2024CardService~collectDrawAward'
data = {"accountList":cards}
data = {"accountList": cards}
response = self.do_request(url, data)
if response and response.get('success'):
data = response.get('obj',{})
productName = data.get('productName','')
data = response.get('obj', {})
productName = data.get('productName', '')
Log(f'抽奖成功,获得{productName}')
return True
else:
@ -908,7 +925,7 @@ class RUN:
print(f'抽奖失败: {error_message}')
return False
def convert_common_card(self,cards, target_card):
def convert_common_card(self, cards, target_card):
# 如果共通卡(COMMON_CARD)的数量大于0转化成目标卡
if cards['COMMON_CARD'] > 0:
cards['COMMON_CARD'] -= 1
@ -916,13 +933,12 @@ class RUN:
return True
return False
def can_draw(self,cards, n):
def can_draw(self, cards, n):
# 判断是否有足够的不同卡进行抽奖
distinct_cards = sum(1 for card, amount in cards.items() if card != 'COMMON_CARD' and amount > 0)
return distinct_cards >= n
def draw(self,cards, n):
def draw(self, cards, n):
drawn_cards = []
for card, amount in sorted(cards.items(), key=lambda item: item[1]):
if card != 'COMMON_CARD' and amount > 0:
@ -937,7 +953,7 @@ class RUN:
else:
return None
def simulate_lottery(self,cards):
def simulate_lottery(self, cards):
while self.can_draw(cards, 9):
used_cards = self.draw(cards, 9)
print("进行了一次9卡抽奖消耗卡片: ", used_cards)
@ -973,15 +989,210 @@ class RUN:
else:
print('未到自动抽奖时间')
def member_day_index(self):
try:
invite_user_id = random.choice([invite for invite in inviteId if invite != self.user_id])
payload = {'inviteUserId': invite_user_id}
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~memberDayIndexService~index'
response = self.do_request(url, data=payload)
if response.get('success'):
lottery_num = response.get('obj', {}).get('lotteryNum', 0)
can_receive_invite_award = response.get('obj', {}).get('canReceiveInviteAward', False)
if can_receive_invite_award:
self.member_day_receive_invite_award(invite_user_id)
self.member_day_red_packet_status()
Log(f'会员日可以抽奖{lottery_num}')
for _ in range(lottery_num):
self.member_day_lottery()
if self.member_day_black:
return
self.member_day_task_list()
if self.member_day_black:
return
self.member_day_red_packet_status()
else:
error_message = response.get('errorMessage', '无返回')
Log(f'查询会员日失败: {error_message}')
if '没有资格参与活动' in error_message:
self.member_day_black = True
Log('会员日任务风控')
except Exception as e:
print(e)
def member_day_receive_invite_award(self, invite_user_id):
try:
payload = {'inviteUserId': invite_user_id}
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~memberDayIndexService~receiveInviteAward'
response = self.do_request(url, payload)
if response.get('success'):
product_name = response.get('obj', {}).get('productName', '空气')
Log(f'会员日奖励: {product_name}')
else:
error_message = response.get('errorMessage', '无返回')
Log(f'领取会员日奖励失败: {error_message}')
if '没有资格参与活动' in error_message:
self.member_day_black = True
Log('会员日任务风控')
except Exception as e:
print(e)
def member_day_lottery(self):
try:
payload = {}
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~memberDayLotteryService~lottery'
response = self.do_request(url, payload)
if response.get('success'):
product_name = response.get('obj', {}).get('productName', '空气')
Log(f'会员日抽奖: {product_name}')
else:
error_message = response.get('errorMessage', '无返回')
Log(f'会员日抽奖失败: {error_message}')
if '没有资格参与活动' in error_message:
self.member_day_black = True
Log('会员日任务风控')
except Exception as e:
print(e)
def member_day_task_list(self):
try:
payload = {'activityCode': 'MEMBER_DAY', 'channelType': 'MINI_PROGRAM'}
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~activityTaskService~taskList'
response = self.do_request(url, payload)
if response.get('success'):
task_list = response.get('obj', [])
for task in task_list:
if task['status'] == 1:
if self.member_day_black:
return
self.member_day_fetch_mix_task_reward(task)
for task in task_list:
if task['status'] == 2:
if self.member_day_black:
return
if task['taskType'] in ['SEND_SUCCESS', 'INVITEFRIENDS_PARTAKE_ACTIVITY', 'OPEN_SVIP',
'OPEN_NEW_EXPRESS_CARD', 'OPEN_FAMILY_CARD', 'CHARGE_NEW_EXPRESS_CARD',
'INTEGRAL_EXCHANGE']:
pass
else:
for _ in range(task['restFinishTime']):
if self.member_day_black:
return
self.member_day_finish_task(task)
else:
error_message = response.get('errorMessage', '无返回')
Log('查询会员日任务失败: ' + error_message)
if '没有资格参与活动' in error_message:
self.member_day_black = True
Log('会员日任务风控')
except Exception as e:
print(e)
def member_day_finish_task(self, task):
try:
payload = {'taskCode': task['taskCode']}
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberEs~taskRecord~finishTask'
response = self.do_request(url, payload)
if response.get('success'):
Log('完成会员日任务[' + task['taskName'] + ']成功')
self.member_day_fetch_mix_task_reward(task)
else:
error_message = response.get('errorMessage', '无返回')
Log('完成会员日任务[' + task['taskName'] + ']失败: ' + error_message)
if '没有资格参与活动' in error_message:
self.member_day_black = True
Log('会员日任务风控')
except Exception as e:
print(e)
def member_day_fetch_mix_task_reward(self, task):
try:
payload = {'taskType': task['taskType'], 'activityCode': 'MEMBER_DAY', 'channelType': 'MINI_PROGRAM'}
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~activityTaskService~fetchMixTaskReward'
response = self.do_request(url, payload)
if response.get('success'):
Log('领取会员日任务[' + task['taskName'] + ']奖励成功')
else:
error_message = response.get('errorMessage', '无返回')
Log('领取会员日任务[' + task['taskName'] + ']奖励失败: ' + error_message)
if '没有资格参与活动' in error_message:
self.member_day_black = True
Log('会员日任务风控')
except Exception as e:
print(e)
def member_day_receive_red_packet(self, hour):
try:
payload = {'receiveHour': hour}
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~memberDayTaskService~receiveRedPacket'
response = self.do_request(url, payload)
if response.get('success'):
Log(f'会员日领取{hour}点红包成功')
else:
error_message = response.get('errorMessage', '无返回')
Log(f'会员日领取{hour}点红包失败: {error_message}')
if '没有资格参与活动' in error_message:
self.member_day_black = True
Log('会员日任务风控')
except Exception as e:
print(e)
def member_day_red_packet_status(self):
try:
payload = {}
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~memberDayPacketService~redPacketStatus'
response = self.do_request(url, payload)
if response.get('success'):
packet_list = response.get('obj', {}).get('packetList', [])
for packet in packet_list:
self.member_day_red_packet_map[packet['level']] = packet['count']
# Rest of the logic to process packet list
else:
error_message = response.get('errorMessage', '无返回')
Log(f'查询会员日合成失败: {error_message}')
if '没有资格参与活动' in error_message:
self.member_day_black = True
Log('会员日任务风控')
except Exception as e:
print(e)
def member_day_red_packet_merge(self, level):
try:
payload = {'level': level, 'num': 2}
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~memberDayPacketService~redPacketMerge'
response = self.do_request(url, payload)
if response.get('success'):
Log(f'会员日合成: [{level}级]红包X2 -> [{level + 1}级]红包')
self.member_day_red_packet_map[level] -= 2
if not self.member_day_red_packet_map.get(level + 1):
self.member_day_red_packet_map[level + 1] = 0
self.member_day_red_packet_map[level + 1] += 1
else:
error_message = response.get('errorMessage', '无返回')
Log(f'会员日合成两个[{level}级]红包失败: {error_message}')
if '没有资格参与活动' in error_message:
self.member_day_black = True
Log('会员日任务风控')
except Exception as e:
print(e)
def main(self):
global one_msg
wait_time = random.randint(1000, 3000) / 1000.0 # 转换为秒
time.sleep(wait_time) # 等待
one_msg = ''
if not self.login_res:return False
# 执行签到任务
if not self.login_res: return False
执行签到任务
self.sign()
self.superWelfare_receiveRedPacket()
self.get_SignTaskList()
@ -992,17 +1203,25 @@ class RUN:
#获取任务列表并执行任务
self.get_honeyTaskListStart()
self.honey_indexData(True)
#######################################
# # 获取当前季度结束日期
# activity_end_date = get_quarter_end_date()
# if is_activity_end_date(activity_end_date):
# Log("今天采蜜活动截止兑换,请及时进行兑换")
# send('顺丰速运挂机通知', "今天采蜜活动截止兑换,请及时进行兑换")
target_time = datetime(2024, 4, 8, 19, 0)
if datetime.now() < target_time:
# self.EAR_END_2023_TaskList()
self.anniversary2024_task()
# target_time = datetime(2024, 4, 8, 19, 0)
# if datetime.now() < target_time:
# # self.EAR_END_2023_TaskList()
# self.anniversary2024_task()
# else:
# print('周年庆活动已结束')
#######################################
self.member_day_index()
current_date = datetime.now().day
if 26 <= current_date <= 28:
self.member_day_index()
else:
print('周年庆活动已结束')
print('未到指定时间不执行会员日任务')
self.sendMsg()
return True
@ -1012,7 +1231,6 @@ class RUN:
print(push_res)
def get_quarter_end_date():
current_date = datetime.now()
current_month = current_date.month
@ -1026,6 +1244,7 @@ def get_quarter_end_date():
return quarter_end_date.strftime("%Y-%m-%d")
def is_activity_end_date(end_date):
current_date = datetime.now().date()
end_date = datetime.strptime(end_date, "%Y-%m-%d").date()
@ -1033,7 +1252,6 @@ def is_activity_end_date(end_date):
return current_date == end_date
def down_file(filename, file_url):
print(f'开始下载:{filename},下载地址:{file_url}')
try:
@ -1060,10 +1278,13 @@ def down_file(filename, file_url):
print(f'{filename}】下载失败:{str(e)}')
return False
def import_Tools():
global CHERWIN_TOOLS,ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode
global CHERWIN_TOOLS, ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode
import CHERWIN_TOOLS
ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode = CHERWIN_TOOLS.main(APP_NAME, local_script_name, ENV_NAME,local_version)
ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode = CHERWIN_TOOLS.main(APP_NAME, local_script_name, ENV_NAME,
local_version)
if __name__ == '__main__':
APP_NAME = '顺丰速运'
@ -1094,16 +1315,19 @@ export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.04.16'
if os.path.isfile('CHERWIN_TOOLS.py'):
local_version = '2024.05.15'
if IS_DEV:
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
print('脚本依赖下载失败请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print('脚本依赖下载失败请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token
@ -1118,4 +1342,3 @@ export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
run_result = RUN(infos, index).main()
if not run_result: continue
if send: send(f'{APP_NAME}挂机通知', send_msg + TIPS_HTML)

54
TBHYZX.py Normal file → Executable file
View File

@ -15,8 +15,10 @@ from requests.packages.urllib3.exceptions import InsecureRequestWarning
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
#
if os.path.isfile('/DEV_ENV.py'):
IS_DEV = False
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
@ -36,8 +38,7 @@ class RUN:
global one_msg
one_msg = ''
split_info = info.split('@')
self.token = split_info[0]
self.token = json.loads(split_info[0])
# print(self.token)
len_split_info = len(split_info)
last_info = split_info[len_split_info - 1]
@ -69,6 +70,8 @@ class RUN:
}
#
# print(self.headers)
for key, value in self.token.items():
self.headers[key] = value
self.baseUrl = 'https://wxa-tp.ezrpro.com/myvip/'
@ -120,17 +123,30 @@ class RUN:
def WxAppOnLoginNew(self):
Log('>>>>>>登陆')
url = "https://wxa-tp.ezrpro.com/myvip/Base/User/WxAppOnLoginNew"
# data = {
# "code": self.token,
# "CommonIdType": "VipWxUnionId",
# "CommonId": "124948229",
# "ShopId": 0,
# "CommonIdSource": 47,
# "Latitude": 0,
# "Longitude": 0,
# "InviteActObj": "{\"ActId\":50726}",
# "PingId": "peLXqZCQAAD-C3PrZ36DkaE98NoLjuqQ",
# # "PingDate": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# "PingDate": '2024-03-19 22:23:01'
# }
data = {
"code": self.token,
"CommonIdType": "VipId",
"CommonId": "124948229",
"CommonIdType": "VipWxUnionId",
"CommonId": "",
"ShopId": 0,
"CommonIdSource": 98,
"CommonIdSource": 47,
"Latitude": 0,
"Longitude": 0,
"InviteActObj": "{\"ActId\":50726}",
"PingId": "peLXqZCQAA3W2G4d-pEZ8FRPEjxKdzoQ",
"PingDate": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
"code": "0c1QVUFa1bogqH0W87Ga1ZHjjG3QVUFP",
"PingId": "peLXqZCQAAD-C3PrZ36DkaE98NoLjuqQ",
"PingDate": "2024-03-19 22:23:01"
}
response = s.post(url, headers=self.headers,json=data)
response = response.json()
@ -187,8 +203,9 @@ class RUN:
def main(self):
Log(f"\n开始执行第{self.index}个账号--------------->>>>>")
if self.WxAppOnLoginNew():
self.GetVipCardInfoByVipId()
# if self.WxAppOnLoginNew():
if self.GetVipCardInfoByVipId():
# self.GetVipCardInfoByVipId()
self.GetSignInDtlInfo()
self.BonusClassify()
self.sendMsg()
@ -261,16 +278,19 @@ export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
@Author CHERWIN
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.05.08'
if os.path.isfile('CHERWIN_TOOLS.py'):
local_version = '2024.05.15'
if IS_DEV:
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
print('脚本依赖下载失败请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print('脚本依赖下载失败请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token

59
TYQH.py
View File

@ -5,23 +5,21 @@
# -------------------------------
# cron "25 10-22/2 * * *" script-path=xxx.py,tag=匹配cron用
# const $ = new Env('统一快乐星球小程序-茄皇的家')
import hashlib
import json
import os
import random
import string
import time
from datetime import datetime
from os import environ, path
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
IS_DEV = False
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
@ -128,40 +126,7 @@ class RUN:
print("发生了未知错误:", e)
def gen_sign(self, parameters={}, body=None):
sorted_keys = sorted(parameters.keys())
parameter_strings = []
for key in sorted_keys:
if isinstance(parameters[key], dict):
parameter_strings.append(f"{key}={json.dumps(parameters[key])}")
else:
parameter_strings.append(f"{key}={parameters[key]}")
current_time = int(datetime.now().timestamp() * 1000)
secret_chars = list('BxzTx45uIGT25TTHIIBU2')
last_three_digits = str(current_time)[-3:]
for digit in last_three_digits:
secret_chars.insert(int(digit), digit)
secret = hashlib.md5(''.join(secret_chars).encode()).hexdigest()
nonce_str = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
sign_data = {
'client_id': 'game',
'nonstr': nonce_str,
'timestamp': current_time,
'body': json.dumps(body) if body else '',
'query': '&'.join(parameter_strings) if parameter_strings else '',
'secret': secret
}
sign_string = '|'.join([str(v) for v in sign_data.values()])
sign = hashlib.md5(sign_string.encode()).hexdigest().upper()
sign_header = {
'client_id': 'game',
'timestamp': str(current_time),
'nonstr': sign_data['nonstr'],
'sign': sign
}
sign_header = CHERWIN_TOOLS.TYQH_SIGN(parameters, body)
self.headers.update(sign_header)
return self.headers
@ -1212,16 +1177,20 @@ export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.05.04'
if os.path.isfile('CHERWIN_TOOLS.py'):
local_version = '2024.05.15'
if IS_DEV:
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
print('脚本依赖下载失败请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print(
'脚本依赖下载失败请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token

53
TYQH_JK.py Normal file → Executable file
View File

@ -20,8 +20,10 @@ from requests.packages.urllib3.exceptions import InsecureRequestWarning
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
IS_DEV = False
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
@ -108,40 +110,7 @@ class RUN:
print("发生了未知错误:", e)
def gen_sign(self, parameters={}, body=None):
sorted_keys = sorted(parameters.keys())
parameter_strings = []
for key in sorted_keys:
if isinstance(parameters[key], dict):
parameter_strings.append(f"{key}={json.dumps(parameters[key])}")
else:
parameter_strings.append(f"{key}={parameters[key]}")
current_time = int(datetime.now().timestamp() * 1000)
secret_chars = list('BxzTx45uIGT25TTHIIBU2')
last_three_digits = str(current_time)[-3:]
for digit in last_three_digits:
secret_chars.insert(int(digit), digit)
secret = hashlib.md5(''.join(secret_chars).encode()).hexdigest()
nonce_str = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
sign_data = {
'client_id': 'game',
'nonstr': nonce_str,
'timestamp': current_time,
'body': json.dumps(body) if body else '',
'query': '&'.join(parameter_strings) if parameter_strings else '',
'secret': secret
}
sign_string = '|'.join([str(v) for v in sign_data.values()])
sign = hashlib.md5(sign_string.encode()).hexdigest().upper()
sign_header = {
'client_id': 'game',
'timestamp': str(current_time),
'nonstr': sign_data['nonstr'],
'sign': sign
}
sign_header=CHERWIN_TOOLS.TYQH_SIGN(parameters,body)
self.headers.update(sign_header)
return self.headers
@ -367,16 +336,20 @@ export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
@Author CHERWIN
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.05.04'
if os.path.isfile('CHERWIN_TOOLS.py'):
local_version = '2024.05.15'
if IS_DEV:
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
print('脚本依赖下载失败请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print(
'脚本依赖下载失败请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
ENV = os.environ.get('TYQH','')

18
TYYP.py
View File

@ -16,8 +16,10 @@ from requests.packages.urllib3.exceptions import InsecureRequestWarning
# import CHERWIN_TOOLS
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
IS_DEV = False
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
@ -251,16 +253,20 @@ export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
@Author CHERWIN
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.04.06'
if os.path.isfile('CHERWIN_TOOLS.py'):
local_version = '2024.05.15'
if IS_DEV:
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
print('脚本依赖下载失败请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print(
'脚本依赖下载失败请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token

18
YDKD.py
View File

@ -32,8 +32,10 @@ from requests.packages.urllib3.exceptions import InsecureRequestWarning
# import CHERWIN_TOOLS
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
IS_DEV = False
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
@ -339,16 +341,20 @@ export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
@Author CHERWIN
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.04.06'
if os.path.isfile('CHERWIN_TOOLS.py'):
local_version = '2024.05.15'
if IS_DEV:
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
print('脚本依赖下载失败请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print(
'脚本依赖下载失败请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token

77
YHSH.py
View File

@ -21,8 +21,10 @@ from requests.packages.urllib3.exceptions import InsecureRequestWarning
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
IS_DEV = False
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
@ -946,6 +948,58 @@ class RUN:
# print(response)
else:
print(f'>申请失败,{response}')
def sixYears_getTask(self):
print(f'>>开始获取6周年任务列表')
headers = {
'Accept': '*/*'
}
url = f'https://api.yonghuivip.com/web/marketing/quick/task/loadTask?activityId=3572&{self.url_add}'
response = self.do_request(url, headers=headers, req_type='get')
if 'code' in response and response.get('code') == 0:
data = response.get('data',[{}])
for task in data:
taskId = task.get('taskId','')
taskType = task.get('taskType','')
taskStatus = task.get('taskStatus','')
taskTitle = task.get('taskTitle','')
if taskStatus == 0:
print(f'任务[{taskTitle}]已完成')
continue
self.sixYears_doTask(taskId,taskType,taskTitle)
else:
Log(f'获取6周年任务列表{response}')
def sixYears_doTask(self,taskId,taskType,taskTitle):
print(f'>>开始执行{taskTitle}任务')
headers= self.headers.copy()
headers_new = {
'Accept': 'application/json',
'Content-Type': 'application/json'
}
headers.update(headers_new)
data = {
"activityId": '3572',
"activityCode": "FJ-MD-05",
"taskId": taskId,
"taskType": taskType
}
url = f'https://api.yonghuivip.com/web/marketing/quick/task/doTask?{self.url_add}'
response = self.do_request(url, headers=headers,data=data)
if 'code' in response and response.get('code') == 0:
data = response.get('data',{})
count = data.get('count',0)
print(f'获得【{count}】次抽奖机会')
else:
Log(f'>执行{taskTitle}任务失败,{response}')
def help_fun(self):
print(f"\n>>>>>>>>>>开始互助<<<<<<<<<<")
one_msg = ''
@ -982,6 +1036,7 @@ class RUN:
self.Boostcoupon()
else:
self.Boostcoupon(GameCode)
self.sixYears_getTask()
new_data = {
self.memberId:
{
@ -1074,16 +1129,20 @@ export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
@Author CHERWIN
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.04.17'
if os.path.isfile('CHERWIN_TOOLS.py'):
local_version = '2024.05.15'
if IS_DEV:
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
print('脚本依赖下载失败请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print(
'脚本依赖下载失败请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token
@ -1097,8 +1156,8 @@ export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
for index, infos in enumerate(tokens):
run_result = RUN(infos, index).main()
if not run_result:continue
for index, infos in enumerate(tokens):
RUN(infos, index).help_fun()
if not run_result: continue
# for index, infos in enumerate(tokens):
# RUN(infos, index).help_fun()
# if not run_result: continue
if send: send(f'{APP_NAME}挂机通知', send_msg + TIPS_HTML)

149
ZTKD.py
View File

@ -1,56 +1,42 @@
# !/usr/bin/python3
# -- coding: utf-8 --
# -------------------------------
# ✨✨✨ 中通快递小程序签到✨✨✨
# ✨ 功能:
# 积分签到
# ✨ 抓包步骤:
# 打开中通快递小程序
# 授权登陆
# 打开抓包工具
# 找URl请求头带[x-token或者token]
# 复制里面的[x-token或者token]参数值
# 参数示例eyJhbGciOiJIUzUxMiJ9.eyJnZW5lcmF0ZVRpbWUixxxxxx
# ✨ 设置青龙变量:
# export YDKD='x-token或者token参数值'多账号#或&分割
# export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
# ✨ ✨ 注意抓完CK没事儿别打开小程序重新打开小程序请重新抓包
# ✨ 推荐cron0 6 * * *
# ✨✨✨ @Author CHERWIN✨✨✨
# -------------------------------
# cron "0 6 * * *" script-path=xxx.py,tag=匹配cron用
# const $ = new Env('中通快递小程序签到')
import json
import os
import random
import time
from datetime import datetime, date
from os import path
import requests
import hashlib
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# import CHERWIN_TOOLS
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
#
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
else:
print("加载通知服务失败!")
send_msg = ''
one_msg=''
one_msg = ''
def Log(cont=''):
global send_msg,one_msg
global send_msg, one_msg
print(cont)
if cont:
one_msg += f'{cont}\n'
send_msg += f'{cont}\n'
class RUN:
def __init__(self,info,index):
def __init__(self, info, index):
global one_msg
one_msg = ''
split_info = info.split('@')
@ -83,18 +69,18 @@ class RUN:
self.list_index = 0
self.isFirstTask = True
def get_point(self,END=False):
def get_point(self, END=False):
Log('>>>>>>获取积分信息')
json_data = {}
response = s.post(f'{self.baseUrl}user/point/get', headers=self.headers,json=json_data)
response = s.post(f'{self.baseUrl}user/point/get', headers=self.headers, json=json_data)
point_info = response.json()
# print(point_info)
code = point_info.get('code',-1)
if point_info.get('success',False) == True and code == 10000:
data = point_info.get('data',[{}])
point=data.get('point',0)
mobile=data.get('mobile','')
mobile=mobile[:3] + "*" * 4 + mobile[7:]
code = point_info.get('code', -1)
if point_info.get('success', False) == True and code == 10000:
data = point_info.get('data', [{}])
point = data.get('point', 0)
mobile = data.get('mobile', '')
mobile = mobile[:3] + "*" * 4 + mobile[7:]
if END:
Log(f'>>执行后积分:【{point}')
else:
@ -106,43 +92,42 @@ class RUN:
return False
def Check_sign(self):
Log('>>>>>>查询签到')
json_data = {"calendarType":0}
response = s.post(f'{self.baseUrl}member/sign/v2/calendar', headers=self.headers,json=json_data)
response = response.json()
# print(point_info)
code = response.get('code', -1)
if response['success']== True and response['data'] != None and code == 10000:
data=response.get('data',{})
dayList=data.get('dayList',[{}])
signDays=data.get('signDays',0)
for day in dayList:
dates = day.get('date','')
point = day.get('point','')
signFlag = day.get('signFlag','')
current_date = date.today()
parsed_date = datetime.strptime(dates, '%Y-%m-%d').date()
if parsed_date == current_date:
if signFlag == 1:
Log(f'>>今日已签到,连续签到【{signDays}】天,获得【{point}】积分')
else:
self.sign()
else:
Log(f"查询签到失败,{response['msg']}")
Log('>>>>>>查询签到')
json_data = {"calendarType": 0}
response = s.post(f'{self.baseUrl}member/sign/v2/calendar', headers=self.headers, json=json_data)
response = response.json()
# print(point_info)
code = response.get('code', -1)
if response['success'] == True and response['data'] != None and code == 10000:
data = response.get('data', {})
dayList = data.get('dayList', [{}])
signDays = data.get('signDays', 0)
for day in dayList:
dates = day.get('date', '')
point = day.get('point', '')
signFlag = day.get('signFlag', '')
current_date = date.today()
parsed_date = datetime.strptime(dates, '%Y-%m-%d').date()
if parsed_date == current_date:
if signFlag == 1:
Log(f'>>今日已签到,连续签到【{signDays}】天,获得【{point}】积分')
else:
self.sign()
else:
Log(f"查询签到失败,{response['msg']}")
def sign(self):
Log('>>>签到')
json_data = {}
response = s.post(f'{self.baseUrl}member/sign/v2/userSignIn', headers=self.headers,json=json_data)
point_info = response.json()
# print(point_info)
code = point_info.get('code', -1)
if point_info['success']== True and point_info['data'] != None and code == 10000:
point=point_info['data']['point']
Log(f'>>签到成功获得:【{point}】积分')
else:
Log(f">>签到失败,{point_info['msg']}")
Log('>>>签到')
json_data = {}
response = s.post(f'{self.baseUrl}member/sign/v2/userSignIn', headers=self.headers, json=json_data)
point_info = response.json()
# print(point_info)
code = point_info.get('code', -1)
if point_info['success'] == True and point_info['data'] != None and code == 10000:
point = point_info['data']['point']
Log(f'>>签到成功获得:【{point}】积分')
else:
Log(f">>签到失败,{point_info['msg']}")
def main(self):
print(f"\n开始执行第{self.index}个账号--------------->>>>>")
@ -186,15 +171,17 @@ def down_file(filename, file_url):
print(f'{filename}】下载失败:{str(e)}')
return False
def import_Tools():
global CHERWIN_TOOLS,ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode
global CHERWIN_TOOLS, ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode
import CHERWIN_TOOLS
ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode = CHERWIN_TOOLS.main(APP_NAME, local_script_name, ENV_NAME,local_version)
if __name__ == '__main__':
APP_NAME = '中通快递小程序'
ENV_NAME = 'ZTKD'
CK_NAME = 'x-token或者token'
CK_NAME = 'token'
print(f'''
{APP_NAME}签到
功能
@ -203,7 +190,7 @@ if __name__ == '__main__':
打开{APP_NAME}
授权登陆
打开抓包工具
URl请求头带[{CK_NAME}]
https://api.ztomember.com/api/user/point/get请求头里的[{CK_NAME}]
复制里面的[{CK_NAME}]参数值
参数示例eyJhbGciOiJIUzUxMiJ9.eyJnZW5lcmF0ZVRpbWUixxxxxx
设置青龙变量
@ -214,16 +201,20 @@ export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
@Author CHERWIN
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.05.08'
if os.path.isfile('CHERWIN_TOOLS.py'):
local_version = '2024.05.15'
if IS_DEV:
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
print('脚本依赖下载失败请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print(
'脚本依赖下载失败请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token