You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

88 lines
2.6 KiB

import subprocess
import json
import os
import time
from flask import Flask, render_template, request, Response, jsonify, redirect, url_for, session
from datetime import timedelta
app = Flask(__name__)
app.secret_key = "your_secret_key"
app.permanent_session_lifetime = timedelta(days=1)
# Danh sách tài khoản & mật khẩu
USERS = {
"admin": "123456",
"user1": "password1",
"user2": "password2"
}
# Định nghĩa đường dẫn file JSON chứa danh sách service
SERVICES_FILE = os.path.join(os.path.dirname(__file__), "data", "services.json")
def load_services():
with open(SERVICES_FILE, "r", encoding="utf-8") as file:
return json.load(file)
@app.route("/")
def home():
if "user" not in session:
return redirect(url_for("login"))
listService = load_services()
return render_template("index.html", listService=listService)
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
if username in USERS and USERS[username] == password:
session["user"] = username
session.permanent = True
return redirect(url_for("home"))
else:
return render_template("login.html", error="Invalid username or password")
return render_template("login.html")
@app.route("/logout")
def logout():
session.pop("user", None)
return redirect(url_for("login"))
# Streaming real-time output của lệnh
def generate_output(command_list):
# Chuyển danh sách thành chuỗi để chạy trong shell
command_str = " ".join(command_list)
process = subprocess.Popen(command_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, executable="/bin/bash")
for line in iter(process.stdout.readline, ''):
yield f"data: {line.strip()}\n\n"
time.sleep(0.1)
for line in iter(process.stderr.readline, ''):
yield f"data: {line.strip()}\n\n"
time.sleep(0.1)
process.wait()
if process.returncode == 0:
yield f"data: SUCCESS\n\n"
else:
yield f"data: ERROR\n\n"
@app.route("/run-command", methods=["GET"])
def run_command():
if "user" not in session:
return jsonify({"error": "Unauthorized"}), 401
command_list = request.args.getlist("command[]") # Nhận danh sách lệnh từ frontend
if not command_list:
return jsonify({"error": "No command provided"}), 400
return Response(generate_output(command_list), mimetype="text/event-stream")
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=True)