484 lines
18 KiB
Python
484 lines
18 KiB
Python
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import getpass
|
|
from collections import defaultdict
|
|
import requests
|
|
from datetime import datetime, timedelta
|
|
from todoist_api_python.api import TodoistAPI
|
|
from jinja2 import Environment, FileSystemLoader, select_autoescape
|
|
|
|
ATTACHMENTS_DIR = "attachments"
|
|
TODOIST_API_TOKEN: str | None = None
|
|
COMPLETED_HISTORY_FILE = "Todoist-Completed-History.json"
|
|
|
|
|
|
def json_serial(obj):
|
|
if isinstance(obj, datetime):
|
|
return obj.isoformat()
|
|
return str(obj)
|
|
|
|
|
|
def usage():
|
|
print("""
|
|
Todoist Export Script
|
|
---------------------
|
|
Exports all active and completed tasks from the Todoist API to a JSON file, including attachments and comments, and generates a human-readable HTML backup using Jinja2.
|
|
|
|
Usage:
|
|
python export_todoist.py export
|
|
- Exports all data and generates JSON and HTML files.
|
|
python export_todoist.py [any other argument or none]
|
|
- Shows this help message.
|
|
""")
|
|
|
|
|
|
def get_api_key():
|
|
key = os.environ.get("TODOIST_KEY")
|
|
if not key:
|
|
try:
|
|
key = getpass.getpass("The TODOIST_KEY environment variable is not set. Enter TODOIST API key to continue: ").strip()
|
|
except (EOFError, KeyboardInterrupt):
|
|
print("\nError: TODOIST API key is required.")
|
|
sys.exit(1)
|
|
if not key:
|
|
print("Error: TODOIST API key is required.")
|
|
sys.exit(1)
|
|
os.environ["TODOIST_KEY"] = key
|
|
return key
|
|
|
|
|
|
def ensure_attachments_dir():
|
|
if not os.path.exists(ATTACHMENTS_DIR):
|
|
os.makedirs(ATTACHMENTS_DIR)
|
|
|
|
|
|
def load_completed_history():
|
|
if not os.path.exists(COMPLETED_HISTORY_FILE):
|
|
return {}
|
|
try:
|
|
with open(COMPLETED_HISTORY_FILE, "r", encoding="utf-8") as handle:
|
|
data = json.load(handle)
|
|
except (OSError, json.JSONDecodeError) as exc: # pylint: disable=broad-except
|
|
print(f"Warning: failed to load completed history ({exc}). Starting fresh.")
|
|
return {}
|
|
if isinstance(data, dict):
|
|
history = {}
|
|
for key, value in data.items():
|
|
if isinstance(value, list):
|
|
history[str(key)] = value
|
|
return history
|
|
if isinstance(data, list):
|
|
history = defaultdict(list)
|
|
for item in data:
|
|
if isinstance(item, dict):
|
|
project_id = str(item.get("project_id", ""))
|
|
if project_id:
|
|
history[project_id].append(item)
|
|
return {key: value for key, value in history.items()}
|
|
return {}
|
|
|
|
|
|
def save_completed_history(history):
|
|
try:
|
|
with open(COMPLETED_HISTORY_FILE, "w", encoding="utf-8") as handle:
|
|
json.dump(history, handle, ensure_ascii=False, indent=2, default=json_serial)
|
|
except OSError as exc: # pylint: disable=broad-except
|
|
print(f"Warning: failed to write completed history ({exc}).")
|
|
|
|
|
|
def merge_completed_lists(history_tasks, new_tasks):
|
|
merged = []
|
|
seen = set()
|
|
|
|
def make_key(task):
|
|
task_id = str(task.get('id', ''))
|
|
completed_at = task.get('completed_at') or task.get('updated_at') or ""
|
|
return (task_id, completed_at)
|
|
|
|
def add_task(task):
|
|
key = make_key(task)
|
|
if key in seen:
|
|
return
|
|
seen.add(key)
|
|
merged.append(task)
|
|
|
|
for item in new_tasks:
|
|
add_task(item)
|
|
for item in history_tasks:
|
|
add_task(item)
|
|
|
|
def sort_key(task):
|
|
completed_at = task.get('completed_at') or ""
|
|
updated_at = task.get('updated_at') or ""
|
|
return (completed_at, updated_at)
|
|
|
|
merged.sort(key=sort_key, reverse=True)
|
|
return merged
|
|
|
|
|
|
def _file_looks_like_html(path):
|
|
try:
|
|
with open(path, 'rb') as handle:
|
|
prefix = handle.read(256)
|
|
except OSError:
|
|
return False
|
|
if not prefix:
|
|
return True
|
|
snippet = prefix.lstrip().lower()
|
|
return snippet.startswith(b"<!doctype") or snippet.startswith(b"<html")
|
|
|
|
|
|
def download_attachment(url, filename):
|
|
local_path = os.path.join(ATTACHMENTS_DIR, filename)
|
|
if os.path.exists(local_path):
|
|
if _file_looks_like_html(local_path) and not filename.lower().endswith(('.htm', '.html')):
|
|
try:
|
|
os.remove(local_path)
|
|
except OSError:
|
|
pass
|
|
else:
|
|
return local_path
|
|
print(f"Downloading attachment {url}")
|
|
headers = {}
|
|
if TODOIST_API_TOKEN:
|
|
headers["Authorization"] = f"Bearer {TODOIST_API_TOKEN}"
|
|
try:
|
|
response = requests.get(url, stream=True, headers=headers, timeout=30)
|
|
except requests.RequestException as exc: # pylint: disable=broad-except
|
|
print(f"Failed to download attachment {url}: {exc}")
|
|
return None
|
|
if response.status_code != 200:
|
|
print(f"Failed to download attachment {url}: HTTP {response.status_code}")
|
|
return None
|
|
content_type = (response.headers.get("Content-Type") or "").lower()
|
|
first_chunk = b""
|
|
try:
|
|
with open(local_path, 'wb') as handle:
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
if not chunk:
|
|
continue
|
|
if not first_chunk:
|
|
first_chunk = chunk
|
|
handle.write(chunk)
|
|
except OSError as exc: # pylint: disable=broad-except
|
|
print(f"Failed to save attachment {filename}: {exc}")
|
|
return None
|
|
looks_like_html = (
|
|
"text/html" in content_type
|
|
or (first_chunk and _file_looks_like_html(local_path))
|
|
)
|
|
if looks_like_html and not filename.lower().endswith(('.htm', '.html')):
|
|
try:
|
|
os.remove(local_path)
|
|
except OSError:
|
|
pass
|
|
print(f"Skipped attachment {url}: received HTML response instead of file")
|
|
return None
|
|
print(f"Downloaded attachment {url}")
|
|
return local_path
|
|
|
|
|
|
def _get_retry_delay(response, attempt, base_delay=5, max_delay=120):
|
|
if response is not None:
|
|
headers = getattr(response, "headers", {}) or {}
|
|
retry_after = headers.get("Retry-After") or headers.get("retry-after")
|
|
if retry_after:
|
|
try:
|
|
return max(1, int(float(retry_after)))
|
|
except (TypeError, ValueError):
|
|
pass
|
|
reset_header = headers.get("X-RateLimit-Reset") or headers.get("x-rate-limit-reset")
|
|
if reset_header:
|
|
try:
|
|
reset_timestamp = float(reset_header)
|
|
return max(1, int(reset_timestamp - time.time()))
|
|
except (TypeError, ValueError):
|
|
pass
|
|
return min(max_delay, base_delay * (2 ** attempt))
|
|
|
|
|
|
def execute_with_rate_limit(func, *args, **kwargs):
|
|
attempts = 0
|
|
max_attempts = 5
|
|
while True:
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except Exception as error: # pylint: disable=broad-except
|
|
status_code = getattr(error, "status_code", None)
|
|
response = getattr(error, "response", None)
|
|
if status_code is None and response is not None:
|
|
status_code = getattr(response, "status_code", None)
|
|
if status_code == 429 and attempts < max_attempts:
|
|
delay = _get_retry_delay(response, attempts)
|
|
attempts += 1
|
|
print(f"Rate limit hit for {func.__name__}. Waiting {delay} seconds before retry {attempts}/{max_attempts}...")
|
|
time.sleep(delay)
|
|
continue
|
|
raise
|
|
|
|
|
|
def fetch_all_projects(api):
|
|
projects_by_id = {}
|
|
try:
|
|
projects_iter = execute_with_rate_limit(api.get_projects)
|
|
for batch in projects_iter:
|
|
for project in batch:
|
|
projects_by_id[str(getattr(project, "id", ""))] = project
|
|
except Exception as error: # pylint: disable=broad-except
|
|
print(f"Error fetching projects: {error}")
|
|
return list(projects_by_id.values())
|
|
|
|
|
|
def fetch_active_tasks_by_project(api):
|
|
tasks_by_project = defaultdict(list)
|
|
try:
|
|
tasks_iter = execute_with_rate_limit(api.get_tasks)
|
|
for batch in tasks_iter:
|
|
for task in batch:
|
|
tasks_by_project[str(getattr(task, "project_id", ""))].append(task)
|
|
except Exception as error: # pylint: disable=broad-except
|
|
print(f"Error fetching active tasks: {error}")
|
|
print(f"Fetched active tasks for {len(tasks_by_project)} projects")
|
|
return tasks_by_project
|
|
|
|
|
|
def fetch_completed_tasks_by_project(api, since, until):
|
|
tasks_by_project = defaultdict(list)
|
|
try:
|
|
completed_iter = execute_with_rate_limit(
|
|
api.get_completed_tasks_by_completion_date,
|
|
since=since,
|
|
until=until,
|
|
)
|
|
for batch in completed_iter:
|
|
for task in batch:
|
|
tasks_by_project[str(getattr(task, "project_id", ""))].append(task)
|
|
except Exception as error: # pylint: disable=broad-except
|
|
print(f"Error fetching completed tasks between {since} and {until}: {error}")
|
|
print(f"Fetched completed tasks for {len(tasks_by_project)} projects")
|
|
return tasks_by_project
|
|
|
|
|
|
def fetch_comments_by_task(api, project_ids, task_ids):
|
|
comments_by_task = defaultdict(list)
|
|
total_comments = 0
|
|
for project_id in project_ids:
|
|
try:
|
|
comments_iter = execute_with_rate_limit(api.get_comments, project_id=project_id)
|
|
for batch in comments_iter:
|
|
for comment in batch:
|
|
task_id = str(getattr(comment, "task_id", ""))
|
|
if task_id:
|
|
comments_by_task[task_id].append(comment)
|
|
total_comments += 1
|
|
except Exception as error: # pylint: disable=broad-except
|
|
print(f"Error fetching comments for project {project_id}: {error}")
|
|
missing_task_ids = [task_id for task_id in task_ids if task_id not in comments_by_task]
|
|
for task_id in missing_task_ids:
|
|
try:
|
|
comments_iter = execute_with_rate_limit(api.get_comments, task_id=task_id)
|
|
for batch in comments_iter:
|
|
for comment in batch:
|
|
key = str(getattr(comment, "task_id", ""))
|
|
if key:
|
|
comments_by_task[key].append(comment)
|
|
total_comments += 1
|
|
except Exception as error: # pylint: disable=broad-except
|
|
print(f"Error fetching comments for task {task_id}: {error}")
|
|
print(
|
|
f"Fetched {total_comments} comments mapped to {len(comments_by_task)} tasks"
|
|
)
|
|
return comments_by_task
|
|
|
|
|
|
def process_task(task, comments_lookup):
|
|
task_dict = task.__dict__.copy()
|
|
task_id = getattr(task, "id", None) or getattr(task, "task_id", None)
|
|
if task_id is not None:
|
|
task_dict.setdefault("id", task_id)
|
|
# Attachments (if any)
|
|
attachments = []
|
|
if hasattr(task, 'attachments') and task.attachments:
|
|
for att in task.attachments:
|
|
att_dict = att.__dict__.copy()
|
|
if 'file_url' in att_dict and att_dict['file_url']:
|
|
filename = att_dict.get('file_name') or os.path.basename(att_dict['file_url'])
|
|
local_path = download_attachment(att_dict['file_url'], filename)
|
|
if local_path:
|
|
att_dict['local_file'] = os.path.relpath(local_path)
|
|
attachments.append(att_dict)
|
|
if attachments:
|
|
task_dict['attachments'] = attachments
|
|
# Comments
|
|
comment_key = str(task_id) if task_id is not None else None
|
|
if comment_key and comment_key in comments_lookup:
|
|
serialized_comments = []
|
|
for comment in comments_lookup[comment_key]:
|
|
comment_dict = comment.__dict__.copy()
|
|
attachment = getattr(comment, "attachment", None)
|
|
if attachment:
|
|
attachment_dict = attachment.__dict__.copy()
|
|
file_url = attachment_dict.get("file_url")
|
|
if file_url:
|
|
filename = attachment_dict.get("file_name") or os.path.basename(file_url)
|
|
local_path = download_attachment(file_url, filename)
|
|
if local_path:
|
|
attachment_dict['local_file'] = os.path.relpath(local_path)
|
|
comment_dict['attachment'] = attachment_dict
|
|
serialized_comments.append(comment_dict)
|
|
task_dict['comments'] = serialized_comments
|
|
return task_dict
|
|
|
|
|
|
def build_task_hierarchy(task_dicts):
|
|
task_lookup = {}
|
|
order_lookup = {}
|
|
for index, task in enumerate(task_dicts):
|
|
task_id = task.get('id')
|
|
if task_id is None:
|
|
continue
|
|
task_lookup[str(task_id)] = task
|
|
order_lookup[str(task_id)] = index
|
|
task.setdefault('subtasks', [])
|
|
|
|
roots = []
|
|
for task in task_dicts:
|
|
task_id = task.get('id')
|
|
if task_id is None:
|
|
roots.append(task)
|
|
continue
|
|
parent_id = task.get('parent_id')
|
|
if parent_id:
|
|
parent = task_lookup.get(str(parent_id))
|
|
if parent:
|
|
parent.setdefault('subtasks', [])
|
|
parent['subtasks'].append(task)
|
|
continue
|
|
roots.append(task)
|
|
|
|
def sort_children(children):
|
|
children.sort(key=lambda item: order_lookup.get(str(item.get('id')), 0))
|
|
for child in children:
|
|
child_children = child.get('subtasks') or []
|
|
if child_children:
|
|
sort_children(child_children)
|
|
|
|
sort_children(roots)
|
|
|
|
# Remove empty subtasks lists for cleanliness
|
|
def prune(task):
|
|
subtasks = task.get('subtasks')
|
|
if subtasks:
|
|
for sub in subtasks:
|
|
prune(sub)
|
|
else:
|
|
task.pop('subtasks', None)
|
|
|
|
for root in roots:
|
|
prune(root)
|
|
|
|
return roots
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) != 2 or sys.argv[1] != "export":
|
|
usage()
|
|
return
|
|
ensure_attachments_dir()
|
|
token = get_api_key()
|
|
global TODOIST_API_TOKEN # pylint: disable=global-statement
|
|
TODOIST_API_TOKEN = token
|
|
api = TodoistAPI(token)
|
|
projects = fetch_all_projects(api)
|
|
since = (datetime.now() - timedelta(days=90)).replace(hour=0, minute=0, second=0, microsecond=0)
|
|
until = datetime.now()
|
|
active_tasks_by_project = fetch_active_tasks_by_project(api)
|
|
completed_tasks_by_project = fetch_completed_tasks_by_project(api, since=since, until=until)
|
|
comment_project_ids = sorted(
|
|
pid
|
|
for pid in (set(active_tasks_by_project.keys()) | set(completed_tasks_by_project.keys()))
|
|
if pid
|
|
)
|
|
task_ids_for_comments: set[str] = set()
|
|
for task_list in active_tasks_by_project.values():
|
|
for task in task_list:
|
|
task_id = getattr(task, "id", None)
|
|
if task_id:
|
|
task_ids_for_comments.add(str(task_id))
|
|
for task_list in completed_tasks_by_project.values():
|
|
for task in task_list:
|
|
task_id = getattr(task, "id", None)
|
|
if task_id:
|
|
task_ids_for_comments.add(str(task_id))
|
|
comments_by_task = fetch_comments_by_task(
|
|
api, comment_project_ids, sorted(task_ids_for_comments)
|
|
)
|
|
completed_history = load_completed_history()
|
|
updated_history = {}
|
|
data = []
|
|
for project in projects:
|
|
project_dict = project.__dict__.copy()
|
|
project_id = str(getattr(project, "id", ""))
|
|
active_tasks = active_tasks_by_project.get(project_id, [])
|
|
completed_tasks = completed_tasks_by_project.get(project_id, [])
|
|
|
|
processed_active = [process_task(t, comments_by_task) for t in active_tasks]
|
|
processed_completed = [process_task(t, comments_by_task) for t in completed_tasks]
|
|
|
|
# Build hierarchy for active tasks
|
|
project_dict['tasks'] = build_task_hierarchy(processed_active)
|
|
|
|
# Map task IDs to names for parent lookups
|
|
name_lookup = {}
|
|
for task in active_tasks + completed_tasks:
|
|
task_id = getattr(task, "id", None)
|
|
if task_id:
|
|
name_lookup[str(task_id)] = getattr(task, "content", "")
|
|
|
|
for task in processed_completed:
|
|
parent_id = task.get('parent_id')
|
|
if parent_id:
|
|
parent_name = name_lookup.get(str(parent_id))
|
|
if parent_name:
|
|
task['parent_task'] = {
|
|
"id": str(parent_id),
|
|
"content": parent_name,
|
|
}
|
|
|
|
historical = completed_history.get(project_id, [])
|
|
merged_completed = merge_completed_lists(historical, processed_completed)
|
|
project_dict['completed_tasks'] = merged_completed
|
|
updated_history[project_id] = merged_completed
|
|
data.append(project_dict)
|
|
for project_id, tasks in completed_history.items():
|
|
if project_id not in updated_history:
|
|
updated_history[project_id] = tasks
|
|
save_completed_history(updated_history)
|
|
# Write JSON
|
|
today = datetime.now().strftime("%Y-%m-%d")
|
|
json_filename = f"Todoist-Actual-Backup-{today}.json"
|
|
with open(json_filename, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2, default=json_serial)
|
|
print(f"Exported data to {json_filename}")
|
|
# Write HTML
|
|
env = Environment(
|
|
loader=FileSystemLoader(os.path.dirname(__file__)),
|
|
autoescape=select_autoescape(['html', 'xml'])
|
|
)
|
|
# Add markdown filter
|
|
try:
|
|
import markdown
|
|
env.filters['markdown'] = lambda text: markdown.markdown(text or "")
|
|
except ImportError:
|
|
env.filters['markdown'] = lambda text: text or ""
|
|
template = env.get_template("todoist_backup_template.html")
|
|
html_filename = f"Todoist-Actual-Backup-{today}.html"
|
|
with open(html_filename, "w", encoding="utf-8") as f:
|
|
f.write(template.render(projects=data, date=today))
|
|
print(f"Generated HTML backup at {html_filename}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|