Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package(default_visibility = ["//visibility:private"])
PREREQUISITES_DATA = glob([
"**/Brewfile*",
"**/*.lock",
"**/*.json",
"**/*.toml",
"**/*.txt",
])
Expand Down
248 changes: 223 additions & 25 deletions setup/install_prereqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import argparse
import functools
import hashlib
import json
import logging
import os
from pathlib import Path
Expand All @@ -16,7 +18,10 @@
import shlex
import subprocess
import sys
import tempfile
import textwrap
import urllib.parse
import urllib.request

_MY_DIR: Path = Path(__file__).parent
"""The directory containing this script, used to locate our data assets."""
Expand Down Expand Up @@ -46,38 +51,227 @@ def _is_ubuntu() -> bool:
return False


@functools.cache
def _os_codename() -> str:
if platform.system() == "Linux":
return platform.freedesktop_os_release()["VERSION_CODENAME"]
raise NotImplementedError(platform.system())


def _check_sudo() -> None:
"""Checks that 'sudo' has sufficient credentials."""
# If sudo is already usable, then we're done.
process = _run(args=["sudo", "-n", "/bin/true"], check=False, quiet=True)
if process.returncode == 0:
return
# If not, then we need to refresh the credentials. N.B. This doesn't work in
# our CI environment, but the prior check should have passed in that case.
subprocess.check_call(["sudo", "-v"])


def _run(
*,
args: list,
cwd: Path | None = None,
check: bool = True,
superuser: bool = False,
quiet: bool = False,
) -> None:
"""Runs a subprocess command given by `args`. Failure of the command is an
`_error`. When `quiet` is true, the command line will not be printed by
default.
"""
interactive: bool = False,
) -> subprocess.CompletedProcess:
"""Runs a subprocess command given by `args`. When `check` is true, failure
of the command is an `_error`. When `superuser` is true, the command will
be run under 'sudo' unless the euid is already root. When `quiet` is
true, the command line will not be printed by default. When `interactive`
is true, input is allowed and output is unbuffered. Returns the completed
process object."""
command = args[0]
if superuser and os.geteuid() != 0:
_check_sudo()
args = ["sudo"] + args
logging.log(
msg=f"Running: {shlex.join(args)} ...",
level=logging.DEBUG if quiet else logging.INFO,
)
process = subprocess.run(
args,
cwd=cwd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.DEVNULL if not interactive else None,
stdout=subprocess.PIPE if not interactive else None,
stderr=subprocess.STDOUT if not interactive else None,
text=True,
)
problem = process.returncode != 0
for line in process.stdout.splitlines():
logging.log(
msg=f"... from {command}: {line}",
level=logging.INFO if problem else logging.DEBUG,
)
problem = check and (process.returncode != 0)
if process.stdout is not None:
for line in process.stdout.splitlines():
logging.log(
msg=f"... from {command}: {line}",
level=logging.INFO if problem else logging.DEBUG,
)
logging.debug(f"... finished {command}.")
if problem:
_error(f"{command} failed with returncode {process.returncode}")
return process


def _get_dpkg_versions(package_names: list[str]) -> dict[str, str]:
"""Returns the installed version of packages. The input is a list of
package names, and the return value is a dict mapping all of those
names to their installed version (or None, if not installed)."""
assert package_names
result = {}
for name in package_names:
result[name] = None
process = _run(
args=[
"dpkg-query",
"--show",
"--showformat=${Package} ${db:Status-Abbrev} ${Version}\n",
]
+ package_names,
check=False,
quiet=True,
)
for line in process.stdout.splitlines():
tokens = line.split()
if len(tokens) != 3:
continue
name, status, version = tokens
if status == "ii":
result[name] = version
logging.debug(f"dpkg_versions = {result!r}")
return result


def _apt_install(*, package_names: list[str], yes: bool) -> None:
"""Installs the given packages using 'apt-get'.
The `yes` flag is passed along to apt as `--yes`."""
assert package_names
args = [
"apt-get",
"install",
"--no-install-recommends",
]
if yes:
args.append("--yes")
args.extend(package_names)
process = _run(args=args, superuser=True, check=yes)
if process.returncode == 0:
return
# We can only reach here when yes=False (i.e., check=False). The apt-get
# command didn't work, and the most likely reason is it needs Y/n input
# from the user, so we'll try it again allowing for user input.
_run(args=args, superuser=True, interactive=True, quiet=True)


def _download(*, temp_dir: Path, package: dict) -> Path:
"""Downloads a `*.deb` package a denoted by the given `package` entry loaded
from the setup/ubuntu/packages.json file. Returns its path inside temp_dir.
"""
name = package["name"]
version = package["version"]
urls = package["urls"]
sha256 = package["sha256"]

logging.info(f"Downloading {name} {version} ...")

# Try each url in turn.
errors = []
for url in urls:
logging.debug(f"Trying {url} ...")
basename = urllib.parse.urlparse(url).path.split("/")[-1]
temp_filename = temp_dir / basename
hasher = hashlib.sha256()
with temp_filename.open("wb") as f:
try:
with urllib.request.urlopen(url=url, timeout=30) as response:
while True:
data = response.read(4096)
if not data:
break
hasher.update(data)
f.write(data)
except OSError as e:
errors.append(f"Candidate {url} failed:\n{e}")
continue
download_sha256 = hasher.hexdigest()
if download_sha256 == sha256:
return temp_filename
errors.append(
f"Candidate {url} failed:\n"
f"Checksum mismatch; was {download_sha256} but wanted {sha256}."
)

# No downloads succeeded.
messages = "\n\n".join(errors)
_error(f"All downloads failed:\n\n{messages}")


def _install_downloaded_debs(*, yes: bool) -> None:
"""Downloads and installs required debs for --developer that are not
available in Ubuntu's apt site.
The `yes` flag is passed along to apt as `--yes`."""
deb_arch = {
"x86_64": "amd64",
"aarch64": "arm64",
}[platform.machine().lower()]

# Load the list of packages and filter for the relevant ones.
json_filename = _MY_DIR / "ubuntu/packages.json"
packages = {}
for package in json.loads(json_filename.read_text(encoding="utf-8")):
assert package["type"] == "download_deb"
name = package["name"]
if deb_arch not in package["arches"]:
continue
if _os_codename() not in package["codenames"]:
continue
assert name not in packages, name
packages[name] = package
if not packages:
return

# Check what's already installed in case we can skip some.
all_names = list(packages.keys())
for name, installed_version in _get_dpkg_versions(all_names).items():
desired_version = packages[name]["version"]
if installed_version is None:
# The package is missing; we will need to install it.
continue

# Check if already installed at the exact version.
if installed_version == desired_version:
logging.debug(f"{name} already installed at the desired version.")
del packages[name]
continue

# Check if already installed at a newer version.
comparison = _run(
args=[
"dpkg",
"--compare-versions",
installed_version,
"gt",
desired_version,
],
quiet=True,
check=False,
)
if comparison.returncode == 0:
logging.info(
f"Not downgrading {name} from {installed_version=} "
f"to {desired_version=}."
)
del packages[name]
continue

# Download and install the necessary file(s).
if packages:
with tempfile.TemporaryDirectory(prefix="drake_prereqs_") as temp:
paths = [
str(_download(temp_dir=Path(temp), package=package))
for package in packages.values()
]
_apt_install(package_names=paths, yes=yes)


def _setup_user_environment():
Expand All @@ -92,12 +286,9 @@ def _setup_user_environment():
# Compute the bazel rcfile snippet. This is always created, but only needs
# content for Drake Developers on Linux.
bazelrc_content = ""
if sys.platform == "linux":
os_release = platform.freedesktop_os_release()
if _is_ubuntu():
developer_txt = (
_MY_DIR
/ "ubuntu"
/ f"packages-{os_release['VERSION_CODENAME']}-developer.txt"
_MY_DIR / "ubuntu" / f"packages-{_os_codename()}-developer.txt"
)
clang_re = re.compile("^clang-([0-9]+)$")
for line in developer_txt.read_text(encoding="utf-8").splitlines():
Expand Down Expand Up @@ -191,12 +382,17 @@ def main():
"but don't install any system-wide packages."
),
)
for name in ("--without-update", "-y"):
parser.add_argument(
name,
action="store_true",
help="Ignored for forwards compatibility.",
)
parser.add_argument(
"--without-update",
action="store_true",
help="Ignored for forwards compatibility.",
)
parser.add_argument(
"-y",
action="store_true",
dest="yes",
help="Install without prompting for confirmation.",
)
parser.add_argument(
"--verbose",
action="store_true",
Expand All @@ -208,6 +404,8 @@ def main():

# We are in the process of migrating our bash setup code into this file.
# Anything not set up here was already setup by install_prereqs.sh.
if _is_ubuntu() and args.developer:
_install_downloaded_debs(yes=args.yes)
if args.developer or args.user_environment_only:
_setup_user_environment()
if args.developer:
Expand Down
Loading