Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 184 additions & 30 deletions src/mss/linux/base.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any
from urllib.parse import urlencode

from mss.base import MSSBase
from mss.exception import ScreenShotError
from mss.tools import parse_edid

from . import xcb
from .xcb import LIB

if TYPE_CHECKING:
from ctypes import Array

from mss.models import Monitor
from mss.screenshot import ScreenShot

Expand Down Expand Up @@ -66,7 +70,7 @@ def __init__(self, /, **kwargs: Any) -> None: # noqa: PLR0912
# we'll have to ask the server for its depth and visual.
assert self.root == self.drawable # noqa: S101
self.drawable_depth = self.pref_screen.root_depth
self.drawable_visual_id = self.pref_screen.root_visual.value
self.drawable_visual_id = self.pref_screen.root_visual
# Server image byte order
if xcb_setup.image_byte_order != xcb.ImageOrder.LSBFirst:
msg = "Only X11 servers using LSB-First images are supported."
Expand Down Expand Up @@ -103,7 +107,7 @@ def __init__(self, /, **kwargs: Any) -> None: # noqa: PLR0912
msg = "Internal error: drawable's depth not found in screen's supported depths"
raise ScreenShotError(msg)
for visual_info in xcb.depth_visuals(xcb_depth):
if visual_info.visual_id.value == self.drawable_visual_id:
if visual_info.visual_id == self.drawable_visual_id:
break
else:
msg = "Internal error: drawable's visual not found in screen's supported visuals"
Expand Down Expand Up @@ -140,8 +144,37 @@ def _monitors_impl(self) -> None:
msg = "Cannot identify monitors while the connection is closed"
raise ScreenShotError(msg)

# The first entry is the whole X11 screen that the root is on. That's the one that covers all the
# monitors.
self._append_root_monitor()

randr_version = self._randr_get_version()
if randr_version is None or randr_version < (1, 2):
return

# XRandR terminology (very abridged, but enough for this code):
# - X screen / framebuffer: the overall drawable area for this root.
# - CRTC: a display controller that scans out a rectangular region of the X screen. A CRTC with zero
# outputs is inactive. A CRTC may drive multiple outputs in clone/mirroring mode.
# - Output: a physical connector (e.g. "HDMI-1", "DP-1"). The RandR "connection" state (connected vs
# disconnected) is separate from whether the output is currently driven by a CRTC.
# - Monitor (RandR 1.5+): a logical rectangle presented to clients. Monitors may be client-defined (useful
# for tiled displays) and are the closest match to what MSS wants.
#
# This implementation prefers RandR 1.5+ Monitors when available; otherwise it falls back to enumerating
# active CRTCs.

primary_output = self._randr_get_primary_output(randr_version)
edid_atom = self._randr_get_edid_atom()

if randr_version >= (1, 5):
self._monitors_from_randr_monitors(primary_output, edid_atom)
else:
self._monitors_from_randr_crtcs(randr_version, primary_output, edid_atom)

def _append_root_monitor(self) -> None:
if self.conn is None:
msg = "Cannot identify monitors while the connection is closed"
raise ScreenShotError(msg)

root_geom = xcb.get_geometry(self.conn, self.root)
self._monitors.append(
{
Expand All @@ -152,47 +185,168 @@ def _monitors_impl(self) -> None:
}
)

# After that, we have one for each monitor on that X11 screen. For decades, that's been handled by
# Xrandr. We don't presently try to work with Xinerama. So, we're going to check the different outputs,
# according to Xrandr. If that fails, we'll just leave the one root covering everything.
def _randr_get_version(self) -> tuple[int, int] | None:
if self.conn is None:
msg = "Cannot identify monitors while the connection is closed"
raise ScreenShotError(msg)

# Make sure we have the Xrandr extension we need. This will query the cache that we started populating in
# __init__.
randr_ext_data = xcb.get_extension_data(self.conn, LIB.randr_id)
if not randr_ext_data.present:
return
return None

# We ask the server to give us anything up to the version we support (i.e., what we expect the reply
# structs to look like). If the server only supports 1.2, then that's what it'll give us, and we're ok
# with that, but we also use a faster path if the server implements at least 1.3.
randr_version_data = xcb.randr_query_version(self.conn, xcb.RANDR_MAJOR_VERSION, xcb.RANDR_MINOR_VERSION)
randr_version = (randr_version_data.major_version, randr_version_data.minor_version)
if randr_version < (1, 2):
return
return (randr_version_data.major_version, randr_version_data.minor_version)

def _randr_get_primary_output(self, randr_version: tuple[int, int], /) -> xcb.RandrOutput:
if self.conn is None:
msg = "Cannot identify monitors while the connection is closed"
raise ScreenShotError(msg)

if randr_version >= (1, 3):
primary_output_data = xcb.randr_get_output_primary(self.conn, self.drawable)
return primary_output_data.output
return xcb.RandrOutput(0)

def _randr_get_edid_atom(self) -> xcb.Atom | None:
if self.conn is None:
msg = "Cannot identify monitors while the connection is closed"
raise ScreenShotError(msg)

edid_atom = xcb.intern_atom(self.conn, "EDID", only_if_exists=True)
if edid_atom is not None:
return edid_atom

# Formerly, "EDID" was known as "EdidData". I don't know when it changed.
return xcb.intern_atom(self.conn, "EdidData", only_if_exists=True)

def _randr_output_ids(
self,
output: xcb.RandrOutput,
timestamp: xcb.Timestamp,
edid_atom: xcb.Atom | None,
/,
) -> dict[str, Any]:
if self.conn is None:
msg = "Cannot identify monitors while the connection is closed"
raise ScreenShotError(msg)

output_info = xcb.randr_get_output_info(self.conn, output, timestamp)
if output_info.status != 0:
msg = "Display configuration changed while detecting monitors."
raise ScreenShotError(msg)

rv: dict[str, Any] = {}

output_name_arr = xcb.randr_get_output_info_name(output_info)
rv["output"] = bytes(output_name_arr).decode("utf_8", errors="replace")

if edid_atom is not None:
edid_prop = xcb.randr_get_output_property(
self.conn, # connection
output, # output
edid_atom, # property
xcb.XCB_NONE, # property type: Any
0, # long-offset: 0
1024, # long-length: in 4-byte units; 4k is plenty for an EDID
0, # delete: false
0, # pending: false
)
if edid_prop.type_.value != 0:
edid_block = bytes(xcb.randr_get_output_property_data(edid_prop))
edid_data = parse_edid(edid_block)
if "display_name" in edid_data:
rv["name"] = edid_data["display_name"]

edid_params: dict[str, str] = {}
if "id_legacy" in edid_data:
edid_params["model"] = edid_data["id_legacy"]
if "serial_number" in edid_data:
edid_params["serial"] = str(edid_data["serial_number"])
if "manufacture_year" in edid_data:
if "manufacture_week" in edid_data:
edid_params["mfr_date"] = (
f"{edid_data['manufacture_year']:04d}W{edid_data['manufacture_week']:02d}"
)
else:
edid_params["mfr_date"] = f"{edid_data['manufacture_year']:04d}"
if "model_year" in edid_data:
edid_params["model_year"] = f"{edid_data['model_year']:04d}"
if edid_params:
rv["unique_id"] = urlencode(edid_params)

return rv

@staticmethod
def _choose_randr_output(outputs: Array[xcb.RandrOutput], primary_output: xcb.RandrOutput, /) -> xcb.RandrOutput:
if len(outputs) == 0:
msg = "No RandR outputs available"
raise ScreenShotError(msg)
if any(o == primary_output for o in outputs):
return primary_output
return outputs[0]

def _monitors_from_randr_monitors(self, primary_output: xcb.RandrOutput, edid_atom: xcb.Atom | None, /) -> None:
if self.conn is None:
msg = "Cannot identify monitors while the connection is closed"
raise ScreenShotError(msg)

monitors_reply = xcb.randr_get_monitors(self.conn, self.drawable, 1)
timestamp = monitors_reply.timestamp
for randr_monitor in xcb.randr_get_monitors_monitors(monitors_reply):
monitor = {
"left": randr_monitor.x,
"top": randr_monitor.y,
"width": randr_monitor.width,
"height": randr_monitor.height,
}
if randr_monitor.primary:
monitor["is_primary"] = True

if randr_monitor.nOutput > 0:
outputs = xcb.randr_monitor_info_outputs(randr_monitor)
chosen_output = self._choose_randr_output(outputs, primary_output)
monitor |= self._randr_output_ids(chosen_output, timestamp, edid_atom)

self._monitors.append(monitor)

def _monitors_from_randr_crtcs(
self,
randr_version: tuple[int, int],
primary_output: xcb.RandrOutput,
edid_atom: xcb.Atom | None,
/,
) -> None:
if self.conn is None:
msg = "Cannot identify monitors while the connection is closed"
raise ScreenShotError(msg)

screen_resources: xcb.RandrGetScreenResourcesReply | xcb.RandrGetScreenResourcesCurrentReply
# Check to see if we have the xcb_randr_get_screen_resources_current function in libxcb-randr, and that
# the server supports it.
if hasattr(LIB.randr, "xcb_randr_get_screen_resources_current") and randr_version >= (1, 3):
screen_resources = xcb.randr_get_screen_resources_current(self.conn, self.drawable.value)
screen_resources = xcb.randr_get_screen_resources_current(self.conn, self.drawable)
crtcs = xcb.randr_get_screen_resources_current_crtcs(screen_resources)
else:
# Either the client or the server doesn't support the _current form. That's ok; we'll use the old
# function, which forces a new query to the physical monitors.
screen_resources = xcb.randr_get_screen_resources(self.conn, self.drawable)
crtcs = xcb.randr_get_screen_resources_crtcs(screen_resources)
timestamp = screen_resources.config_timestamp

for crtc in crtcs:
crtc_info = xcb.randr_get_crtc_info(self.conn, crtc, screen_resources.config_timestamp)
crtc_info = xcb.randr_get_crtc_info(self.conn, crtc, timestamp)
if crtc_info.num_outputs == 0:
continue
self._monitors.append(
{"left": crtc_info.x, "top": crtc_info.y, "width": crtc_info.width, "height": crtc_info.height}
)
monitor = {
"left": crtc_info.x,
"top": crtc_info.y,
"width": crtc_info.width,
"height": crtc_info.height,
}

outputs = xcb.randr_get_crtc_info_outputs(crtc_info)
chosen_output = self._choose_randr_output(outputs, primary_output)
monitor |= self._randr_output_ids(chosen_output, timestamp, edid_atom)
if chosen_output == primary_output:
monitor["is_primary"] = True

# Extra credit would be to enumerate the virtual desktops; see
# https://specifications.freedesktop.org/wm/latest/ar01s03.html. But I don't know how widely-used that
# style is.
self._monitors.append(monitor)

def _cursor_impl_check_xfixes(self) -> bool:
"""Check XFixes availability and version.
Expand Down Expand Up @@ -277,11 +431,11 @@ def _grab_impl_xgetimage(self, monitor: Monitor, /) -> ScreenShot:
# Copy this into a new bytearray, so that it will persist after we clear the image structure.
img_data = bytearray(img_data_arr)

if img_reply.depth != self.drawable_depth or img_reply.visual.value != self.drawable_visual_id:
if img_reply.depth != self.drawable_depth or img_reply.visual != self.drawable_visual_id:
# This should never happen; a window can't change its visual.
msg = (
"Server returned an image with a depth or visual different than it initially reported: "
f"expected {self.drawable_depth},{hex(self.drawable_visual_id)}, "
f"expected {self.drawable_depth},{hex(self.drawable_visual_id.value)}, "
f"got {img_reply.depth},{hex(img_reply.visual.value)}"
)
raise ScreenShotError(msg)
Expand Down
Loading