CHERWIN_SCRIPTS/PPCS.py
2024-04-08 02:14:22 +08:00

555 lines
21 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

'''
!/usr/bin/python3
-- coding: utf-8 --
-------------------------------
✨✨✨ @Author CHERWIN✨✨✨
cron "0 9 * * *" script-path=xxx.py,tag=匹配cron用
const $ = new Env('朴朴超市APP')
'''
import json
import os
import random
import time
from sys import exit
from datetime import datetime, timedelta
from urllib.parse import urlparse
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
else:
print("加载通知服务失败!")
send_msg = ''
one_msg=''
def Log(cont=''):
global send_msg,one_msg
print(cont)
if cont:
one_msg += f'{cont}\n'
send_msg += f'{cont}\n'
inviteCode = {}
class RUN:
def __init__(self, info,index,access_token=None,user_id=None):
global one_msg
one_msg = ''
split_info = info.split('@')
self.refresh_token = split_info[0]
len_split_info = len(split_info)
last_info = split_info[len_split_info - 1]
self.send_UID = None
if len_split_info > 0 and "UID_" in last_info:
self.send_UID = last_info
self.index = index + 1
Log(f"\n---------开始执行第{self.index}个账号>>>>>")
self.s = requests.session()
self.s.verify = False
self.UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x6309080f) XWEB/8555'
self.headers = {
'Authorization': '',
'Accept': 'application/json, text/plain, */*',
'User-Agent': self.UA,
'Origin': 'https://ma.pupumall.com',
'Sec-Fetch-Site': 'cross-site',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Referer': 'https://ma.pupumall.com/',
'Accept-Language': 'zh-CN,zh;q=0.9',
'pp-os': '0'
}
# print(self.refresh_token)
self.team_need_help = False
self.teamId = ''
self.location = {}
self.store_id = None
self.zip = None
self.lng = None
self.lat = None
self.params = {
'supplement_id': '',
'lat_y': '',
'lng_x': '',
}
if access_token:
self.access_token=access_token
self.headers['Authorization'] = access_token
self.headers['pp-userid'] = user_id
self.near_location_by_city()
self.wait_time = random.randint(1000, 3000) / 1000.0 # 转换为秒
def make_request(self, url, method='post',headers={},params='',data={}):
parsed_url = urlparse(url)
host = parsed_url.netloc
self.headers['Host'] = host
if headers == {}:
headers = self.headers
if data =={}:
data = self.params
try:
if method.lower() == 'get':
response = self.s.get(url, headers=headers, verify=False,params=params)
elif method.lower() == 'post':
response = self.s.post(url, headers=headers, json=data, verify=False)
else:
raise ValueError("不支持的请求方法: " + method)
return response.json()
except requests.exceptions.RequestException as e:
print("请求异常:", e)
except ValueError as e:
print("值错误或不支持的请求方法:", e)
except Exception as e:
print("发生了未知错误:", e)
def near_location_by_city(self):
print('>>>>>>开始随机选择位置')
url = 'https://j1.pupuapi.com/client/store/place/near_location_by_city/v2'
# 生成随机的四位数
random_digits = ''.join(random.choices('0123456789', k=4))
# 准备查询参数
search_params = {
'lng': '119.31' + random_digits,
'lat': '26.06' + random_digits
}
try:
result =self.make_request(url,method='get',params=search_params)
errcode = result.get('errcode', -1)
if errcode == 0:
data = result.get('data')
# 假设randomList是一个选择列表中随机元素的方法
self.location = self.randomList(data)
self.store_id = self.location['service_store_id']
self.zip = self.location['city_zip']
self.lng = str(self.location['lng_x'])
self.lat = str(self.location['lat_y'])
self.params['lat_y'] = self.lat
self.params['lat_x'] = self.lng
# 更新请求头部
self.headers['pp_storeid'] = self.store_id
self.headers['pp-cityzip'] = str(self.zip)
print('>选取随机地点成功')
else:
errmsg = result.get('errmsg', '')
print(f'>选取随机地点失败[{errcode}]: {errmsg}')
except Exception as e:
print(e)
def randomList(self, lst):
# 返回列表中的随机项
return random.choice(lst)
def get_AccessToken(self):
# Log('获取access_token')
url = "https://cauth.pupuapi.com/clientauth/user/refresh_token"
data = {
"refresh_token": self.refresh_token
}
headers = {
"User-Agent": "Pupumall/4.7.3;Android/11;dda37894d1b4c3ed09b6272c55b37cf2",
"Content-Type": "application/json; charset=UTF-8",
"Host": "cauth.pupuapi.com",
"Connection": "Keep-Alive",
"Accept-Encoding": "gzip"
}
try:
response = requests.put(url, headers=headers, data=json.dumps(data), verify=False)
response.raise_for_status()
json_response = response.json()
print(json_response)
data = json_response.get('data', {})
access_token = data.get('access_token', '')
if access_token:
self.access_token = f'Bearer '+access_token
self.user_id = data.get('user_id')
self.name = data.get('nick_name', '')
self.headers['Authorization'] = self.access_token
self.headers['pp-userid'] = self.user_id
append_data = {
'access_token':self.access_token,
'user_id':self.user_id
}
access_token_li.append(append_data)
# print(f'账号【{self.index}】access_token{self.access_token}')
return True
else:
return False
except requests.exceptions.HTTPError as http_err:
Log(f"HTTP请求出错: {http_err}")
except requests.exceptions.ConnectionError as conn_err:
Log(f"网络连接出错: {conn_err}")
except requests.exceptions.Timeout as timeout_err:
Log(f"请求超时: {timeout_err}")
except requests.exceptions.RequestException as req_err:
Log(f"出现了一个意外的请求错误: {req_err}")
except json.JSONDecodeError as json_err:
Log(f"JSON解码错误: {json_err}")
return False
def getUserInfo(self):
global inviteCode
Log(f'>>>>>>获取用户信息')
url = "https://cauth.pupuapi.com/clientauth/user/info"
# response = self.do_request(url, headers=headers,req_type='get')
response = self.make_request(url,method='get')
# print(response)
if 'errcode' in response and response.get('errcode', '') == 0:
data = response.get('data')
self.user_id = data.get('user_id')
phone_number = data['phone']
self.phone = phone_number[:3] + '****' + phone_number[7:]
self.invite_code = data['invite_code']
Log(f'手机号:【{self.phone}')
print(f'用户ID{self.user_id}\n邀请码:【{self.invite_code}')
# new_data = {
# self.user_id:
# {
# 'phone': self.phone,
# 'invite_code': self.invite_code
# }
# }
# CHERWIN_TOOLS.SAVE_INVITE_CODE("INVITE_CODE/PPCS_INVITE_CODE.json", new_data)
return True
else:
Log(f'>获取用户信息失败')
print(response)
return False
def signStu(self):
Log(f'>>>>>>获取签到状态')
url = 'https://j1.pupuapi.com/client/game/sign/v2/index'
try:
result = self.make_request(url, 'get')
errcode = result.get('errcode', 1)
if errcode == 0:
data = result.get('data', {})
is_signed = data.get('is_signed', 0)
if is_signed:
Log('今天已签到')
else:
self.sign()
else:
errmsg = result.get('errmsg', '')
print('查询签到信息失败[{}]: {}'.format(errcode, errmsg))
except Exception as e:
print('签到索引过程中发生错误: {}'.format(e))
def getCoinInfo(self):
url = "https://j1.pupuapi.com/client/coin"
# response = self.do_request(url, req_type='get')
response = self.make_request(url,method='get')
# print(response)
if 'errcode' in response and response.get('errcode', '') == 0:
data = response.get('data')
# 当前朴分
coin = data['balance']
Log(f'当前朴分:【{coin}')
else:
Log(f'>获取当前朴分失败')
print(response)
def sign(self):
Log(f'>>>>>>开始签到')
url = "https://j1.pupuapi.com/client/game/sign/v2?supplement_id="
# 签到
data = {'supplement_id': ''}
try:
result = self.make_request(url, 'post', data=data)
errcode = result.get('errcode', 1)
if errcode == 0:
data = result.get('data', {})
daily_sign_coin = data.get('daily_sign_coin', 0)
coupon_list = data.get('coupon_list', [])
rewards = [str(daily_sign_coin) + '积分']
for coupon in coupon_list:
condition_amount = '{:.2f}'.format(coupon['condition_amount'] / 100)
discount_amount = '{:.2f}'.format(coupon['discount_amount'] / 100)
rewards.append('{}{}'.format(condition_amount, discount_amount))
Log('签到成功: ' + ', '.join(rewards))
else:
errmsg = result.get('errmsg', '')
Log('签到失败[{}]: {}'.format(errcode, errmsg))
except Exception as e:
Log('签到过程中发生错误: {}'.format(e))
# 发起组队
def creatTeam(self):
global inviteCode
Log(f'>>>>>>开始发起组队')
url = "https://j1.pupuapi.com/client/game/coin_share/team"
response = self.make_request(url)
# print(response)
if 'errcode' in response and response.get('errcode', '') == 0:
data = response.get('data')
self.teamId = data
Log(f'>发起组队成功ID{data}')
return True
elif 'errcode' in response and response.get('errcode', '') != 0:
Log(f'>发起组队失败:【{response.get("errmsg", "")}')
else:
Log(f'>发起组队失败')
print(response)
return False
def get_myTeam(self):
Log(f'>>>>>>开始查询历史组队')
# 获取当前日期
today = datetime.now().date()
# 构建当天的23:59:59时间
end_of_day = datetime.combine(today, datetime.max.time()) + timedelta(hours=23, minutes=59, seconds=59)
# 将时间转换为时间戳(毫秒为单位)
timestamp = int(end_of_day.timestamp() * 1000)
# 将日期格式化为字符串
formatted_date = today.strftime("%Y-%m-%d")
# print(timestamp)
# print(formatted_date)
url = f'https://j1.pupuapi.com/client/game/coin_share/records?time_from=1704038400000&time_to={timestamp}&page=1&size=20'
try:
result = self.make_request(url,method='get')
errcode = result.get('errcode', -1)
isCreatTeam = False
if errcode == 0:
data = result.get('data',[{}])
for team in data:
# print(team)
record_type = team.get('record_type','')
time_create = team.get('time_create','')
# 将时间戳转换为 datetime 对象
timestamp = time_create / 1000 # 将时间戳转换为秒
dt = datetime.fromtimestamp(timestamp)
# 将日期格式化为字符串
formatted_time_create = dt.strftime("%Y-%m-%d")
team_id = team.get('team_id','')
# print(record_type)
# print(formatted_time_create)
# print(team_id)
if record_type == 0 and formatted_time_create == formatted_date:
self.teamId = team_id
isCreatTeam =True
break
if isCreatTeam:
Log(f'今日已创建ID{self.teamId}】队伍')
self.check_my_team()
else:
self.creatTeam()
else:
errmsg = result.get('errmsg', '')
print(f'查询组队历史信息失败[{errcode}]: {errmsg}')
except Exception as e:
print(str(e))
def check_my_team(self):
Log(f'>>>>>>开始查询组队详情')
url = f'https://j1.pupuapi.com/client/game/coin_share/teams/{self.teamId}'
try:
result = self.make_request(url,method='get')
errcode = result.get('errcode', -1)
if errcode == 0:
data = result.get('data')
status = data.get('status')
if status == 10:
self.team_need_help = True
self.team_max_help = data.get('target_team_member_num')
self.team_helped_count = data.get('current_team_member_num')
Log(f'组队未完成: {self.team_helped_count}/{self.team_max_help}')
elif status == 30:
self.team_need_help = False
coins = data.get('current_user_reward_coin')
Log(f'已组队成功, 获得了{coins}积分')
else:
Log(f'组队状态[{status}]')
print(f': {json.dumps(data)}')
else:
errmsg = result.get('errmsg', '')
print(f'查询组队信息失败[{errcode}]: {errmsg}')
except Exception as e:
print(str(e))
# 组队
def joinAuthorTeam(self):
Log(f'>>>>>>第1个账号开始助力作者')
if len(AuthorCode) > 0:
for code in AuthorCode:
# print(code['teamId'])
url = f"https://j1.pupuapi.com/client/game/coin_share/teams/{code['teamId']}/join"
response = self.make_request(url)
# print(response)
if 'errcode' in response and response.get('errcode', '') == 0:
print(f'>入队成功:【{code}')
break
# elif 'errcode' in response and response.get('errcode', '') != 0:
# print(f'>入队失败:【{response.get("errmsg", "")}】')
# else:
# print(f'>入队失败')
# print(response)
# 组队
def joinTeam(self):
global inviteCode
Log(f'>>>>>>开始本地组队')
with open('INVITE_CODE/PPCS_INVITE_CODE.json', 'r') as file:
data = json.load(file)
inviteCode = list(data.values())
for code in inviteCode:
teamId = code.get('teamId',False)
if not teamId:continue
url = f"https://j1.pupuapi.com/client/game/coin_share/teams/{code['teamId']}/join"
response = self.make_request(url)
# print(response)
if 'errcode' in response and response.get('errcode', '') == 0:
Log(f">入队成功:【{code['teamId']}")
elif 'errcode' in response and response.get('errcode', '') != 0:
Log(f'>入队失败:【{response.get("errmsg", "")}')
else:
Log(f'>入队失败')
print(response)
def main(self):
# print(self.refresh_token)
if self.get_AccessToken():
print('成功获取了access token.')
self.getUserInfo()
self.signStu()
self.getCoinInfo()
self.get_myTeam()
new_data = {
self.user_id:
{
'phone': self.phone,
'invite_code': self.invite_code
}
}
if self.teamId:new_data[self.user_id]['teamId']=self.teamId
CHERWIN_TOOLS.SAVE_INVITE_CODE("INVITE_CODE/PPCS_INVITE_CODE.json", new_data)
return True
else:
Log( f'账号[{self.index}] {CK_NAME}:\n{self.refresh_token}\n已失效请及时更新')
return False
self.sendMsg()
def help(self):
if self.get_AccessToken():
if self.index == 1:
Log('--------签到组队--------')
self.joinAuthorTeam()
else:
Log('--------签到组队--------')
self.joinTeam()
return True
else:
Log(f'账号[{self.index}] {CK_NAME}:\n{self.refresh_token}\n已失效请及时更新')
return False
self.sendMsg(True)
def sendMsg(self, help=False):
if self.send_UID:
push_res = CHERWIN_TOOLS.wxpusher(self.send_UID, one_msg, APP_NAME, help)
print(push_res)
def down_file(filename, file_url):
print(f'开始下载:{filename},下载地址:{file_url}')
try:
response = requests.get(file_url, verify=False, timeout=10)
response.raise_for_status()
with open(filename + '.tmp', 'wb') as f:
f.write(response.content)
print(f'{filename}】下载完成!')
# 检查临时文件是否存在
temp_filename = filename + '.tmp'
if os.path.exists(temp_filename):
# 删除原有文件
if os.path.exists(filename):
os.remove(filename)
# 重命名临时文件
os.rename(temp_filename, filename)
print(f'{filename}】重命名成功!')
return True
else:
print(f'{filename}】临时文件不存在!')
return False
except Exception as e:
print(f'{filename}】下载失败:{str(e)}')
return False
def import_Tools():
global CHERWIN_TOOLS,ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode
import CHERWIN_TOOLS
ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode = CHERWIN_TOOLS.main(APP_NAME, local_script_name, ENV_NAME,local_version)
if __name__ == '__main__':
APP_NAME = '朴朴超市'
ENV_NAME = 'PPCS'
CK_NAME = 'refresh_token'
print(f'''
✨✨✨ 朴朴超市脚本✨✨✨
✨ 功能:
积分签到
组队互助
✨ 抓包步骤:
打开朴朴超市APP
已登录先退出
打开抓包
登陆
找https://cauth.pupuapi.com/clientauth/user/verify_login
复制返回body中的{CK_NAME}
多个账号可清理APP数据进行换号别点退出否则token失效
✨ 设置青龙变量:
export {ENV_NAME}= 'E0oXq3++6a4LG4xxxxxxxx'多账号#分割
export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
✨ 多账号默认第一个账号与作者组队,其余互助
✨ 推荐定时0 9 * * *
✨✨✨ @Author CHERWIN✨✨✨
''')
local_script_name = os.path.basename(__file__)
local_version = '2024.04.06'
if os.path.isfile('CHERWIN_TOOLS.py'):
import_Tools()
else:
if down_file('CHERWIN_TOOLS.py', 'https://py.cherwin.cn/CHERWIN_TOOLS.py'):
print('脚本依赖下载完成请重新运行脚本')
import_Tools()
else:
print('脚本依赖下载失败请到https://py.cherwin.cn/CHERWIN_TOOLS.py下载最新版本依赖')
exit()
print(TIPS)
token = ''
token = ENV if ENV else token
if not token:
print(f"未填写{ENV_NAME}变量\n青龙可在环境变量设置 {ENV_NAME} 或者在本脚本文件上方将{CK_NAME}填入token =''")
exit()
tokens = CHERWIN_TOOLS.ENV_SPLIT(token)
# print(tokens)
if len(tokens) > 0:
print(f"\n>>>>>>>>>>共获取到{len(tokens)}个账号<<<<<<<<<<")
access_token_li=[]
for index, infos in enumerate(tokens):
run_result = RUN(infos, index).main()
if not run_result: continue
for index, infos in enumerate(tokens):
RUN(infos, index).help()
if not run_result: continue
if send: send(f'{APP_NAME}挂机通知', send_msg + TIPS_HTML)