# DSS Export/Import - Integration Guide for Implementation Teams ## Quick Reference | Need | Document | Location | |------|----------|----------| | **30-second overview** | QUICK_REFERENCE.md | Root directory | | **Complete feature guide** | DSS_EXPORT_IMPORT_GUIDE.md | Root directory | | **Architecture overview** | IMPLEMENTATION_SUMMARY.md | Root directory | | **Production hardening details** | PRODUCTION_READINESS.md | Root directory | | **Hardening summary** | PRODUCTION_HARDENING_SUMMARY.md | Root directory | | **API integration** | This file (INTEGRATION_GUIDE.md) | Root directory | | **Working code examples** | dss/export_import/examples.py | Package | | **Security utilities** | dss/export_import/security.py | Package | | **Service layer API** | dss/export_import/service.py | Package | --- ## For Your Implementation Team ### Phase 1: Understanding the System (30 minutes) ``` 1. Read: QUICK_REFERENCE.md (5 min) 2. Run: python -m dss.export_import.examples (5 min) 3. Read: PRODUCTION_HARDENING_SUMMARY.md (10 min) 4. Skim: PRODUCTION_READINESS.md (10 min) ``` **Result**: You'll understand what the system does, how to use it, and what production considerations exist. ### Phase 2: API Integration Planning (1 hour) ``` 1. Review: dss/export_import/service.py - Read DSSProjectService docstring and method signatures - Understand return types: ExportSummary, ImportSummary, MergeSummary 2. Review: dss/export_import/security.py - Understand what each security class does - Note configuration options 3. Plan: Where to integrate - API endpoints for export/import? - Background job handler (Celery/RQ)? - CLI commands? - Web UI buttons? ``` **Deliverable**: Integration plan document with: - [ ] List of API endpoints needed - [ ] Error handling strategy - [ ] Background job approach - [ ] Monitoring/alerting plan ### Phase 3: API Development (2-4 hours) Follow the code examples below for your framework. ### Phase 4: Testing (1-2 hours) ``` 1. Run examples with real project data 2. Test error scenarios 3. Load test with large projects 4. Test background job handling ``` ### Phase 5: Deployment (30 minutes) Follow production checklist in PRODUCTION_READINESS.md. --- ## API Integration Examples ### Flask ```python from flask import Flask, request, send_file from pathlib import Path from dss.export_import import DSSProjectService app = Flask(__name__) service = DSSProjectService(busy_timeout_ms=5000) @app.route('/api/projects//export', methods=['POST']) def export_project(project_id): """Export project to .dss archive""" try: # Get project from database project = db.session.query(Project).get(project_id) if not project: return {'error': 'Project not found'}, 404 # Export output_path = Path(f'/tmp/export_{project_id}.dss') result = service.export_project(project, output_path) if not result.success: return {'error': result.error}, 500 # Return file return send_file( result.archive_path, as_attachment=True, download_name=f'{project.name}.dss', mimetype='application/zip' ) except Exception as e: app.logger.error(f"Export failed: {e}") return {'error': 'Export failed'}, 500 @app.route('/api/projects/import', methods=['POST']) def import_project(): """Import project from .dss archive""" try: if 'file' not in request.files: return {'error': 'No file provided'}, 400 file = request.files['file'] if not file.filename.endswith('.dss'): return {'error': 'File must be .dss archive'}, 400 # Save uploaded file archive_path = Path(f'/tmp/{file.filename}') file.save(archive_path) # Import result = service.import_project(archive_path) if result.requires_background_job: # Schedule background import task_id = import_project_async.delay(str(archive_path)) return { 'status': 'queued', 'job_id': task_id, 'estimated_items': ( result.item_counts.get('tokens', 0) + result.item_counts.get('components', 0) ) }, 202 if not result.success: return {'error': result.error}, 500 # Store in database new_project = Project( name=result.project_name, # ... other fields ) db.session.add(new_project) db.session.commit() return { 'success': True, 'project_name': result.project_name, 'project_id': new_project.id, 'duration_seconds': result.duration_seconds }, 201 except Exception as e: app.logger.error(f"Import failed: {e}") return {'error': 'Import failed'}, 500 @app.route('/api/projects//merge', methods=['POST']) def merge_projects(project_id): """Merge imported project with local""" try: if 'file' not in request.files: return {'error': 'No file provided'}, 400 file = request.files['file'] archive_path = Path(f'/tmp/{file.filename}') file.save(archive_path) # Get local project local = db.session.query(Project).get(project_id) if not local: return {'error': 'Project not found'}, 404 # Analyze merge merge_analysis = service.analyze_merge(local, archive_path) # Perform merge strategy = request.json.get('strategy', 'keep_local') result = service.merge_project(local, archive_path, strategy) if not result.success: return {'error': result.error}, 500 # Update database db.session.commit() return { 'success': True, 'new_items': result.new_items_count, 'updated_items': result.updated_items_count, 'conflicts': result.conflicts_count, 'duration_seconds': result.duration_seconds } except Exception as e: app.logger.error(f"Merge failed: {e}") return {'error': 'Merge failed'}, 500 ``` ### FastAPI ```python from fastapi import FastAPI, UploadFile, File, HTTPException from fastapi.responses import FileResponse from pathlib import Path from dss.export_import import DSSProjectService app = FastAPI() service = DSSProjectService(busy_timeout_ms=5000) @app.post("/api/projects/{project_id}/export") async def export_project(project_id: int): """Export project to .dss archive""" try: project = db.get_project(project_id) if not project: raise HTTPException(status_code=404, detail="Project not found") output_path = Path(f"/tmp/export_{project_id}.dss") result = service.export_project(project, output_path) if not result.success: raise HTTPException(status_code=500, detail=result.error) return FileResponse( result.archive_path, media_type="application/zip", filename=f"{project.name}.dss" ) except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/projects/import") async def import_project(file: UploadFile = File(...)): """Import project from .dss archive""" try: if not file.filename.endswith('.dss'): raise HTTPException(status_code=400, detail="File must be .dss") # Save uploaded file archive_path = Path(f"/tmp/{file.filename}") with open(archive_path, "wb") as f: f.write(await file.read()) # Import result = service.import_project(archive_path) if result.requires_background_job: task_id = import_project_async.delay(str(archive_path)) return { "status": "queued", "job_id": task_id, "estimated_items": ( result.item_counts.get('tokens', 0) + result.item_counts.get('components', 0) ) } if not result.success: raise HTTPException(status_code=500, detail=result.error) return { "success": True, "project_name": result.project_name, "duration_seconds": result.duration_seconds } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) ``` ### Django ```python from django.http import JsonResponse, FileResponse from django.views.decorators.http import require_http_methods from pathlib import Path from dss.export_import import DSSProjectService service = DSSProjectService(busy_timeout_ms=5000) @require_http_methods(["POST"]) def export_project(request, project_id): """Export project to .dss archive""" try: project = Project.objects.get(pk=project_id) output_path = Path(f"/tmp/export_{project_id}.dss") result = service.export_project(project, output_path) if not result.success: return JsonResponse({'error': result.error}, status=500) response = FileResponse( open(result.archive_path, 'rb'), content_type='application/zip' ) response['Content-Disposition'] = f'attachment; filename="{project.name}.dss"' return response except Project.DoesNotExist: return JsonResponse({'error': 'Project not found'}, status=404) except Exception as e: return JsonResponse({'error': str(e)}, status=500) @require_http_methods(["POST"]) def import_project(request): """Import project from .dss archive""" try: if 'file' not in request.FILES: return JsonResponse({'error': 'No file provided'}, status=400) file = request.FILES['file'] if not file.name.endswith('.dss'): return JsonResponse({'error': 'File must be .dss'}, status=400) # Save uploaded file archive_path = Path(f"/tmp/{file.name}") with open(archive_path, 'wb') as f: for chunk in file.chunks(): f.write(chunk) # Import result = service.import_project(archive_path) if result.requires_background_job: task_id = import_project_async.delay(str(archive_path)) return JsonResponse({ 'status': 'queued', 'job_id': task_id }, status=202) if not result.success: return JsonResponse({'error': result.error}, status=500) return JsonResponse({ 'success': True, 'project_name': result.project_name }, status=201) except Exception as e: return JsonResponse({'error': str(e)}, status=500) ``` --- ## Background Job Integration ### Celery ```python # celery_tasks.py from celery import shared_task from dss.export_import import DSSProjectService from django.core.cache import cache @shared_task(bind=True, time_limit=600) def import_project_async(self, archive_path): """Background task for large imports""" try: service = DSSProjectService() result = service.import_project(archive_path) # Store result cache.set( f"import_job:{self.request.id}", { 'status': 'completed' if result.success else 'failed', 'success': result.success, 'project_name': result.project_name, 'error': result.error, 'duration_seconds': result.duration_seconds, }, timeout=3600 # 1 hour ) if result.success: # Trigger webhook notify_user_import_complete( self.request.id, result.project_name ) return { 'job_id': self.request.id, 'success': result.success } except Exception as e: cache.set( f"import_job:{self.request.id}", {'status': 'failed', 'error': str(e)}, timeout=3600 ) raise # In route @app.post("/api/projects/import/background") async def import_background(file: UploadFile): """Start background import""" archive_path = Path(f"/tmp/{file.filename}") with open(archive_path, "wb") as f: f.write(await file.read()) task = import_project_async.delay(str(archive_path)) return {"job_id": task.id} @app.get("/api/import/status/{job_id}") async def import_status(job_id: str): """Check background import status""" result = cache.get(f"import_job:{job_id}") if not result: return {"status": "processing"} return result ``` --- ## Error Handling ### Common Error Scenarios ```python from dss.export_import import DSSArchiveImporter def handle_import_error(archive_path): """Proper error handling with diagnostics""" # Analyze archive to get detailed errors importer = DSSArchiveImporter(archive_path) analysis = importer.analyze() if not analysis.is_valid: for error in analysis.errors: if error.stage == "archive": if "Zip Slip" in error.message: # Security alert! alert_security_team(error.message) return 403, "Malicious archive rejected" elif "unsafe paths" in error.message: return 400, "Invalid archive structure" else: return 400, f"Archive error: {error.message}" elif error.stage == "manifest": return 400, f"Invalid manifest: {error.message}" elif error.stage == "schema": if "newer than app" in error.message: return 400, "DSS version too old, please update" else: return 400, f"Schema error: {error.message}" elif error.stage == "structure": return 400, f"Invalid JSON structure: {error.message}" elif error.stage == "referential": return 400, f"Invalid references: {error.message}" # If we got here, archive is valid return 200, "Archive is valid" ``` --- ## Monitoring & Observability ### Metrics to Track ```python import time from prometheus_client import Counter, Histogram # Metrics export_duration = Histogram( 'dss_export_duration_seconds', 'Time to export project' ) import_duration = Histogram( 'dss_import_duration_seconds', 'Time to import project' ) validation_errors = Counter( 'dss_validation_errors_total', 'Validation errors', ['stage'] ) security_alerts = Counter( 'dss_security_alerts_total', 'Security alerts', ['type'] ) # Usage with export_duration.time(): result = service.export_project(project, path) if not result.success: if "Zip Slip" in result.error: security_alerts.labels(type='zip_slip').inc() for error in analysis.errors: validation_errors.labels(stage=error.stage).inc() ``` --- ## Testing Strategy ### Unit Tests ```python import pytest from dss.export_import import DSSArchiveExporter, DSSArchiveImporter def test_round_trip(): """Test export → import = identical""" # Create test project project = create_test_project() # Export exporter = DSSArchiveExporter(project) archive_path = exporter.export_to_file(Path("/tmp/test.dss")) # Import importer = DSSArchiveImporter(archive_path) imported = importer.import_replace() # Verify assert imported.name == project.name assert len(imported.theme.tokens) == len(project.theme.tokens) def test_security_zip_slip(): """Test Zip Slip protection""" from dss.export_import.security import ZipSlipValidator # Malicious paths unsafe_paths = [ "../../etc/passwd", "../../../root/.ssh/id_rsa", "normal_file.json", ] is_safe, unsafe = ZipSlipValidator.validate_archive_members(unsafe_paths) assert not is_safe assert len(unsafe) == 2 # Two unsafe paths def test_memory_limits(): """Test memory limit enforcement""" from dss.export_import.security import MemoryLimitManager mgr = MemoryLimitManager(max_tokens=100) ok, error = mgr.check_token_count(101) assert not ok assert error is not None ``` ### Integration Tests ```python def test_import_with_large_archive(): """Test import doesn't OOM on large archive""" large_archive = create_large_archive(10000) # 10k tokens result = service.import_project(large_archive) assert result.success def test_background_job_scheduling(): """Test background job detection""" huge_archive = create_huge_archive(50000) # 50k tokens result = service.import_project(huge_archive) assert result.requires_background_job ``` --- ## Troubleshooting Guide ### Import Fails with "Archive validation failed" ```python # Debug: from dss.export_import import DSSArchiveImporter importer = DSSArchiveImporter(archive_path) analysis = importer.analyze() for error in analysis.errors: print(f"[{error.stage}] {error.message}") print(f"Details: {error.details}") ``` ### Memory limit exceeded on large archive ```python # Solution 1: Increase limits from dss.export_import.security import MemoryLimitManager memory_mgr = MemoryLimitManager( max_file_size=500_000_000, # 500MB max_tokens=50000 ) # Solution 2: Use background job result = service.import_project(archive, background=True) if result.requires_background_job: task_id = celery.send_task('import_project', args=[archive]) ``` ### Clock skew warnings during merge ```python # These are informational - system is working correctly # Warnings indicate clocks are >1 hour apart between systems # To silence: Sync system clocks # Or: Increase tolerance in TimestampConflictResolver from dss.export_import.security import TimestampConflictResolver from datetime import timedelta resolver = TimestampConflictResolver( clock_skew_tolerance=timedelta(hours=2) ) ``` --- ## Summary You now have everything needed to integrate DSS Export/Import: 1. ✅ Code examples for your framework 2. ✅ Background job integration 3. ✅ Error handling patterns 4. ✅ Monitoring setup 5. ✅ Testing strategy 6. ✅ Troubleshooting guide **Next Steps:** 1. Pick your framework (Flask/FastAPI/Django) 2. Copy the example code 3. Adapt to your database models 4. Add your authentication/authorization 5. Follow production checklist in PRODUCTION_READINESS.md **Questions?** Refer to the detailed documentation in the files listed at the top of this guide. --- *Integration Guide v1.0* *For DSS Export/Import v1.0.1*