Implement storage layer for MVP-1 Personal Tracker CLI

Add storage layer with FileStorage, MarkdownReader, and MarkdownWriter classes.
Add data models (Project, Session, Note, Change).
This commit is contained in:
2026-03-23 08:54:00 -03:00
parent 525996f60c
commit 4547c492da
16 changed files with 1013 additions and 0 deletions

View File

@@ -0,0 +1,45 @@
"""Services layer for business logic."""
from .session_service import (
get_active_session,
set_active_session,
clear_active_session,
get_active_session_path,
validate_no_other_active_session,
)
from .project_service import (
create_project,
get_project,
update_project,
list_projects,
get_projects_root,
ensure_project_structure,
)
from .note_service import (
add_note,
consolidate_notes,
)
from .heuristics_service import (
suggest_next_steps,
)
from .summary_service import (
generate_summary,
)
__all__ = [
"get_active_session",
"set_active_session",
"clear_active_session",
"get_active_session_path",
"validate_no_other_active_session",
"create_project",
"get_project",
"update_project",
"list_projects",
"get_projects_root",
"ensure_project_structure",
"add_note",
"consolidate_notes",
"suggest_next_steps",
"generate_summary",
]

View File

@@ -0,0 +1,47 @@
"""Heuristics service for suggestions based on rules."""
from datetime import datetime
from ..models import Session, Project
def suggest_next_steps(session: Session, project: Project) -> list[str]:
"""
Generate suggestions based on session state and project context.
Rules:
- si hay blockers abiertos, sugerir "Destrabar: [bloqueos]"
- si hay changes sin references, sugerir "Validar cambios recientes"
- si work_done está vacío y session > 30 min, sugerir "Revisar progreso del objetivo"
- si no hay next_steps definidos, sugerir "Definir próximos pasos"
"""
suggestions = []
# Rule: blockers open
if session.blockers:
for blocker in session.blockers:
suggestions.append(f"Destrabar: {blocker}")
# Rule: changes without references
changes_without_refs = []
for change in session.changes:
# Simple heuristic: if change doesn't reference anything specific
if change and not any(ref in change.lower() for ref in ["#", "commit", "pr", "issue"]):
changes_without_refs.append(change)
if changes_without_refs:
suggestions.append("Validar cambios recientes")
# Rule: work_done empty and session > 30 minutes
if not session.work_done:
duration = session.duration_minutes
if duration == 0 and session.ended_at and session.started_at:
duration = int((session.ended_at - session.started_at).total_seconds() / 60)
if duration > 30:
suggestions.append("Revisar progreso del objetivo")
# Rule: no next_steps defined
if not session.next_steps:
suggestions.append("Definir próximos pasos")
return suggestions

View File

@@ -0,0 +1,65 @@
"""Note service for note management."""
from datetime import datetime
from ..models import Session, NoteType, Note
def add_note(session: Session, note_type: str, text: str) -> dict:
"""
Add a note to the session and return the note dict.
Valid note types: work, change, blocker, decision, idea, reference
"""
try:
note_type_enum = NoteType(note_type)
except ValueError:
raise ValueError(f"Invalid note type: {note_type}. Valid types are: {[t.value for t in NoteType]}")
note = Note(type=note_type_enum, text=text)
session.raw_notes.append(note.model_dump(mode="json"))
return {
"type": note.type.value,
"text": note.text,
"created_at": note.created_at.isoformat(),
}
def consolidate_notes(raw_notes: list[dict]) -> dict:
"""
Consolidate raw notes into categorized sections.
Returns dict with keys: work_done, changes, decisions, blockers, references
"""
result = {
"work_done": [],
"changes": [],
"decisions": [],
"blockers": [],
"references": [],
}
for note in raw_notes:
if isinstance(note, dict):
note_type = note.get("type", "")
text = note.get("text", "")
else:
# Handle string format like "[type] text"
parts = note.split("]", 1)
if len(parts) == 2:
note_type = parts[0][1:]
text = parts[1].strip()
else:
continue
if note_type == NoteType.WORK.value:
result["work_done"].append(text)
elif note_type == NoteType.CHANGE.value:
result["changes"].append(text)
elif note_type == NoteType.DECISION.value:
result["decisions"].append(text)
elif note_type == NoteType.BLOCKER.value:
result["blockers"].append(text)
elif note_type == NoteType.REFERENCE.value:
result["references"].append(text)
return result

View File

@@ -0,0 +1,118 @@
"""Project service for project management."""
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional
from ..models import Project
_PROJECTS_ROOT = Path("projects")
def get_projects_root() -> Path:
"""Return the root directory for all projects."""
return _PROJECTS_ROOT
def _get_project_meta_path(slug: str) -> Path:
"""Return the path to the project's meta/project.yaml file."""
return _PROJECTS_ROOT / slug / "meta" / "project.yaml"
def _get_project_readme_path(slug: str) -> Path:
"""Return the path to the project's README.md file."""
return _PROJECTS_ROOT / slug / "README.md"
def create_project(
name: str,
slug: str,
description: str = "",
type: str = "misc",
tags: Optional[list[str]] = None,
repo_path: Optional[Path] = None,
) -> Project:
"""
Create a new project and return the Project instance.
Note: This does not write any files - that is handled by storage.
"""
if tags is None:
tags = []
project = Project(
id=str(uuid.uuid4()),
name=name,
slug=slug,
description=description,
type=type,
status="inbox",
tags=tags,
root_path=_PROJECTS_ROOT / slug,
repo_path=repo_path,
created_at=datetime.now(),
updated_at=datetime.now(),
)
return project
def get_project(slug: str) -> Optional[Project]:
"""
Get a project by slug.
Note: This reads from file system - placeholder for storage integration.
"""
meta_path = _get_project_meta_path(slug)
if not meta_path.exists():
return None
# TODO: Load from storage (YAML)
return None
def update_project(slug: str, **kwargs) -> Optional[Project]:
"""
Update a project's attributes.
Note: This does not persist - that is handled by storage.
"""
project = get_project(slug)
if project is None:
return None
for key, value in kwargs.items():
if hasattr(project, key):
setattr(project, key, value)
project.updated_at = datetime.now()
return project
def list_projects() -> list[Project]:
"""
List all projects.
Note: This reads from file system - placeholder for storage integration.
"""
projects_root = get_projects_root()
if not projects_root.exists():
return []
projects = []
for item in projects_root.iterdir():
if item.is_dir() and not item.name.startswith("."):
project = get_project(item.name)
if project is not None:
projects.append(project)
return projects
def ensure_project_structure(slug: str) -> None:
"""
Ensure the project directory structure exists.
Creates: sessions/, docs/, assets/, meta/
Note: This creates directories only - actual file writing is storage's job.
"""
project_root = _PROJECTS_ROOT / slug
directories = ["sessions", "docs", "assets", "meta"]
for directory in directories:
(project_root / directory).mkdir(parents=True, exist_ok=True)

View File

@@ -0,0 +1,67 @@
"""Session service for active session management."""
import json
from datetime import datetime
from pathlib import Path
from typing import Optional
from ..models import Session
_ACTIVE_SESSION_FILE = ".active_session.json"
def get_active_session_path() -> Path:
"""Return the path to the active session file in projects/ directory."""
return Path("projects") / _ACTIVE_SESSION_FILE
def get_active_session() -> Optional[Session]:
"""Load and return the currently active session, or None if none exists."""
path = get_active_session_path()
if not path.exists():
return None
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
# Convert started_at string back to datetime
data["started_at"] = datetime.fromisoformat(data["started_at"])
if data.get("ended_at"):
data["ended_at"] = datetime.fromisoformat(data["ended_at"])
return Session(**data)
def set_active_session(session: Session) -> None:
"""Save the given session as the active session."""
path = get_active_session_path()
path.parent.mkdir(parents=True, exist_ok=True)
data = session.model_dump(mode="json")
# Serialize datetime objects to ISO format
data["started_at"] = session.started_at.isoformat()
if session.ended_at:
data["ended_at"] = session.ended_at.isoformat()
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def clear_active_session() -> None:
"""Remove the active session file."""
path = get_active_session_path()
if path.exists():
path.unlink()
def validate_no_other_active_session(project_slug: str) -> bool:
"""
Check if there is an active session for a different project.
Returns True if no conflict exists (i.e., either no active session
or the active session belongs to the same project).
"""
active = get_active_session()
if active is None:
return True
return active.project_slug == project_slug

View File

@@ -0,0 +1,42 @@
"""Summary service for heuristic summary generation."""
from ..models import Session
from .note_service import consolidate_notes
def generate_summary(session: Session) -> str:
"""
Generate a heuristic summary from the session.
Uses consolidate_notes to extract work_done, decisions, blockers.
"""
# Consolidate raw notes into categorized sections
consolidated = consolidate_notes(session.raw_notes)
lines = []
# Work done section
if consolidated["work_done"]:
lines.append("Trabajo realizado:")
for item in consolidated["work_done"]:
lines.append(f" - {item}")
lines.append("")
# Decisions section
if consolidated["decisions"]:
lines.append("Decisiones:")
for item in consolidated["decisions"]:
lines.append(f" - {item}")
lines.append("")
# Blockers section
if consolidated["blockers"]:
lines.append("Bloqueos:")
for item in consolidated["blockers"]:
lines.append(f" - {item}")
lines.append("")
# If no content, provide a minimal summary
if not lines:
return f"Session de {session.duration_minutes} minutos sin progreso registrado."
return "\n".join(lines)