文件大小 需要cookies 上传显示进度条 占用账号网盘空间 支持秒传
小于200MB 否 否 否 否
大于200MB(目前没发现上限) 是 是 否(但是空间不够会失败) 是
- # -*- coding: utf-8 -*-
- import requests,json
- from ftplib import FTP
- import os, time, sys
- cookie = { #在下方引号内填入UID和uf两个cookies参数,仅上传200MB以上文件需要
- "UID": "",
- "uf": ""
- }
- class Chaopan(object):
- def __init__(self, cookies):
- """登录超盘"""
- session = requests.session()
- requests.utils.add_dict_to_cookiejar(session.cookies, cookies)
- response = session.get('https://pan-yz.chaoxing.com/api/token/uservalid')
- retobj = json.loads(response.text)
- if not retobj["result"]:
- raise Exception("参数验证失败,登录状态失效")
- self.__token = retobj["_token"]
- self.__id = cookies["UID"]
- self.__session = session
- def __get_info(self):
- url = f'https://pan-yz.chaoxing.com/api/info?puid={self.__id}&_token={self.__token}'
- return self.__session.get(url).json()["data"]
- def get_disk_capacity(self):
- """获取总空间和已用空间大小"""
- url = f'https://pan-yz.chaoxing.com/api/getUserDiskCapacity?puid={self.__id}&_token={self.__token}'
- response = self.__session.get(url)
- retobj = json.loads(response.text)
- return retobj
- def list_dir(self, fldid='', orderby='d', order='desc', page=1, size=100, addrec=False, showCollect=1):
- """列举目录文件"""
- url = f'https://pan-yz.chaoxing.com/api/getMyDirAndFiles?puid={self.__id}&fldid={fldid}&orderby={orderby}&order={order}&page={page}&size={size}&_token={self.__token}&addrec={addrec}&showCollect={showCollect}'
- response = self.__session.get(url)
- retobj = json.loads(response.text)
- return retobj
- def __create_file_new(self, file: "本地文件", fldid=""):
- url = 'https://pan-yz.chaoxing.com/opt/createfilenew'
- BYTES_PER_CHUNK = 512 * 1024
- LIMIT = 1024 * 1024
- ffile = []
- rr = file.tell()
- size = file.seek(0, 2)
- file.seek(0, 0)
- ffile.append(file.read(BYTES_PER_CHUNK))
- file.seek(BYTES_PER_CHUNK + size - LIMIT, 0)
- ffile.append(file.read(BYTES_PER_CHUNK))
- file.seek(0, rr)
- path,name = os.path.split(file.name)
- files = {
- "file0":(ffile[0]),
- "file1":(ffile[1])
- }
- post_data = {
- "size": size,
- "fn": name,
- "puid":0,
- }
- if fldid:
- post_data["fldid"] = fldid
- response = self.__session.post(url, data=post_data, files=files)
- return json.loads(response.text)
- def __ftp_upload_file(self, file: "本地文件", timemil, callback=None):
- jindu = [file.seek(0, 2),0]
- def __callback(block):
- jindu[1] += 8192
- if jindu[1] < jindu[0]:
- callback(jindu[1] / jindu[0])
- else:
- callback(1)
- info = self.__get_info()
- upath = info["froot"]
- host = info["host"]
- ftp = FTP()
- ftp.encoding = 'utf-8'
- ftp.connect(host, 21)
- ftp.login("usertemp", "0GYF0hBAbsXVBZCUPaSOVS")
- ftp.set_pasv(True)
- ftp.mkd(f'/{upath}/{timemil}')
- path,name = os.path.split(file.name)
- file.seek(0, 0)
- if callback:
- res = ftp.storbinary(f'STOR /{upath}/{timemil}/{name}', file, blocksize=8192, callback=__callback)
- else:
- res = ftp.storbinary(f'STOR /{upath}/{timemil}/{name}', file)
- ret = res.find('226') != -1
- ftp.quit()
- return ret
- def __sync_upload(self, timemil, pntid=""):
- url = 'https://pan-yz.chaoxing.com/api/notification/rsyncsucss'
- post_data = {
- "puid": self.__id,
- "rf": timemil,
- "_token": self.__token
- }
- if pntid:
- post_data["pntid"] = pntid
- response = self.__session.post(url, data=post_data)
- return json.loads(response.text)
- def __crcstatus(self, crc):
- url = f'https://pan-yz.chaoxing.com/api/crcstatus?puid={self.__id}&crc={crc}&_token={self.__token}'
- response = requests.get(url)
- return json.loads(response.text)
- def upload_file(self, file: "本地文件", fldid="", callback=None):
- """上传文件"""
- size = Chaopan.__getsize(file)
- if size > 1024 * 1024 + 1024 * 1024:
- retobj = self.__create_file_new(file, fldid)
- if retobj["result"]:
- return retobj["data"]
- crc = retobj["crc"]
- timemil = retobj["timemil"]
- if self.__ftp_upload_file(file, timemil, callback):
- self.__sync_upload(timemil, fldid)
- return self.__crcstatus(crc)
- else:
- timemil = int(time.time() * 1000)
- self.__ftp_upload_file(file, timemil, callback)
- return self.__sync_upload(timemil, fldid)
- def del_file(self, id: '文件id,多个请用英文逗号","分隔'):
- """删除网盘上文件"""
- url = 'https://pan-yz.chaoxing.com/api/delete'
- post_data = {
- "puid": self.__id,
- "resids": id,
- "_token": self.__token
- }
- response = self.__session.post(url, data=post_data)
- return json.loads(response.text)
- @staticmethod
- def upload_share_file(file: "本地文件或Bytes"):
- """上传本地文件转链接,不得大于200M"""
- size = Chaopan.__getsize(file)
- if size == 0 or size > 200000000:
- return {"status":False,"msg":"文件大小必须在0-200MB之间"}
- url = 'http://notice.chaoxing.com/pc/files/uploadNoticeFile'
- file_data = {
- 'attrFile': file
- }
- response = requests.post(url, files=file_data)
- return json.loads(response.text)
- @staticmethod
- def __getsize(file):
- """获取文件大小"""
- import _io
- if isinstance(file, _io.BufferedReader):
- rr = file.tell()
- size = file.seek(0, 2)
- file.seek(0, rr)
- return size
- elif isinstance(file, bytes):
- return len(file)
- else:
- return 0
- def callback(per):
- hashes = '#' * int(per * 30)
- spaces = ' ' * (30 - len(hashes))
- sys.stdout.write("\rPercent: [%s] %d%%"%(hashes + spaces, per*100))
- sys.stdout.flush()
- filepath = input("请输入文件路径: ")
- try:
- with open(filepath, 'rb') as f:
- size = f.seek(0, 2)
- f.seek(0, 0)
- if size <= 200000000:
- print("正在上传中,请等待...")
- ret = Chaopan.upload_share_file(f)
- if ret["status"]:
- print(f'下载直链为(http替换为https仍然可用): {ret["att_file"]["att_clouddisk"]["downPath"]}')
- print(f'分享链接为: {ret["att_file"]["att_clouddisk"]["shareUrl"]}')
- else:
- print(f'转直链失败,原因为{ret["msg"]}')
- else:
- print("您的文件大于200M, 正在以登录状态上传,请等待...")
- cp = Chaopan(cookie)
- retobj = cp.upload_file(f, callback=callback)
- print("")
- print(f'下载直链为(http替换为https仍然可用): http://d0.ananas.chaoxing.com/download/{retobj["objectId"]}')
- print(f'分享链接为: http://cloud.ananas.chaoxing.com/view/fileview?objectid={retobj["objectId"]}')
- print('正在清理网盘空间(清理后上传的文件不会占用您的网盘空间)')
- time.sleep(3)
- if "id" in retobj:
- cp.del_file(retobj["id"])
- elif "resid" in retobj:
- cp.del_file(retobj["resid"])
- print('清理完成')
- except Exception as e:
- print(f'出现错误,原因为:{str(e)}')