Nest subtasks, Fix attachment handling

This commit is contained in:
2025-10-18 11:53:42 -04:00
parent 43ad7ff17e
commit 406f8cef0b
2 changed files with 328 additions and 95 deletions

View File

@ -9,6 +9,7 @@ from todoist_api_python.api import TodoistAPI
from jinja2 import Environment, FileSystemLoader, select_autoescape
ATTACHMENTS_DIR = "attachments"
TODOIST_API_TOKEN: str | None = None
def usage():
@ -38,19 +39,66 @@ def ensure_attachments_dir():
os.makedirs(ATTACHMENTS_DIR)
def _file_looks_like_html(path):
try:
with open(path, 'rb') as handle:
prefix = handle.read(256)
except OSError:
return False
if not prefix:
return True
snippet = prefix.lstrip().lower()
return snippet.startswith(b"<!doctype") or snippet.startswith(b"<html")
def download_attachment(url, filename):
local_path = os.path.join(ATTACHMENTS_DIR, filename)
if os.path.exists(local_path):
return local_path
if _file_looks_like_html(local_path) and not filename.lower().endswith(('.htm', '.html')):
try:
os.remove(local_path)
except OSError:
pass
else:
return local_path
print(f"Downloading attachment {url}")
r = requests.get(url, stream=True)
if r.status_code == 200:
with open(local_path, 'wb') as f:
for chunk in r.iter_content(1024):
f.write(chunk)
return local_path
else:
headers = {}
if TODOIST_API_TOKEN:
headers["Authorization"] = f"Bearer {TODOIST_API_TOKEN}"
try:
response = requests.get(url, stream=True, headers=headers, timeout=30)
except requests.RequestException as exc: # pylint: disable=broad-except
print(f"Failed to download attachment {url}: {exc}")
return None
if response.status_code != 200:
print(f"Failed to download attachment {url}: HTTP {response.status_code}")
return None
content_type = (response.headers.get("Content-Type") or "").lower()
first_chunk = b""
try:
with open(local_path, 'wb') as handle:
for chunk in response.iter_content(chunk_size=8192):
if not chunk:
continue
if not first_chunk:
first_chunk = chunk
handle.write(chunk)
except OSError as exc: # pylint: disable=broad-except
print(f"Failed to save attachment {filename}: {exc}")
return None
looks_like_html = (
"text/html" in content_type
or (first_chunk and _file_looks_like_html(local_path))
)
if looks_like_html and not filename.lower().endswith(('.htm', '.html')):
try:
os.remove(local_path)
except OSError:
pass
print(f"Skipped attachment {url}: received HTML response instead of file")
return None
print(f"Downloaded attachment {url}")
return local_path
def _get_retry_delay(response, attempt, base_delay=5, max_delay=120):
@ -134,8 +182,9 @@ def fetch_completed_tasks_by_project(api, since, until):
return tasks_by_project
def fetch_comments_by_task(api, project_ids):
def fetch_comments_by_task(api, project_ids, task_ids):
comments_by_task = defaultdict(list)
total_comments = 0
for project_id in project_ids:
try:
comments_iter = execute_with_rate_limit(api.get_comments, project_id=project_id)
@ -144,9 +193,24 @@ def fetch_comments_by_task(api, project_ids):
task_id = str(getattr(comment, "task_id", ""))
if task_id:
comments_by_task[task_id].append(comment)
total_comments += 1
except Exception as error: # pylint: disable=broad-except
print(f"Error fetching comments for project {project_id}: {error}")
print(f"Fetched comments for {len(comments_by_task)} tasks")
missing_task_ids = [task_id for task_id in task_ids if task_id not in comments_by_task]
for task_id in missing_task_ids:
try:
comments_iter = execute_with_rate_limit(api.get_comments, task_id=task_id)
for batch in comments_iter:
for comment in batch:
key = str(getattr(comment, "task_id", ""))
if key:
comments_by_task[key].append(comment)
total_comments += 1
except Exception as error: # pylint: disable=broad-except
print(f"Error fetching comments for task {task_id}: {error}")
print(
f"Fetched {total_comments} comments mapped to {len(comments_by_task)} tasks"
)
return comments_by_task
@ -171,33 +235,138 @@ def process_task(task, comments_lookup):
# Comments
comment_key = str(task_id) if task_id is not None else None
if comment_key and comment_key in comments_lookup:
task_dict['comments'] = [c.__dict__ for c in comments_lookup[comment_key]]
serialized_comments = []
for comment in comments_lookup[comment_key]:
comment_dict = comment.__dict__.copy()
attachment = getattr(comment, "attachment", None)
if attachment:
attachment_dict = attachment.__dict__.copy()
file_url = attachment_dict.get("file_url")
if file_url:
filename = attachment_dict.get("file_name") or os.path.basename(file_url)
local_path = download_attachment(file_url, filename)
if local_path:
attachment_dict['local_file'] = os.path.relpath(local_path)
comment_dict['attachment'] = attachment_dict
serialized_comments.append(comment_dict)
task_dict['comments'] = serialized_comments
return task_dict
def build_task_hierarchy(task_dicts):
task_lookup = {}
order_lookup = {}
for index, task in enumerate(task_dicts):
task_id = task.get('id')
if task_id is None:
continue
task_lookup[str(task_id)] = task
order_lookup[str(task_id)] = index
task.setdefault('subtasks', [])
roots = []
for task in task_dicts:
task_id = task.get('id')
if task_id is None:
roots.append(task)
continue
parent_id = task.get('parent_id')
if parent_id:
parent = task_lookup.get(str(parent_id))
if parent:
parent.setdefault('subtasks', [])
parent['subtasks'].append(task)
continue
roots.append(task)
def sort_children(children):
children.sort(key=lambda item: order_lookup.get(str(item.get('id')), 0))
for child in children:
child_children = child.get('subtasks') or []
if child_children:
sort_children(child_children)
sort_children(roots)
# Remove empty subtasks lists for cleanliness
def prune(task):
subtasks = task.get('subtasks')
if subtasks:
for sub in subtasks:
prune(sub)
else:
task.pop('subtasks', None)
for root in roots:
prune(root)
return roots
def main():
if len(sys.argv) != 2 or sys.argv[1] != "export":
usage()
return
ensure_attachments_dir()
api = TodoistAPI(get_api_key())
token = get_api_key()
global TODOIST_API_TOKEN # pylint: disable=global-statement
TODOIST_API_TOKEN = token
api = TodoistAPI(token)
projects = fetch_all_projects(api)
since = (datetime.now() - timedelta(days=90)).replace(hour=0, minute=0, second=0, microsecond=0)
until = datetime.now()
active_tasks_by_project = fetch_active_tasks_by_project(api)
completed_tasks_by_project = fetch_completed_tasks_by_project(api, since=since, until=until)
comment_project_ids = sorted(
pid for pid in (set(active_tasks_by_project.keys()) | set(completed_tasks_by_project.keys())) if pid
pid
for pid in (set(active_tasks_by_project.keys()) | set(completed_tasks_by_project.keys()))
if pid
)
task_ids_for_comments: set[str] = set()
for task_list in active_tasks_by_project.values():
for task in task_list:
task_id = getattr(task, "id", None)
if task_id:
task_ids_for_comments.add(str(task_id))
for task_list in completed_tasks_by_project.values():
for task in task_list:
task_id = getattr(task, "id", None)
if task_id:
task_ids_for_comments.add(str(task_id))
comments_by_task = fetch_comments_by_task(
api, comment_project_ids, sorted(task_ids_for_comments)
)
comments_by_task = fetch_comments_by_task(api, comment_project_ids)
data = []
for project in projects:
project_dict = project.__dict__.copy()
project_id = str(getattr(project, "id", ""))
active_tasks = active_tasks_by_project.get(project_id, [])
completed_tasks = completed_tasks_by_project.get(project_id, [])
project_dict['tasks'] = [process_task(t, comments_by_task) for t in active_tasks]
project_dict['completed_tasks'] = [process_task(t, comments_by_task) for t in completed_tasks]
processed_active = [process_task(t, comments_by_task) for t in active_tasks]
processed_completed = [process_task(t, comments_by_task) for t in completed_tasks]
# Build hierarchy for active tasks
project_dict['tasks'] = build_task_hierarchy(processed_active)
# Map task IDs to names for parent lookups
name_lookup = {}
for task in active_tasks + completed_tasks:
task_id = getattr(task, "id", None)
if task_id:
name_lookup[str(task_id)] = getattr(task, "content", "")
for task in processed_completed:
parent_id = task.get('parent_id')
if parent_id:
parent_name = name_lookup.get(str(parent_id))
if parent_name:
task['parent_task'] = {
"id": str(parent_id),
"content": parent_name,
}
project_dict['completed_tasks'] = processed_completed
data.append(project_dict)
# Write JSON
today = datetime.now().strftime("%Y-%m-%d")