#!/usr/bin/env python3 import argparse import json import os import re import shlex import subprocess import sys import time from dataclasses import dataclass from typing import Any, Iterable, List, Optional, Sequence, Tuple from urllib.parse import urlparse @dataclass class TabEntry: token: str window_index: int tab_index: int title: str url: str active: bool session_name: str = "" window_active_title: str = "" def menu_line(self) -> str: return f"{self.token}\t{self.label()}" def label(self) -> str: host = "" if self.url: parsed = urlparse(self.url) host = parsed.netloc active_marker = "*" if self.active else " " title = self.title if self.title else "(untitled)" host_segment = f" [{host}]" if host else "" session_segment = f" {self.session_name}" if self.session_name else "" return f"[{active_marker}{session_segment} w{self.window_index + 1} t{self.tab_index + 1}] {title}{host_segment}" @dataclass class ChromeWindow: window_id: str class_name: str title: str def _run_capture(args: Sequence[str], stdin_text: Optional[str] = None) -> str: proc = subprocess.run( list(args), input=stdin_text, text=True, capture_output=True, check=False, ) if proc.returncode != 0: stderr = (proc.stderr or "").strip() stdout = (proc.stdout or "").strip() details = stderr or stdout or f"exit {proc.returncode}" raise RuntimeError(f"command failed: {' '.join(args)}: {details}") return proc.stdout def _run(args: Sequence[str]) -> None: proc = subprocess.run(list(args), check=False) if proc.returncode != 0: raise RuntimeError(f"command failed: {' '.join(args)} (exit {proc.returncode})") def _as_bool(v: Any) -> bool: if isinstance(v, bool): return v if isinstance(v, (int, float)): return v != 0 if isinstance(v, str): return v.lower() in {"1", "true", "yes", "on"} return False def _windows_from_payload(payload: Any) -> List[Any]: if isinstance(payload, list): return payload if isinstance(payload, dict): windows = payload.get("windows") if isinstance(windows, list): return windows raise ValueError("unsupported chrome-session-dump JSON shape") def _session_name_from_dir(path: str) -> str: clean = path.rstrip("/") if not clean: return "session" return os.path.basename(clean) or clean def _extract_user_data_dir(cmdline: str) -> Optional[str]: pattern = r"--user-data-dir(?:=|\s+)(?:\"([^\"]+)\"|'([^']+)'|(\S+))" match = re.search(pattern, cmdline) if not match: return None for group in match.groups(): if group: return os.path.expanduser(group) return None def discover_active_session_dirs(ps_output: str, home_dir: str) -> List[str]: dirs: List[str] = [] default_needed = False for line in ps_output.splitlines(): line_lower = line.lower() if "chrome-session-dump" in line_lower: continue if "chrome" not in line_lower and "chromium" not in line_lower: continue user_data_dir = _extract_user_data_dir(line) if user_data_dir: dirs.append(user_data_dir) else: default_needed = True if default_needed: for candidate in ( os.path.join(home_dir, ".config", "google-chrome"), os.path.join(home_dir, ".config", "chromium"), os.path.join(home_dir, ".config", "chrome"), ): if os.path.isdir(candidate): dirs.append(candidate) deduped: List[str] = [] seen = set() for path in dirs: normalized = os.path.normpath(path) if normalized not in seen: deduped.append(normalized) seen.add(normalized) return deduped def discover_active_session_dirs_from_system(ps_command: str) -> List[str]: ps_output = _run_capture(shlex.split(ps_command)) return discover_active_session_dirs(ps_output, os.path.expanduser("~")) def parse_tabs_from_payload( payload: Any, *, token_prefix: str = "W", session_name: str = "", window_offset: int = 0, ) -> Tuple[List[TabEntry], int]: windows = _windows_from_payload(payload) entries: List[TabEntry] = [] for w_idx, window in enumerate(windows): if not isinstance(window, dict): continue tabs = window.get("tabs", []) if not isinstance(tabs, list): continue window_active_title = "" active_tab_index = window.get("activeTabIndex") if not isinstance(active_tab_index, int): active_tab_index = window.get("selected") if isinstance(active_tab_index, int): # Some tools use 1-based selected index. active_tab_index = max(active_tab_index - 1, 0) else: active_tab_index = None for t_idx, tab in enumerate(tabs): if not isinstance(tab, dict): continue title = str(tab.get("title") or tab.get("name") or "") url = str(tab.get("url") or tab.get("uri") or "") active = _as_bool(tab.get("active")) if active_tab_index is not None and t_idx == active_tab_index: active = True if active and not window_active_title and title: window_active_title = title for t_idx, tab in enumerate(tabs): if not isinstance(tab, dict): continue title = str(tab.get("title") or tab.get("name") or "") url = str(tab.get("url") or tab.get("uri") or "") active = _as_bool(tab.get("active")) if active_tab_index is not None and t_idx == active_tab_index: active = True token = f"{token_prefix}{w_idx}T{t_idx}" entries.append( TabEntry( token=token, window_index=window_offset + w_idx, tab_index=t_idx, title=title, url=url, active=active, session_name=session_name, window_active_title=window_active_title, ) ) return entries, len(windows) def parse_tabs(json_text: str) -> List[TabEntry]: payload = json.loads(json_text) entries, _ = parse_tabs_from_payload(payload) return entries def load_entries( *, json_file: Optional[str], read_stdin: bool, single_session: bool, session_command: str, ps_command: str, ) -> List[TabEntry]: if json_file: with open(json_file, "r", encoding="utf-8") as f: return parse_tabs(f.read()) if read_stdin: return parse_tabs(sys.stdin.read()) if single_session: return parse_tabs(_run_capture(shlex.split(session_command))) session_dirs = discover_active_session_dirs_from_system(ps_command) if not session_dirs: # Fall back to legacy behavior when process discovery cannot find active sessions. return parse_tabs(_run_capture(shlex.split(session_command))) all_entries: List[TabEntry] = [] window_offset = 0 errors: List[str] = [] for s_idx, session_dir in enumerate(session_dirs): try: json_text = _run_capture(["chrome-session-dump", "--json", session_dir]) payload = json.loads(json_text) session_name = _session_name_from_dir(session_dir) entries, window_count = parse_tabs_from_payload( payload, token_prefix=f"S{s_idx}W", session_name=session_name, window_offset=window_offset, ) all_entries.extend(entries) window_offset += window_count except Exception as exc: errors.append(f"{session_dir}: {exc}") if all_entries: return all_entries if errors: raise RuntimeError("; ".join(errors)) raise RuntimeError("no tabs found in active Chrome sessions") def parse_menu_selection(line: str) -> str: selected = line.strip() if not selected: raise ValueError("empty selection") token = selected.split("\t", 1)[0] if not token: raise ValueError("missing selection token") return token def list_chrome_windows() -> List[ChromeWindow]: out = _run_capture(["wmctrl", "-lx"]) windows: List[ChromeWindow] = [] for line in out.splitlines(): parts = line.split(None, 4) if len(parts) < 3: continue class_name = parts[2].lower() if "chrome" in class_name or "chromium" in class_name: title = parts[4] if len(parts) >= 5 else "" windows.append(ChromeWindow(window_id=parts[0], class_name=class_name, title=title)) return windows def _title_match_score(hint: str, title: str) -> int: hint_norm = hint.strip().lower() title_norm = title.strip().lower() if not hint_norm or not title_norm: return 0 if hint_norm == title_norm: return 3 if hint_norm in title_norm: return 2 if title_norm in hint_norm: return 1 return 0 def resolve_window_id(entry: TabEntry, windows: List[ChromeWindow]) -> str: if not windows: raise RuntimeError("no Chrome windows found") best_idx = -1 best_score = 0 for idx, window in enumerate(windows): score = _title_match_score(entry.window_active_title, window.title) if entry.session_name and entry.session_name.lower() in window.title.lower(): score += 1 if score > best_score: best_score = score best_idx = idx if best_idx >= 0 and best_score > 0: return windows[best_idx].window_id if entry.window_index >= len(windows): raise RuntimeError( f"chrome window index {entry.window_index} not available (found {len(windows)} chrome windows)" ) return windows[entry.window_index].window_id def build_switch_commands(window_id: str, tab_index: int) -> List[List[str]]: cmds: List[List[str]] = [["wmctrl", "-ia", window_id], ["xdotool", "key", "--clearmodifiers", "ctrl+1"]] if tab_index > 0: cmds.append(["xdotool", "key", "--clearmodifiers", "--repeat", str(tab_index), "ctrl+Tab"]) return cmds def switch_to_tab(entry: TabEntry, focus_delay_ms: int, dry_run: bool) -> None: windows = list_chrome_windows() window_id = resolve_window_id(entry, windows) commands = build_switch_commands(window_id, entry.tab_index) if dry_run: for cmd in commands: print("DRY-RUN:", " ".join(cmd)) return _run(commands[0]) if focus_delay_ms > 0: time.sleep(focus_delay_ms / 1000.0) for cmd in commands[1:]: _run(cmd) def rofi_select(entries: Iterable[TabEntry], rofi_command: str) -> str: lines = "\n".join(entry.menu_line() for entry in entries) out = _run_capture(shlex.split(rofi_command), stdin_text=lines) return parse_menu_selection(out) def _entry_by_token(entries: List[TabEntry], token: str) -> TabEntry: for entry in entries: if entry.token == token: return entry raise RuntimeError(f"tab token not found: {token}") def main() -> int: parser = argparse.ArgumentParser(description="Switch Chrome tabs using chrome-session-dump JSON and rofi.") parser.add_argument("--json-file", help="Read chrome-session-dump JSON from a file") parser.add_argument("--stdin", action="store_true", help="Read chrome-session-dump JSON from stdin") parser.add_argument( "--session-command", default="chrome-session-dump --json", help="Command used to obtain session JSON in single-session mode", ) parser.add_argument( "--single-session", action="store_true", help="Disable active-session discovery and only read one session via --session-command", ) parser.add_argument( "--ps-command", default="ps -eo args", help="Process listing command used to discover active Chrome user-data directories", ) parser.add_argument("--list-only", action="store_true", help="Print tab list and exit") parser.add_argument("--select", help="Tab token to switch to") parser.add_argument("--dry-run", action="store_true", help="Print switch commands without executing") parser.add_argument("--focus-delay-ms", type=int, default=80, help="Delay after focusing window before tab keys") parser.add_argument( "--rofi-command", default="rofi -dmenu -i -p ChromeTabs", help="Rofi command used for interactive selection", ) args = parser.parse_args() try: entries = load_entries( json_file=args.json_file, read_stdin=args.stdin, single_session=args.single_session, session_command=args.session_command, ps_command=args.ps_command, ) if not entries: raise RuntimeError("no tabs found in session dump") if args.list_only: for entry in entries: print(entry.menu_line()) return 0 token = args.select if args.select else rofi_select(entries, args.rofi_command) entry = _entry_by_token(entries, token) switch_to_tab(entry, args.focus_delay_ms, args.dry_run) return 0 except Exception as exc: print(f"error: {exc}", file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())