| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- #!/usr/bin/env python3
- import os
- import tempfile
- import threading
- import time
- from flask import Flask, jsonify, request, abort, render_template_string
- app = Flask(__name__)
- VALUE_FILE = "/data/pct"
- _write_lock = threading.Lock()
- # --- Helpers ---
- def _clamp_to_range(val: int) -> int:
- return max(0, min(99, int(val)))
- def _ensure_file_exists():
- if not os.path.exists(VALUE_FILE):
- _atomic_write("0")
- def _read_value() -> int:
- with open(VALUE_FILE, "r", encoding="utf-8") as f:
- raw = f.read().strip()
- if raw == "":
- return 0
- return _clamp_to_range(int(raw))
- def _atomic_write(text: str):
- dir_name = os.path.dirname(os.path.abspath(VALUE_FILE)) or "."
- os.makedirs(dir_name, exist_ok=True)
- fd, tmp_path = tempfile.mkstemp(prefix=".value-", dir=dir_name, text=True)
- with open(VALUE_FILE, "w") as tmp:
- tmp.write(text)
- tmp.flush()
- os.fsync(tmp.fileno())
- def _get_mtime() -> float:
- try:
- return os.path.getmtime(VALUE_FILE)
- except FileNotFoundError:
- return 0.0
- def _fmt_time(mtime: float) -> str:
- if not mtime:
- return "never"
- return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(mtime))
- # --- Routes ---
- @app.route("/")
- def index():
- _ensure_file_exists()
- val = _read_value()
- mtime = _get_mtime()
- html = """
- <!doctype html>
- <html>
- <head>
- <meta charset="utf-8"/>
- <title>Astoria Manual Control</title>
- <style>
- body { font-family: sans-serif; margin: 2em; }
- .value { font-size: 64px; margin-bottom: 20px; }
- button { font-size: 24px; margin: 6px; padding: 12px 20px; }
- </style>
- </head>
- <body>
- <h1>Astoria Manual Control</h1>
- <div class="value" id="val">{{value}}</div>
- <div id="mtime">Last updated: {{mtime}}</div>
- <div>
- <button onclick="setVal(0)">0</button>
- <button onclick="setVal(7)">7</button>
- <button onclick="setVal(33)">33</button>
- <button onclick="setVal(99)">99</button>
- </div>
- <div>
- <input type="number" id="num" min="0" max="99" value="{{value}}"/>
- <button onclick="saveNum()">Save</button>
- </div>
- <script>
- let lastMtime = {{mtime_num}};
- async function waitForChange() {
- try {
- const resp = await fetch("/wait?since=" + lastMtime);
- if (!resp.ok) throw new Error("HTTP " + resp.status);
- const data = await resp.json();
- document.getElementById("val").textContent = data.value;
- document.getElementById("mtime").textContent = "Last updated: " + data.mtime_human;
- document.getElementById("num").value = data.value;
- lastMtime = data.mtime;
- } catch (e) {
- console.error("Polling failed:", e);
- }
- waitForChange(); // restart long poll
- }
- async function setVal(v) {
- await fetch("/set", {
- method: "POST",
- headers: { "Content-Type": "application/json" },
- body: JSON.stringify({ value: v })
- });
- }
- function saveNum() {
- const v = parseInt(document.getElementById("num").value);
- setVal(v);
- }
- waitForChange();
- </script>
- </body>
- </html>
- """
- return render_template_string(html, value=val, mtime=_fmt_time(mtime), mtime_num=mtime)
- @app.route("/set", methods=["POST"])
- def set_value():
- if not request.is_json:
- abort(400, "Expected JSON")
- body = request.get_json(silent=True) or {}
- if "value" not in body:
- abort(400, "Missing value")
- try:
- new_val = _clamp_to_range(int(body["value"]))
- except Exception:
- abort(400, "Value must be int 0–99")
- with _write_lock:
- _atomic_write(str(new_val))
- mtime = _get_mtime()
- return jsonify(value=new_val, mtime=mtime, mtime_human=_fmt_time(mtime))
- @app.route("/wait")
- def wait_for_change():
- """Long poll: wait until file changes after `since` timestamp."""
- since = float(request.args.get("since", "0"))
- timeout = 30 # max seconds to wait
- start = time.time()
- while time.time() - start < timeout:
- cur_mtime = _get_mtime()
- if cur_mtime > since:
- val = _read_value()
- return jsonify(value=val, mtime=cur_mtime, mtime_human=_fmt_time(cur_mtime))
- time.sleep(0.2) # avoid busy loop
- # timeout, respond with no change
- return jsonify(value=_read_value(), mtime=_get_mtime(), mtime_human=_fmt_time(_get_mtime()))
- if __name__ == "__main__":
- _ensure_file_exists()
- app.run(host="0.0.0.0", port=8080, debug=True)
|