Skip to content

chicogong/media-pipeline

Repository files navigation

Media Pipeline

Go Version FFmpeg Docker License

A declarative, scalable media processing pipeline built on FFmpeg.

中文文档 | Examples | Deployment Guide

Overview

Media Pipeline is a production-ready engine for declarative video/audio workflows. Define what you want, not how to do it.

Key Features

  • Declarative API: JSON-based job specifications
  • Extensible Operators: Built-in trim, scale + custom operator support
  • Type-Safe: Strong validation and automatic type conversion
  • Docker Ready: One-command deployment with Docker Compose
  • REST API: Complete job management endpoints
  • Real-time Progress: Track processing with live updates

Architecture

System Architecture

graph TB
    Client[Client Application]
    API[REST API Server]
    Store[(In-Memory Store)]
    Redis[(Redis Cache)]
    Postgres[(PostgreSQL DB)]

    subgraph "Processing Pipeline"
        Prober[Media Prober<br/>FFprobe]
        Planner[Planner<br/>DAG Builder]
        Executor[Executor<br/>FFmpeg]
    end

    subgraph "Storage Layer"
        Uploads[/Uploads/]
        Outputs[/Outputs/]
        Temp[/Temp Files/]
    end

    Client -->|HTTP POST /jobs| API
    Client -->|HTTP GET /jobs/:id| API
    API --> Store
    API -.->|Future| Redis
    API -.->|Future| Postgres

    API --> Prober
    Prober --> Planner
    Planner --> Executor

    Executor --> Uploads
    Executor --> Temp
    Executor --> Outputs

    style API fill:#4CAF50,stroke:#333,stroke-width:2px,color:#fff
    style Prober fill:#2196F3,stroke:#333,stroke-width:2px,color:#fff
    style Planner fill:#2196F3,stroke:#333,stroke-width:2px,color:#fff
    style Executor fill:#2196F3,stroke:#333,stroke-width:2px,color:#fff
Loading

Job Processing Flow

sequenceDiagram
    participant C as Client
    participant A as API Server
    participant S as Store
    participant Pr as Prober
    participant Pl as Planner
    participant E as Executor

    C->>A: POST /api/v1/jobs<br/>{JobSpec}
    A->>S: CreateJob(job)
    S-->>A: job_id
    A-->>C: 201 Created<br/>{job_id, status: pending}

    Note over A: Background Processing
    A->>S: UpdateStatus(validating)
    A->>Pr: Probe(input_files)
    Pr-->>A: MediaInfo

    A->>S: UpdateStatus(planning)
    A->>Pl: Plan(JobSpec, MediaInfo)
    Pl-->>A: ProcessingPlan (DAG)

    A->>S: UpdateStatus(processing)
    A->>E: Execute(ProcessingPlan)

    loop Progress Updates
        E-->>A: Progress{frame, fps, bitrate}
        A->>S: UpdateProgress(percent)
    end

    E-->>A: Success
    A->>S: UpdateStatus(completed)

    C->>A: GET /api/v1/jobs/{id}
    A->>S: GetJob(id)
    S-->>A: JobStatus
    A-->>C: 200 OK<br/>{status, progress}
Loading

Job State Machine

stateDiagram-v2
    [*] --> Pending: Job Created

    Pending --> Validating: Start Processing
    Validating --> Planning: Validation OK
    Validating --> Failed: Validation Error

    Planning --> Processing: Plan Created
    Planning --> Failed: Planning Error

    Processing --> Completed: Success
    Processing --> Failed: Execution Error
    Processing --> Cancelled: User Cancelled

    Completed --> [*]
    Failed --> [*]
    Cancelled --> [*]

    note right of Validating
        Check JobSpec syntax,
        validate parameters
    end note

    note right of Planning
        Build DAG,
        estimate resources
    end note

    note right of Processing
        Execute FFmpeg,
        track progress
    end note
Loading

Quick Start

Docker Deployment (Recommended)

The fastest way to get started is using Docker:

# Clone the repository
git clone https://github.com/chicogong/media-pipeline.git
cd media-pipeline

# Start all services (API, Redis, PostgreSQL)
make docker-up

# Or manually:
docker-compose up -d

# Check service health
curl http://localhost:8081/health

# View logs
make docker-logs
# Or: docker-compose logs -f

See DEPLOYMENT.md for complete deployment guide including production setup, configuration, and troubleshooting.

Development Setup

# Install dependencies
make install

# Run tests
make test

# Build API server
make build

# Run locally
make run

Example: Trim and Scale Video

Processing DAG

graph LR
    Input[Input Video<br/>input.mp4]
    Trim[Trim Operator<br/>10s - 5min]
    Scale[Scale Operator<br/>1280x720]
    Output[Output Video<br/>output.mp4]

    Input --> Trim
    Trim --> Scale
    Scale --> Output

    style Input fill:#FFC107,stroke:#333,stroke-width:2px
    style Trim fill:#2196F3,stroke:#333,stroke-width:2px,color:#fff
    style Scale fill:#2196F3,stroke:#333,stroke-width:2px,color:#fff
    style Output fill:#4CAF50,stroke:#333,stroke-width:2px,color:#fff
Loading

Job Specification

{
  "inputs": [
    {
      "id": "video",
      "source": "s3://bucket/input.mp4"
    }
  ],
  "operations": [
    {
      "op": "trim",
      "input": "video",
      "output": "trimmed",
      "params": {
        "start": "00:00:10",
        "duration": "00:05:00"
      }
    },
    {
      "op": "scale",
      "input": "trimmed",
      "output": "scaled",
      "params": {
        "width": 1280,
        "height": 720,
        "algorithm": "lanczos"
      }
    }
  ],
  "outputs": [
    {
      "id": "scaled",
      "destination": "s3://bucket/output.mp4",
      "codec": {
        "video": {
          "codec": "libx264",
          "preset": "medium",
          "crf": 23
        },
        "audio": {
          "codec": "aac",
          "bitrate": "128k"
        }
      }
    }
  ]
}

Project Structure

media-pipeline/
├── cmd/api/              # API server entry point
├── pkg/
│   ├── schemas/          # JobSpec, ProcessingPlan, MediaInfo
│   ├── operators/        # Operator interface + built-in operators (trim, scale)
│   ├── planner/          # DAG builder and resource estimator
│   ├── executor/         # FFmpeg command builder and runner
│   ├── prober/           # FFprobe media metadata extraction
│   ├── storage/          # 🆕 Storage abstraction (local, HTTP/HTTPS, S3)
│   ├── compiler/
│   │   └── validator/    # 🆕 Input validation + SSRF protection
│   ├── auth/             # 🆕 JWT + API Key authentication
│   ├── store/            # In-memory job storage (thread-safe)
│   └── api/              # HTTP handlers and middleware
└── docs/plans/           # Design documents

Status

✅ MVP Complete + Security Enhancements - Production-ready with security hardening

Core Modules:

  • Schemas - JobSpec, ProcessingPlan, JobStatus with validation
  • Operators - trim, scale + extensible framework
  • Planner - DAG builder with resource estimation
  • Executor - FFmpeg command generation & execution
  • Prober - Media metadata extraction via FFprobe
  • Storage - Unified file abstraction (local, HTTP/HTTPS, S3) 🆕
  • Validator - Input validation + SSRF protection 🆕
  • Authentication - JWT + API Key with role-based access 🆕
  • Store - In-memory job storage
  • API Server - REST API with real-time progress
  • Docker - Multi-service deployment ready

Future Enhancements:

  • More Operators (loudnorm, mix, concat, overlay)
  • Cloud Storage (GCS, Azure Blob)
  • Distributed Workers with job queue
  • Advanced RBAC policies
  • Prometheus metrics & distributed tracing
  • Webhook notifications

Documentation

Testing

# Run all tests
make test

# Run specific package tests
go test ./pkg/operators/... -v

License

MIT License - see LICENSE file for details.

About

Open-source cloud media processing engine: declarative jobs → deterministic FFmpeg pipelines → progress/logs/artifacts.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors