chrome-tab-switcher.py 13 KB


  1. #!/usr/bin/env python3
  2. import argparse
  3. import json
  4. import os
  5. import re
  6. import shlex
  7. import subprocess
  8. import sys
  9. import time
  10. from dataclasses import dataclass
  11. from typing import Any, Iterable, List, Optional, Sequence, Tuple
  12. from urllib.parse import urlparse
  13. @dataclass
  14. class TabEntry:
  15. token: str
  16. window_index: int
  17. tab_index: int
  18. title: str
  19. url: str
  20. active: bool
  21. session_name: str = ""
  22. window_active_title: str = ""
  23. def menu_line(self) -> str:
  24. return f"{self.token}\t{self.label()}"
  25. def label(self) -> str:
  26. host = ""
  27. if self.url:
  28. parsed = urlparse(self.url)
  29. host = parsed.netloc
  30. active_marker = "*" if self.active else " "
  31. title = self.title if self.title else "(untitled)"
  32. host_segment = f" [{host}]" if host else ""
  33. session_segment = f" {self.session_name}" if self.session_name else ""
  34. return f"[{active_marker}{session_segment} w{self.window_index + 1} t{self.tab_index + 1}] {title}{host_segment}"
  35. @dataclass
  36. class ChromeWindow:
  37. window_id: str
  38. class_name: str
  39. title: str
  40. def _run_capture(args: Sequence[str], stdin_text: Optional[str] = None) -> str:
  41. proc = subprocess.run(
  42. list(args),
  43. input=stdin_text,
  44. text=True,
  45. capture_output=True,
  46. check=False,
  47. )
  48. if proc.returncode != 0:
  49. stderr = (proc.stderr or "").strip()
  50. stdout = (proc.stdout or "").strip()
  51. details = stderr or stdout or f"exit {proc.returncode}"
  52. raise RuntimeError(f"command failed: {' '.join(args)}: {details}")
  53. return proc.stdout
  54. def _run(args: Sequence[str]) -> None:
  55. proc = subprocess.run(list(args), check=False)
  56. if proc.returncode != 0:
  57. raise RuntimeError(f"command failed: {' '.join(args)} (exit {proc.returncode})")
  58. def _as_bool(v: Any) -> bool:
  59. if isinstance(v, bool):
  60. return v
  61. if isinstance(v, (int, float)):
  62. return v != 0
  63. if isinstance(v, str):
  64. return v.lower() in {"1", "true", "yes", "on"}
  65. return False
  66. def _windows_from_payload(payload: Any) -> List[Any]:
  67. if isinstance(payload, list):
  68. return payload
  69. if isinstance(payload, dict):
  70. windows = payload.get("windows")
  71. if isinstance(windows, list):
  72. return windows
  73. raise ValueError("unsupported chrome-session-dump JSON shape")
  74. def _session_name_from_dir(path: str) -> str:
  75. clean = path.rstrip("/")
  76. if not clean:
  77. return "session"
  78. return os.path.basename(clean) or clean
  79. def _extract_user_data_dir(cmdline: str) -> Optional[str]:
  80. pattern = r"--user-data-dir(?:=|\s+)(?:\"([^\"]+)\"|'([^']+)'|(\S+))"
  81. match = re.search(pattern, cmdline)
  82. if not match:
  83. return None
  84. for group in match.groups():
  85. if group:
  86. return os.path.expanduser(group)
  87. return None
  88. def discover_active_session_dirs(ps_output: str, home_dir: str) -> List[str]:
  89. dirs: List[str] = []
  90. default_needed = False
  91. for line in ps_output.splitlines():
  92. line_lower = line.lower()
  93. if "chrome-session-dump" in line_lower:
  94. continue
  95. if "chrome" not in line_lower and "chromium" not in line_lower:
  96. continue
  97. user_data_dir = _extract_user_data_dir(line)
  98. if user_data_dir:
  99. dirs.append(user_data_dir)
  100. else:
  101. default_needed = True
  102. if default_needed:
  103. for candidate in (
  104. os.path.join(home_dir, ".config", "google-chrome"),
  105. os.path.join(home_dir, ".config", "chromium"),
  106. os.path.join(home_dir, ".config", "chrome"),
  107. ):
  108. if os.path.isdir(candidate):
  109. dirs.append(candidate)
  110. deduped: List[str] = []
  111. seen = set()
  112. for path in dirs:
  113. normalized = os.path.normpath(path)
  114. if normalized not in seen:
  115. deduped.append(normalized)
  116. seen.add(normalized)
  117. return deduped
  118. def discover_active_session_dirs_from_system(ps_command: str) -> List[str]:
  119. ps_output = _run_capture(shlex.split(ps_command))
  120. return discover_active_session_dirs(ps_output, os.path.expanduser("~"))
  121. def parse_tabs_from_payload(
  122. payload: Any,
  123. *,
  124. token_prefix: str = "W",
  125. session_name: str = "",
  126. window_offset: int = 0,
  127. ) -> Tuple[List[TabEntry], int]:
  128. windows = _windows_from_payload(payload)
  129. entries: List[TabEntry] = []
  130. for w_idx, window in enumerate(windows):
  131. if not isinstance(window, dict):
  132. continue
  133. tabs = window.get("tabs", [])
  134. if not isinstance(tabs, list):
  135. continue
  136. window_active_title = ""
  137. active_tab_index = window.get("activeTabIndex")
  138. if not isinstance(active_tab_index, int):
  139. active_tab_index = window.get("selected")
  140. if isinstance(active_tab_index, int):
  141. # Some tools use 1-based selected index.
  142. active_tab_index = max(active_tab_index - 1, 0)
  143. else:
  144. active_tab_index = None
  145. for t_idx, tab in enumerate(tabs):
  146. if not isinstance(tab, dict):
  147. continue
  148. title = str(tab.get("title") or tab.get("name") or "")
  149. url = str(tab.get("url") or tab.get("uri") or "")
  150. active = _as_bool(tab.get("active"))
  151. if active_tab_index is not None and t_idx == active_tab_index:
  152. active = True
  153. if active and not window_active_title and title:
  154. window_active_title = title
  155. for t_idx, tab in enumerate(tabs):
  156. if not isinstance(tab, dict):
  157. continue
  158. title = str(tab.get("title") or tab.get("name") or "")
  159. url = str(tab.get("url") or tab.get("uri") or "")
  160. active = _as_bool(tab.get("active"))
  161. if active_tab_index is not None and t_idx == active_tab_index:
  162. active = True
  163. token = f"{token_prefix}{w_idx}T{t_idx}"
  164. entries.append(
  165. TabEntry(
  166. token=token,
  167. window_index=window_offset + w_idx,
  168. tab_index=t_idx,
  169. title=title,
  170. url=url,
  171. active=active,
  172. session_name=session_name,
  173. window_active_title=window_active_title,
  174. )
  175. )
  176. return entries, len(windows)
  177. def parse_tabs(json_text: str) -> List[TabEntry]:
  178. payload = json.loads(json_text)
  179. entries, _ = parse_tabs_from_payload(payload)
  180. return entries
  181. def load_entries(
  182. *,
  183. json_file: Optional[str],
  184. read_stdin: bool,
  185. single_session: bool,
  186. session_command: str,
  187. ps_command: str,
  188. ) -> List[TabEntry]:
  189. if json_file:
  190. with open(json_file, "r", encoding="utf-8") as f:
  191. return parse_tabs(f.read())
  192. if read_stdin:
  193. return parse_tabs(sys.stdin.read())
  194. if single_session:
  195. return parse_tabs(_run_capture(shlex.split(session_command)))
  196. session_dirs = discover_active_session_dirs_from_system(ps_command)
  197. if not session_dirs:
  198. # Fall back to legacy behavior when process discovery cannot find active sessions.
  199. return parse_tabs(_run_capture(shlex.split(session_command)))
  200. all_entries: List[TabEntry] = []
  201. window_offset = 0
  202. errors: List[str] = []
  203. for s_idx, session_dir in enumerate(session_dirs):
  204. try:
  205. json_text = _run_capture(["chrome-session-dump", "--json", session_dir])
  206. payload = json.loads(json_text)
  207. session_name = _session_name_from_dir(session_dir)
  208. entries, window_count = parse_tabs_from_payload(
  209. payload,
  210. token_prefix=f"S{s_idx}W",
  211. session_name=session_name,
  212. window_offset=window_offset,
  213. )
  214. all_entries.extend(entries)
  215. window_offset += window_count
  216. except Exception as exc:
  217. errors.append(f"{session_dir}: {exc}")
  218. if all_entries:
  219. return all_entries
  220. if errors:
  221. raise RuntimeError("; ".join(errors))
  222. raise RuntimeError("no tabs found in active Chrome sessions")
  223. def parse_menu_selection(line: str) -> str:
  224. selected = line.strip()
  225. if not selected:
  226. raise ValueError("empty selection")
  227. token = selected.split("\t", 1)[0]
  228. if not token:
  229. raise ValueError("missing selection token")
  230. return token
  231. def list_chrome_windows() -> List[ChromeWindow]:
  232. out = _run_capture(["wmctrl", "-lx"])
  233. windows: List[ChromeWindow] = []
  234. for line in out.splitlines():
  235. parts = line.split(None, 4)
  236. if len(parts) < 3:
  237. continue
  238. class_name = parts[2].lower()
  239. if "chrome" in class_name or "chromium" in class_name:
  240. title = parts[4] if len(parts) >= 5 else ""
  241. windows.append(ChromeWindow(window_id=parts[0], class_name=class_name, title=title))
  242. return windows
  243. def _title_match_score(hint: str, title: str) -> int:
  244. hint_norm = hint.strip().lower()
  245. title_norm = title.strip().lower()
  246. if not hint_norm or not title_norm:
  247. return 0
  248. if hint_norm == title_norm:
  249. return 3
  250. if hint_norm in title_norm:
  251. return 2
  252. if title_norm in hint_norm:
  253. return 1
  254. return 0
  255. def resolve_window_id(entry: TabEntry, windows: List[ChromeWindow]) -> str:
  256. if not windows:
  257. raise RuntimeError("no Chrome windows found")
  258. best_idx = -1
  259. best_score = 0
  260. for idx, window in enumerate(windows):
  261. score = _title_match_score(entry.window_active_title, window.title)
  262. if entry.session_name and entry.session_name.lower() in window.title.lower():
  263. score += 1
  264. if score > best_score:
  265. best_score = score
  266. best_idx = idx
  267. if best_idx >= 0 and best_score > 0:
  268. return windows[best_idx].window_id
  269. if entry.window_index >= len(windows):
  270. raise RuntimeError(
  271. f"chrome window index {entry.window_index} not available (found {len(windows)} chrome windows)"
  272. )
  273. return windows[entry.window_index].window_id
  274. def build_switch_commands(window_id: str, tab_index: int) -> List[List[str]]:
  275. cmds: List[List[str]] = [["wmctrl", "-ia", window_id], ["xdotool", "key", "--clearmodifiers", "ctrl+1"]]
  276. if tab_index > 0:
  277. cmds.append(["xdotool", "key", "--clearmodifiers", "--repeat", str(tab_index), "ctrl+Tab"])
  278. return cmds
  279. def switch_to_tab(entry: TabEntry, focus_delay_ms: int, dry_run: bool) -> None:
  280. windows = list_chrome_windows()
  281. window_id = resolve_window_id(entry, windows)
  282. commands = build_switch_commands(window_id, entry.tab_index)
  283. if dry_run:
  284. for cmd in commands:
  285. print("DRY-RUN:", " ".join(cmd))
  286. return
  287. _run(commands[0])
  288. if focus_delay_ms > 0:
  289. time.sleep(focus_delay_ms / 1000.0)
  290. for cmd in commands[1:]:
  291. _run(cmd)
  292. def rofi_select(entries: Iterable[TabEntry], rofi_command: str) -> str:
  293. lines = "\n".join(entry.menu_line() for entry in entries)
  294. out = _run_capture(shlex.split(rofi_command), stdin_text=lines)
  295. return parse_menu_selection(out)
  296. def _entry_by_token(entries: List[TabEntry], token: str) -> TabEntry:
  297. for entry in entries:
  298. if entry.token == token:
  299. return entry
  300. raise RuntimeError(f"tab token not found: {token}")
  301. def main() -> int:
  302. parser = argparse.ArgumentParser(description="Switch Chrome tabs using chrome-session-dump JSON and rofi.")
  303. parser.add_argument("--json-file", help="Read chrome-session-dump JSON from a file")
  304. parser.add_argument("--stdin", action="store_true", help="Read chrome-session-dump JSON from stdin")
  305. parser.add_argument(
  306. "--session-command",
  307. default="chrome-session-dump --json",
  308. help="Command used to obtain session JSON in single-session mode",
  309. )
  310. parser.add_argument(
  311. "--single-session",
  312. action="store_true",
  313. help="Disable active-session discovery and only read one session via --session-command",
  314. )
  315. parser.add_argument(
  316. "--ps-command",
  317. default="ps -eo args",
  318. help="Process listing command used to discover active Chrome user-data directories",
  319. )
  320. parser.add_argument("--list-only", action="store_true", help="Print tab list and exit")
  321. parser.add_argument("--select", help="Tab token to switch to")
  322. parser.add_argument("--dry-run", action="store_true", help="Print switch commands without executing")
  323. parser.add_argument("--focus-delay-ms", type=int, default=80, help="Delay after focusing window before tab keys")
  324. parser.add_argument(
  325. "--rofi-command",
  326. default="rofi -dmenu -i -p ChromeTabs",
  327. help="Rofi command used for interactive selection",
  328. )
  329. args = parser.parse_args()
  330. try:
  331. entries = load_entries(
  332. json_file=args.json_file,
  333. read_stdin=args.stdin,
  334. single_session=args.single_session,
  335. session_command=args.session_command,
  336. ps_command=args.ps_command,
  337. )
  338. if not entries:
  339. raise RuntimeError("no tabs found in session dump")
  340. if args.list_only:
  341. for entry in entries:
  342. print(entry.menu_line())
  343. return 0
  344. token = args.select if args.select else rofi_select(entries, args.rofi_command)
  345. entry = _entry_by_token(entries, token)
  346. switch_to_tab(entry, args.focus_delay_ms, args.dry_run)
  347. return 0
  348. except Exception as exc:
  349. print(f"error: {exc}", file=sys.stderr)
  350. return 1
  351. if __name__ == "__main__":
  352. raise SystemExit(main())