||
- #!/usr/bin/env python3
- import argparse
- import json
- import os
- import re
- import shlex
- import subprocess
- import sys
- import time
- from dataclasses import dataclass
- from typing import Any, Iterable, List, Optional, Sequence, Tuple
- from urllib.parse import urlparse
- @dataclass
- class TabEntry:
- token: str
- window_index: int
- tab_index: int
- title: str
- url: str
- active: bool
- session_name: str = ""
- window_active_title: str = ""
- def menu_line(self) -> str:
- return f"{self.token}\t{self.label()}"
- def label(self) -> str:
- host = ""
- if self.url:
- parsed = urlparse(self.url)
- host = parsed.netloc
- active_marker = "*" if self.active else " "
- title = self.title if self.title else "(untitled)"
- host_segment = f" [{host}]" if host else ""
- session_segment = f" {self.session_name}" if self.session_name else ""
- return f"[{active_marker}{session_segment} w{self.window_index + 1} t{self.tab_index + 1}] {title}{host_segment}"
- @dataclass
- class ChromeWindow:
- window_id: str
- class_name: str
- title: str
- def _run_capture(args: Sequence[str], stdin_text: Optional[str] = None) -> str:
- proc = subprocess.run(
- list(args),
- input=stdin_text,
- text=True,
- capture_output=True,
- check=False,
- )
- if proc.returncode != 0:
- stderr = (proc.stderr or "").strip()
- stdout = (proc.stdout or "").strip()
- details = stderr or stdout or f"exit {proc.returncode}"
- raise RuntimeError(f"command failed: {' '.join(args)}: {details}")
- return proc.stdout
- def _run(args: Sequence[str]) -> None:
- proc = subprocess.run(list(args), check=False)
- if proc.returncode != 0:
- raise RuntimeError(f"command failed: {' '.join(args)} (exit {proc.returncode})")
- def _as_bool(v: Any) -> bool:
- if isinstance(v, bool):
- return v
- if isinstance(v, (int, float)):
- return v != 0
- if isinstance(v, str):
- return v.lower() in {"1", "true", "yes", "on"}
- return False
- def _windows_from_payload(payload: Any) -> List[Any]:
- if isinstance(payload, list):
- return payload
- if isinstance(payload, dict):
- windows = payload.get("windows")
- if isinstance(windows, list):
- return windows
- raise ValueError("unsupported chrome-session-dump JSON shape")
- def _session_name_from_dir(path: str) -> str:
- clean = path.rstrip("/")
- if not clean:
- return "session"
- return os.path.basename(clean) or clean
- def _extract_user_data_dir(cmdline: str) -> Optional[str]:
- pattern = r"--user-data-dir(?:=|\s+)(?:\"([^\"]+)\"|'([^']+)'|(\S+))"
- match = re.search(pattern, cmdline)
- if not match:
- return None
- for group in match.groups():
- if group:
- return os.path.expanduser(group)
- return None
- def discover_active_session_dirs(ps_output: str, home_dir: str) -> List[str]:
- dirs: List[str] = []
- default_needed = False
- for line in ps_output.splitlines():
- line_lower = line.lower()
- if "chrome-session-dump" in line_lower:
- continue
- if "chrome" not in line_lower and "chromium" not in line_lower:
- continue
- user_data_dir = _extract_user_data_dir(line)
- if user_data_dir:
- dirs.append(user_data_dir)
- else:
- default_needed = True
- if default_needed:
- for candidate in (
- os.path.join(home_dir, ".config", "google-chrome"),
- os.path.join(home_dir, ".config", "chromium"),
- os.path.join(home_dir, ".config", "chrome"),
- ):
- if os.path.isdir(candidate):
- dirs.append(candidate)
- deduped: List[str] = []
- seen = set()
- for path in dirs:
- normalized = os.path.normpath(path)
- if normalized not in seen:
- deduped.append(normalized)
- seen.add(normalized)
- return deduped
- def discover_active_session_dirs_from_system(ps_command: str) -> List[str]:
- ps_output = _run_capture(shlex.split(ps_command))
- return discover_active_session_dirs(ps_output, os.path.expanduser("~"))
- def parse_tabs_from_payload(
- payload: Any,
- *,
- token_prefix: str = "W",
- session_name: str = "",
- window_offset: int = 0,
- ) -> Tuple[List[TabEntry], int]:
- windows = _windows_from_payload(payload)
- entries: List[TabEntry] = []
- for w_idx, window in enumerate(windows):
- if not isinstance(window, dict):
- continue
- tabs = window.get("tabs", [])
- if not isinstance(tabs, list):
- continue
- window_active_title = ""
- active_tab_index = window.get("activeTabIndex")
- if not isinstance(active_tab_index, int):
- active_tab_index = window.get("selected")
- if isinstance(active_tab_index, int):
- # Some tools use 1-based selected index.
- active_tab_index = max(active_tab_index - 1, 0)
- else:
- active_tab_index = None
- for t_idx, tab in enumerate(tabs):
- if not isinstance(tab, dict):
- continue
- title = str(tab.get("title") or tab.get("name") or "")
- url = str(tab.get("url") or tab.get("uri") or "")
- active = _as_bool(tab.get("active"))
- if active_tab_index is not None and t_idx == active_tab_index:
- active = True
- if active and not window_active_title and title:
- window_active_title = title
- for t_idx, tab in enumerate(tabs):
- if not isinstance(tab, dict):
- continue
- title = str(tab.get("title") or tab.get("name") or "")
- url = str(tab.get("url") or tab.get("uri") or "")
- active = _as_bool(tab.get("active"))
- if active_tab_index is not None and t_idx == active_tab_index:
- active = True
- token = f"{token_prefix}{w_idx}T{t_idx}"
- entries.append(
- TabEntry(
- token=token,
- window_index=window_offset + w_idx,
- tab_index=t_idx,
- title=title,
- url=url,
- active=active,
- session_name=session_name,
- window_active_title=window_active_title,
- )
- )
- return entries, len(windows)
- def parse_tabs(json_text: str) -> List[TabEntry]:
- payload = json.loads(json_text)
- entries, _ = parse_tabs_from_payload(payload)
- return entries
- def load_entries(
- *,
- json_file: Optional[str],
- read_stdin: bool,
- single_session: bool,
- session_command: str,
- ps_command: str,
- ) -> List[TabEntry]:
- if json_file:
- with open(json_file, "r", encoding="utf-8") as f:
- return parse_tabs(f.read())
- if read_stdin:
- return parse_tabs(sys.stdin.read())
- if single_session:
- return parse_tabs(_run_capture(shlex.split(session_command)))
- session_dirs = discover_active_session_dirs_from_system(ps_command)
- if not session_dirs:
- # Fall back to legacy behavior when process discovery cannot find active sessions.
- return parse_tabs(_run_capture(shlex.split(session_command)))
- all_entries: List[TabEntry] = []
- window_offset = 0
- errors: List[str] = []
- for s_idx, session_dir in enumerate(session_dirs):
- try:
- json_text = _run_capture(["chrome-session-dump", "--json", session_dir])
- payload = json.loads(json_text)
- session_name = _session_name_from_dir(session_dir)
- entries, window_count = parse_tabs_from_payload(
- payload,
- token_prefix=f"S{s_idx}W",
- session_name=session_name,
- window_offset=window_offset,
- )
- all_entries.extend(entries)
- window_offset += window_count
- except Exception as exc:
- errors.append(f"{session_dir}: {exc}")
- if all_entries:
- return all_entries
- if errors:
- raise RuntimeError("; ".join(errors))
- raise RuntimeError("no tabs found in active Chrome sessions")
- def parse_menu_selection(line: str) -> str:
- selected = line.strip()
- if not selected:
- raise ValueError("empty selection")
- token = selected.split("\t", 1)[0]
- if not token:
- raise ValueError("missing selection token")
- return token
- def list_chrome_windows() -> List[ChromeWindow]:
- out = _run_capture(["wmctrl", "-lx"])
- windows: List[ChromeWindow] = []
- for line in out.splitlines():
- parts = line.split(None, 4)
- if len(parts) < 3:
- continue
- class_name = parts[2].lower()
- if "chrome" in class_name or "chromium" in class_name:
- title = parts[4] if len(parts) >= 5 else ""
- windows.append(ChromeWindow(window_id=parts[0], class_name=class_name, title=title))
- return windows
- def _title_match_score(hint: str, title: str) -> int:
- hint_norm = hint.strip().lower()
- title_norm = title.strip().lower()
- if not hint_norm or not title_norm:
- return 0
- if hint_norm == title_norm:
- return 3
- if hint_norm in title_norm:
- return 2
- if title_norm in hint_norm:
- return 1
- return 0
- def resolve_window_id(entry: TabEntry, windows: List[ChromeWindow]) -> str:
- if not windows:
- raise RuntimeError("no Chrome windows found")
- best_idx = -1
- best_score = 0
- for idx, window in enumerate(windows):
- score = _title_match_score(entry.window_active_title, window.title)
- if entry.session_name and entry.session_name.lower() in window.title.lower():
- score += 1
- if score > best_score:
- best_score = score
- best_idx = idx
- if best_idx >= 0 and best_score > 0:
- return windows[best_idx].window_id
- if entry.window_index >= len(windows):
- raise RuntimeError(
- f"chrome window index {entry.window_index} not available (found {len(windows)} chrome windows)"
- )
- return windows[entry.window_index].window_id
- def build_switch_commands(window_id: str, tab_index: int) -> List[List[str]]:
- cmds: List[List[str]] = [["wmctrl", "-ia", window_id], ["xdotool", "key", "--clearmodifiers", "ctrl+1"]]
- if tab_index > 0:
- cmds.append(["xdotool", "key", "--clearmodifiers", "--repeat", str(tab_index), "ctrl+Tab"])
- return cmds
- def switch_to_tab(entry: TabEntry, focus_delay_ms: int, dry_run: bool) -> None:
- windows = list_chrome_windows()
- window_id = resolve_window_id(entry, windows)
- commands = build_switch_commands(window_id, entry.tab_index)
- if dry_run:
- for cmd in commands:
- print("DRY-RUN:", " ".join(cmd))
- return
- _run(commands[0])
- if focus_delay_ms > 0:
- time.sleep(focus_delay_ms / 1000.0)
- for cmd in commands[1:]:
- _run(cmd)
- def rofi_select(entries: Iterable[TabEntry], rofi_command: str) -> str:
- lines = "\n".join(entry.menu_line() for entry in entries)
- out = _run_capture(shlex.split(rofi_command), stdin_text=lines)
- return parse_menu_selection(out)
- def _entry_by_token(entries: List[TabEntry], token: str) -> TabEntry:
- for entry in entries:
- if entry.token == token:
- return entry
- raise RuntimeError(f"tab token not found: {token}")
- def main() -> int:
- parser = argparse.ArgumentParser(description="Switch Chrome tabs using chrome-session-dump JSON and rofi.")
- parser.add_argument("--json-file", help="Read chrome-session-dump JSON from a file")
- parser.add_argument("--stdin", action="store_true", help="Read chrome-session-dump JSON from stdin")
- parser.add_argument(
- "--session-command",
- default="chrome-session-dump --json",
- help="Command used to obtain session JSON in single-session mode",
- )
- parser.add_argument(
- "--single-session",
- action="store_true",
- help="Disable active-session discovery and only read one session via --session-command",
- )
- parser.add_argument(
- "--ps-command",
- default="ps -eo args",
- help="Process listing command used to discover active Chrome user-data directories",
- )
- parser.add_argument("--list-only", action="store_true", help="Print tab list and exit")
- parser.add_argument("--select", help="Tab token to switch to")
- parser.add_argument("--dry-run", action="store_true", help="Print switch commands without executing")
- parser.add_argument("--focus-delay-ms", type=int, default=80, help="Delay after focusing window before tab keys")
- parser.add_argument(
- "--rofi-command",
- default="rofi -dmenu -i -p ChromeTabs",
- help="Rofi command used for interactive selection",
- )
- args = parser.parse_args()
- try:
- entries = load_entries(
- json_file=args.json_file,
- read_stdin=args.stdin,
- single_session=args.single_session,
- session_command=args.session_command,
- ps_command=args.ps_command,
- )
- if not entries:
- raise RuntimeError("no tabs found in session dump")
- if args.list_only:
- for entry in entries:
- print(entry.menu_line())
- return 0
- token = args.select if args.select else rofi_select(entries, args.rofi_command)
- entry = _entry_by_token(entries, token)
- switch_to_tab(entry, args.focus_delay_ms, args.dry_run)
- return 0
- except Exception as exc:
- print(f"error: {exc}", file=sys.stderr)
- return 1
- if __name__ == "__main__":
- raise SystemExit(main())
|