diff --git a/export_todoist.py b/export_todoist.py index 1d39e7e..5938a85 100644 --- a/export_todoist.py +++ b/export_todoist.py @@ -1,6 +1,8 @@ import os import sys import json +import time +from collections import defaultdict import requests from datetime import datetime, timedelta from todoist_api_python.api import TodoistAPI @@ -51,64 +53,108 @@ def download_attachment(url, filename): return None -def fetch_all_projects(api): - ret = [] - projects_iter = api.get_projects() - for projects in projects_iter: - for project in projects: - name = getattr(project, 'name', None) - id = getattr(project, 'id', None) - print(f"Found project {name} with ID {id}") - ret.append(project) - return ret +def _get_retry_delay(response, attempt, base_delay=5, max_delay=120): + if response is not None: + headers = getattr(response, "headers", {}) or {} + retry_after = headers.get("Retry-After") or headers.get("retry-after") + if retry_after: + try: + return max(1, int(float(retry_after))) + except (TypeError, ValueError): + pass + reset_header = headers.get("X-RateLimit-Reset") or headers.get("x-rate-limit-reset") + if reset_header: + try: + reset_timestamp = float(reset_header) + return max(1, int(reset_timestamp - time.time())) + except (TypeError, ValueError): + pass + return min(max_delay, base_delay * (2 ** attempt)) -def fetch_all_completed_tasks(api, project_id): - # Fetch all completed tasks for a project using get_completed_tasks_by_completion_date - # The API only allows up to 3 months per call, so we fetch just once for the last 3 months - all_completed = [] - since = (datetime.now() - timedelta(days=90)).replace(hour=0, minute=0, second=0, microsecond=0) - until = datetime.now() - try: - completed_iter = api.get_completed_tasks_by_completion_date(since=since, until=until) - for completed_list in completed_iter: - for task in completed_list: - if hasattr(task, 'project_id') and str(task.project_id) == str(project_id): - all_completed.append(task) - except Exception as e: - print(f"Error fetching completed tasks for {since} to {until}: {e}") - print(f"Found {len(all_completed)} completed tasks for project {project_id}") - return all_completed -def fetch_all_tasks(api, project_id, completed=False): - if completed: - return fetch_all_completed_tasks(api, project_id) - else: - tasks = [] +def execute_with_rate_limit(func, *args, **kwargs): + attempts = 0 + max_attempts = 5 + while True: try: - tasks_iter = api.get_tasks(project_id=project_id) - for batch in tasks_iter: - for task in batch: - tasks.append(task) - except Exception as e: - print(f"Error fetching active tasks for project {project_id}: {e}") - print(f"Found {len(tasks)} completed tasks for project {project_id}") - return tasks + return func(*args, **kwargs) + except Exception as error: # pylint: disable=broad-except + status_code = getattr(error, "status_code", None) + response = getattr(error, "response", None) + if status_code is None and response is not None: + status_code = getattr(response, "status_code", None) + if status_code == 429 and attempts < max_attempts: + delay = _get_retry_delay(response, attempts) + attempts += 1 + print(f"Rate limit hit for {func.__name__}. Waiting {delay} seconds before retry {attempts}/{max_attempts}...") + time.sleep(delay) + continue + raise -def fetch_comments(api, task_id): - comments = [] +def fetch_all_projects(api): + projects_by_id = {} try: - comments_iter = api.get_comments(task_id=task_id) - for batch in comments_iter: - for comment in batch: - comments.append(comment) - except Exception: - return [] - return comments + projects_iter = execute_with_rate_limit(api.get_projects) + for batch in projects_iter: + for project in batch: + projects_by_id[str(getattr(project, "id", ""))] = project + except Exception as error: # pylint: disable=broad-except + print(f"Error fetching projects: {error}") + return list(projects_by_id.values()) -def process_task(api, task, completed=False): +def fetch_active_tasks_by_project(api): + tasks_by_project = defaultdict(list) + try: + tasks_iter = execute_with_rate_limit(api.get_tasks) + for batch in tasks_iter: + for task in batch: + tasks_by_project[str(getattr(task, "project_id", ""))].append(task) + except Exception as error: # pylint: disable=broad-except + print(f"Error fetching active tasks: {error}") + print(f"Fetched active tasks for {len(tasks_by_project)} projects") + return tasks_by_project + + +def fetch_completed_tasks_by_project(api, since, until): + tasks_by_project = defaultdict(list) + try: + completed_iter = execute_with_rate_limit( + api.get_completed_tasks_by_completion_date, + since=since, + until=until, + ) + for batch in completed_iter: + for task in batch: + tasks_by_project[str(getattr(task, "project_id", ""))].append(task) + except Exception as error: # pylint: disable=broad-except + print(f"Error fetching completed tasks between {since} and {until}: {error}") + print(f"Fetched completed tasks for {len(tasks_by_project)} projects") + return tasks_by_project + + +def fetch_comments_by_task(api, project_ids): + comments_by_task = defaultdict(list) + for project_id in project_ids: + try: + comments_iter = execute_with_rate_limit(api.get_comments, project_id=project_id) + for batch in comments_iter: + for comment in batch: + task_id = str(getattr(comment, "task_id", "")) + if task_id: + comments_by_task[task_id].append(comment) + except Exception as error: # pylint: disable=broad-except + print(f"Error fetching comments for project {project_id}: {error}") + print(f"Fetched comments for {len(comments_by_task)} tasks") + return comments_by_task + + +def process_task(task, comments_lookup): task_dict = task.__dict__.copy() + task_id = getattr(task, "id", None) or getattr(task, "task_id", None) + if task_id is not None: + task_dict.setdefault("id", task_id) # Attachments (if any) attachments = [] if hasattr(task, 'attachments') and task.attachments: @@ -123,9 +169,9 @@ def process_task(api, task, completed=False): if attachments: task_dict['attachments'] = attachments # Comments - comments = fetch_comments(api, task.id) - if comments: - task_dict['comments'] = [c.__dict__ for c in comments] + comment_key = str(task_id) if task_id is not None else None + if comment_key and comment_key in comments_lookup: + task_dict['comments'] = [c.__dict__ for c in comments_lookup[comment_key]] return task_dict @@ -136,16 +182,22 @@ def main(): ensure_attachments_dir() api = TodoistAPI(get_api_key()) projects = fetch_all_projects(api) + since = (datetime.now() - timedelta(days=90)).replace(hour=0, minute=0, second=0, microsecond=0) + until = datetime.now() + active_tasks_by_project = fetch_active_tasks_by_project(api) + completed_tasks_by_project = fetch_completed_tasks_by_project(api, since=since, until=until) + comment_project_ids = sorted( + pid for pid in (set(active_tasks_by_project.keys()) | set(completed_tasks_by_project.keys())) if pid + ) + comments_by_task = fetch_comments_by_task(api, comment_project_ids) data = [] for project in projects: project_dict = project.__dict__.copy() - project_id = project.id - # Active tasks - active_tasks = fetch_all_tasks(api, project_id, completed=False) - # Completed tasks - completed_tasks = fetch_all_tasks(api, project_id, completed=True) - project_dict['tasks'] = [process_task(api, t, completed=False) for t in active_tasks] - project_dict['completed_tasks'] = [process_task(api, t, completed=True) for t in completed_tasks] + project_id = str(getattr(project, "id", "")) + active_tasks = active_tasks_by_project.get(project_id, []) + completed_tasks = completed_tasks_by_project.get(project_id, []) + project_dict['tasks'] = [process_task(t, comments_by_task) for t in active_tasks] + project_dict['completed_tasks'] = [process_task(t, comments_by_task) for t in completed_tasks] data.append(project_dict) # Write JSON today = datetime.now().strftime("%Y-%m-%d")