Skip to content

Commit 6a703fb

Browse files
authored
tests: add test_set_ui_element_request_manager.py (marimo-team#7180)
Just some tests and logging that was helpful when debugging an issue
1 parent 88e1b31 commit 6a703fb

File tree

2 files changed

+201
-0
lines changed

2 files changed

+201
-0
lines changed

marimo/_runtime/requests.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616

1717
import msgspec
1818

19+
from marimo import _loggers
1920
from marimo._ast.app_config import _AppConfig
2021
from marimo._config.config import MarimoConfig
2122
from marimo._data.models import DataTableSource
2223
from marimo._types.ids import CellId_t, RequestId, UIElementId, WidgetModelId
2324

25+
LOGGER = _loggers.marimo_logger()
26+
2427
if TYPE_CHECKING:
2528
from collections.abc import Iterator
2629

@@ -229,6 +232,11 @@ def __post_init__(self) -> None:
229232
assert len(self.object_ids) == len(self.values), (
230233
"Mismatched object_ids and values"
231234
)
235+
# Empty token is not valid (but let's not fail)
236+
if not self.token:
237+
LOGGER.warning(
238+
"SetUIElementValueRequest with empty token is invalid"
239+
)
232240

233241
@staticmethod
234242
def from_ids_and_values(
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
# Copyright 2024 Marimo. All rights reserved.
2+
from __future__ import annotations
3+
4+
import asyncio
5+
import queue
6+
import threading
7+
import time
8+
9+
from marimo._runtime.requests import SetUIElementValueRequest
10+
from marimo._runtime.utils.set_ui_element_request_manager import (
11+
SetUIElementRequestManager,
12+
)
13+
14+
15+
def test_process_request_dedupes_by_token() -> None:
16+
"""Test that duplicate tokens are properly deduplicated."""
17+
q: queue.Queue[SetUIElementValueRequest] = queue.Queue()
18+
manager = SetUIElementRequestManager(q)
19+
20+
# Create two requests with the same token
21+
request1 = SetUIElementValueRequest(
22+
object_ids=["obj1"], values=[1], token="token1"
23+
)
24+
request2 = SetUIElementValueRequest(
25+
object_ids=["obj2"], values=[2], token="token1"
26+
)
27+
28+
# Put the duplicate in the queue
29+
q.put(request2)
30+
31+
# Process the first request
32+
result = manager.process_request(request1)
33+
34+
# Should only get one request (the second is a duplicate)
35+
assert result is not None
36+
assert len(result.object_ids) == 1
37+
assert result.object_ids[0] == "obj1"
38+
39+
40+
def test_process_request_merges_different_tokens() -> None:
41+
"""Test that requests with different tokens are merged."""
42+
q: queue.Queue[SetUIElementValueRequest] = queue.Queue()
43+
manager = SetUIElementRequestManager(q)
44+
45+
request1 = SetUIElementValueRequest(
46+
object_ids=["obj1"], values=[1], token="token1"
47+
)
48+
request2 = SetUIElementValueRequest(
49+
object_ids=["obj2"], values=[2], token="token2"
50+
)
51+
52+
# Put the second request in the queue
53+
q.put(request2)
54+
55+
# Process the first request
56+
result = manager.process_request(request1)
57+
58+
# Should merge both requests
59+
assert result is not None
60+
assert len(result.object_ids) == 2
61+
assert set(result.object_ids) == {"obj1", "obj2"}
62+
63+
64+
def test_process_request_keeps_latest_value_per_id() -> None:
65+
"""Test that when multiple requests update the same UI element, the latest value wins."""
66+
q: queue.Queue[SetUIElementValueRequest] = queue.Queue()
67+
manager = SetUIElementRequestManager(q)
68+
69+
request1 = SetUIElementValueRequest(
70+
object_ids=["obj1"], values=[1], token="token1"
71+
)
72+
request2 = SetUIElementValueRequest(
73+
object_ids=["obj1"], values=[2], token="token2"
74+
)
75+
76+
# Put the second request in the queue
77+
q.put(request2)
78+
79+
# Process the first request
80+
result = manager.process_request(request1)
81+
82+
# Should keep the latest value (from request2)
83+
assert result is not None
84+
assert len(result.object_ids) == 1
85+
assert result.object_ids[0] == "obj1"
86+
assert result.values[0] == 2
87+
88+
89+
def test_process_request_handles_concurrent_queue_updates() -> None:
90+
"""Test that the manager properly drains the queue even with concurrent updates.
91+
92+
This simulates the race condition that occurs with ZeroMQ IPC where a
93+
receiver thread continuously adds messages to the queue.
94+
"""
95+
q: queue.Queue[SetUIElementValueRequest] = queue.Queue()
96+
manager = SetUIElementRequestManager(q)
97+
98+
# Track how many requests were added
99+
requests_added = []
100+
stop_event = threading.Event()
101+
102+
def producer():
103+
"""Simulate a ZeroMQ receiver thread adding messages to the queue."""
104+
counter = 0
105+
while not stop_event.is_set():
106+
request = SetUIElementValueRequest(
107+
object_ids=[f"obj{counter}"],
108+
values=[counter],
109+
token=f"token{counter}",
110+
)
111+
q.put(request)
112+
requests_added.append(request)
113+
counter += 1
114+
time.sleep(0.001) # Small delay to simulate real conditions
115+
116+
# Start the producer thread
117+
producer_thread = threading.Thread(target=producer, daemon=True)
118+
producer_thread.start()
119+
120+
# Let it produce some messages
121+
time.sleep(0.05)
122+
123+
# Process a request while the producer is still running
124+
initial_request = SetUIElementValueRequest(
125+
object_ids=["obj_initial"], values=[999], token="token_initial"
126+
)
127+
128+
result = manager.process_request(initial_request)
129+
130+
# Stop the producer
131+
stop_event.set()
132+
producer_thread.join(timeout=1)
133+
134+
# Verify that we got a merged result
135+
assert result is not None
136+
assert len(result.object_ids) > 1 # Should have merged multiple requests
137+
assert "obj_initial" in result.object_ids
138+
139+
# Verify the queue is actually empty after processing
140+
# (small delay to let any in-flight messages arrive)
141+
time.sleep(0.01)
142+
remaining = []
143+
while not q.empty():
144+
try:
145+
remaining.append(q.get_nowait())
146+
except queue.Empty:
147+
break
148+
149+
# There might be a few messages that arrived after we finished processing,
150+
# but it should be small compared to what we processed
151+
assert len(remaining) < 5 # Allow for a few stragglers
152+
153+
154+
async def test_process_request_with_asyncio_queue() -> None:
155+
"""Test that the manager works with asyncio.Queue."""
156+
q: asyncio.Queue[SetUIElementValueRequest] = asyncio.Queue()
157+
manager = SetUIElementRequestManager(q)
158+
159+
request1 = SetUIElementValueRequest(
160+
object_ids=["obj1"], values=[1], token="token1"
161+
)
162+
request2 = SetUIElementValueRequest(
163+
object_ids=["obj2"], values=[2], token="token2"
164+
)
165+
166+
# Put the second request in the queue
167+
q.put_nowait(request2)
168+
169+
# Process the first request
170+
result = manager.process_request(request1)
171+
172+
# Should merge both requests
173+
assert result is not None
174+
assert len(result.object_ids) == 2
175+
assert set(result.object_ids) == {"obj1", "obj2"}
176+
177+
178+
def test_process_request_returns_none_for_empty_batch() -> None:
179+
"""Test that None is returned when all requests are duplicates."""
180+
q: queue.Queue[SetUIElementValueRequest] = queue.Queue()
181+
manager = SetUIElementRequestManager(q)
182+
183+
# Create a request and process it
184+
request = SetUIElementValueRequest(
185+
object_ids=["obj1"], values=[1], token="token1"
186+
)
187+
result1 = manager.process_request(request)
188+
assert result1 is not None
189+
190+
# Process the same token again (duplicate)
191+
result2 = manager.process_request(request)
192+
# Should return None since it's a duplicate
193+
assert result2 is None

0 commit comments

Comments
 (0)