Server Actions (RPC)
Call Python functions directly from your browser. Caspian bridges your backend logic to the frontend with built-in Streaming, Rate Limiting, RBAC, File Uploads, and Auto-Serialization.
Core Features
Native Streaming Support
Just use yield in your Python function. Caspian automatically detects
generators and streams data via Server-Sent Events (SSE).
Perfect for AI Agents, LLM token streaming, and long-running jobs.
Secure & Private
Built-in CSRF protection and Origin validation. Functions are private by default until decorated with @rpc.
Smart Rate Limiting
Define hierarchical limits (e.g., "20/hour") per function, or fallback to global environment defaults.
Role-Based Access
Restrict actions to specific user roles or authenticated sessions with a single argument.
Auto-Serialization
Returns Pydantic models, Dataclasses, or Prisma objects directly. JSON conversion is automatic.
Defining Actions (Backend)
The @rpc() decorator configures access control, limits, and behavior.
from casp.rpc import rpc import asyncio # 1. Basic Action (JSON Response) @rpc() def get_status(): return {"status": "ok", "uptime": 99.9} # 2. Secure & Rate Limited Action @rpc( require_auth=True, allowed_roles=["admin"], limits="5/minute" ) def delete_user(user_id: int): return db.delete(user_id) # 3. Streaming Action (AsyncGenerator) @rpc() async def stream_ai_response(prompt: str): # Automatic SSE streaming when 'yield' is detected async for chunk in openai.stream(prompt): yield chunk
Calling Actions (Frontend)
Use pp.rpc(name, data, options).
For streaming, simply provide the onStream callback.
async function handleAction() { try { // 1. Standard Call (Returns Promise) const data = await pp.rpc('get_status'); console.log(data.status); // 2. Streaming Call (Event Driven) await pp.rpc('stream_ai_response', { prompt: "Hello" }, { onStream: (chunk) => { console.log("Received:", chunk); }, onStreamComplete: () => { console.log("Stream Finished"); } }); } catch (err) { // Handles 401, 403, 429 automatically console.error("RPC Error:", err); } }
Example 1: Secure Todo App
A classic CRUD application demonstrating database integration, serialization, and state updates.
from casp.layout import render_page from casp.rpc import rpc from src.lib.prisma import prisma @rpc() def create_todo(title: str): # Returns a Prisma object which auto-serializes to JSON return prisma.todo.create( data={"title": title, "completed": False} ) @rpc() def toggle_todo(id: str, completed: bool): return prisma.todo.update( where={"id": id}, data={"completed": completed} ) def page(): # Initial data load (Server Side) todos = prisma.todo.find_many() return render_page(__file__, { "todos": [t.to_dict() for t in todos] })
<!-- UI Markup --> <div class="max-w-md space-y-4"> <div class="flex gap-2"> <input value="{newTodo}" oninput="setNewTodo(this.value)" class="flex-1 border rounded px-3 py-2" /> <button onclick="add()" class="bg-primary text-white px-4 rounded"> Add </button> </div> <ul class="space-y-2"> <template pp-for="todo in todos"> <li class="flex items-center gap-2"> <input type="checkbox" checked="{todo.completed}" onchange="toggle(todo.id, this.checked)" /> <span>{todo.title}</span> </li> </template> </ul> </div> <script> const [todos, setTodos] = pp.state([[todos|json]]); const [newTodo, setNewTodo] = pp.state(""); async function add() { if (!newTodo) return; const res = await pp.rpc('create_todo', { title: newTodo }); setTodos([...todos, res]); setNewTodo(""); } async function toggle(id, checked) { await pp.rpc('toggle_todo', { id, completed: checked }); // Optimistic update (or use response) setTodos(todos.map(t => t.id === id ? {...t, completed: checked} : t)); } </script>
Example 2: Streaming AI Chat
A real-time chat interface showing how to stream text tokens from the backend using generators.
from casp.rpc import rpc import asyncio # Rate limited to 10 requests per minute @rpc(limits="10/minute") async def chat_stream(message: str): # In production, this would be an OpenAI/Anthropic call await asyncio.sleep(0.5) # Simulate latency response = f"You said: {message}. Here is my streamed response..." # Yielding strings automatically sends SSE 'data' events for word in response.split(): yield word + " " await asyncio.sleep(0.1)
<!-- UI Markup --> <div class="w-full max-w-lg space-y-4"> <div class="h-64 border rounded-lg p-4 overflow-y-auto bg-muted/50"> <p class="text-sm whitespace-pre-wrap">{response}</p> <span hidden="{!isStreaming}" class="animate-pulse text-primary">...</span> </div> <div class="flex gap-2"> <input value="{msg}" oninput="setMsg(this.value)" placeholder="Type a message..." class="flex-1 px-3 py-2 border rounded" /> <button onclick="send()" disabled="{isStreaming || !msg}" class="px-4 py-2 bg-primary text-primary-foreground rounded disabled:opacity-50" > Send </button> </div> </div> <script> const [msg, setMsg] = pp.state(""); const [response, setResponse] = pp.state(""); const [isStreaming, setIsStreaming] = pp.state(false); async function send() { if (!msg) return; setIsStreaming(true); setResponse(""); // Pass the 'onStream' callback to enable streaming mode await pp.rpc('chat_stream', { message: msg }, { onStream: (chunk) => { setResponse(prev => prev + chunk); }, onStreamComplete: () => { setIsStreaming(false); } }); } </script>
Example 3: File Uploads
Secure file uploads with automatic progress tracking using the onUploadProgress callback.
from casp.rpc import rpc from fastapi import UploadFile, File import shutil @rpc() def upload_file(file: UploadFile = File(...)): # Save file logic with open(f"uploads/{file.filename}", "wb") as buffer: shutil.copyfileobj(file.file, buffer) return { "status": "success", "size": file.size, "filename": file.filename }
<!-- UI Markup --> <div class="w-full max-w-sm"> <!-- Progress Bar --> <div class="h-2 w-full bg-muted rounded overflow-hidden mb-4"> <div class="h-full bg-primary transition-all duration-200" style="width: {progress}%" ></div> </div> <input type="file" onchange="upload(this.files[0])" class="block w-full text-sm text-muted-foreground file:mr-4 file:py-2 file:px-4 file:rounded-full file:border-0 file:text-sm file:font-semibold file:bg-primary/10 file:text-primary hover:file:bg-primary/20" /> </div> <script> const [progress, setProgress] = pp.state(0); async function upload(file) { if (!file) return; await pp.rpc('upload_file', { file: file }, { onUploadProgress: (info) => { // { loaded, total, percent } if (info.percent) setProgress(Math.round(info.percent)); } }); console.log("Upload Complete"); } </script>