|
|
@@ -0,0 +1,429 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+
|
|
|
+import argparse
|
|
|
+import json
|
|
|
+import os
|
|
|
+import re
|
|
|
+import shlex
|
|
|
+import subprocess
|
|
|
+import sys
|
|
|
+import time
|
|
|
+from dataclasses import dataclass
|
|
|
+from typing import Any, Iterable, List, Optional, Sequence, Tuple
|
|
|
+from urllib.parse import urlparse
|
|
|
+
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class TabEntry:
|
|
|
+ token: str
|
|
|
+ window_index: int
|
|
|
+ tab_index: int
|
|
|
+ title: str
|
|
|
+ url: str
|
|
|
+ active: bool
|
|
|
+ session_name: str = ""
|
|
|
+ window_active_title: str = ""
|
|
|
+
|
|
|
+ def menu_line(self) -> str:
|
|
|
+ return f"{self.token}\t{self.label()}"
|
|
|
+
|
|
|
+ def label(self) -> str:
|
|
|
+ host = ""
|
|
|
+ if self.url:
|
|
|
+ parsed = urlparse(self.url)
|
|
|
+ host = parsed.netloc
|
|
|
+ active_marker = "*" if self.active else " "
|
|
|
+ title = self.title if self.title else "(untitled)"
|
|
|
+ host_segment = f" [{host}]" if host else ""
|
|
|
+ session_segment = f" {self.session_name}" if self.session_name else ""
|
|
|
+ return f"[{active_marker}{session_segment} w{self.window_index + 1} t{self.tab_index + 1}] {title}{host_segment}"
|
|
|
+
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class ChromeWindow:
|
|
|
+ window_id: str
|
|
|
+ class_name: str
|
|
|
+ title: str
|
|
|
+
|
|
|
+
|
|
|
+def _run_capture(args: Sequence[str], stdin_text: Optional[str] = None) -> str:
|
|
|
+ proc = subprocess.run(
|
|
|
+ list(args),
|
|
|
+ input=stdin_text,
|
|
|
+ text=True,
|
|
|
+ capture_output=True,
|
|
|
+ check=False,
|
|
|
+ )
|
|
|
+ if proc.returncode != 0:
|
|
|
+ stderr = (proc.stderr or "").strip()
|
|
|
+ stdout = (proc.stdout or "").strip()
|
|
|
+ details = stderr or stdout or f"exit {proc.returncode}"
|
|
|
+ raise RuntimeError(f"command failed: {' '.join(args)}: {details}")
|
|
|
+ return proc.stdout
|
|
|
+
|
|
|
+
|
|
|
+def _run(args: Sequence[str]) -> None:
|
|
|
+ proc = subprocess.run(list(args), check=False)
|
|
|
+ if proc.returncode != 0:
|
|
|
+ raise RuntimeError(f"command failed: {' '.join(args)} (exit {proc.returncode})")
|
|
|
+
|
|
|
+
|
|
|
+def _as_bool(v: Any) -> bool:
|
|
|
+ if isinstance(v, bool):
|
|
|
+ return v
|
|
|
+ if isinstance(v, (int, float)):
|
|
|
+ return v != 0
|
|
|
+ if isinstance(v, str):
|
|
|
+ return v.lower() in {"1", "true", "yes", "on"}
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+def _windows_from_payload(payload: Any) -> List[Any]:
|
|
|
+ if isinstance(payload, list):
|
|
|
+ return payload
|
|
|
+ if isinstance(payload, dict):
|
|
|
+ windows = payload.get("windows")
|
|
|
+ if isinstance(windows, list):
|
|
|
+ return windows
|
|
|
+ raise ValueError("unsupported chrome-session-dump JSON shape")
|
|
|
+
|
|
|
+
|
|
|
+def _session_name_from_dir(path: str) -> str:
|
|
|
+ clean = path.rstrip("/")
|
|
|
+ if not clean:
|
|
|
+ return "session"
|
|
|
+ return os.path.basename(clean) or clean
|
|
|
+
|
|
|
+
|
|
|
+def _extract_user_data_dir(cmdline: str) -> Optional[str]:
|
|
|
+ pattern = r"--user-data-dir(?:=|\s+)(?:\"([^\"]+)\"|'([^']+)'|(\S+))"
|
|
|
+ match = re.search(pattern, cmdline)
|
|
|
+ if not match:
|
|
|
+ return None
|
|
|
+ for group in match.groups():
|
|
|
+ if group:
|
|
|
+ return os.path.expanduser(group)
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+def discover_active_session_dirs(ps_output: str, home_dir: str) -> List[str]:
|
|
|
+ dirs: List[str] = []
|
|
|
+ default_needed = False
|
|
|
+
|
|
|
+ for line in ps_output.splitlines():
|
|
|
+ line_lower = line.lower()
|
|
|
+ if "chrome-session-dump" in line_lower:
|
|
|
+ continue
|
|
|
+ if "chrome" not in line_lower and "chromium" not in line_lower:
|
|
|
+ continue
|
|
|
+
|
|
|
+ user_data_dir = _extract_user_data_dir(line)
|
|
|
+ if user_data_dir:
|
|
|
+ dirs.append(user_data_dir)
|
|
|
+ else:
|
|
|
+ default_needed = True
|
|
|
+
|
|
|
+ if default_needed:
|
|
|
+ for candidate in (
|
|
|
+ os.path.join(home_dir, ".config", "google-chrome"),
|
|
|
+ os.path.join(home_dir, ".config", "chromium"),
|
|
|
+ os.path.join(home_dir, ".config", "chrome"),
|
|
|
+ ):
|
|
|
+ if os.path.isdir(candidate):
|
|
|
+ dirs.append(candidate)
|
|
|
+
|
|
|
+ deduped: List[str] = []
|
|
|
+ seen = set()
|
|
|
+ for path in dirs:
|
|
|
+ normalized = os.path.normpath(path)
|
|
|
+ if normalized not in seen:
|
|
|
+ deduped.append(normalized)
|
|
|
+ seen.add(normalized)
|
|
|
+ return deduped
|
|
|
+
|
|
|
+
|
|
|
+def discover_active_session_dirs_from_system(ps_command: str) -> List[str]:
|
|
|
+ ps_output = _run_capture(shlex.split(ps_command))
|
|
|
+ return discover_active_session_dirs(ps_output, os.path.expanduser("~"))
|
|
|
+
|
|
|
+
|
|
|
+def parse_tabs_from_payload(
|
|
|
+ payload: Any,
|
|
|
+ *,
|
|
|
+ token_prefix: str = "W",
|
|
|
+ session_name: str = "",
|
|
|
+ window_offset: int = 0,
|
|
|
+) -> Tuple[List[TabEntry], int]:
|
|
|
+ windows = _windows_from_payload(payload)
|
|
|
+
|
|
|
+ entries: List[TabEntry] = []
|
|
|
+ for w_idx, window in enumerate(windows):
|
|
|
+ if not isinstance(window, dict):
|
|
|
+ continue
|
|
|
+
|
|
|
+ tabs = window.get("tabs", [])
|
|
|
+ if not isinstance(tabs, list):
|
|
|
+ continue
|
|
|
+
|
|
|
+ window_active_title = ""
|
|
|
+ active_tab_index = window.get("activeTabIndex")
|
|
|
+ if not isinstance(active_tab_index, int):
|
|
|
+ active_tab_index = window.get("selected")
|
|
|
+ if isinstance(active_tab_index, int):
|
|
|
+ # Some tools use 1-based selected index.
|
|
|
+ active_tab_index = max(active_tab_index - 1, 0)
|
|
|
+ else:
|
|
|
+ active_tab_index = None
|
|
|
+
|
|
|
+ for t_idx, tab in enumerate(tabs):
|
|
|
+ if not isinstance(tab, dict):
|
|
|
+ continue
|
|
|
+
|
|
|
+ title = str(tab.get("title") or tab.get("name") or "")
|
|
|
+ url = str(tab.get("url") or tab.get("uri") or "")
|
|
|
+ active = _as_bool(tab.get("active"))
|
|
|
+ if active_tab_index is not None and t_idx == active_tab_index:
|
|
|
+ active = True
|
|
|
+ if active and not window_active_title and title:
|
|
|
+ window_active_title = title
|
|
|
+
|
|
|
+ for t_idx, tab in enumerate(tabs):
|
|
|
+ if not isinstance(tab, dict):
|
|
|
+ continue
|
|
|
+
|
|
|
+ title = str(tab.get("title") or tab.get("name") or "")
|
|
|
+ url = str(tab.get("url") or tab.get("uri") or "")
|
|
|
+ active = _as_bool(tab.get("active"))
|
|
|
+ if active_tab_index is not None and t_idx == active_tab_index:
|
|
|
+ active = True
|
|
|
+
|
|
|
+ token = f"{token_prefix}{w_idx}T{t_idx}"
|
|
|
+ entries.append(
|
|
|
+ TabEntry(
|
|
|
+ token=token,
|
|
|
+ window_index=window_offset + w_idx,
|
|
|
+ tab_index=t_idx,
|
|
|
+ title=title,
|
|
|
+ url=url,
|
|
|
+ active=active,
|
|
|
+ session_name=session_name,
|
|
|
+ window_active_title=window_active_title,
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
+ return entries, len(windows)
|
|
|
+
|
|
|
+
|
|
|
+def parse_tabs(json_text: str) -> List[TabEntry]:
|
|
|
+ payload = json.loads(json_text)
|
|
|
+ entries, _ = parse_tabs_from_payload(payload)
|
|
|
+ return entries
|
|
|
+
|
|
|
+
|
|
|
+def load_entries(
|
|
|
+ *,
|
|
|
+ json_file: Optional[str],
|
|
|
+ read_stdin: bool,
|
|
|
+ single_session: bool,
|
|
|
+ session_command: str,
|
|
|
+ ps_command: str,
|
|
|
+) -> List[TabEntry]:
|
|
|
+ if json_file:
|
|
|
+ with open(json_file, "r", encoding="utf-8") as f:
|
|
|
+ return parse_tabs(f.read())
|
|
|
+ if read_stdin:
|
|
|
+ return parse_tabs(sys.stdin.read())
|
|
|
+
|
|
|
+ if single_session:
|
|
|
+ return parse_tabs(_run_capture(shlex.split(session_command)))
|
|
|
+
|
|
|
+ session_dirs = discover_active_session_dirs_from_system(ps_command)
|
|
|
+ if not session_dirs:
|
|
|
+ # Fall back to legacy behavior when process discovery cannot find active sessions.
|
|
|
+ return parse_tabs(_run_capture(shlex.split(session_command)))
|
|
|
+
|
|
|
+ all_entries: List[TabEntry] = []
|
|
|
+ window_offset = 0
|
|
|
+ errors: List[str] = []
|
|
|
+
|
|
|
+ for s_idx, session_dir in enumerate(session_dirs):
|
|
|
+ try:
|
|
|
+ json_text = _run_capture(["chrome-session-dump", "--json", session_dir])
|
|
|
+ payload = json.loads(json_text)
|
|
|
+ session_name = _session_name_from_dir(session_dir)
|
|
|
+ entries, window_count = parse_tabs_from_payload(
|
|
|
+ payload,
|
|
|
+ token_prefix=f"S{s_idx}W",
|
|
|
+ session_name=session_name,
|
|
|
+ window_offset=window_offset,
|
|
|
+ )
|
|
|
+ all_entries.extend(entries)
|
|
|
+ window_offset += window_count
|
|
|
+ except Exception as exc:
|
|
|
+ errors.append(f"{session_dir}: {exc}")
|
|
|
+
|
|
|
+ if all_entries:
|
|
|
+ return all_entries
|
|
|
+ if errors:
|
|
|
+ raise RuntimeError("; ".join(errors))
|
|
|
+ raise RuntimeError("no tabs found in active Chrome sessions")
|
|
|
+
|
|
|
+
|
|
|
+def parse_menu_selection(line: str) -> str:
|
|
|
+ selected = line.strip()
|
|
|
+ if not selected:
|
|
|
+ raise ValueError("empty selection")
|
|
|
+ token = selected.split("\t", 1)[0]
|
|
|
+ if not token:
|
|
|
+ raise ValueError("missing selection token")
|
|
|
+ return token
|
|
|
+
|
|
|
+
|
|
|
+def list_chrome_windows() -> List[ChromeWindow]:
|
|
|
+ out = _run_capture(["wmctrl", "-lx"])
|
|
|
+ windows: List[ChromeWindow] = []
|
|
|
+ for line in out.splitlines():
|
|
|
+ parts = line.split(None, 4)
|
|
|
+ if len(parts) < 3:
|
|
|
+ continue
|
|
|
+ class_name = parts[2].lower()
|
|
|
+ if "chrome" in class_name or "chromium" in class_name:
|
|
|
+ title = parts[4] if len(parts) >= 5 else ""
|
|
|
+ windows.append(ChromeWindow(window_id=parts[0], class_name=class_name, title=title))
|
|
|
+ return windows
|
|
|
+
|
|
|
+
|
|
|
+def _title_match_score(hint: str, title: str) -> int:
|
|
|
+ hint_norm = hint.strip().lower()
|
|
|
+ title_norm = title.strip().lower()
|
|
|
+ if not hint_norm or not title_norm:
|
|
|
+ return 0
|
|
|
+ if hint_norm == title_norm:
|
|
|
+ return 3
|
|
|
+ if hint_norm in title_norm:
|
|
|
+ return 2
|
|
|
+ if title_norm in hint_norm:
|
|
|
+ return 1
|
|
|
+ return 0
|
|
|
+
|
|
|
+
|
|
|
+def resolve_window_id(entry: TabEntry, windows: List[ChromeWindow]) -> str:
|
|
|
+ if not windows:
|
|
|
+ raise RuntimeError("no Chrome windows found")
|
|
|
+
|
|
|
+ best_idx = -1
|
|
|
+ best_score = 0
|
|
|
+ for idx, window in enumerate(windows):
|
|
|
+ score = _title_match_score(entry.window_active_title, window.title)
|
|
|
+ if entry.session_name and entry.session_name.lower() in window.title.lower():
|
|
|
+ score += 1
|
|
|
+ if score > best_score:
|
|
|
+ best_score = score
|
|
|
+ best_idx = idx
|
|
|
+
|
|
|
+ if best_idx >= 0 and best_score > 0:
|
|
|
+ return windows[best_idx].window_id
|
|
|
+
|
|
|
+ if entry.window_index >= len(windows):
|
|
|
+ raise RuntimeError(
|
|
|
+ f"chrome window index {entry.window_index} not available (found {len(windows)} chrome windows)"
|
|
|
+ )
|
|
|
+ return windows[entry.window_index].window_id
|
|
|
+
|
|
|
+
|
|
|
+def build_switch_commands(window_id: str, tab_index: int) -> List[List[str]]:
|
|
|
+ cmds: List[List[str]] = [["wmctrl", "-ia", window_id], ["xdotool", "key", "--clearmodifiers", "ctrl+1"]]
|
|
|
+ if tab_index > 0:
|
|
|
+ cmds.append(["xdotool", "key", "--clearmodifiers", "--repeat", str(tab_index), "ctrl+Tab"])
|
|
|
+ return cmds
|
|
|
+
|
|
|
+
|
|
|
+def switch_to_tab(entry: TabEntry, focus_delay_ms: int, dry_run: bool) -> None:
|
|
|
+ windows = list_chrome_windows()
|
|
|
+ window_id = resolve_window_id(entry, windows)
|
|
|
+ commands = build_switch_commands(window_id, entry.tab_index)
|
|
|
+
|
|
|
+ if dry_run:
|
|
|
+ for cmd in commands:
|
|
|
+ print("DRY-RUN:", " ".join(cmd))
|
|
|
+ return
|
|
|
+
|
|
|
+ _run(commands[0])
|
|
|
+ if focus_delay_ms > 0:
|
|
|
+ time.sleep(focus_delay_ms / 1000.0)
|
|
|
+ for cmd in commands[1:]:
|
|
|
+ _run(cmd)
|
|
|
+
|
|
|
+
|
|
|
+def rofi_select(entries: Iterable[TabEntry], rofi_command: str) -> str:
|
|
|
+ lines = "\n".join(entry.menu_line() for entry in entries)
|
|
|
+ out = _run_capture(shlex.split(rofi_command), stdin_text=lines)
|
|
|
+ return parse_menu_selection(out)
|
|
|
+
|
|
|
+
|
|
|
+def _entry_by_token(entries: List[TabEntry], token: str) -> TabEntry:
|
|
|
+ for entry in entries:
|
|
|
+ if entry.token == token:
|
|
|
+ return entry
|
|
|
+ raise RuntimeError(f"tab token not found: {token}")
|
|
|
+
|
|
|
+
|
|
|
+def main() -> int:
|
|
|
+ parser = argparse.ArgumentParser(description="Switch Chrome tabs using chrome-session-dump JSON and rofi.")
|
|
|
+ parser.add_argument("--json-file", help="Read chrome-session-dump JSON from a file")
|
|
|
+ parser.add_argument("--stdin", action="store_true", help="Read chrome-session-dump JSON from stdin")
|
|
|
+ parser.add_argument(
|
|
|
+ "--session-command",
|
|
|
+ default="chrome-session-dump --json",
|
|
|
+ help="Command used to obtain session JSON in single-session mode",
|
|
|
+ )
|
|
|
+ parser.add_argument(
|
|
|
+ "--single-session",
|
|
|
+ action="store_true",
|
|
|
+ help="Disable active-session discovery and only read one session via --session-command",
|
|
|
+ )
|
|
|
+ parser.add_argument(
|
|
|
+ "--ps-command",
|
|
|
+ default="ps -eo args",
|
|
|
+ help="Process listing command used to discover active Chrome user-data directories",
|
|
|
+ )
|
|
|
+ parser.add_argument("--list-only", action="store_true", help="Print tab list and exit")
|
|
|
+ parser.add_argument("--select", help="Tab token to switch to")
|
|
|
+ parser.add_argument("--dry-run", action="store_true", help="Print switch commands without executing")
|
|
|
+ parser.add_argument("--focus-delay-ms", type=int, default=80, help="Delay after focusing window before tab keys")
|
|
|
+ parser.add_argument(
|
|
|
+ "--rofi-command",
|
|
|
+ default="rofi -dmenu -i -p ChromeTabs",
|
|
|
+ help="Rofi command used for interactive selection",
|
|
|
+ )
|
|
|
+
|
|
|
+ args = parser.parse_args()
|
|
|
+
|
|
|
+ try:
|
|
|
+ entries = load_entries(
|
|
|
+ json_file=args.json_file,
|
|
|
+ read_stdin=args.stdin,
|
|
|
+ single_session=args.single_session,
|
|
|
+ session_command=args.session_command,
|
|
|
+ ps_command=args.ps_command,
|
|
|
+ )
|
|
|
+
|
|
|
+ if not entries:
|
|
|
+ raise RuntimeError("no tabs found in session dump")
|
|
|
+
|
|
|
+ if args.list_only:
|
|
|
+ for entry in entries:
|
|
|
+ print(entry.menu_line())
|
|
|
+ return 0
|
|
|
+
|
|
|
+ token = args.select if args.select else rofi_select(entries, args.rofi_command)
|
|
|
+ entry = _entry_by_token(entries, token)
|
|
|
+ switch_to_tab(entry, args.focus_delay_ms, args.dry_run)
|
|
|
+ return 0
|
|
|
+ except Exception as exc:
|
|
|
+ print(f"error: {exc}", file=sys.stderr)
|
|
|
+ return 1
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ raise SystemExit(main())
|