How to Design Python-First Interactive Dashboards with Prefab Reactive UI Components and Static HTML Export

0


import random
from collections import Counter, defaultdict
from datetime import date, timedelta
from prefab_ui.actions import AppendState, OpenLink, PopState, SetState, ShowToast, ToggleState
from prefab_ui.app import PrefabApp
from prefab_ui.components import (
Alert, AlertDescription, AlertTitle, Badge, Button, Card, CardContent,
CardDescription, CardFooter, CardHeader, CardTitle, Code, Column,
DataTable, DataTableColumn, Form, Grid, H2, Input, Markdown, Mermaid,
Metric, Muted, Progress, Ring, Row, Slider, Small, Switch, Tab, Tabs,
Text
)
from prefab_ui.components.charts import (
BarChart, ChartSeries, LineChart, PieChart, RadarChart, ScatterChart,
Sparkline
)
from prefab_ui.components.control_flow import Else, ForEach, If
from prefab_ui.rx import EVENT, STATE
random.seed(42)
TODAY = date.today()
DATES = [TODAY – timedelta(days=29 – i) for i in range(30)]
REGIONS = [“All”, “APAC”, “EMEA”, “NA”, “LATAM”]
PIPELINES = [
“Customer 360 ETL”,
“Invoice OCR”,
“LLM Triage”,
“Risk Scoring”,
“Forecast Sync”,
“Warehouse Load”,
]
OWNERS = [“Data Platform”, “AI Apps”, “Revenue Ops”, “Risk Engineering”]
STATES = [“Completed”, “Completed”, “Completed”, “Completed”, “Late”, “Failed”]
PRIORITIES = [“P0”, “P1”, “P2”, “P3”]
runs = []
daily_region_rows = []
for d in DATES:
for region in REGIONS[1:]:
region_bias = {
“APAC”: 0.96,
“EMEA”: 0.94,
“NA”: 0.97,
“LATAM”: 0.91,
}[region]
volume = random.randint(32, 78)
failures = 0
late = 0
total_cost = 0.0
total_latency = 0.0
total_revenue = 0.0
for i in range(volume):
pipeline = random.choice(PIPELINES)
owner = random.choice(OWNERS)
state = random.choices(
STATES,
weights=[
region_bias * 10,
6,
4,
3,
1.2,
max(0.2, (1 – region_bias) * 16),
],
k=1,
)[0]
duration = max(
12,
int(
random.gauss(95, 35)
+ (20 if state == “Late” else 0)
+ (45 if state == “Failed” else 0)
),
)
cost = round(max(0.09, random.lognormvariate(-1.15, 0.55) + duration / 1800), 2)
revenue = round(random.uniform(1.2, 8.5) * (1.3 if state == “Completed” else 0.6), 2)
priority = random.choices(PRIORITIES, weights=[1, 3, 7, 10], k=1)[0]
if state == “Failed”:
failures += 1
if state == “Late”:
late += 1
total_cost += cost
total_latency += duration
total_revenue += revenue
if d >= TODAY – timedelta(days=10) and (state in {“Failed”, “Late”} or random.random() < 0.05):
runs.append({
“run_id”: f”{d.strftime(‘%m%d’)}-{region[:2]}-{len(runs)+1:04d}”,
“date”: d.strftime(“%Y-%m-%d”),
“pipeline”: pipeline,
“owner”: owner,
“region”: region,
“state”: state,
“priority”: priority,
“duration_s”: duration,
“cost_usd”: cost,
“revenue_k”: revenue,
“sla_gap”: round(max(0, duration – 120) / 60, 1),
})
daily_region_rows.append({
“date”: d.strftime(“%b %d”),
“region”: region,
“runs”: volume,
“failures”: failures,
“late”: late,
“success_rate”: round(100 * (volume – failures – late * 0.35) / volume, 1),
“avg_latency”: round(total_latency / volume, 1),
“cost_usd”: round(total_cost, 2),
“revenue_k”: round(total_revenue, 1),
})
runs = sorted(
runs,
key=lambda r: (r[“priority”], r[“state”] != “Failed”, -r[“duration_s”])
)[:80]
def aggregate_daily(rows):
by_date = defaultdict(lambda: {
“date”: “”,
“runs”: 0,
“failures”: 0,
“late”: 0,
“cost_usd”: 0.0,
“revenue_k”: 0.0,
“latency_weighted”: 0.0,
})
for r in rows:
bucket = by_date[r[“date”]]
bucket[“date”] = r[“date”]
bucket[“runs”] += r[“runs”]
bucket[“failures”] += r[“failures”]
bucket[“late”] += r[“late”]
bucket[“cost_usd”] += r[“cost_usd”]
bucket[“revenue_k”] += r[“revenue_k”]
bucket[“latency_weighted”] += r[“avg_latency”] * r[“runs”]
out = []
for d in [x.strftime(“%b %d”) for x in DATES]:
b = by_date[d]
if b[“runs”]:
b[“success_rate”] = round(100 * (b[“runs”] – b[“failures”] – b[“late”] * 0.35) / b[“runs”], 1)
b[“avg_latency”] = round(b[“latency_weighted”] / b[“runs”], 1)
b[“cost_usd”] = round(b[“cost_usd”], 2)
b[“revenue_k”] = round(b[“revenue_k”], 1)
del b[“latency_weighted”]
out.append(dict(b))
return out
def aggregate_regions(rows):
by_region = defaultdict(lambda: {
“region”: “”,
“runs”: 0,
“failures”: 0,
“late”: 0,
“cost_usd”: 0.0,
“revenue_k”: 0.0,
“latency_weighted”: 0.0,
})
for r in rows:
b = by_region[r[“region”]]
b[“region”] = r[“region”]
b[“runs”] += r[“runs”]
b[“failures”] += r[“failures”]
b[“late”] += r[“late”]
b[“cost_usd”] += r[“cost_usd”]
b[“revenue_k”] += r[“revenue_k”]
b[“latency_weighted”] += r[“avg_latency”] * r[“runs”]
out = []
for region in REGIONS[1:]:
b = by_region[region]
b[“success_rate”] = round(100 * (b[“runs”] – b[“failures”] – b[“late”] * 0.35) / b[“runs”], 1)
b[“avg_latency”] = round(b[“latency_weighted”] / b[“runs”], 1)
b[“cost_usd”] = round(b[“cost_usd”], 2)
b[“revenue_k”] = round(b[“revenue_k”], 1)
b[“roi”] = round(b[“revenue_k”] / max(1, b[“cost_usd”]), 1)
del b[“latency_weighted”]
out.append(dict(b))
return out
def make_status_rows(table_rows):
counts = Counter(r[“state”] for r in table_rows)
return [{“state”: k, “count”: v} for k, v in counts.items()]
def make_pipeline_rows(table_rows):
counts = Counter(r[“pipeline”] for r in table_rows)
return [{“pipeline”: k, “count”: v} for k, v in counts.most_common()]
def make_kpis(region, daily_rows, table_rows):
runs_count = sum(r[“runs”] for r in daily_rows)
failures = sum(r[“failures”] for r in daily_rows)
late = sum(r[“late”] for r in daily_rows)
cost = sum(r[“cost_usd”] for r in daily_rows)
revenue = sum(r[“revenue_k”] for r in daily_rows)
return {
“region”: region,
“runs”: runs_count,
“success_rate”: round(100 * (runs_count – failures – late * 0.35) / max(1, runs_count), 1),
“avg_latency”: round(sum(r[“avg_latency”] * r[“runs”] for r in daily_rows) / max(1, runs_count), 1),
“cost_usd”: round(cost, 2),
“revenue_k”: round(revenue, 1),
“roi”: round(revenue / max(1, cost), 1),
“open_issues”: len(table_rows),
“p0p1”: sum(1 for r in table_rows if r[“priority”] in {“P0”, “P1”}),
“failure_rate”: round(100 * failures / max(1, runs_count), 2),
“spark”: [r[“success_rate”] for r in daily_rows[-14:]],
}
DAILY_BY_REGION = {“All”: aggregate_daily(daily_region_rows)}
REGION_ROWS = aggregate_regions(daily_region_rows)
for region in REGIONS[1:]:
DAILY_BY_REGION[region] = [r for r in daily_region_rows if r[“region”] == region]
RUNS_BY_REGION = {
region: [r for r in runs if region == “All” or r[“region”] == region]
for region in REGIONS
}
STATUS_BY_REGION = {
region: make_status_rows(RUNS_BY_REGION[region])
for region in REGIONS
}
PIPELINE_BY_REGION = {
region: make_pipeline_rows(RUNS_BY_REGION[region])
for region in REGIONS
}
KPI_BY_REGION = {
region: make_kpis(region, DAILY_BY_REGION[region], RUNS_BY_REGION[region])
for region in REGIONS
}
WATCHLIST = sorted(
runs,
key=lambda r: (r[“priority”], r[“state”] != “Failed”, -r[“sla_gap”])
)[:8]
SCATTER_ROWS = [
{
“region”: r[“region”],
“success_rate”: r[“success_rate”],
“cost_usd”: r[“cost_usd”],
“avg_latency”: r[“avg_latency”],
}
for r in REGION_ROWS
]
RADAR_ROWS = [
{“metric”: “Success”, **{r[“region”]: r[“success_rate”] for r in REGION_ROWS}},
{“metric”: “ROI”, **{r[“region”]: min(100, r[“roi”] * 8) for r in REGION_ROWS}},
{“metric”: “Latency”, **{r[“region”]: max(0, 100 – r[“avg_latency”] / 2) for r in REGION_ROWS}},
{“metric”: “Cost”, **{r[“region”]: max(0, 100 – r[“cost_usd”] / 20) for r in REGION_ROWS}},
]
REGION_ACTIONS = {
region: [
SetState(“selected_region”, region),
SetState(“line_rows”, DAILY_BY_REGION[region]),
SetState(“table_rows”, RUNS_BY_REGION[region]),
SetState(“status_rows”, STATUS_BY_REGION[region]),
SetState(“pipeline_rows”, PIPELINE_BY_REGION[region]),
SetState(“region_kpis”, KPI_BY_REGION[region]),
SetState(“selected_run”, None),
ShowToast(f”Region set to {region}”, variant=”info”, duration=1800),
]
for region in REGIONS
}
”’



Source link

You might also like
Leave A Reply

Your email address will not be published.

Cube Letter