Headless Core & JS Blur & MultiThreading Addet

This commit is contained in:
Joey Pillunat 2025-10-24 16:02:19 +02:00
parent a41f765211
commit 5b99621044
4 changed files with 235 additions and 68 deletions

View file

@ -38,29 +38,33 @@ document.addEventListener('DOMContentLoaded', () => {
const status = document.getElementById('status');
const consoleEl = document.getElementById('console');
// SSE: Logs anhören
const es = new EventSource('/api/logs');
es.onmessage = (ev) => {
consoleEl.textContent += ev.data + '\n';
consoleEl.scrollTop = consoleEl.scrollHeight;
};
es.onerror = () => {
// optional: Reconnect-Info
};
btn.addEventListener('click', async () => {
const val = (ip.value || '').trim();
if (!val) { status.textContent = 'Bitte IP eingeben.'; return; }
status.textContent = `Starte`;
status.textContent = 'Starte';
try {
// 1) Job starten → job_id erhalten
const res = await fetch('/api/start', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ ip: val })
});
const text = await res.text();
status.textContent = text || 'OK';
const data = await res.json();
const jobId = data.job_id;
status.textContent = `Job: ${jobId}`;
consoleEl.textContent = '';
// 2) individuellen Log-Stream für diesen Job öffnen
const es = new EventSource(`/api/logs?job=${encodeURIComponent(jobId)}`);
es.onmessage = (ev) => {
consoleEl.textContent += ev.data + '\n';
consoleEl.scrollTop = consoleEl.scrollHeight;
};
es.onerror = () => {
// Stream beendet/Fehler optional kennzeichnen
};
} catch (err) {
status.textContent = 'Fehler: ' + (err?.message || err);
}

View file

@ -1,25 +1,29 @@
mod assets;
use std::{
collections::HashMap,
env, fs,
net::SocketAddr,
path::{Path, PathBuf},
sync::Arc,
};
use axum::{
extract::{Json, Query, State as AxumState},
response::{Html, sse::{Event, KeepAlive, Sse}},
routing::{get, post},
extract::Json,
Router,
response::{Html, sse::{Sse, Event, KeepAlive}},
};
use tower_http::services::ServeDir;
use serde::Deserialize;
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::{sync::{broadcast, Mutex}};
use tokio::time::{sleep, Duration};
use tokio_stream::wrappers::BroadcastStream;
use tower_http::services::ServeDir;
use uuid::Uuid;
use assets::{DEFAULT_INDEX_HTML, DEFAULT_APP_JS, DEFAULT_STYLES_CSS};
use crate::{state::SharedState, headless};
use assets::{DEFAULT_APP_JS, DEFAULT_INDEX_HTML, DEFAULT_STYLES_CSS};
use crate::{headless, state::SharedState};
const WEB_DIR: &str = "web";
@ -45,7 +49,10 @@ fn rebuild_web_assets(dir: &Path) -> std::io::Result<()> {
#[derive(Clone)]
struct HttpState {
shared: SharedState,
log_tx: broadcast::Sender<String>,
// Optional: globaler Stream (falls du eine Gesamtkonsole willst)
_global_log_tx: broadcast::Sender<String>,
// Pro-Job Sender-Registry
jobs: Arc<Mutex<HashMap<String, broadcast::Sender<String>>>>,
}
#[derive(Deserialize)]
@ -53,67 +60,120 @@ struct StartRequest {
ip: String,
}
#[derive(Serialize)]
struct StartResponse {
job_id: String,
}
async fn start_job(
axum::extract::State(st): axum::extract::State<HttpState>,
Json(req): Json<StartRequest>,
) {
) -> axum::Json<StartResponse> {
use axum::Json as AxumJson;
let ip = req.ip.trim().to_string();
let tx_for_task = st.log_tx.clone();
// neuen Job anlegen
let job_id = uuid::Uuid::new_v4().to_string();
let (tx, _rx) = tokio::sync::broadcast::channel::<String>(200);
{
// in Registry eintragen
let mut map = st.jobs.lock().await;
map.insert(job_id.clone(), tx.clone());
}
// Klone für den Task
let job_id_for_task = job_id.clone();
let jobs_map = st.jobs.clone();
tokio::spawn(async move {
if let Err(e) = headless::run_test(&ip, tx_for_task.clone()).await {
let _ = tx_for_task.send(format!("[Headless Error] {e}"));
sleep(Duration::from_millis(500)).await;
tx.send(format!("[Job {job_id_for_task}] Starte Setup für {ip}")).ok();
// WICHTIG: Ergebnis NICHT in Variable halten, sondern sofort matchen,
// damit kein non-Send Error über das spätere .await (Mutex) "lebt".
if let Err(e) = crate::headless::run_test(&ip, tx.clone()).await {
tx.send(format!("[Job {job_id_for_task}] ❌ Fehler: {e}")).ok();
} else {
let _ = tx_for_task.send(format!("[Headless] Fertig für {ip}"));
tx.send(format!("[Job {job_id_for_task}] ✅ Fertig")).ok();
}
// Erst NACH dem match den Mutex awaiten -> der Error ist schon gedroppt
let mut m = jobs_map.lock().await;
m.remove(&job_id_for_task);
});
AxumJson(StartResponse { job_id })
}
#[derive(Deserialize)]
struct LogsQuery {
job: String,
}
async fn stream_logs(
axum::extract::State(st): axum::extract::State<HttpState>
) -> Sse<impl futures::Stream<Item = Result<Event, std::convert::Infallible>>> {
let rx = st.log_tx.subscribe();
AxumState(st): AxumState<HttpState>,
Query(q): Query<LogsQuery>,
) -> Result<Sse<impl futures::Stream<Item = Result<Event, std::convert::Infallible>>>, axum::http::StatusCode> {
// Sender für diesen Job holen
let tx = {
let map = st.jobs.lock().await;
match map.get(&q.job) {
Some(tx) => tx.clone(),
None => return Err(axum::http::StatusCode::NOT_FOUND),
}
};
// Subscribe & streamen
let rx = tx.subscribe();
let stream = BroadcastStream::new(rx)
.filter_map(|res| async move { res.ok() })
.map(|msg| Ok(Event::default().data(msg)));
Sse::new(stream).keep_alive(
Ok(Sse::new(stream).keep_alive(
KeepAlive::new()
.interval(std::time::Duration::from_secs(10))
.text("💓"),
)
))
}
// Port wird jetzt von außen übergeben
pub async fn start(state: SharedState, port: u16)
-> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>>
{
pub async fn start(
state: SharedState,
port: u16,
) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>> {
let dir = exe_dir();
rebuild_web_assets(&dir)?;
// Broadcast-Kanal für Web-Logs
let (log_tx, _rx) = broadcast::channel::<String>(200);
let http_state = HttpState { shared: state.clone(), log_tx: log_tx.clone() };
let (global_log_tx, _rx) = broadcast::channel::<String>(200);
let http_state = HttpState {
shared: state.clone(),
_global_log_tx: global_log_tx.clone(),
jobs: Arc::new(Mutex::new(HashMap::new())),
};
let web_root = dir.join(WEB_DIR);
let static_root = web_root.clone();
let index_path = web_root.join("index.html");
let app = Router::new()
.route("/", get({
let index_path = index_path.clone();
move || {
.route(
"/",
get({
let index_path = index_path.clone();
async move {
let html = fs::read_to_string(&index_path)
.unwrap_or_else(|_| "<h1>index.html fehlt</h1>".to_string());
Html(html)
move || {
let index_path = index_path.clone();
async move {
let html = fs::read_to_string(&index_path)
.unwrap_or_else(|_| "<h1>index.html fehlt</h1>".to_string());
Html(html)
}
}
}
}))
.route("/api/start", post(start_job)) // Start-Button
.route("/api/logs", get(stream_logs)) // SSE-Konsole
}),
)
.route("/api/start", post(start_job))
.route("/api/logs", get(stream_logs)) // /api/logs?job=<job_id>
.nest_service("/static", ServeDir::new(static_root))
.route("/healthz", get(|| async { "ok" }))
.with_state(http_state);