Files
Todoist-Actual-Backup/export_todoist.py

231 lines
8.7 KiB
Python

import os
import sys
import json
import time
from collections import defaultdict
import requests
from datetime import datetime, timedelta
from todoist_api_python.api import TodoistAPI
from jinja2 import Environment, FileSystemLoader, select_autoescape
ATTACHMENTS_DIR = "attachments"
def usage():
print("""
Todoist Export Script
---------------------
Exports all active and completed tasks from the Todoist API to a JSON file, including attachments and comments, and generates a human-readable HTML backup using Jinja2.
Usage:
python export_todoist.py export
- Exports all data and generates JSON and HTML files.
python export_todoist.py [any other argument or none]
- Shows this help message.
""")
def get_api_key():
key = os.environ.get("TODOIST_KEY")
if not key:
print("Error: TODOIST_KEY environment variable not set.")
sys.exit(1)
return key
def ensure_attachments_dir():
if not os.path.exists(ATTACHMENTS_DIR):
os.makedirs(ATTACHMENTS_DIR)
def download_attachment(url, filename):
local_path = os.path.join(ATTACHMENTS_DIR, filename)
if os.path.exists(local_path):
return local_path
print(f"Downloading attachment {url}")
r = requests.get(url, stream=True)
if r.status_code == 200:
with open(local_path, 'wb') as f:
for chunk in r.iter_content(1024):
f.write(chunk)
return local_path
else:
return None
def _get_retry_delay(response, attempt, base_delay=5, max_delay=120):
if response is not None:
headers = getattr(response, "headers", {}) or {}
retry_after = headers.get("Retry-After") or headers.get("retry-after")
if retry_after:
try:
return max(1, int(float(retry_after)))
except (TypeError, ValueError):
pass
reset_header = headers.get("X-RateLimit-Reset") or headers.get("x-rate-limit-reset")
if reset_header:
try:
reset_timestamp = float(reset_header)
return max(1, int(reset_timestamp - time.time()))
except (TypeError, ValueError):
pass
return min(max_delay, base_delay * (2 ** attempt))
def execute_with_rate_limit(func, *args, **kwargs):
attempts = 0
max_attempts = 5
while True:
try:
return func(*args, **kwargs)
except Exception as error: # pylint: disable=broad-except
status_code = getattr(error, "status_code", None)
response = getattr(error, "response", None)
if status_code is None and response is not None:
status_code = getattr(response, "status_code", None)
if status_code == 429 and attempts < max_attempts:
delay = _get_retry_delay(response, attempts)
attempts += 1
print(f"Rate limit hit for {func.__name__}. Waiting {delay} seconds before retry {attempts}/{max_attempts}...")
time.sleep(delay)
continue
raise
def fetch_all_projects(api):
projects_by_id = {}
try:
projects_iter = execute_with_rate_limit(api.get_projects)
for batch in projects_iter:
for project in batch:
projects_by_id[str(getattr(project, "id", ""))] = project
except Exception as error: # pylint: disable=broad-except
print(f"Error fetching projects: {error}")
return list(projects_by_id.values())
def fetch_active_tasks_by_project(api):
tasks_by_project = defaultdict(list)
try:
tasks_iter = execute_with_rate_limit(api.get_tasks)
for batch in tasks_iter:
for task in batch:
tasks_by_project[str(getattr(task, "project_id", ""))].append(task)
except Exception as error: # pylint: disable=broad-except
print(f"Error fetching active tasks: {error}")
print(f"Fetched active tasks for {len(tasks_by_project)} projects")
return tasks_by_project
def fetch_completed_tasks_by_project(api, since, until):
tasks_by_project = defaultdict(list)
try:
completed_iter = execute_with_rate_limit(
api.get_completed_tasks_by_completion_date,
since=since,
until=until,
)
for batch in completed_iter:
for task in batch:
tasks_by_project[str(getattr(task, "project_id", ""))].append(task)
except Exception as error: # pylint: disable=broad-except
print(f"Error fetching completed tasks between {since} and {until}: {error}")
print(f"Fetched completed tasks for {len(tasks_by_project)} projects")
return tasks_by_project
def fetch_comments_by_task(api, project_ids):
comments_by_task = defaultdict(list)
for project_id in project_ids:
try:
comments_iter = execute_with_rate_limit(api.get_comments, project_id=project_id)
for batch in comments_iter:
for comment in batch:
task_id = str(getattr(comment, "task_id", ""))
if task_id:
comments_by_task[task_id].append(comment)
except Exception as error: # pylint: disable=broad-except
print(f"Error fetching comments for project {project_id}: {error}")
print(f"Fetched comments for {len(comments_by_task)} tasks")
return comments_by_task
def process_task(task, comments_lookup):
task_dict = task.__dict__.copy()
task_id = getattr(task, "id", None) or getattr(task, "task_id", None)
if task_id is not None:
task_dict.setdefault("id", task_id)
# Attachments (if any)
attachments = []
if hasattr(task, 'attachments') and task.attachments:
for att in task.attachments:
att_dict = att.__dict__.copy()
if 'file_url' in att_dict and att_dict['file_url']:
filename = att_dict.get('file_name') or os.path.basename(att_dict['file_url'])
local_path = download_attachment(att_dict['file_url'], filename)
if local_path:
att_dict['local_file'] = os.path.relpath(local_path)
attachments.append(att_dict)
if attachments:
task_dict['attachments'] = attachments
# Comments
comment_key = str(task_id) if task_id is not None else None
if comment_key and comment_key in comments_lookup:
task_dict['comments'] = [c.__dict__ for c in comments_lookup[comment_key]]
return task_dict
def main():
if len(sys.argv) != 2 or sys.argv[1] != "export":
usage()
return
ensure_attachments_dir()
api = TodoistAPI(get_api_key())
projects = fetch_all_projects(api)
since = (datetime.now() - timedelta(days=90)).replace(hour=0, minute=0, second=0, microsecond=0)
until = datetime.now()
active_tasks_by_project = fetch_active_tasks_by_project(api)
completed_tasks_by_project = fetch_completed_tasks_by_project(api, since=since, until=until)
comment_project_ids = sorted(
pid for pid in (set(active_tasks_by_project.keys()) | set(completed_tasks_by_project.keys())) if pid
)
comments_by_task = fetch_comments_by_task(api, comment_project_ids)
data = []
for project in projects:
project_dict = project.__dict__.copy()
project_id = str(getattr(project, "id", ""))
active_tasks = active_tasks_by_project.get(project_id, [])
completed_tasks = completed_tasks_by_project.get(project_id, [])
project_dict['tasks'] = [process_task(t, comments_by_task) for t in active_tasks]
project_dict['completed_tasks'] = [process_task(t, comments_by_task) for t in completed_tasks]
data.append(project_dict)
# Write JSON
today = datetime.now().strftime("%Y-%m-%d")
json_filename = f"Todoist-Actual-Backup-{today}.json"
def json_serial(obj):
if isinstance(obj, datetime):
return obj.isoformat()
return str(obj)
with open(json_filename, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2, default=json_serial)
print(f"Exported data to {json_filename}")
# Write HTML
env = Environment(
loader=FileSystemLoader(os.path.dirname(__file__)),
autoescape=select_autoescape(['html', 'xml'])
)
# Add markdown filter
try:
import markdown
env.filters['markdown'] = lambda text: markdown.markdown(text or "")
except ImportError:
env.filters['markdown'] = lambda text: text or ""
template = env.get_template("todoist_backup_template.html")
html_filename = f"Todoist-Actual-Backup-{today}.html"
with open(html_filename, "w", encoding="utf-8") as f:
f.write(template.render(projects=data, date=today))
print(f"Generated HTML backup at {html_filename}")
if __name__ == "__main__":
main()