vLLM Tui Monitor¶
Source https://github.com/vllm-project/vllm/blob/main/examples/online_serving/vllm_tui_monitor.py.
#!/usr/bin/env python3
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""
vLLM TUI Monitor
================
A retro-futuristic terminal dashboard for monitoring vLLM instances.
Visualizes KV cache usage, throughput, and request load in real-time.
Usage:
python vllm_tui_monitor.py --url http://localhost:8000/metrics
python vllm_tui_monitor.py --mock
"""
import argparse
import asyncio
import contextlib
import math
import random
import re
from collections import deque
from datetime import datetime
import requests
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Container, Grid
from textual.reactive import reactive
from textual.widgets import Digits, Footer, Label, RichLog, Sparkline, Static
# --- Constants & Configuration ---
REFRESH_RATE = 1.0 # Seconds between metric polls
HISTORY_SIZE = 60 # Number of data points for sparklines
# Theme Colors
COLOR_PRIMARY = "#00ff00" # Phosphor Green
COLOR_WARNING = "#ffff00" # Amber
COLOR_DANGER = "#ff0000" # Red
COLOR_BG = "#000000" # Deep Black
# ASCII Art Header
ASCII_LOGO = """
___ ___ ___ ___
/\__\ /\__\ /\__\ /\__\
/:/ / /:/ / /:/ / /::| |
/:/ / /:/ / /:/ / /:|:| |
/:/__/ /:/ / /:/ / /:/|:|__|__
|:| | |:|__|__ /:/__/ /:/ |::::\__\\
|:| | |:| | | /::\ \ \/__/~~/:/ /
|:| | |:| | | /:/\:\ \ /:/ /
|:|__| |:|__|__| \/__\:\ \ /:/ /
\____\ \____\ \:\__\ /:/ /
\/__/ \/__/
"""
# --- Metric Parsing & Fetching ---
class MetricPoller:
"""Handles fetching and parsing metrics from vLLM or generating mock data."""
def __init__(self, url: str, mock: bool = False):
self.url = url
self.mock = mock
self._mock_time = 0.0
# Metric storage
self.metrics = {
"gpu_cache_usage_perc": 0.0,
"num_requests_running": 0,
"num_requests_waiting": 0,
"num_requests_swapped": 0,
"avg_generation_throughput_toks_per_s": 0.0,
"avg_prompt_throughput_toks_per_s": 0.0,
}
def fetch(self) -> dict[str, float]:
if self.mock:
return self._generate_mock_metrics()
try:
response = requests.get(self.url, timeout=2)
response.raise_for_status()
return self._parse_prometheus(response.text)
except requests.exceptions.RequestException as e:
# Re-raise as is to be handled by the caller
raise e
except Exception as e:
# Wrap other errors
raise RuntimeError(f"Failed to fetch metrics: {e}") from e
def _parse_prometheus(self, text: str) -> dict[str, float]:
"""Simple regex-based Prometheus parser for specific vLLM metrics."""
parsed = {}
# Regex to match: name{labels} value
# Handles scientific notation (e.g. 1.23e-05), integers, and floats
number_pattern = r"([-+]?[0-9]*\.?[0-9]+(?:[eE][-+]?[0-9]+)?)"
# Patterns broken down for line length
p_cache = rf"vllm:gpu_cache_usage_perc{{?[^}}]*}}?\s+{number_pattern}"
p_run = rf"vllm:num_requests_running{{?[^}}]*}}?\s+{number_pattern}"
p_wait = rf"vllm:num_requests_waiting{{?[^}}]*}}?\s+{number_pattern}"
p_swap = rf"vllm:num_requests_swapped{{?[^}}]*}}?\s+{number_pattern}"
p_gen = (
rf"vllm:avg_generation_throughput_toks_per_s{{?[^}}]*}}?\s+{number_pattern}"
)
p_prompt = (
rf"vllm:avg_prompt_throughput_toks_per_s{{?[^}}]*}}?\s+{number_pattern}"
)
patterns = {
"gpu_cache_usage_perc": p_cache,
"num_requests_running": p_run,
"num_requests_waiting": p_wait,
"num_requests_swapped": p_swap,
"avg_generation_throughput_toks_per_s": p_gen,
"avg_prompt_throughput_toks_per_s": p_prompt,
}
for key, pattern in patterns.items():
match = re.search(pattern, text)
if match:
try:
parsed[key] = float(match.group(1))
except ValueError:
parsed[key] = 0.0
else:
# Keep previous value if missing or default to 0 if never found
parsed[key] = self.metrics.get(key, 0.0)
self.metrics.update(parsed)
return self.metrics
def _generate_mock_metrics(self) -> dict[str, float]:
self._mock_time += 0.1
# Simulate load with sine waves and noise
load_factor = (math.sin(self._mock_time) + 1) / 2 # 0 to 1
self.metrics["gpu_cache_usage_perc"] = max(
0.0, min(1.0, load_factor * 0.8 + random.uniform(-0.05, 0.05))
)
self.metrics["num_requests_running"] = int(
load_factor * 50 + random.randint(0, 5)
)
self.metrics["num_requests_waiting"] = int(max(0, load_factor * 20 - 10))
self.metrics["avg_generation_throughput_toks_per_s"] = (
load_factor * 2000 + random.uniform(0, 100)
)
return self.metrics
# --- UI Widgets ---
class RetroHeader(Static):
"""Displays the ASCII logo and connection status."""
status = reactive("CONNECTING")
def compose(self) -> ComposeResult:
yield Label(ASCII_LOGO, id="logo")
yield Label(f"STATUS: {self.status}", id="status_label")
def watch_status(self, status: str) -> None:
"""Update the status label when the reactive status changes."""
with contextlib.suppress(Exception):
self.query_one("#status_label").update(f"STATUS: {status}")
def update_status(self, status: str, color: str = "green"):
self.status = status
with contextlib.suppress(Exception):
self.query_one("#status_label").styles.color = color
class ReactorCore(Static):
"""Visualizes GPU Cache Usage as a grid of blocks."""
usage = reactive(0.0)
def watch_usage(self, usage: float) -> None:
self.update(self._render_core(usage))
def _render_core(self, usage: float) -> str:
# Create a 10x20 grid (approx)
rows = 10
cols = 20
total_cells = rows * cols
filled_cells = int(usage * total_cells)
# Block characters
FULL_BLOCK = "█"
EMPTY_BLOCK = "·"
out = []
for i in range(total_cells):
if i < filled_cells:
out.append(f"[{self._get_color(i, total_cells)}]{FULL_BLOCK}[/]")
else:
out.append(f"[#333333]{EMPTY_BLOCK}[/]")
if (i + 1) % cols == 0:
out.append("\n")
return "".join(out)
def _get_color(self, index: int, total: int) -> str:
ratio = index / total
if ratio < 0.6:
return COLOR_PRIMARY
elif ratio < 0.85:
return COLOR_WARNING
else:
return COLOR_DANGER
class MetricSparkline(Static):
"""A labelled sparkline graph."""
def __init__(self, title: str, color: str = "green", **kwargs):
super().__init__(**kwargs)
self.title_text = title
self.spark_color = color
self.data = deque([0.0] * HISTORY_SIZE, maxlen=HISTORY_SIZE)
def compose(self) -> ComposeResult:
yield Label(self.title_text, classes="spark_title")
yield Sparkline(self.data, summary_function=max, color=self.spark_color)
yield Label("0.0", classes="spark_value")
def add_data(self, value: float):
self.data.append(value)
self.query_one(Sparkline).data = self.data
self.query_one(".spark_value").update(f"{value:.1f}")
class LogStream(RichLog):
"""Scrolling log of events."""
pass
# --- Main Application ---
class VLLMTUIApp(App):
"""The main TUI Application."""
CSS = """
Screen {
background: #000000;
color: #00ff00;
font-family: monospace;
}
#logo {
color: #00ff00;
text-align: center;
width: 100%;
}
#status_label {
text-align: center;
width: 100%;
background: #111111;
padding: 1;
text-style: bold;
}
RetroHeader {
height: auto;
dock: top;
margin-bottom: 1;
}
Grid {
grid-size: 2;
grid-columns: 1fr 1fr;
grid-rows: 1fr;
}
Container {
border: solid #00ff00;
padding: 1;
margin: 1;
}
.box_title {
background: #00ff00;
color: #000000;
padding: 0 1;
margin-bottom: 1;
text-style: bold;
}
ReactorCore {
height: 100%;
content-align: center middle;
}
MetricSparkline {
height: auto;
margin-bottom: 1;
border-bottom: solid #333333;
padding-bottom: 1;
}
.spark_title {
color: #888888;
}
.spark_value {
text-align: right;
color: #ffffff;
text-style: bold;
}
LogStream {
height: 10fr;
border-top: solid #00ff00;
background: #050505;
color: #aaaaaa;
}
"""
BINDINGS = [
Binding("q", "quit", "Quit"),
]
def __init__(self, args):
super().__init__()
self.args = args
self.poller = MetricPoller(args.url, args.mock)
def compose(self) -> ComposeResult:
yield RetroHeader()
with Grid():
# Left Column: Reactor Core (KV Cache)
with Container(classes="panel"):
yield Label(" REACTOR CORE (KV CACHE) ", classes="box_title")
yield ReactorCore(id="reactor")
yield Label("\nSYSTEM LOAD", classes="box_title")
yield Digits(id="load_digits")
# Right Column: Metrics & Logs
with Container(classes="panel"):
yield Label(" TELEMETRY ", classes="box_title")
yield MetricSparkline(
"Throughput (tok/s)", color="green", id="spark_throughput"
)
yield MetricSparkline(
"Active Requests", color="yellow", id="spark_active"
)
yield MetricSparkline(
"Waiting Requests", color="red", id="spark_waiting"
)
yield Label(" EVENT LOG ", classes="box_title")
yield LogStream(highlight=True, markup=True)
yield Footer()
def on_mount(self) -> None:
self.title = "vLLM OPS TERMINAL"
self.set_interval(REFRESH_RATE, self.update_metrics)
self.query_one(LogStream).write(
f"[bold green]SYSTEM ONLINE[/] - Connecting to {self.args.url}..."
)
if self.args.mock:
self.query_one(LogStream).write(
"[bold yellow]WARNING: MOCK MODE ENGAGED[/]"
)
async def update_metrics(self) -> None:
"""Fetch metrics in a background thread and update UI."""
try:
# Run the blocking fetch in a separate thread to avoid freezing the UI
metrics = await asyncio.to_thread(self.poller.fetch)
# Update Header
self.query_one(RetroHeader).update_status("CONNECTED", "green")
# Update Reactor Core
self.query_one(ReactorCore).usage = metrics.get("gpu_cache_usage_perc", 0.0)
# Update Load Digits (just showing running requests as a big number)
self.query_one("#load_digits", Digits).update(
f"{int(metrics.get('num_requests_running', 0)):03d}"
)
# Update Sparklines using IDs
self.query_one("#spark_throughput", MetricSparkline).add_data(
metrics.get("avg_generation_throughput_toks_per_s", 0.0)
)
self.query_one("#spark_active", MetricSparkline).add_data(
metrics.get("num_requests_running", 0.0)
)
self.query_one("#spark_waiting", MetricSparkline).add_data(
metrics.get("num_requests_waiting", 0.0)
)
# Log significant events
log = self.query_one(LogStream)
if metrics.get("num_requests_waiting", 0) > 5:
log.write(
f"[{datetime.now().strftime('%H:%M:%S')}] "
"[bold red]ALERT:[/] High queue depth detected!"
)
except requests.exceptions.RequestException as e:
self.query_one(RetroHeader).update_status("CONNECTION LOST", "red")
self.query_one(LogStream).write(
f"[{datetime.now().strftime('%H:%M:%S')}] "
f"[red]Connection Error: {str(e)}[/]"
)
except Exception as e:
self.query_one(RetroHeader).update_status("ERROR", "red")
self.query_one(LogStream).write(
f"[{datetime.now().strftime('%H:%M:%S')}] "
f"[red]System Error: {str(e)}[/]"
)
def main():
parser = argparse.ArgumentParser(description="vLLM TUI Monitor")
parser.add_argument(
"--url",
type=str,
default="http://localhost:8000/metrics",
help="vLLM Prometheus metrics URL",
)
parser.add_argument(
"--mock", action="store_true", help="Run in mock mode with fake data"
)
args = parser.parse_args()
app = VLLMTUIApp(args)
app.run()
if __name__ == "__main__":
main()