Server Actions (RPC)
Call Python functions directly from your browser. Caspian bridges your backend logic to the frontend with built-in Streaming, Rate Limiting, RBAC, File Uploads, and Auto-Serialization.
Core Features
Native Streaming Support
Just use yield in your Python function. Caspian automatically detects
generators and streams data via Server-Sent Events (SSE).
Perfect for AI Agents, LLM token streaming, and long-running jobs.
Secure & Private
Built-in CSRF protection and Origin validation. Functions are private by default until decorated with @rpc.
Smart Rate Limiting
Define hierarchical limits (e.g., "20/hour") per function, or fallback to global environment defaults.
Role-Based Access
Restrict actions to specific user roles or authenticated sessions with a single argument.
Auto-Serialization
Returns Pydantic models, Dataclasses, or Prisma objects directly. JSON conversion is automatic.
Defining Actions (Backend)
The @rpc() decorator configures access control, limits, and behavior.
from casp.rpc import rpc import asyncio # 1. Basic Action (JSON Response) @rpc() def get_status(): return "status": "ok", "uptime": 99.9 # 2. Secure & Rate Limited Action @rpc( require_auth=True, allowed_roles=["admin"], limits="5/minute" ) def delete_user(user_id: int): return db.delete(user_id) # 3. Streaming Action (AsyncGenerator) @rpc() async def stream_ai_response(prompt: str): # Automatic SSE streaming when 'yield' is detected async for chunk in openai.stream(prompt): yield chunk
Calling Actions (Frontend)
Use pp.rpc(name, data, options).
For streaming, simply provide the onStream callback.
async function handleAction() try // 1. Standard Call (Returns Promise) const data = await pp.rpc('get_status'); console.log(data.status); // 2. Streaming Call (Event Driven) await pp.rpc('stream_ai_response', prompt: "Hello" , onStream: (chunk) => console.log("Received:", chunk); , onStreamComplete: () => console.log("Stream Finished"); ); catch (err) // Handles 401, 403, 429 automatically console.error("RPC Error:", err);
Example 1: Secure Todo App
A classic CRUD application demonstrating database integration, serialization, and state updates.
from casp.layout import render_page from casp.rpc import rpc from src.lib.prisma import prisma @rpc() def create_todo(title: str): # Returns a Prisma object which auto-serializes to JSON return prisma.todo.create( data="title": title, "completed": False ) @rpc() def toggle_todo(id: str, completed: bool): return prisma.todo.update( where="id": id, data="completed": completed ) def page(): # Initial data load (Server Side) todos = prisma.todo.find_many() return render_page(__file__, "todos": [t.to_dict() for t in todos] )
<!-- UI Markup --> <div class="max-w-md space-y-4"> <div class="flex gap-2"> <input value="newTodo" oninput="setNewTodo(this.value)" class="flex-1 border rounded px-3 py-2" /> <button onclick="add()" class="bg-primary text-white px-4 rounded"> Add </button> </div> <ul class="space-y-2"> <template pp-for="todo in todos"> <li class="flex items-center gap-2"> <input type="checkbox" checked="todo.completed" onchange="toggle(todo.id, this.checked)" /> <span>todo.title</span> </li> </template> </ul> <script> const [todos, setTodos] = pp.state({{ todos | tojson }}); const [newTodo, setNewTodo] = pp.state(""); async function add() if (!newTodo) return; const res = await pp.rpc('create_todo', title: newTodo ); setTodos([...todos, res]); setNewTodo(""); async function toggle(id, checked) await pp.rpc('toggle_todo', id, completed: checked ); // Optimistic update (or use response) setTodos(todos.map(t => t.id === id ? ...t, completed: checked : t)); </script> </div>
Example 2: Streaming AI Chat
A real-time chat interface showing how to stream text tokens from the backend using generators.
from casp.rpc import rpc import asyncio # Rate limited to 10 requests per minute @rpc(limits="10/minute") async def chat_stream(message: str): # In production, this would be an OpenAI/Anthropic call await asyncio.sleep(0.5) # Simulate latency response = f"You said: message. Here is my streamed response..." # Yielding strings automatically sends SSE 'data' events for word in response.split(): yield word + " " await asyncio.sleep(0.1)
<!-- UI Markup --> <div class="w-full max-w-lg space-y-4"> <div class="h-64 border rounded-lg p-4 overflow-y-auto bg-muted/50"> <p class="text-sm whitespace-pre-wrap">response</p> <span hidden="!isStreaming" class="animate-pulse text-primary">...</span> </div> <div class="flex gap-2"> <input value="msg" oninput="setMsg(this.value)" placeholder="Type a message..." class="flex-1 px-3 py-2 border rounded" /> <button onclick="send()" disabled="isStreaming || !msg" class="px-4 py-2 bg-primary text-primary-foreground rounded disabled:opacity-50" > Send </button> </div> <script> const [msg, setMsg] = pp.state(""); const [response, setResponse] = pp.state(""); const [isStreaming, setIsStreaming] = pp.state(false); async function send() if (!msg) return; setIsStreaming(true); setResponse(""); // Pass the 'onStream' callback to enable streaming mode await pp.rpc('chat_stream', message: msg , onStream: (chunk) => setResponse(prev => prev + chunk); , onStreamComplete: () => setIsStreaming(false); ); </script> </div>
Example 3: File Uploads
Secure file uploads with automatic progress tracking using the onUploadProgress callback.
from casp.rpc import rpc from fastapi import UploadFile, File import shutil @rpc() def upload_file(file: UploadFile = File(...)): # Save file logic with open(f"uploads/file.filename", "wb") as buffer: shutil.copyfileobj(file.file, buffer) return "status": "success", "size": file.size, "filename": file.filename
<!-- UI Markup --> <div class="w-full max-w-sm"> <!-- Progress Bar --> <div class="h-2 w-full bg-muted rounded overflow-hidden mb-4"> <div class="h-full bg-primary transition-all duration-200" style="width: progress%" ></div> </div> <input type="file" onchange="upload(this.files[0])" class="block w-full text-sm text-muted-foreground file:mr-4 file:py-2 file:px-4 file:rounded-full file:border-0 file:text-sm file:font-semibold file:bg-primary/10 file:text-primary hover:file:bg-primary/20" /> <script> const [progress, setProgress] = pp.state(0); async function upload(file) if (!file) return; await pp.rpc('upload_file', file: file , onUploadProgress: (info) => // loaded, total, percent if (info.percent) setProgress(Math.round(info.percent)); ); console.log("Upload Complete"); </script> </div>