import os import sys import json import time import getpass import shutil import copy from collections import defaultdict from urllib.parse import quote_plus import requests from datetime import datetime, timedelta from todoist_api_python.api import TodoistAPI from jinja2 import Environment, FileSystemLoader, select_autoescape OUTPUT_DIR = "output" ATTACHMENTS_DIR = os.path.join(OUTPUT_DIR, "attachments") LEGACY_ATTACHMENTS_DIR = "attachments" TODOIST_API_TOKEN: str | None = None COMPLETED_HISTORY_FILE = "Todoist-Completed-History.json" COMMENT_REQUEST_MIN_INTERVAL = 0.5 # seconds COMMENT_MAX_ATTEMPTS = 8 PROJECTS_URL = "https://api.todoist.com/rest/v2/projects" TASKS_URL = "https://api.todoist.com/rest/v2/tasks" COMPLETED_TASKS_URL = "https://api.todoist.com/api/v1/tasks/completed/by_completion_date" COMMENTS_URL = "https://api.todoist.com/api/v1/comments" def json_serial(obj): if isinstance(obj, datetime): return obj.isoformat() return str(obj) def usage(): print(""" Todoist Export Script --------------------- Exports all active and completed tasks from the Todoist API to a JSON file, including attachments and comments, and generates a human-readable HTML backup using Jinja2. Usage: python export_todoist.py export - Exports all data and generates JSON and HTML files. python export_todoist.py [any other argument or none] - Shows this help message. """) def get_api_key(): key = os.environ.get("TODOIST_KEY") if not key: try: key = getpass.getpass("The TODOIST_KEY environment variable is not set. Enter TODOIST API key to continue: ").strip() except (EOFError, KeyboardInterrupt): print("\nError: TODOIST API key is required.") sys.exit(1) if not key: print("Error: TODOIST API key is required.") sys.exit(1) os.environ["TODOIST_KEY"] = key return key def ensure_output_dir(): if not os.path.exists(OUTPUT_DIR): os.makedirs(OUTPUT_DIR, exist_ok=True) def ensure_attachments_dir(): ensure_output_dir() if os.path.isdir(LEGACY_ATTACHMENTS_DIR) and LEGACY_ATTACHMENTS_DIR != ATTACHMENTS_DIR: try: if not os.path.exists(ATTACHMENTS_DIR): shutil.move(LEGACY_ATTACHMENTS_DIR, ATTACHMENTS_DIR) else: for name in os.listdir(LEGACY_ATTACHMENTS_DIR): shutil.move( os.path.join(LEGACY_ATTACHMENTS_DIR, name), os.path.join(ATTACHMENTS_DIR, name), ) os.rmdir(LEGACY_ATTACHMENTS_DIR) print(f"Moved legacy attachments into {ATTACHMENTS_DIR}") except (OSError, shutil.Error) as exc: # pylint: disable=broad-except print(f"Warning: failed to migrate legacy attachments: {exc}") if not os.path.exists(ATTACHMENTS_DIR): os.makedirs(ATTACHMENTS_DIR, exist_ok=True) def load_completed_history(): if not os.path.exists(COMPLETED_HISTORY_FILE): return {} try: with open(COMPLETED_HISTORY_FILE, "r", encoding="utf-8") as handle: data = json.load(handle) except (OSError, json.JSONDecodeError) as exc: # pylint: disable=broad-except print(f"Warning: failed to load completed history ({exc}). Starting fresh.") return {} if isinstance(data, dict): history = {} for key, value in data.items(): if isinstance(value, list): history[str(key)] = value return history if isinstance(data, list): history = defaultdict(list) for item in data: if isinstance(item, dict): project_id = str(item.get("project_id", "")) if project_id: history[project_id].append(item) return {key: value for key, value in history.items()} return {} def save_completed_history(history): try: with open(COMPLETED_HISTORY_FILE, "w", encoding="utf-8") as handle: json.dump(history, handle, ensure_ascii=False, indent=2, default=json_serial) except OSError as exc: # pylint: disable=broad-except print(f"Warning: failed to write completed history ({exc}).") def normalize_timestamp(value): if not value: return "" if isinstance(value, datetime): return value.isoformat() return str(value) def make_completed_task_key_from_dict(task): task_id = str(task.get('id', '')) if isinstance(task, dict) else "" if not task_id: return None completed_at = normalize_timestamp(task.get('completed_at')) if not completed_at: completed_at = normalize_timestamp(task.get('updated_at')) return (task_id, completed_at) def make_completed_task_key_from_api(task): task_id = getattr(task, "id", None) if not task_id: return None completed_at = normalize_timestamp(getattr(task, "completed_at", None)) if not completed_at: completed_at = normalize_timestamp(getattr(task, "updated_at", None)) return (str(task_id), completed_at) def merge_completed_lists(history_tasks, new_tasks): merged = [] index_by_key = {} def merge_task_dicts(primary, secondary, prefer_primary=True): for key, value in secondary.items(): if key == 'comments': if (not primary.get('comments')) and value: primary['comments'] = value continue if key == 'attachments': if (not primary.get('attachments')) and value: primary['attachments'] = value continue if key not in primary or primary[key] in (None, "", [], {}): primary[key] = value continue if not prefer_primary: primary[key] = value return primary def add_or_merge(task, prefer_existing=True): key = make_completed_task_key_from_dict(task) if key is None: merged.append(task) return if key in index_by_key: idx = index_by_key[key] merge_task_dicts(merged[idx], task, prefer_primary=prefer_existing) else: merged.append(task) index_by_key[key] = len(merged) - 1 for item in new_tasks: add_or_merge(item, prefer_existing=True) for item in history_tasks: add_or_merge(item, prefer_existing=True) def sort_key(task): completed_at = normalize_timestamp(task.get('completed_at')) updated_at = normalize_timestamp(task.get('updated_at')) return (completed_at, updated_at) merged.sort(key=sort_key, reverse=True) return merged def _file_looks_like_html(path): try: with open(path, 'rb') as handle: prefix = handle.read(256) except OSError: return False if not prefix: return True snippet = prefix.lstrip().lower() return snippet.startswith(b" 1: print(f" Waiting {delay} seconds due to rate limiting") time.sleep(delay) continue raise def fetch_all_projects(api): projects_by_id = {} try: projects_iter = execute_with_rate_limit( api.get_projects, request_desc=f"GET {PROJECTS_URL}" ) for batch in projects_iter: for project in batch: projects_by_id[str(getattr(project, "id", ""))] = project except Exception as error: # pylint: disable=broad-except print(f"Error fetching projects: {error}") return list(projects_by_id.values()) def fetch_active_tasks_by_project(api): tasks_by_project = defaultdict(list) try: tasks_iter = execute_with_rate_limit( api.get_tasks, request_desc=f"GET {TASKS_URL}" ) for batch in tasks_iter: for task in batch: tasks_by_project[str(getattr(task, "project_id", ""))].append(task) except Exception as error: # pylint: disable=broad-except print(f"Error fetching active tasks: {error}") print(f"Fetched active tasks for {len(tasks_by_project)} projects") return tasks_by_project def fetch_completed_tasks_by_project(api, since, until): tasks_by_project = defaultdict(list) try: query = f"?since={since.isoformat()}&until={until.isoformat()}" completed_iter = execute_with_rate_limit( api.get_completed_tasks_by_completion_date, request_desc=f"GET {COMPLETED_TASKS_URL}{query}", since=since, until=until, ) for batch in completed_iter: for task in batch: tasks_by_project[str(getattr(task, "project_id", ""))].append(task) except Exception as error: # pylint: disable=broad-except print(f"Error fetching completed tasks between {since} and {until}: {error}") print(f"Fetched completed tasks for {len(tasks_by_project)} projects") return tasks_by_project def fetch_comments_by_task(api, project_ids, task_ids): comments_by_task = defaultdict(list) total_comments = 0 last_comment_call = 0.0 def throttled_get_comments(**kwargs): nonlocal last_comment_call elapsed = time.time() - last_comment_call if elapsed < COMMENT_REQUEST_MIN_INTERVAL: time.sleep(COMMENT_REQUEST_MIN_INTERVAL - elapsed) params = [] for key, value in kwargs.items(): if value is None: continue params.append(f"{key}={quote_plus(str(value))}") query = "&".join(params) desc = f"GET {COMMENTS_URL}{('?' + query) if query else ''}" result = execute_with_rate_limit( api.get_comments, max_attempts=COMMENT_MAX_ATTEMPTS, request_desc=desc, **kwargs, ) last_comment_call = time.time() return result def handle_comment_error(scope, identifier, error): status_code = getattr(error, "status_code", None) response = getattr(error, "response", None) if status_code is None and response is not None: status_code = getattr(response, "status_code", None) if status_code == 404: print(f" Comments not found for {scope} {identifier} (404). Skipping.") return False if status_code == 429: delay = _get_retry_delay(response, COMMENT_MAX_ATTEMPTS) print( f" Rate limit while fetching comments for {scope} {identifier} after retries; waiting {delay} seconds before continuing." ) if delay > 1: print(f" Waiting {delay} seconds due to rate limiting") time.sleep(delay) return True print(f" Error fetching comments for {scope} {identifier}: {error}") return False for project_id in project_ids: while True: try: comments_iter = throttled_get_comments(project_id=project_id) for batch in comments_iter: for comment in batch: task_id = str(getattr(comment, "task_id", "")) if task_id: comments_by_task[task_id].append(comment) total_comments += 1 break except Exception as error: # pylint: disable=broad-except if not handle_comment_error("project", project_id, error): break missing_task_ids = [task_id for task_id in task_ids if task_id not in comments_by_task] for task_id in missing_task_ids: while True: try: comments_iter = throttled_get_comments(task_id=task_id) for batch in comments_iter: for comment in batch: key = str(getattr(comment, "task_id", "")) if key: comments_by_task[key].append(comment) total_comments += 1 break except Exception as error: # pylint: disable=broad-except if not handle_comment_error("task", task_id, error): break print( f"Fetched {total_comments} comments mapped to {len(comments_by_task)} tasks" ) return comments_by_task def process_task(task, comments_lookup): task_dict = task.__dict__.copy() task_id = getattr(task, "id", None) or getattr(task, "task_id", None) if task_id is not None: task_dict.setdefault("id", task_id) # Attachments (if any) attachments = [] if hasattr(task, 'attachments') and task.attachments: for att in task.attachments: att_dict = att.__dict__.copy() if 'file_url' in att_dict and att_dict['file_url']: filename = att_dict.get('file_name') or os.path.basename(att_dict['file_url']) local_path = download_attachment(att_dict['file_url'], filename) if local_path: att_dict['local_file'] = os.path.relpath(local_path, OUTPUT_DIR) attachments.append(att_dict) if attachments: task_dict['attachments'] = attachments # Comments comment_key = str(task_id) if task_id is not None else None if comment_key and comment_key in comments_lookup: serialized_comments = [] for comment in comments_lookup[comment_key]: comment_dict = comment.__dict__.copy() attachment = getattr(comment, "attachment", None) if attachment: attachment_dict = attachment.__dict__.copy() file_url = attachment_dict.get("file_url") if file_url: filename = attachment_dict.get("file_name") or os.path.basename(file_url) local_path = download_attachment(file_url, filename) if local_path: attachment_dict['local_file'] = os.path.relpath(local_path, OUTPUT_DIR) comment_dict['attachment'] = attachment_dict serialized_comments.append(comment_dict) task_dict['comments'] = serialized_comments return task_dict def build_task_hierarchy(task_dicts): task_lookup = {} order_lookup = {} for index, task in enumerate(task_dicts): task_id = task.get('id') if task_id is None: continue task_lookup[str(task_id)] = task order_lookup[str(task_id)] = index task.setdefault('subtasks', []) roots = [] for task in task_dicts: task_id = task.get('id') if task_id is None: roots.append(task) continue parent_id = task.get('parent_id') if parent_id: parent = task_lookup.get(str(parent_id)) if parent: parent.setdefault('subtasks', []) parent['subtasks'].append(task) continue roots.append(task) def sort_children(children): children.sort(key=lambda item: order_lookup.get(str(item.get('id')), 0)) for child in children: child_children = child.get('subtasks') or [] if child_children: sort_children(child_children) sort_children(roots) # Remove empty subtasks lists for cleanliness def prune(task): subtasks = task.get('subtasks') if subtasks: for sub in subtasks: prune(sub) else: task.pop('subtasks', None) for root in roots: prune(root) return roots def main(): if len(sys.argv) != 2 or sys.argv[1] != "export": usage() return ensure_attachments_dir() token = get_api_key() global TODOIST_API_TOKEN # pylint: disable=global-statement TODOIST_API_TOKEN = token api = TodoistAPI(token) projects = fetch_all_projects(api) since = (datetime.now() - timedelta(days=90)).replace(hour=0, minute=0, second=0, microsecond=0) until = datetime.now() active_tasks_by_project = fetch_active_tasks_by_project(api) completed_tasks_by_project = fetch_completed_tasks_by_project(api, since=since, until=until) completed_history = load_completed_history() history_by_key = {} for task_list in completed_history.values(): for stored_task in task_list: key = make_completed_task_key_from_dict(stored_task) if key: history_by_key[key] = stored_task active_comment_project_ids = sorted( pid for pid, tasks in active_tasks_by_project.items() if pid and tasks ) completed_task_ids_for_comments: set[str] = set() skipped_completed_history = {} for task_list in completed_tasks_by_project.values(): for task in task_list: key = make_completed_task_key_from_api(task) if key is None: continue history_entry = history_by_key.get(key) if history_entry: skipped_completed_history[key] = history_entry else: completed_task_ids_for_comments.add(key[0]) comments_by_task = fetch_comments_by_task( api, active_comment_project_ids, sorted(completed_task_ids_for_comments), ) updated_history = {} data = [] for project in projects: project_dict = project.__dict__.copy() project_id = str(getattr(project, "id", "")) active_tasks = active_tasks_by_project.get(project_id, []) completed_tasks = completed_tasks_by_project.get(project_id, []) processed_active = [process_task(t, comments_by_task) for t in active_tasks] processed_completed = [process_task(t, comments_by_task) for t in completed_tasks] for task in processed_completed: key = make_completed_task_key_from_dict(task) history_entry = skipped_completed_history.get(key) if key else None if history_entry: if (not task.get('comments')) and history_entry.get('comments'): task['comments'] = copy.deepcopy(history_entry['comments']) if (not task.get('attachments')) and history_entry.get('attachments'): task['attachments'] = copy.deepcopy(history_entry['attachments']) # Build hierarchy for active tasks project_dict['tasks'] = build_task_hierarchy(processed_active) # Map task IDs to names for parent lookups name_lookup = {} for task in active_tasks + completed_tasks: task_id = getattr(task, "id", None) if task_id: name_lookup[str(task_id)] = getattr(task, "content", "") for task in processed_completed: parent_id = task.get('parent_id') if parent_id: parent_name = name_lookup.get(str(parent_id)) if parent_name: task['parent_task'] = { "id": str(parent_id), "content": parent_name, } historical = completed_history.get(project_id, []) merged_completed = merge_completed_lists(historical, processed_completed) project_dict['completed_tasks'] = merged_completed updated_history[project_id] = merged_completed data.append(project_dict) for project_id, tasks in completed_history.items(): if project_id not in updated_history: updated_history[project_id] = tasks save_completed_history(updated_history) # Write JSON today = datetime.now().strftime("%Y-%m-%d") json_filename = f"Todoist-Actual-Backup-{today}.json" json_output_path = os.path.join(OUTPUT_DIR, json_filename) with open(json_output_path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2, default=json_serial) print(f"Exported data to {json_output_path}") # Write HTML env = Environment( loader=FileSystemLoader(os.path.dirname(__file__)), autoescape=select_autoescape(['html', 'xml']) ) # Add markdown filter try: import markdown env.filters['markdown'] = lambda text: markdown.markdown(text or "") except ImportError: env.filters['markdown'] = lambda text: text or "" template = env.get_template("todoist_backup_template.html") html_filename = f"Todoist-Actual-Backup-{today}.html" html_output_path = os.path.join(OUTPUT_DIR, html_filename) with open(html_output_path, "w", encoding="utf-8") as f: f.write(template.render(projects=data, date=today)) print(f"Generated HTML backup at {html_output_path}") if __name__ == "__main__": main()