diff --git a/c-questdb-client b/c-questdb-client index 34905ab2..3ed209aa 160000 --- a/c-questdb-client +++ b/c-questdb-client @@ -1 +1 @@ -Subproject commit 34905ab227bb95acb27c0b5c38ae31a9a084f2a3 +Subproject commit 3ed209aab5ff9420bb6d970422341390ea6bed49 diff --git a/src/questdb/ingress.pyi b/src/questdb/ingress.pyi index 06278a84..712aaeff 100644 --- a/src/questdb/ingress.pyi +++ b/src/questdb/ingress.pyi @@ -36,7 +36,7 @@ __all__ = [ from datetime import datetime, timedelta from enum import Enum -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Sequence, Tuple, Union import numpy as np import pandas as pd @@ -831,6 +831,7 @@ class Sender: host: str, port: Union[int, str], *, + addresses: Optional[Sequence[Tuple[str, Union[str, int]]]] = None, bind_interface: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, @@ -852,7 +853,13 @@ class Sender: protocol_version=None, init_buf_size: int = 65536, max_name_len: int = 127, - ): ... + ): + """ + :param addresses: Additional ``(host, port)`` pairs for failover + (HTTP/HTTPS only). On retriable errors the sender rotates through + all configured addresses in round-robin order. + """ + ... @staticmethod def from_conf( conf_str: str, diff --git a/src/questdb/ingress.pyx b/src/questdb/ingress.pyx index 17e43296..66883e4b 100644 --- a/src/questdb/ingress.pyx +++ b/src/questdb/ingress.pyx @@ -1784,6 +1784,9 @@ cdef object parse_conf_str( str conf_str): """ Parse a config string to a tuple of (Protocol, dict[str, str]). + The 'addr' key may appear multiple times for multi-url support; + all values are collected into a list under the '_addrs' key, + and the last 'addr' value is kept under the 'addr' key. """ cdef size_t c_len1 cdef const char* c_buf1 @@ -1794,6 +1797,7 @@ cdef object parse_conf_str( cdef str key cdef str value cdef dict params = {} + cdef list addrs = [] cdef line_sender_utf8 c_conf_str_utf8 cdef questdb_conf_str_parse_err* err cdef questdb_conf_str* c_conf_str @@ -1812,6 +1816,8 @@ cdef object parse_conf_str( while questdb_conf_str_iter_next(c_iter, &c_buf1, &c_len1, &c_buf2, &c_len2): key = PyUnicode_FromStringAndSize(c_buf1, c_len1) value = PyUnicode_FromStringAndSize(c_buf2, c_len2) + if key == 'addr': + addrs.append(value) params[key] = value questdb_conf_str_iter_free(c_iter) @@ -1848,6 +1854,13 @@ cdef object parse_conf_str( k: type_mappings.get(k, str)(v) for k, v in params.items() } + + # Store the full list of addresses for multi-url support. + # This is set AFTER the type_mappings comprehension to avoid + # the list being converted to a string by the default str() coercion. + if addrs: + params['_addrs'] = addrs + return (Protocol.parse(service), params) @@ -2089,6 +2102,7 @@ cdef class Sender: str host, object port, *, + object addresses=None, str bind_interface=None, str username=None, str password=None, @@ -2115,6 +2129,9 @@ cdef class Sender: cdef str port_str cdef line_sender_protocol c_protocol cdef line_sender_utf8 c_port + cdef line_sender_error* err = NULL + cdef line_sender_utf8 c_addr_host + cdef line_sender_utf8 c_addr_port cdef qdb_pystr_buf* b = qdb_pystr_buf_new() try: protocol = Protocol.parse(protocol) @@ -2130,6 +2147,31 @@ cdef class Sender: str_to_utf8(b, port_str, &c_port) self._opts = line_sender_opts_new_service(c_protocol, c_host, c_port) + if addresses is not None: + for addr_index, addr_entry in enumerate(addresses): + if not isinstance(addr_entry, (tuple, list)) or len(addr_entry) != 2: + raise TypeError( + f'addresses[{addr_index}] must be a (host, port) tuple') + if not isinstance(addr_entry[0], str): + raise TypeError( + f'addresses[{addr_index}] host must be a str, ' + f'not {_fqn(type(addr_entry[0]))}') + addr_host_str = addr_entry[0] + addr_port_val = addr_entry[1] + if isinstance(addr_port_val, int): + addr_port_str = str(addr_port_val) + elif isinstance(addr_port_val, str): + addr_port_str = addr_port_val + else: + raise TypeError( + f'addresses[{addr_index}] port must be an int or a str, ' + f'not {_fqn(type(addr_port_val))}') + str_to_utf8(b, addr_host_str, &c_addr_host) + str_to_utf8(b, addr_port_str, &c_addr_port) + if not line_sender_opts_address( + self._opts, c_addr_host, c_addr_port, &err): + raise c_err_to_py(err) + self._set_sender_fields( b, protocol, @@ -2248,11 +2290,16 @@ cdef class Sender: sender = Sender.__new__(Sender) - # Forward only the `addr=` parameter to the C API. - synthetic_conf_str = f'{protocol.tag}::addr={addr};' + # Forward addr parameter(s) to the C API. + # Multiple addr entries are supported for multi-url failover. + addrs = params.get('_addrs', [addr]) + addr_parts = ';'.join(f'addr={a}' for a in addrs) + synthetic_conf_str = f'{protocol.tag}::{addr_parts};' str_to_utf8(b, synthetic_conf_str, &c_synthetic_conf_str) sender._opts = line_sender_opts_from_conf( c_synthetic_conf_str, &err) + if err != NULL: + raise c_err_to_py(err) sender._set_sender_fields( b, diff --git a/src/questdb/line_sender.pxd b/src/questdb/line_sender.pxd index ad364d24..a6b07119 100644 --- a/src/questdb/line_sender.pxd +++ b/src/questdb/line_sender.pxd @@ -337,6 +337,13 @@ cdef extern from "questdb/ingress/line_sender.h": line_sender_error** err_out ) noexcept nogil + bint line_sender_opts_address( + line_sender_opts* opts, + line_sender_utf8 host, + line_sender_utf8 port, + line_sender_error** err_out + ) noexcept nogil + bint line_sender_opts_username( line_sender_opts* opts, line_sender_utf8 username, diff --git a/test/system_test.py b/test/system_test.py index 09acfd4d..0b037f32 100755 --- a/test/system_test.py +++ b/test/system_test.py @@ -1,6 +1,9 @@ #!/usr/bin/env python3 import sys + +from questdb.ingress import TimestampNanos + sys.dont_write_bytecode = True import os import shutil @@ -29,7 +32,7 @@ import questdb.ingress as qi -QUESTDB_VERSION = '9.2.0' +QUESTDB_VERSION = '9.3.0' QUESTDB_PLAIN_INSTALL_PATH = None QUESTDB_AUTH_INSTALL_PATH = None FIRST_ARRAY_RELEASE = (8, 4, 0) @@ -325,5 +328,10 @@ def test_decimal_pyarrow(self): scrubbed_data = [row[:-1] for row in resp['dataset']] self.assertEqual(scrubbed_data, expected_data) + def test_sending_just_timestamps(self): + with qi.Sender('http', 'localhost', self.qdb_plain.http_server_port) as sender: + sender.row(table_name="just_timestamp_test", at=TimestampNanos.now()) + sender.flush() + if __name__ == '__main__': unittest.main() diff --git a/test/test.py b/test/test.py index f32c5034..543aa37a 100755 --- a/test/test.py +++ b/test/test.py @@ -1483,6 +1483,309 @@ class TestBufferProtocolVersionV3(TestBases.TestBuffer): version = 3 +class TestMultiUrl(unittest.TestCase): + """Tests for multi-URL failover support.""" + + def test_addresses_param_basic(self): + """Test that the addresses kwarg is accepted and creates a valid sender.""" + with HttpServer() as server1, HttpServer() as server2: + with qi.Sender( + 'http', '127.0.0.1', server1.port, + addresses=[('127.0.0.1', server2.port)], + protocol_version='2', + auto_flush=False) as sender: + sender.row('tbl1', columns={'x': 42}, at=qi.ServerTimestamp) + sender.flush() + self.assertEqual(len(server1.requests), 1) + + def test_addresses_param_with_int_port(self): + """Test that addresses accept integer ports.""" + with HttpServer() as server1, HttpServer() as server2: + with qi.Sender( + 'http', '127.0.0.1', server1.port, + addresses=[('127.0.0.1', int(server2.port))], + protocol_version='2', + auto_flush=False) as sender: + sender.row('tbl1', columns={'x': 42}, at=qi.ServerTimestamp) + sender.flush() + + def test_addresses_param_with_str_port(self): + """Test that addresses accept string ports.""" + with HttpServer() as server1, HttpServer() as server2: + with qi.Sender( + 'http', '127.0.0.1', server1.port, + addresses=[('127.0.0.1', str(server2.port))], + protocol_version='2', + auto_flush=False) as sender: + sender.row('tbl1', columns={'x': 42}, at=qi.ServerTimestamp) + sender.flush() + + def test_addresses_param_none_is_no_op(self): + """Test that addresses=None works (single address, backward compat).""" + with HttpServer() as server: + with qi.Sender( + 'http', '127.0.0.1', server.port, + addresses=None, + protocol_version='2', + auto_flush=False) as sender: + sender.row('tbl1', columns={'x': 42}, at=qi.ServerTimestamp) + sender.flush() + self.assertEqual(len(server.requests), 1) + + def test_addresses_param_empty_list_is_no_op(self): + """Test that addresses=[] works (single address, no extras).""" + with HttpServer() as server: + with qi.Sender( + 'http', '127.0.0.1', server.port, + addresses=[], + protocol_version='2', + auto_flush=False) as sender: + sender.row('tbl1', columns={'x': 42}, at=qi.ServerTimestamp) + sender.flush() + self.assertEqual(len(server.requests), 1) + + def test_addresses_param_bad_type_rejected(self): + """Test that non-tuple entries in addresses raise TypeError.""" + with HttpServer() as server: + with self.assertRaises(TypeError): + qi.Sender( + 'http', '127.0.0.1', server.port, + addresses=['not-a-tuple'], + protocol_version='2') + + def test_addresses_param_bad_port_type_rejected(self): + """Test that non-int/str port in addresses raises TypeError.""" + with HttpServer() as server: + with self.assertRaises(TypeError): + qi.Sender( + 'http', '127.0.0.1', server.port, + addresses=[('host2', 3.14)], + protocol_version='2') + + def test_addresses_tcp_rejected(self): + """Test that TCP protocol rejects additional addresses.""" + with self.assertRaises(qi.IngressError) as ctx: + qi.Sender( + 'tcp', '127.0.0.1', 9009, + addresses=[('127.0.0.1', 9010)]) + self.assertEqual(ctx.exception.code, qi.IngressErrorCode.ConfigError) + + def test_multi_url_from_conf(self): + """Test multi-url via config string.""" + with HttpServer() as server1, HttpServer() as server2: + conf = ( + f'http::addr=127.0.0.1:{server1.port};' + f'addr=127.0.0.1:{server2.port};' + f'protocol_version=2;' + ) + with qi.Sender.from_conf(conf, auto_flush=False) as sender: + sender.row('tbl1', columns={'x': 42}, at=qi.ServerTimestamp) + sender.flush() + self.assertEqual(len(server1.requests), 1) + + def test_multi_url_failover(self): + """Test that a 500 on server1 causes failover to server2.""" + with HttpServer() as server1, HttpServer() as server2: + conf = ( + f'http::addr=127.0.0.1:{server1.port};' + f'addr=127.0.0.1:{server2.port};' + f'protocol_version=2;' + f'retry_timeout=5000;' + ) + with qi.Sender.from_conf(conf, auto_flush=False) as sender: + # First request to server1 returns 500, retry goes to server2. + server1.responses.append( + (0, 500, 'text/plain', b'server1 down')) + sender.row('tbl1', columns={'x': 42}, at=qi.ServerTimestamp) + sender.flush() + # server1 got the first attempt (500), server2 got the retry (200). + self.assertEqual(len(server1.requests), 1) + self.assertEqual(len(server2.requests), 1) + + def test_multi_url_non_retriable_no_failover(self): + """Test that a 400 error does not trigger failover.""" + with HttpServer() as server1, HttpServer() as server2: + conf = ( + f'http::addr=127.0.0.1:{server1.port};' + f'addr=127.0.0.1:{server2.port};' + f'protocol_version=2;' + f'retry_timeout=5000;' + ) + with qi.Sender.from_conf(conf, auto_flush=False) as sender: + server1.responses.append( + (0, 400, 'text/plain', b'Bad Request')) + sender.row('tbl1', columns={'x': 42}, at=qi.ServerTimestamp) + with self.assertRaises(qi.IngressError): + sender.flush() + # Only server1 was contacted, no failover. + self.assertEqual(len(server1.requests), 1) + self.assertEqual(len(server2.requests), 0) + + def test_multi_url_ipv6_from_conf(self): + """Test IPv6 bracket notation in config string parses correctly.""" + # We can't actually connect to [::1] in all environments, + # but we can verify parsing works by checking it doesn't error + # on the conf string itself (it will fail at connect time). + sender = None + try: + sender = qi.Sender.from_conf( + 'http::addr=[::1]:9000;addr=[::1]:9001;protocol_version=2;', + auto_flush=False) + except qi.IngressError as e: + # Connection errors are expected — we just care that + # config parsing succeeded (not a ConfigError). + self.assertNotEqual(e.code, qi.IngressErrorCode.ConfigError) + finally: + if sender is not None: + sender.close() + + def test_multi_url_addresses_param_failover(self): + """Test failover using the programmatic addresses parameter.""" + with HttpServer() as server1, HttpServer() as server2: + with qi.Sender( + 'http', '127.0.0.1', server1.port, + addresses=[('127.0.0.1', server2.port)], + protocol_version='2', + retry_timeout=5000, + auto_flush=False) as sender: + server1.responses.append( + (0, 500, 'text/plain', b'server1 down')) + sender.row('tbl1', columns={'x': 42}, at=qi.ServerTimestamp) + sender.flush() + self.assertEqual(len(server1.requests), 1) + self.assertEqual(len(server2.requests), 1) + + def test_multi_url_multiple_addresses_param(self): + """Test specifying multiple extra addresses programmatically.""" + with HttpServer() as s1, HttpServer() as s2, HttpServer() as s3: + with qi.Sender( + 'http', '127.0.0.1', s1.port, + addresses=[ + ('127.0.0.1', s2.port), + ('127.0.0.1', s3.port), + ], + protocol_version='2', + retry_timeout=5000, + auto_flush=False) as sender: + # s1 returns 500, s2 returns 500, s3 succeeds. + s1.responses.append( + (0, 500, 'text/plain', b's1 down')) + s2.responses.append( + (0, 500, 'text/plain', b's2 down')) + sender.row('tbl1', columns={'x': 42}, at=qi.ServerTimestamp) + sender.flush() + self.assertEqual(len(s1.requests), 1) + self.assertEqual(len(s2.requests), 1) + self.assertEqual(len(s3.requests), 1) + + def test_addresses_tcps_rejected(self): + """Test that TCPS protocol also rejects additional addresses.""" + with self.assertRaises(qi.IngressError) as ctx: + qi.Sender( + 'tcps', '127.0.0.1', 9009, + addresses=[('127.0.0.1', 9010)]) + self.assertEqual(ctx.exception.code, qi.IngressErrorCode.ConfigError) + + def test_multi_url_all_servers_fail(self): + """All servers return 500 — should raise after retry_timeout.""" + with HttpServer() as server1, HttpServer() as server2: + conf = ( + f'http::addr=127.0.0.1:{server1.port};' + f'addr=127.0.0.1:{server2.port};' + f'protocol_version=2;' + f'retry_timeout=200;' + ) + with qi.Sender.from_conf(conf, auto_flush=False) as sender: + # Pre-load enough 500 responses to outlast all retries. + for _ in range(20): + server1.responses.append( + (0, 500, 'text/plain', b'server1 down')) + server2.responses.append( + (0, 500, 'text/plain', b'server2 down')) + sender.row('tbl1', columns={'x': 42}, at=qi.ServerTimestamp) + with self.assertRaises(qi.IngressError) as ctx: + sender.flush() + self.assertEqual( + ctx.exception.code, + qi.IngressErrorCode.ServerFlushError) + # Both servers should have been contacted. + total = len(server1.requests) + len(server2.requests) + self.assertGreaterEqual(total, 2) + + def test_multi_url_failover_body_content(self): + """Verify the failover server receives the same ILP data.""" + with HttpServer() as server1, HttpServer() as server2: + conf = ( + f'http::addr=127.0.0.1:{server1.port};' + f'addr=127.0.0.1:{server2.port};' + f'protocol_version=2;' + f'retry_timeout=5000;' + ) + with qi.Sender.from_conf(conf, auto_flush=False) as sender: + server1.responses.append( + (0, 500, 'text/plain', b'server1 down')) + sender.row('tbl1', columns={'x': 42}, at=qi.ServerTimestamp) + sender.flush() + self.assertEqual(len(server2.requests), 1) + body = server2.requests[0] + self.assertIn(b'tbl1', body) + self.assertIn(b'x=42i', body) + + def test_addresses_param_with_list_entries(self): + """Test that list entries (not just tuples) work in addresses.""" + with HttpServer() as server1, HttpServer() as server2: + with qi.Sender( + 'http', '127.0.0.1', server1.port, + addresses=[['127.0.0.1', server2.port]], + protocol_version='2', + auto_flush=False) as sender: + sender.row('tbl1', columns={'x': 42}, at=qi.ServerTimestamp) + sender.flush() + self.assertEqual(len(server1.requests), 1) + + def test_addresses_non_str_host_rejected(self): + """Test that a non-str host (e.g. int) in addresses raises TypeError.""" + with HttpServer() as server: + with self.assertRaises(TypeError) as ctx: + qi.Sender( + 'http', '127.0.0.1', server.port, + addresses=[(42, 9000)], + protocol_version='2') + self.assertIn('addresses[0]', str(ctx.exception)) + self.assertIn('str', str(ctx.exception)) + + def test_addresses_none_host_rejected(self): + """Test that None as host in addresses raises TypeError.""" + with HttpServer() as server: + with self.assertRaises(TypeError) as ctx: + qi.Sender( + 'http', '127.0.0.1', server.port, + addresses=[(None, 9000)], + protocol_version='2') + self.assertIn('addresses[0]', str(ctx.exception)) + self.assertIn('str', str(ctx.exception)) + + def test_addresses_three_element_tuple_rejected(self): + """Test that a 3-element tuple in addresses raises TypeError.""" + with HttpServer() as server: + with self.assertRaises(TypeError) as ctx: + qi.Sender( + 'http', '127.0.0.1', server.port, + addresses=[('host', 9000, 'extra')], + protocol_version='2') + self.assertIn('addresses[0]', str(ctx.exception)) + + def test_addresses_zero_element_tuple_rejected(self): + """Test that an empty tuple in addresses raises TypeError.""" + with HttpServer() as server: + with self.assertRaises(TypeError) as ctx: + qi.Sender( + 'http', '127.0.0.1', server.port, + addresses=[()], + protocol_version='2') + self.assertIn('addresses[0]', str(ctx.exception)) + + if __name__ == '__main__': if os.environ.get('TEST_QUESTDB_PROFILE') == '1': import cProfile