- Python 3.11+
- Docker & Docker Compose
- Poetry for dependency management
- Git
- Fork and Clone
git clone https://github.com/yourusername/contentful.git
cd contentful- Install Dependencies
# Install Poetry
curl -sSL https://install.python-poetry.org | python3 -
# Install Python dependencies
poetry install
# Install pre-commit hooks
poetry run pre-commit install- Set Up Environment
cp .env.example .env
# Add your API keys to .env- Start Services
# Start infrastructure services
docker-compose up -d mongodb redis
# Start development servers
poetry run python -m apps.orchestrator.orchestrator.server
# In another terminal:
poetry run python -m apps.renderer.renderer.servercontentful/
├── apps/ # Microservices
│ ├── orchestrator/ # Job orchestration service
│ │ ├── orchestrator/
│ │ │ ├── server.py # FastAPI application
│ │ │ ├── models/ # Data models
│ │ │ ├── pipeline/ # Pipeline stages
│ │ │ └── database/ # Database layer
│ │ └── tests/
│ └── renderer/ # Video rendering service
│ ├── renderer/
│ │ ├── server.py # FastAPI application
│ │ ├── composer.py # Video composition
│ │ └── templates/ # Video templates
│ └── tests/
├── packages/ # Shared packages
│ ├── providers/ # External service providers
│ │ ├── llm/ # Language models
│ │ ├── tts/ # Text-to-speech
│ │ ├── media/ # Media sources
│ │ └── base.py # Base interfaces
│ └── timeline/ # Timeline schema
├── cli/ # Command-line interface
├── tests/ # Test suite
│ ├── unit/ # Unit tests
│ ├── integration/ # Integration tests
│ └── e2e/ # End-to-end tests
└── docs/ # Documentation
git checkout -b feature/your-feature-nameFollow the coding standards and ensure all tests pass.
Every new feature must include tests:
# tests/unit/test_your_feature.py
import pytest
def test_your_feature():
"""Test description"""
# Arrange
# Act
# Assert# Run all tests
poetry run pytest
# Run specific test file
poetry run pytest tests/unit/test_your_feature.py
# Run with coverage
poetry run pytest --cov=apps --cov=packages# Format with Black
poetry run black apps/ packages/ tests/
# Sort imports
poetry run isort apps/ packages/ tests/
# Lint
poetry run flake8 apps/ packages/ tests/
# Type check
poetry run mypy apps/ packages/git add .
git commit -m "feat: add your feature description"Follow conventional commits:
feat:New featurefix:Bug fixdocs:Documentationstyle:Formattingrefactor:Code restructuringtest:Testschore:Maintenance
git push origin feature/your-feature-nameFollow PEP 8 with these additions:
# Good: Type hints for all functions
def process_job(job_id: str, timeout: int = 30) -> dict:
"""Process a job with timeout.
Args:
job_id: The job identifier
timeout: Timeout in seconds
Returns:
Job result dictionary
Raises:
TimeoutError: If processing exceeds timeout
"""
pass
# Good: Async/await for I/O operations
async def fetch_data(url: str) -> dict:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
# Good: Context managers for resources
with tempfile.NamedTemporaryFile() as tmp:
process_file(tmp.name)
# Good: Descriptive variable names
user_preferences = get_user_preferences() # Not: prefs = get_prefs()# Good: Specific exception handling
try:
result = external_api_call()
except ConnectionError as e:
logger.error(f"API connection failed: {e}")
raise ServiceUnavailableError("External service unavailable")
except JSONDecodeError as e:
logger.error(f"Invalid response format: {e}")
raise ValidationError("Invalid API response")
# Good: Custom exceptions
class PipelineError(Exception):
"""Base exception for pipeline errors"""
pass
class StageTimeoutError(PipelineError):
"""Raised when a pipeline stage times out"""
pass# Good: Descriptive test names
def test_job_creation_with_valid_parameters_succeeds():
pass
def test_job_creation_with_invalid_topic_raises_validation_error():
pass
# Good: Arrange-Act-Assert pattern
def test_pipeline_processes_job():
# Arrange
job = create_test_job()
pipeline = Pipeline()
# Act
result = pipeline.process(job)
# Assert
assert result.status == "completed"
assert result.output is not None
# Good: Fixtures for reusable test data
@pytest.fixture
def sample_job():
return Job(
id="test-123",
topic="Test Topic",
status="pending"
)
# Good: Mocking external dependencies
@patch('external_service.api_call')
def test_with_mock(mock_api):
mock_api.return_value = {"data": "test"}
result = function_under_test()
assert result == expected- Create Provider Class
# packages/providers/llm/claude_provider.py
from packages.providers.base import LLMProvider
class Claude(LLMProvider):
"""Claude LLM provider"""
def __init__(self, api_key: str):
self.api_key = api_key
self.name = "claude"
async def generate(self, prompt: str, **kwargs) -> str:
# Implementation
pass- Add Tests
# tests/unit/test_claude_provider.py
def test_claude_initialization():
provider = Claude(api_key="test-key")
assert provider.name == "claude"
@patch('anthropic.Client')
def test_claude_generate(mock_client):
provider = Claude(api_key="test-key")
result = provider.generate("Test prompt")
assert result is not None- Register Provider
# apps/orchestrator/orchestrator/pipeline/orchestrator.py
PROVIDERS = {
"llm": {
"openai": OpenAI,
"claude": Claude, # Add new provider
}
}- Define Template Structure
# apps/renderer/renderer/templates/tutorial.py
class TutorialTemplate:
"""Tutorial video template"""
def get_beat_structure(self, duration: int) -> list:
return [
{"role": "intro", "duration_percent": 10},
{"role": "overview", "duration_percent": 15},
{"role": "steps", "duration_percent": 60},
{"role": "summary", "duration_percent": 10},
{"role": "outro", "duration_percent": 5}
]
def get_transitions(self) -> list:
return ["wipe", "slide", "fade"]- Add Prompt Engineering
# apps/orchestrator/orchestrator/pipeline/scripting.py
def get_tutorial_prompt(self) -> str:
return """
Create a step-by-step tutorial video script.
Include clear instructions and visual cues.
Break down complex tasks into simple steps.
"""- Create Stage Class
# apps/orchestrator/orchestrator/pipeline/optimization.py
class OptimizationService:
"""Optimize generated content"""
async def optimize(self, content: dict) -> dict:
# Implementation
return optimized_content- Integrate into Pipeline
# apps/orchestrator/orchestrator/pipeline/orchestrator.py
async def run_pipeline(self, job: Job):
# Existing stages...
# Add new stage
optimization = OptimizationService()
optimized = await optimization.optimize(timeline)- Minimum 80% coverage for new code
- 100% coverage for critical paths
- All edge cases must be tested
tests/
├── unit/ # Fast, isolated tests
├── integration/ # Service integration tests
├── e2e/ # Full pipeline tests
├── fixtures/ # Test data
└── conftest.py # Shared fixtures
# Unit tests only
poetry run pytest tests/unit/ -v
# Integration tests
poetry run pytest tests/integration/ -v
# Specific test
poetry run pytest tests/unit/test_file.py::test_function
# With coverage
poetry run pytest --cov=apps --cov-report=html
# Parallel execution
poetry run pytest -n autodef complex_function(
param1: str,
param2: Optional[int] = None
) -> dict:
"""Brief description of function.
Longer description explaining the purpose,
algorithm, or important details.
Args:
param1: Description of param1
param2: Description of param2
Returns:
Description of return value
Raises:
ValueError: When param1 is invalid
Example:
>>> result = complex_function("test", 42)
>>> print(result["status"])
"success"
"""Update OpenAPI schemas:
@app.post(
"/endpoint",
response_model=ResponseModel,
summary="Brief summary",
description="Detailed description"
)
async def endpoint(
request: RequestModel = Body(..., example={
"field": "value"
})
):
"""Internal documentation"""
pass# Good: Concurrent execution
results = await asyncio.gather(
fetch_data(url1),
fetch_data(url2),
fetch_data(url3)
)
# Good: Async context manager
async with aiofiles.open(path) as f:
content = await f.read()
# Good: Streaming large files
async def stream_file(path: Path):
async with aiofiles.open(path, 'rb') as f:
chunk = await f.read(1024)
while chunk:
yield chunk
chunk = await f.read(1024)# Good: Generator for large datasets
def process_large_dataset(data):
for item in data:
yield process_item(item)
# Good: Cleanup resources
try:
resource = acquire_resource()
process(resource)
finally:
release_resource(resource)# Use debugger
import pdb; pdb.set_trace()
# Or with ipdb
import ipdb; ipdb.set_trace()
# VS Code launch.json
{
"name": "Debug Orchestrator",
"type": "python",
"request": "launch",
"module": "apps.orchestrator.orchestrator.server",
"env": {
"PYTHONPATH": "${workspaceFolder}"
}
}import logging
logger = logging.getLogger(__name__)
# Good: Structured logging
logger.info(
"Processing job",
extra={
"job_id": job.id,
"status": job.status,
"duration": elapsed_time
}
)
# Good: Log levels
logger.debug("Detailed debug info")
logger.info("General info")
logger.warning("Warning message")
logger.error("Error occurred", exc_info=True)Before submitting a PR, ensure:
- Code follows style guidelines
- All tests pass
- New tests added for new features
- Documentation updated
- Commit messages follow convention
- No sensitive data in code
- Performance impact considered
- Breaking changes documented
- GitHub Issues: Bug reports and feature requests
- Discussions: Questions and ideas
- Discord: Real-time chat (coming soon)
- Lead: @yourusername
- Core Team: @team-member1, @team-member2
By contributing, you agree that your contributions will be licensed under the MIT License.