components_api_ApiQueueTask.bs
import "pkg:/source/utils/misc.bs"
sub init()
m.top.functionName = "runQueueLoop"
end sub
' FIFO request queue coordinator (Architecture C — continuous-server).
'
' Receives ApiResultNode children appended by fetchRes() callers. Each node carries
' the request AA on its 'request' field. The coordinator reads ALL unprocessed
' children when woken up, dispatches each to the first free ApiTask pool slot, then
' routes the completed response back to the ApiResultNode so the orchestrator unblocks.
'
' Why children instead of a shared field:
' SceneGraph can coalesce port events when multiple Task threads write to the same
' field in rapid succession — the coordinator would see fewer events than writes,
' silently losing requests. Children are immune: even if wakeup events coalesce,
' the coordinator reads ALL pending children from the node tree, so no request is
' ever lost regardless of timing.
'
' Startup ordering guarantee:
' 1. Each ApiTask pool slot sets isReady=true after registering its request observer.
' 2. This coordinator waits for ALL pool slots to be ready before registering its
' own observers and setting isReady=true.
' 3. fetchRes() waits for this coordinator's isReady=true before appending children.
' Result: pool observers → coordinator ready → fetchRes appends. No startup race.
'
' Thread safety:
' Only this task thread reads/writes m.queue, m.inFlight, and m.processedIndex —
' no TOCTOU race. Child appends from other threads go through SceneGraph rendezvous.
sub runQueueLoop()
m.queue = []
' One slot per pool node: invalid = free, AA entry = in-flight
m.inFlight = [invalid, invalid, invalid]
' Index of the next unprocessed child node (children accumulate, never re-processed)
m.processedIndex = 0
pool = [m.global.apiPool0, m.global.apiPool1, m.global.apiPool2]
' ── Wait for all pool tasks to register their request observers ──
' Without this, dispatching a request to a slot that hasn't called
' observeField("request", port) yet would silently drop the write —
' the slot would never wake up, m.inFlight would never clear, and
' the slot would be permanently dead for the rest of the session.
'
' Strategy: observe ready on all not-yet-ready slots, then check ACTUAL
' field state in each iteration (never count events — event counting is
' vulnerable to double-counting when a slot becomes ready between
' observeField() and a subsequent field read).
waitPort = CreateObject("roMessagePort")
for each slot in pool
if not slot.isReady
slot.observeField("isReady", waitPort)
end if
end for
' Check actual state until all 3 are ready. Events just wake us up.
for attempts = 0 to 50
allReady = true
for each slot in pool
if not slot.isReady
allReady = false
exit for
end if
end for
if allReady then exit for
' At least one slot isn't ready — wait for any ready event to recheck.
' 5s timeout per attempt; pool tasks start before us so this is fast.
wait(5000, waitPort)
end for
for each slot in pool
slot.unobserveField("isReady")
end for
' ── Now safe to accept requests ──
port = CreateObject("roMessagePort")
m.top.observeField("enqueue", port)
' Signal readiness AFTER pool slots are confirmed ready and our enqueue
' observer is registered. fetchRes() callers waiting on this flag can
' now safely append children — the full pipeline is operational.
m.top.isReady = true
' Store node IDs so we can match response events back to slot indices.
' msg.getNode() returns the ID string of the node whose field changed.
m.slotNodeIds = [pool[0].id, pool[1].id, pool[2].id]
' Observe each pool slot's response field so we know when a slot completes.
for each slot in pool
slot.observeField("response", port)
end for
' Process any children that were appended BEFORE the enqueue observer was
' registered. If fetchRes() callers bypassed the ready guard (e.g. ready
' became true between their check and the appendChild), their wakeup events
' were lost because our observer wasn't registered yet. Reading children
' here catches those stragglers — the node tree is the source of truth.
processNewChildren(pool)
while true
msg = wait(0, port)
if type(msg) = "roSGNodeEvent"
field = msg.getField()
if field = "enqueue"
' Wake-up signal from fetchRes — actual data is in children, not here.
' processNewChildren below handles it.
else if field = "response"
' A pool slot finished. Find which slot by matching node ID.
nodeId = msg.getNode()
slotIdx = -1
for i = 0 to 2
if m.slotNodeIds[i] = nodeId
slotIdx = i
exit for
end if
end for
if slotIdx >= 0
entry = m.inFlight[slotIdx]
if isValid(entry)
' Deliver response to the orchestrator's result node.
' Write result before done so it is readable when the observer fires.
entry.resultNode.result = msg.getData()
entry.resultNode.isDone = true
m.inFlight[slotIdx] = invalid
end if
end if
end if
end if
' Always process new children when woken up (by ANY event type).
' This is the key coalescing defense: even if multiple enqueue wakeup
' events merged into one, we read ALL unprocessed children here.
processNewChildren(pool)
end while
end sub
' Read all unprocessed children (ApiResultNodes appended by fetchRes callers),
' extract their request AAs, add to the internal FIFO queue, and dispatch.
' Periodically prunes processed children to prevent unbounded accumulation.
sub processNewChildren(pool as object)
childCount = m.top.getChildCount()
while m.processedIndex < childCount
child = m.top.getChild(m.processedIndex)
m.processedIndex++
if isValid(child) and isValid(child.request)
m.queue.Push({ req: child.request, resultNode: child })
end if
end while
tryDispatch(pool)
' Prune processed children to prevent unbounded node accumulation in long sessions.
' Safe: entry objects (in m.queue/m.inFlight) and fetchRes() callers hold direct node
' references — removing from the child tree only detaches the parent link; node fields
' and observers remain valid until all references are dropped.
if m.processedIndex >= 50
m.top.removeChildrenIndex(m.processedIndex, 0)
m.processedIndex = 0
end if
end sub
' Dispatch queued requests to free pool slots (FIFO).
' Only ever called from this coordinator's task thread — no concurrent access.
sub tryDispatch(pool as object)
while m.queue.Count() > 0
freeSlot = -1
for i = 0 to 2
if not isValid(m.inFlight[i])
freeSlot = i
exit for
end if
end for
if freeSlot = -1 then return
entry = m.queue[0]
m.queue.Delete(0)
m.inFlight[freeSlot] = entry
' Writing request triggers ApiTask's port observer, starting execution.
pool[freeSlot].request = entry.req
end while
end sub