Moves recorded MP4 and MP3 files from the Douyin recorder output directory to Google Drive via rclone. Separates raw video and audio assets into dedicated archive folders.
Daily cron job that pushes CLAUDE.md context files and project learnings/error logs to Google Drive via rclone. Runs at 03:00, keeps all VPS project docs in sync with BashekVault.
Full VPS backup via rclone to Google Drive. Dumps PostgreSQL, syncs Docker project directories (nexus, openclaw-core, stream-intel), excludes node_modules. Runs as a scheduled cron job.
Converts Douyin stream recordings to 60-second MP3 chunks and uploads them to Google Cloud Storage for Speech-to-Text processing. Accepts multiple dates as arguments, processes all MP4s found in each day's folder.
Polls the Douyin live API to detect stream state transitions (online/offline). Fires n8n webhooks on state change with a 35-minute cooldown. Designed to run every 5 minutes via cron, persists state to a JSON file.
PYTHONWEBHOOKAPI
★ 4.7↓ 923
READ ONLY
#!/usr/bin/env python3
import json, os, time, urllib.request, urllib.error
DOUYIN_API_URL = (
"https://live.douyin.com/webcast/room/web/enter/"
"?aid=6383&live_id=1&device_platform=web&language=zh-CN&web_rid=YOUR_ROOM_ID"
)
HEADERS = {
"Cookie": "YOUR_DOUYIN_COOKIE_HERE",
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
),
"Referer": "https://live.douyin.com/",
}
WEBHOOK_URL = "https://n8n.your-domain.com/webhook/your-webhook-path"
STATE_FILE = "/tmp/douyin_state.json"
COOLDOWN_SEC = 35 * 60
def load_state():
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE) as f:
return json.load(f)
except Exception:
pass
return {"wasLive": False, "lastOnlineSent": 0, "lastOfflineSent": 0}
def save_state(state):
with open(STATE_FILE, "w") as f:
json.dump(state, f)
def check_is_live():
req = urllib.request.Request(DOUYIN_API_URL, headers=HEADERS)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode())
except Exception as e:
print(f"API error: {e}")
return None
d = data.get("data", {})
is_live = (
d.get("is_living") == 1
or d.get("room_status") == 2
or (isinstance(d.get("data"), list) and len(d["data"]) > 0 and (
d["data"][0].get("room", {}).get("status") == 2
))
)
return is_live
def post_webhook(event):
payload = json.dumps({"event": event}).encode()
req = urllib.request.Request(
WEBHOOK_URL,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
print(f"Webhook {event}: {resp.status}")
except Exception as e:
print(f"Webhook error ({event}): {e}")
def main():
state = load_state()
is_live = check_is_live()
if is_live is None:
print("No valid response — skipping state update")
return
was_live = state.get("wasLive", False)
now = time.time()
if is_live and not was_live:
last_sent = state.get("lastOnlineSent", 0)
if (now - last_sent) > COOLDOWN_SEC:
print("online transition → firing webhook")
post_webhook("online")
state["lastOnlineSent"] = now
elif not is_live and was_live:
last_sent = state.get("lastOfflineSent", 0)
if (now - last_sent) > COOLDOWN_SEC:
print("offline transition → firing webhook")
post_webhook("offline")
state["lastOfflineSent"] = now
else:
print(f"no transition (isLive={is_live}, wasLive={was_live})")
state["wasLive"] = is_live
save_state(state)
if __name__ == "__main__":
main()
// PYTHON
CREATE_DAILY_SHEET.PY
Creates a new dated tab in the stream analytics Google Sheet with all required column headers and conditional formatting rules. Skips creation if the tab already exists. Run once per day before stream processing begins.