zzw
examples
3a8e465
import os
import requests
import json
import time
from qiniu import put_file
import tempfile
import shutil
import random
API_KEY = os.environ['API_KEY']
headers = {
"User-Agent": "PexelDance Sdk/1.0",
'Authorization': 'Bearer %s' % API_KEY,
'Content-Type': 'application/json'
}
def get_endpoint():
hosts = [
"https://api-01.pexeldance.com",
"https://api-02.pexeldance.com",
"https://api-03.pexeldance.com",
"https://api-04.pexeldance.com",
"https://api-05.pexeldance.com",
"https://api-06.pexeldance.com",
"https://api-07.pexeldance.com",
"https://api-08.pexeldance.com",
"https://api-09.pexeldance.com",
"https://api-10.pexeldance.com",
# "https://api-11.pexeldance.com",
# "https://api-12.pexeldance.com",
# "https://api-13.pexeldance.com",
# "https://api-14.pexeldance.com",
# "https://api-15.pexeldance.com",
# "https://api-16.pexeldance.com",
# "https://api-17.pexeldance.com",
# "https://api-18.pexeldance.com",
# "https://api-19.pexeldance.com",
# "https://api-20.pexeldance.com",
]
return random.choice(hosts)
def get_upload_token(extension="mp4"):
endpoint = get_endpoint()
url = endpoint + "/api/upload/token"
payload = json.dumps({
"file_ext": extension,
})
response = requests.request("POST", url, headers=headers, data=bytes(payload, "utf-8"))
if 200<= response.status_code < 300:
return response.json()
return None
def upload_file(local_file, file_key, token):
ret, info = put_file(token, file_key, local_file)
if info.status_code == 200:
return ret
else:
return None
def confirm_upload(key, name, hash_, fsize, bucket):
endpoint = get_endpoint()
url = endpoint + "/api/upload/confirm"
payload = json.dumps({
"key": key,
"name": name,
"hash": hash_,
"fsize": fsize,
"bucket": bucket,
})
response = requests.request("POST", url, headers=headers, data=payload)
if 200<= response.status_code < 300:
return response.json()
return None
def upload_video(file):
ext = file.split(".")[-1]
if not os.path.exists(file):
return False
token_res = get_upload_token(ext)
upload_data = token_res.get('data')
if not upload_data:
return False
upload_token = upload_data.get('token')
if not upload_token:
return False
bucket_key = upload_data.get('bucket_key')
if not bucket_key:
return False
ret = upload_file(file, bucket_key, upload_token)
if not ret:
return False
key, name, hash_, fsize, bucket = ret.get('key'), upload_data.get('c_string'), ret.get('hash'), ret.get('fsize'), ret.get('bucket')
upload_res = confirm_upload(key, name, hash_, fsize, bucket)
if not upload_res:
return False
return upload_res.get('data')
def generate(item_id, style_id, width, height, prompt):
endpoint = get_endpoint()
url = endpoint + "/api/job/common"
payload = json.dumps({
"user_content": {
"item_id": item_id,
"seed": 11111,
"width": width,
"height": height,
"prompt": prompt
},
"unit": 3,
"use_credit_type": "credit",
"name": style_id
})
response = requests.request("POST", url, headers=headers, data=payload)
if 200<= response.status_code < 300:
return response.json()
else:
return None
def get_job_status(job_id):
endpoint = get_endpoint()
url = endpoint + "/api/job"
payload = json.dumps({
"id": job_id
})
response = requests.request("GET", url, headers=headers, data=payload)
if 200<= response.status_code < 300:
return response.json()
return None
def get_item_url(_id):
endpoint = get_endpoint()
url = endpoint + "/api/download_item"
payload = json.dumps({
"key_id": [
_id
]
})
response = requests.request("POST", url, headers=headers, data=payload)
if 200<= response.status_code < 300:
return response.json()
return None
def get_result(job_id):
endpoint = get_endpoint()
url = endpoint + "/api/item/job_id"
payload = json.dumps({
"job_id": job_id
})
response = requests.request("GET", url, headers=headers, data=payload)
if 200<= response.status_code < 300:
return response.json()
return None
def download_file(url):
response = requests.get(url, stream=True)
if response.status_code == 200:
tmpfile = tempfile.NamedTemporaryFile(delete=False)
tmpfile.close()
with open(tmpfile.name, 'wb') as f:
shutil.copyfileobj(response.raw, f)
return tmpfile.name
else:
return None
def processing(in_video, width, height, style_id,transfer_duration, prompt):
videoRes = upload_video(in_video)
if not videoRes:
return None
videoId = videoRes.get('id')
if not videoId:
return None
req_access = False
try_times = 1000
for i in range(try_times):
generate_result = generate(videoId, style_id, width, height, prompt)
if not generate_result:
time.sleep(8)
continue
jobInfo = generate_result.get('data')
if not jobInfo:
time.sleep(8)
continue
job_id = jobInfo.get("job_id")
if job_id:
req_access = True
break
if not req_access:
raise "too many requests"
while True:
jobStatusInfo = get_job_status(job_id)
js = jobStatusInfo.get('data')
if js is None:
return None
if js.get('status') == "finished":
break
elif js.get('status') == "failed":
return None
time.sleep(10)
result = get_result(job_id)
if not result:
return None
res_data = result.get('data')
if not res_data:
return None
li = res_data.get('list')
if len(li) == 0:
return None
jobRes = li[0]
item_id = jobRes.get('id')
if not item_id:
return None
itemDes = get_item_url(item_id)
itemData = itemDes.get('data')
if not itemData:
return None
signed_urls = itemData.get('signed_urls')
if len(signed_urls) == 0:
return None
item_url = signed_urls[0]
file_path = download_file(item_url)
if not file_path:
return None
return file_path