From 068c507c886d4d7d0a7d6500232f0053ca60e07e Mon Sep 17 00:00:00 2001 From: Matthew Treinish Date: Mon, 31 Jul 2017 15:44:59 -0400 Subject: [PATCH 01/10] Add dynamic scheduling operation mode This commit adds an opt-in scheduler option for dynamic scheduling. Instead of partitioning the test list up-front based on historical timing data this commit lets each worker ask for the next test dynamically. This is built using python's multiprocess module to launch new workers instead of shelling out to call python via subprocess. This hopefully will provide a better worker balance since we will keep each worker occupied until there are no more tests to be run. Instead of trying to pack fill each work optimially up front. Additionally this should hopefully improve the pdb story for users who use pdb with tests. Since instead of spawning subprocesses calling python to invoke the subunit runner and reading the subunit stream from stdout and instead uses multiprocessing to fork workers and uses pipes to pass the subunit streams between workers. --- .travis.yml | 3 +- stestr/commands/run.py | 39 +++++++++++++---- stestr/config_file.py | 7 ++- stestr/output.py | 15 +++++-- stestr/scheduler.py | 63 ++++++++++++++++++++++++++ stestr/test_processor.py | 73 ++++++++++++++++++++++++++----- stestr/tests/test_return_codes.py | 11 +++-- 7 files changed, 181 insertions(+), 30 deletions(-) diff --git a/.travis.yml b/.travis.yml index b0da764c..2b6d6665 100644 --- a/.travis.yml +++ b/.travis.yml @@ -36,8 +36,9 @@ cache: - $HOME/.cache/pip install: pip install -U tox coveralls language: python +# NOTE(mtreinish): Just to test script: - - tox + - 'tox -- --dynamic' after_success: .travis/coveralls.sh notifications: email: false diff --git a/stestr/commands/run.py b/stestr/commands/run.py index 9664c639..893ebe22 100644 --- a/stestr/commands/run.py +++ b/stestr/commands/run.py @@ -168,6 +168,8 @@ def get_parser(self, prog_name): dest='all_attachments', help='If set print all text attachment contents on' ' a successful test execution') + parser.add_argument('--dynamic', action='store_true', default=False, + help='Enable the EXPERIMENTAL dynamic scheduler') return parser def take_action(self, parsed_args): @@ -244,7 +246,8 @@ def take_action(self, parsed_args): filters=filters, pretty_out=pretty_out, color=color, stdout=stdout, abbreviate=abbreviate, suppress_attachments=suppress_attachments, - all_attachments=all_attachments, pdb=args.pdb) + all_attachments=all_attachments, pdb=args.pdb, + dynamic=args.dynamic) # Always output slowest test info if requested, regardless of other # test run options @@ -285,7 +288,7 @@ def run_command(config='.stestr.conf', repo_type='file', no_discover=False, random=False, combine=False, filters=None, pretty_out=True, color=False, stdout=sys.stdout, abbreviate=False, suppress_attachments=False, - all_attachments=False, pdb=False): + all_attachments=False, pdb=False, dynamic=False): """Function to execute the run command This function implements the run command. It will run the tests specified @@ -351,6 +354,7 @@ def run_command(config='.stestr.conf', repo_type='file', :param str pdb: Takes in a single test_id to bypasses test discover and just execute the test specified without launching any additional processes. A file name may be used in place of a test name. + :param bool dynamic: Enable dynamic scheduling :return return_code: The exit code for the command. 0 for success and > 0 for failures. @@ -388,6 +392,13 @@ def run_command(config='.stestr.conf', repo_type='file', ">= 0 must be used.\n" % concurrency) stdout.write(msg) return 2 + if dynamic: + if sys.version_info()[0] < 3 or sys.version_info[1] < 5: + msg = 'Dynamic mode requires python 3.5 or newer.' + exit(1) + + warnings.warn("WARNING: The dynamic scheduler is still experimental. " + "You might encounter issues while using it") if combine: latest_id = repo.latest_id() combine_id = six.text_type(latest_id) @@ -501,7 +512,8 @@ def run_tests(): repo_url=repo_url, serial=serial, worker_path=worker_path, concurrency=concurrency, blacklist_file=blacklist_file, whitelist_file=whitelist_file, black_regex=black_regex, - top_dir=top_dir, test_path=test_path, randomize=random) + top_dir=top_dir, test_path=test_path, randomize=random, + dynamic=dynamic) if isolated: result = 0 cmd.setUp() @@ -525,7 +537,8 @@ def run_tests(): repo_type=repo_type, repo_url=repo_url, pretty_out=pretty_out, color=color, abbreviate=abbreviate, stdout=stdout, suppress_attachments=suppress_attachments, - all_attachments=all_attachments) + all_attachments=all_attachments, dynamic=dynamic) + if run_result > result: result = run_result return result @@ -540,7 +553,8 @@ def run_tests(): stdout=stdout, abbreviate=abbreviate, suppress_attachments=suppress_attachments, - all_attachments=all_attachments) + all_attachments=all_attachments, + dynamic=dynamic) else: # Where do we source data about the cause of conflicts. latest_run = repo.get_latest_run() @@ -585,14 +599,21 @@ def _run_tests(cmd, until_failure, subunit_out=False, combine_id=None, repo_type='file', repo_url=None, pretty_out=True, color=False, stdout=sys.stdout, abbreviate=False, suppress_attachments=False, - all_attachments=False): + all_attachments=False, dynamic=False): """Run the tests cmd was parameterised with.""" cmd.setUp() try: def run_tests(): - run_procs = [('subunit', - output.ReturnCodeToSubunit( - proc)) for proc in cmd.run_tests()] + if not dynamic or cmd.concurrency == 1: + run_procs = [('subunit', + output.ReturnCodeToSubunit( + proc, + dynamic=False)) for proc in cmd.run_tests()] + else: + run_procs = [('subunit', + output.ReturnCodeToSubunit( + os.fdopen(proc['stream']), + proc['proc'])) for proc in cmd.run_tests()] if not run_procs: stdout.write("The specified regex doesn't match with anything") return 1 diff --git a/stestr/config_file.py b/stestr/config_file.py index 733ae084..0f1ae68f 100644 --- a/stestr/config_file.py +++ b/stestr/config_file.py @@ -49,7 +49,8 @@ def get_run_command(self, test_ids=None, regexes=None, serial=False, worker_path=None, concurrency=0, blacklist_file=None, whitelist_file=None, black_regex=None, - randomize=False, parallel_class=None): + randomize=False, parallel_class=None, + dynamic=False): """Get a test_processor.TestProcessorFixture for this config file Any parameters about running tests will be used for initialize the @@ -96,6 +97,7 @@ def get_run_command(self, test_ids=None, regexes=None, stestr scheduler by class. If both this and the corresponding config file option which includes `group-regex` are set, this value will be used. + :param bool dynamic: Enable dynamic scheduling :returns: a TestProcessorFixture object for the specified config file and any arguments passed into this function @@ -166,4 +168,5 @@ def group_callback(test_id, regex=re.compile(group_regex)): test_filters=regexes, group_callback=group_callback, serial=serial, worker_path=worker_path, concurrency=concurrency, blacklist_file=blacklist_file, black_regex=black_regex, - whitelist_file=whitelist_file, randomize=randomize) + whitelist_file=whitelist_file, randomize=randomize, + dynamic=dynamic) diff --git a/stestr/output.py b/stestr/output.py index 0f7e3368..2478abd8 100644 --- a/stestr/output.py +++ b/stestr/output.py @@ -164,18 +164,27 @@ class ReturnCodeToSubunit(object): generating subunit. """ - def __init__(self, process): + def __init__(self, process, thread=None, dynamic=True): """Adapt a process to a readable stream.""" self.proc = process self.done = False - self.source = self.proc.stdout + if dynamic: + self.source = process + self.proc = thread + else: + self.source = self.proc.stdout self.lastoutput = six.binary_type(('\n').encode('utf8')[0]) + self.dynamic = dynamic def _append_return_code_as_test(self): if self.done is True: return self.source = six.BytesIO() - returncode = self.proc.wait() + if not self.dynamic: + returncode = self.proc.wait() + else: + self.proc.join() + returncode = self.proc.exitcode if returncode != 0: if self.lastoutput != six.binary_type(('\n').encode('utf8')[0]): # Subunit V1 is line orientated, it has to start on a fresh diff --git a/stestr/scheduler.py b/stestr/scheduler.py index a7216e18..60e01971 100644 --- a/stestr/scheduler.py +++ b/stestr/scheduler.py @@ -21,6 +21,69 @@ from stestr import selection +def get_dynamic_test_list(test_ids, repository=None, group_callback=None, + randomize=False): + dynamic_test_list = [] + _group_callback = group_callback + time_data = {} + if randomize: + return random.shuffle(test_ids) + if repository: + time_data = repository.get_test_times(test_ids) + timed_tests = time_data['known'] + unknown_tests = time_data['unknown'] + else: + timed_tests = {} + unknown_tests = set(test_ids) + # Group tests: generate group_id -> test_ids. + group_ids = collections.defaultdict(list) + if _group_callback is None: + group_callback = lambda _: None + else: + group_callback = _group_callback + for test_id in test_ids: + group_id = group_callback(test_id) or test_id + group_ids[group_id].append(test_id) + # Time groups: generate three sets of groups: + # - fully timed dict(group_id -> time), + # - partially timed dict(group_id -> time) and + # - unknown (set of group_id) + # We may in future treat partially timed different for scheduling, but + # at least today we just schedule them after the fully timed groups. + timed = {} + partial = {} + unknown = [] + for group_id, group_tests in group_ids.items(): + untimed_ids = unknown_tests.intersection(group_tests) + group_time = sum( + [timed_tests[test_id] + for test_id in untimed_ids.symmetric_difference( + group_tests)]) + if not untimed_ids: + timed[group_id] = group_time + elif group_time: + partial[group_id] = group_time + else: + unknown.append(group_id) + + # Scheduling is NP complete in general, so we avoid aiming for + # perfection. A quick approximation that is sufficient for our general + # needs: + # sort the groups by time + # allocate to partitions by putting each group in to the partition with + # the current (lowest time, shortest length[in tests]) + def consume_queue(groups): + queue = sorted( + groups.items(), key=operator.itemgetter(1), reverse=True) + dynamic_test_list.extend([group[0] for group in queue]) + + consume_queue(timed) + consume_queue(partial) + dynamic_test_list.extend(unknown) + + return dynamic_test_list + + def partition_tests(test_ids, concurrency, repository, group_callback, randomize=False): """Partition test_ids by concurrency. diff --git a/stestr/test_processor.py b/stestr/test_processor.py index d570f4f9..cdc152d3 100644 --- a/stestr/test_processor.py +++ b/stestr/test_processor.py @@ -10,6 +10,8 @@ # License for the specific language governing permissions and limitations # under the License. +import functools +import multiprocessing import os import re import signal @@ -24,6 +26,8 @@ from stestr import results from stestr import scheduler from stestr import selection +from stestr.subunit_runner import program +from stestr.subunit_runner import run from stestr import testlist @@ -80,7 +84,8 @@ def __init__(self, test_ids, cmd_template, listopt, idoption, repository, parallel=True, listpath=None, test_filters=None, group_callback=None, serial=False, worker_path=None, concurrency=0, blacklist_file=None, - black_regex=None, whitelist_file=None, randomize=False): + black_regex=None, whitelist_file=None, randomize=False, + dynamic=False): """Create a TestProcessorFixture.""" self.test_ids = test_ids @@ -101,6 +106,7 @@ def __init__(self, test_ids, cmd_template, listopt, idoption, self.whitelist_file = whitelist_file self.black_regex = black_regex self.randomize = randomize + self.dynamic = dynamic def setUp(self): super(TestProcessorFixture, self).setUp() @@ -228,6 +234,29 @@ def list_tests(self): ids = testlist.parse_enumeration(out) return ids + def _dynamic_run_tests(self, job_queue, subunit_pipe): + while True: + # NOTE(mtreinish): Open on each loop iteration with a dup to + # remove the chance of being garbage collected. Without this + # you'll be fighting random Bad file desciptor errors + subunit_pipe = os.fdopen(os.dup(subunit_pipe.fileno()), 'wb') + if job_queue.empty(): + subunit_pipe.close() + return + try: + test_id = job_queue.get(block=False) + except Exception: + subunit_pipe.close() + return + if not test_id: + os.close(subunit_pipe.fileno()) + raise ValueError('Invalid blank test_id: %s' % test_id) + cmd_list = [self.cmd, test_id] + test_runner = run.SubunitTestRunner + program.TestProgram( + module=None, argv=cmd_list, + testRunner=functools.partial(test_runner, stdout=subunit_pipe)) + def run_tests(self): """Run the tests defined by the command @@ -256,14 +285,34 @@ def run_tests(self): self.concurrency, self.repository, self._group_callback) - for test_ids in test_id_groups: - if not test_ids: - # No tests in this partition - continue - fixture = self.useFixture( - TestProcessorFixture(test_ids, - self.template, self.listopt, - self.idoption, self.repository, - parallel=False)) - result.extend(fixture.run_tests()) - return result + if not self.dynamic: + for test_ids in test_id_groups: + if not test_ids: + # No tests in this partition + continue + fixture = self.useFixture( + TestProcessorFixture(test_ids, + self.template, self.listopt, + self.idoption, self.repository, + parallel=False)) + result.extend(fixture.run_tests()) + return result + else: + test_id_list = scheduler.get_dynamic_test_list( + test_ids, self.repository, self._group_callback) + test_list = multiprocessing.Queue() + + for test_id in test_id_list: + test_list.put(test_id) + + for i in range(self.concurrency): + fd_pipe_r, fd_pipe_w = multiprocessing.Pipe(False) + name = 'worker-%s' % i + proc = multiprocessing.Process(target=self._dynamic_run_tests, + name=name, + args=(test_list, fd_pipe_w)) + proc.start() + stream_read = os.dup(fd_pipe_r.fileno()) + result.append({'stream': stream_read, + 'proc': proc}) + return result diff --git a/stestr/tests/test_return_codes.py b/stestr/tests/test_return_codes.py index 563074cb..4bafc866 100644 --- a/stestr/tests/test_return_codes.py +++ b/stestr/tests/test_return_codes.py @@ -81,15 +81,18 @@ def _add_dict(test): self.assertThat(len(tests), testtools.matchers.GreaterThan(0)) def assertRunExit(self, cmd, expected, subunit=False, stdin=None): + env = os.environ if stdin: p = subprocess.Popen( "%s" % cmd, shell=True, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + env=env) out, err = p.communicate(stdin) else: p = subprocess.Popen( "%s" % cmd, shell=True, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + env=env) out, err = p.communicate() if not subunit: @@ -235,8 +238,10 @@ def test_list(self): self.assertRunExit('stestr list', 0) def _get_cmd_stdout(self, cmd): + env = os.environ p = subprocess.Popen(cmd, shell=True, - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, + env=env) out = p.communicate() self.assertEqual(0, p.returncode) return out From 59189e630e2904b3f2ea8bd628073ef08f48b3d0 Mon Sep 17 00:00:00 2001 From: Matthew Treinish Date: Mon, 9 Sep 2019 07:42:41 -0400 Subject: [PATCH 02/10] Update stestr/commands/run.py Co-Authored-By: Adam Spiers --- stestr/commands/run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stestr/commands/run.py b/stestr/commands/run.py index 893ebe22..e50474d0 100644 --- a/stestr/commands/run.py +++ b/stestr/commands/run.py @@ -393,7 +393,7 @@ def run_command(config='.stestr.conf', repo_type='file', stdout.write(msg) return 2 if dynamic: - if sys.version_info()[0] < 3 or sys.version_info[1] < 5: + if sys.version_info[0] < 3 or sys.version_info[1] < 5: msg = 'Dynamic mode requires python 3.5 or newer.' exit(1) From 4196953bbb41ea6d4076364f356b5b8f0587afe9 Mon Sep 17 00:00:00 2001 From: Matthew Treinish Date: Sun, 6 Oct 2019 18:31:46 -0400 Subject: [PATCH 03/10] Fix failing tests This commit fixes the failing tests by catching a couple of missing things from the update. The biggest fix was that for the --no-discover case we still use a subprocess and because of that we need to tell output.ReturnCodeToSubunit to that the input is not dynamic (and therefore a Popen object) so it can handle that properly. The other major change is that the return code tests are updated so that the stdout and stderr from the subprocess calls are always decoded in the non-subunit test cases. This was done primarily for ease of debugging, but it also enabled the removal of several decode() calls when the output is parsed. --- stestr/commands/run.py | 4 +++- stestr/tests/test_config_file.py | 2 +- stestr/tests/test_return_codes.py | 17 +++++++++-------- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/stestr/commands/run.py b/stestr/commands/run.py index e50474d0..d738fa0f 100644 --- a/stestr/commands/run.py +++ b/stestr/commands/run.py @@ -433,7 +433,9 @@ def run_command(config='.stestr.conf', repo_type='file', def run_tests(): run_proc = [('subunit', output.ReturnCodeToSubunit( subprocess.Popen(run_cmd, shell=True, - stdout=subprocess.PIPE)))] + stdout=subprocess.PIPE), + dynamic=False))] + return load.load(in_streams=run_proc, subunit_out=subunit_out, repo_type=repo_type, diff --git a/stestr/tests/test_config_file.py b/stestr/tests/test_config_file.py index 31e364f4..7bb32f47 100644 --- a/stestr/tests/test_config_file.py +++ b/stestr/tests/test_config_file.py @@ -60,7 +60,7 @@ def _check_get_run_command(self, mock_sys, mock_TestProcessorFixture, mock_TestProcessorFixture.assert_called_once_with( None, command, "--list", "--load-list $IDFILE", mock_get_repo_open.return_value, black_regex=None, - blacklist_file=None, concurrency=0, + blacklist_file=None, concurrency=0, dynamic=False, group_callback=expected_group_callback, test_filters=None, randomize=False, serial=False, whitelist_file=None, worker_path=None) diff --git a/stestr/tests/test_return_codes.py b/stestr/tests/test_return_codes.py index 4bafc866..df14a09b 100644 --- a/stestr/tests/test_return_codes.py +++ b/stestr/tests/test_return_codes.py @@ -94,8 +94,9 @@ def assertRunExit(self, cmd, expected, subunit=False, stdin=None): stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) out, err = p.communicate() - if not subunit: + out = out.decode('utf8') + err = err.decode('utf8') self.assertEqual( p.returncode, expected, "Stdout: %s; Stderr: %s" % (out, err)) @@ -296,14 +297,14 @@ def test_load_force_init_invalid(self): def test_load_from_stdin_quiet(self): out, err = self.assertRunExit('stestr --user-config stestr.yaml -q ' 'run passing', 0) - self.assertEqual(out.decode('utf-8'), '') + self.assertEqual(out, '') # FIXME(masayukig): We get some warnings when we run a coverage job. # So, just ignore 'err' here. stream = self._get_cmd_stdout('stestr last --subunit')[0] out, err = self.assertRunExit('stestr --user-config stestr.yaml -q ' 'load', 0, stdin=stream) - self.assertEqual(out.decode('utf-8'), '') - self.assertEqual(err.decode('utf-8'), '') + self.assertEqual(out, '') + self.assertEqual(err, '') def test_no_subunit_trace_force_subunit_trace(self): out, err = self.assertRunExit( @@ -393,21 +394,21 @@ def test_list_from_func(self): def test_run_no_discover_pytest_path(self): passing_string = 'tests/test_passing.py::FakeTestClass::test_pass_list' out, err = self.assertRunExit('stestr run -n %s' % passing_string, 0) - lines = out.decode('utf8').splitlines() + lines = out.splitlines() self.assertIn(' - Passed: 1', lines) self.assertIn(' - Failed: 0', lines) def test_run_no_discover_pytest_path_failing(self): passing_string = 'tests/test_failing.py::FakeTestClass::test_pass_list' out, err = self.assertRunExit('stestr run -n %s' % passing_string, 1) - lines = out.decode('utf8').splitlines() + lines = out.splitlines() self.assertIn(' - Passed: 0', lines) self.assertIn(' - Failed: 1', lines) def test_run_no_discover_file_path(self): passing_string = 'tests/test_passing.py' out, err = self.assertRunExit('stestr run -n %s' % passing_string, 0) - lines = out.decode('utf8').splitlines() + lines = out.splitlines() self.assertIn(' - Passed: 2', lines) self.assertIn(' - Failed: 0', lines) self.assertIn(' - Expected Fail: 1', lines) @@ -415,7 +416,7 @@ def test_run_no_discover_file_path(self): def test_run_no_discover_file_path_failing(self): passing_string = 'tests/test_failing.py' out, err = self.assertRunExit('stestr run -n %s' % passing_string, 1) - lines = out.decode('utf8').splitlines() + lines = out.splitlines() self.assertIn(' - Passed: 0', lines) self.assertIn(' - Failed: 2', lines) self.assertIn(' - Unexpected Success: 1', lines) From e68b5063c8e2830b1542a7611da08410da39defb Mon Sep 17 00:00:00 2001 From: Matthew Treinish Date: Sun, 6 Oct 2019 19:13:53 -0400 Subject: [PATCH 04/10] Clean up test_return_code tests slightly This is a refinement on the previous commit to reduce unecessary changes to the functional tests in the test_return_codes module. Mainly always decoding the output from the subprocess for testing broken things unexpectedly when a bytes object was expected. --- stestr/tests/test_return_codes.py | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/stestr/tests/test_return_codes.py b/stestr/tests/test_return_codes.py index df14a09b..9730d4b9 100644 --- a/stestr/tests/test_return_codes.py +++ b/stestr/tests/test_return_codes.py @@ -81,25 +81,21 @@ def _add_dict(test): self.assertThat(len(tests), testtools.matchers.GreaterThan(0)) def assertRunExit(self, cmd, expected, subunit=False, stdin=None): - env = os.environ if stdin: p = subprocess.Popen( "%s" % cmd, shell=True, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - env=env) + stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate(stdin) else: p = subprocess.Popen( "%s" % cmd, shell=True, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - env=env) + stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() if not subunit: - out = out.decode('utf8') - err = err.decode('utf8') self.assertEqual( p.returncode, expected, - "Stdout: %s; Stderr: %s" % (out, err)) + "Stdout: %s; Stderr: %s" % (out.decode('utf8'), + err.decode('utf8'))) return (out, err) else: self.assertEqual(p.returncode, expected, @@ -239,10 +235,8 @@ def test_list(self): self.assertRunExit('stestr list', 0) def _get_cmd_stdout(self, cmd): - env = os.environ p = subprocess.Popen(cmd, shell=True, - stdout=subprocess.PIPE, - env=env) + stdout=subprocess.PIPE) out = p.communicate() self.assertEqual(0, p.returncode) return out @@ -297,14 +291,14 @@ def test_load_force_init_invalid(self): def test_load_from_stdin_quiet(self): out, err = self.assertRunExit('stestr --user-config stestr.yaml -q ' 'run passing', 0) - self.assertEqual(out, '') + self.assertEqual('', out.decode('utf-8')) # FIXME(masayukig): We get some warnings when we run a coverage job. # So, just ignore 'err' here. stream = self._get_cmd_stdout('stestr last --subunit')[0] out, err = self.assertRunExit('stestr --user-config stestr.yaml -q ' 'load', 0, stdin=stream) - self.assertEqual(out, '') - self.assertEqual(err, '') + self.assertEqual(out.decode('utf-8'), '') + self.assertEqual(err.decode('utf-8'), '') def test_no_subunit_trace_force_subunit_trace(self): out, err = self.assertRunExit( @@ -394,21 +388,21 @@ def test_list_from_func(self): def test_run_no_discover_pytest_path(self): passing_string = 'tests/test_passing.py::FakeTestClass::test_pass_list' out, err = self.assertRunExit('stestr run -n %s' % passing_string, 0) - lines = out.splitlines() + lines = out.decode('utf8').splitlines() self.assertIn(' - Passed: 1', lines) self.assertIn(' - Failed: 0', lines) def test_run_no_discover_pytest_path_failing(self): passing_string = 'tests/test_failing.py::FakeTestClass::test_pass_list' out, err = self.assertRunExit('stestr run -n %s' % passing_string, 1) - lines = out.splitlines() + lines = out.decode('utf8').splitlines() self.assertIn(' - Passed: 0', lines) self.assertIn(' - Failed: 1', lines) def test_run_no_discover_file_path(self): passing_string = 'tests/test_passing.py' out, err = self.assertRunExit('stestr run -n %s' % passing_string, 0) - lines = out.splitlines() + lines = out.decode('utf8').splitlines() self.assertIn(' - Passed: 2', lines) self.assertIn(' - Failed: 0', lines) self.assertIn(' - Expected Fail: 1', lines) @@ -416,7 +410,7 @@ def test_run_no_discover_file_path(self): def test_run_no_discover_file_path_failing(self): passing_string = 'tests/test_failing.py' out, err = self.assertRunExit('stestr run -n %s' % passing_string, 1) - lines = out.splitlines() + lines = out.decode('utf8').splitlines() self.assertIn(' - Passed: 0', lines) self.assertIn(' - Failed: 2', lines) self.assertIn(' - Unexpected Success: 1', lines) From 64d8a0b8b0b7a8e34ffd5fd7bc0a6f8e1e45346e Mon Sep 17 00:00:00 2001 From: Matthew Treinish Date: Tue, 11 Aug 2020 17:12:06 -0400 Subject: [PATCH 05/10] Try running with --dynamic everywhere --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index f426419e..f959e134 100644 --- a/tox.ini +++ b/tox.ini @@ -12,7 +12,7 @@ deps = -r{toxinidir}/requirements.txt -r{toxinidir}/test-requirements.txt commands = python tools/find_and_rm.py - stestr run {posargs} + stestr run --dynamic {posargs} [testenv:pep8] sitepackages = False From 4fcb27a62e53451f7382fdfebcf08da36d95d950 Mon Sep 17 00:00:00 2001 From: Matthew Treinish Date: Thu, 15 Apr 2021 11:12:13 -0400 Subject: [PATCH 06/10] Fix pep8 --- stestr/output.py | 2 +- stestr/scheduler.py | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/stestr/output.py b/stestr/output.py index a07d0479..d1f771ec 100644 --- a/stestr/output.py +++ b/stestr/output.py @@ -179,7 +179,7 @@ def __init__(self, process, thread=None, dynamic=True): def _append_return_code_as_test(self): if self.done is True: return - self.source = six.BytesIO() + self.source = io.BytesIO() if not self.dynamic: returncode = self.proc.wait() else: diff --git a/stestr/scheduler.py b/stestr/scheduler.py index 6b2a50f1..31aed4f2 100644 --- a/stestr/scheduler.py +++ b/stestr/scheduler.py @@ -38,7 +38,10 @@ def get_dynamic_test_list(test_ids, repository=None, group_callback=None, # Group tests: generate group_id -> test_ids. group_ids = collections.defaultdict(list) if _group_callback is None: - group_callback = lambda _: None + + def group_callback(_): + return None + else: group_callback = _group_callback for test_id in test_ids: From e23f8ede892f82805d3fed0b0b68227c4da67d23 Mon Sep 17 00:00:00 2001 From: Matthew Treinish Date: Fri, 14 Jul 2023 15:11:52 -0400 Subject: [PATCH 07/10] Remove check for Python < 3.5 I originally developed this feature when we still supported older python versions in stestr. The dynamic scheduling feature depends on functionality added in Python 3.5. Since then the WIP feature branch sat stale for years since that time we've bumped the minimum version of Python supported to 3.7 so the runtime check for older python versions is no longer needed. --- stestr/commands/run.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/stestr/commands/run.py b/stestr/commands/run.py index 08edcbaf..cfed47c3 100644 --- a/stestr/commands/run.py +++ b/stestr/commands/run.py @@ -527,10 +527,6 @@ def run_command( stdout.write(msg) return 2 if dynamic: - if sys.version_info[0] < 3 or sys.version_info[1] < 5: - msg = "Dynamic mode requires python 3.5 or newer." - exit(1) - warnings.warn( "WARNING: The dynamic scheduler is still experimental. " "You might encounter issues while using it" From 5e9dd7ea9181a82d649001991606cb9825a9b10c Mon Sep 17 00:00:00 2001 From: Matthew Treinish Date: Sat, 28 Feb 2026 11:15:35 -0500 Subject: [PATCH 08/10] Explicitly use spawn for worker processes and fix queue lifetime This commit fixes an issue that occured in earlier commits on the PR around the initialization of the worker processes and the scope of the launch method. Previously if the method used to launch threads returned before all the workers accessed the queue for the first time the worker wouldn't be able to read from the queue. This race condition was caused because the Queue was locally scoped to the method and would be deleted by the main process before other workers could read it. This would specifically occur on systems using "forkserver" or "spawn" multiprocessing start methods because the child processes didn't have the queue object, while "fork" would because the process memory was copied in the child process. This commit fixes this by scoping the Queue object to the instance which means it survives as long as the test processor object does (which is typically the entire run command). As part of this change the start method used by the new dynamic scheduler is set to be fixed to "spawn" to minimize any potential interactions between stestr and the code under test. This mirrors the behavior of running in non-dynamic scheduler mode, because spawn is roughly equivalent to calling python in a subprocess. --- stestr/test_processor.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/stestr/test_processor.py b/stestr/test_processor.py index dd2ce8e0..038a935b 100644 --- a/stestr/test_processor.py +++ b/stestr/test_processor.py @@ -332,18 +332,24 @@ def run_tests(self): test_id_list = scheduler.get_dynamic_test_list( test_ids, self.repository, self._group_callback ) - test_list = multiprocessing.Queue() + # Use spawn to launch a fresh interpreter and have the minimal + # amount of state from stestr when invoking the test runner. + # This is equivalent to non-dynamic mode using subprocess instead + # of multiprocessing. + context = multiprocessing.get_context("spawn") + self._test_list_queue = context.Queue() for test_id in test_id_list: - test_list.put(test_id) + self._test_list_queue.put(test_id) for i in range(self.concurrency): - fd_pipe_r, fd_pipe_w = multiprocessing.Pipe(False) + fd_pipe_r, fd_pipe_w = context.Pipe(False) name = "worker-%s" % i - proc = multiprocessing.Process( + context.Value + proc = context.Process( target=self._dynamic_run_tests, name=name, - args=(test_list, fd_pipe_w), + args=[self._test_list_queue, fd_pipe_w], ) proc.start() stream_read = os.dup(fd_pipe_r.fileno()) From 314c1f9c3ec845fa4374d0d429bda10f2f5a218e Mon Sep 17 00:00:00 2001 From: Matthew Treinish Date: Sat, 28 Feb 2026 13:31:35 -0500 Subject: [PATCH 09/10] Better document the new feature and disable windows testing This commit improves the documentation of the new --dynamic flag to explain how it operates and what the goal of it is. It also makes it clear the feature is experimental and is an opt-in at your own risk. Also from testing this doesn't currently work on Windows, instead of blocking the feature over a platform used by 2-3% of our users (according to https://pypistats.org/packages/stestr ) this just marks it as currently unsupported. We will have to revisit how to make this work on Windows before we stabilize the feature. --- .github/workflows/main.yml | 34 ++++++++++++++++++++++++++++++++++ doc/source/MANUAL.rst | 23 +++++++++++++++++++++++ stestr/commands/run.py | 9 ++++++++- tox.ini | 2 +- 4 files changed, 66 insertions(+), 2 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6987224e..7bdad1b9 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -38,6 +38,40 @@ jobs: .tox/py/bin/pip install gnureadline subunit2sql tox -epy if: runner.os == 'macOS' + test_dynamic: + name: tests-python${{ matrix.python-version }}-${{ matrix.os }} + runs-on: ${{ matrix.os }} + strategy: + matrix: + python-version: [3.8, "3.14"] + os: ["macOS-latest", "ubuntu-latest"] + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Pip cache + uses: actions/cache@v3 + with: + path: ~/.cache/pip + key: ${{ runner.os }}-${{ matrix.python-version }}-pip-tests-${{ hashFiles('pyproject.toml','requirements-dev.txt','constraints.txt') }} + restore-keys: | + ${{ runner.os }}-${{ matrix.python-version }}-pip-tests- + ${{ runner.os }}-${{ matrix.python-version }}-pip- + ${{ runner.os }}-${{ matrix.python-version }} + - name: Install Deps + run: python -m pip install -U tox setuptools virtualenv wheel + - name: Install and Run Tests + run: tox -e py -- --dynamic + if: runner.os != 'macOS' + - name: Install and run tests macOS + run: | + tox -epy --notest + .tox/py/bin/pip install gnureadline subunit2sql + tox -epy -- --dynamic + if: runner.os == 'macOS' + lint: name: pep8 runs-on: ubuntu-latest diff --git a/doc/source/MANUAL.rst b/doc/source/MANUAL.rst index fae7c1a4..fabb20d3 100644 --- a/doc/source/MANUAL.rst +++ b/doc/source/MANUAL.rst @@ -502,6 +502,29 @@ There is also an option on ``stestr run``, ``--random`` to randomize the order of tests as they are passed to the workers. This is useful in certain use cases, especially when you want to test isolation between test cases. +Dynamic Scheduler +----------------- + +There is a new under development dynamic scheduler mode that is being designed to +hopefully replace the static scheduling that stestr currently uses by default. This +new scheduler will sort all the test ids found during discovery based on timing data +and alphabetical order just like the static scheduler, but this scheduler puts the +list in a unified work queue. Then each individual test worker will query the queue +for the next test to execute. This should result in a better worker balance than the +static partitioning used by default which may translate to a better wall time. + +You can enable this new scheduler with the ``--dynamic`` flag to ``stestr run``. This +option should also be compatible with the grouping options described in the :ref:`group_regex` +section. Using the two in combinationwill result in the scheduler working with the +group of tests instead of individual tests. + +Do note that the dynamic scheduler is still experimental and hasn't been as thoroughly +tested as the default scheduler. It may not work in every use case, if you encounter an +issue with the scheduler please file a bug at: + +https://github.com/mtreinish/stestr/issues/new?template=bug_report.md + +Also, it is a known limitation of the new scheduler that it does not work on Windows. User Config Files ----------------- diff --git a/stestr/commands/run.py b/stestr/commands/run.py index d48305b9..cc0f500a 100644 --- a/stestr/commands/run.py +++ b/stestr/commands/run.py @@ -250,7 +250,14 @@ def get_parser(self, prog_name): "--dynamic", action="store_true", default=False, - help="Enable the EXPERIMENTAL dynamic scheduler", + help="Enable the EXPERIMENTAL dynamic scheduler. This scheduler " + "is designed to to improve the worker balance. Each worker will " + "dynamically query the next test to run which can potentially " + "result in better wall time. This option is still experimental " + "and is not guaranteed to work in all cases. This option is also not " + "currently supported on Windows. If you encounter issues with this " + "option please file a bug at: " + "https://github.com/mtreinish/stestr/issues/new?template=bug_report.md" ) return parser diff --git a/tox.ini b/tox.ini index 752d44fb..db20af14 100644 --- a/tox.ini +++ b/tox.ini @@ -11,7 +11,7 @@ allowlist_externals = deps = -r{toxinidir}/test-requirements.txt commands = python tools/find_and_rm.py - stestr run --dynamic {posargs} + stestr run {posargs} [testenv:pep8] sitepackages = False From f7725b030acca66f8f28855d6e421d28f1743031 Mon Sep 17 00:00:00 2001 From: Matthew Treinish Date: Sat, 28 Feb 2026 13:40:03 -0500 Subject: [PATCH 10/10] Fix docs and lint --- .github/workflows/main.yml | 2 +- doc/source/MANUAL.rst | 36 +++++++++++++++++++----------------- stestr/commands/run.py | 2 +- 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 7bdad1b9..28389fb6 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -39,7 +39,7 @@ jobs: tox -epy if: runner.os == 'macOS' test_dynamic: - name: tests-python${{ matrix.python-version }}-${{ matrix.os }} + name: tests-dynamic-scheduler-python${{ matrix.python-version }}-${{ matrix.os }} runs-on: ${{ matrix.os }} strategy: matrix: diff --git a/doc/source/MANUAL.rst b/doc/source/MANUAL.rst index fabb20d3..914c5110 100644 --- a/doc/source/MANUAL.rst +++ b/doc/source/MANUAL.rst @@ -505,26 +505,28 @@ use cases, especially when you want to test isolation between test cases. Dynamic Scheduler ----------------- -There is a new under development dynamic scheduler mode that is being designed to -hopefully replace the static scheduling that stestr currently uses by default. This -new scheduler will sort all the test ids found during discovery based on timing data -and alphabetical order just like the static scheduler, but this scheduler puts the -list in a unified work queue. Then each individual test worker will query the queue -for the next test to execute. This should result in a better worker balance than the -static partitioning used by default which may translate to a better wall time. - -You can enable this new scheduler with the ``--dynamic`` flag to ``stestr run``. This -option should also be compatible with the grouping options described in the :ref:`group_regex` -section. Using the two in combinationwill result in the scheduler working with the -group of tests instead of individual tests. - -Do note that the dynamic scheduler is still experimental and hasn't been as thoroughly -tested as the default scheduler. It may not work in every use case, if you encounter an -issue with the scheduler please file a bug at: +There is a new under development dynamic scheduler mode that is being designed +to hopefully replace the static scheduling that stestr currently uses by +default. This new scheduler will sort all the test ids found during discovery +based on timing data and alphabetical order just like the static scheduler, +but this scheduler puts the list in a unified work queue. Then each individual +test worker will query the queue for the next test to execute. This should +result in a better worker balance than the static partitioning used by +default which may translate to a better wall time. + +You can enable this new scheduler with the ``--dynamic`` flag to +``stestr run``. This option should also be compatible with the grouping +options described in the :ref:`group_regex` section. Using the two in +combination will result in the scheduler working with the group of tests +instead of individual tests. + +Do note that the dynamic scheduler is still experimental and hasn't been as +thoroughly tested as the default scheduler. It may not work in every use +case, if you encounter an issue with the scheduler please file a bug at: https://github.com/mtreinish/stestr/issues/new?template=bug_report.md -Also, it is a known limitation of the new scheduler that it does not work on Windows. +This new scheduler does not currently work on Windows. User Config Files ----------------- diff --git a/stestr/commands/run.py b/stestr/commands/run.py index cc0f500a..7e400aa9 100644 --- a/stestr/commands/run.py +++ b/stestr/commands/run.py @@ -257,7 +257,7 @@ def get_parser(self, prog_name): "and is not guaranteed to work in all cases. This option is also not " "currently supported on Windows. If you encounter issues with this " "option please file a bug at: " - "https://github.com/mtreinish/stestr/issues/new?template=bug_report.md" + "https://github.com/mtreinish/stestr/issues/new?template=bug_report.md", ) return parser