components_api_ApiQueueTask.bs

import "pkg:/source/utils/misc.bs"

sub init()
  m.top.functionName = "runQueueLoop"
end sub

' FIFO request queue coordinator (Architecture C — continuous-server).
'
' Receives ApiResultNode children appended by fetchRes() callers. Each node carries
' the request AA on its 'request' field. The coordinator reads ALL unprocessed
' children when woken up, dispatches each to the first free ApiTask pool slot, then
' routes the completed response back to the ApiResultNode so the orchestrator unblocks.
'
' Why children instead of a shared field:
'   SceneGraph can coalesce port events when multiple Task threads write to the same
'   field in rapid succession — the coordinator would see fewer events than writes,
'   silently losing requests. Children are immune: even if wakeup events coalesce,
'   the coordinator reads ALL pending children from the node tree, so no request is
'   ever lost regardless of timing.
'
' Startup ordering guarantee:
'   1. Each ApiTask pool slot sets isReady=true after registering its request observer.
'   2. This coordinator waits for ALL pool slots to be ready before registering its
'      own observers and setting isReady=true.
'   3. fetchRes() waits for this coordinator's isReady=true before appending children.
'   Result: pool observers → coordinator ready → fetchRes appends.  No startup race.
'
' Thread safety:
'   Only this task thread reads/writes m.queue, m.inFlight, and m.processedIndex —
'   no TOCTOU race. Child appends from other threads go through SceneGraph rendezvous.
sub runQueueLoop()
  m.queue = []
  ' One slot per pool node: invalid = free, AA entry = in-flight
  m.inFlight = [invalid, invalid, invalid]
  ' Index of the next unprocessed child node (children accumulate, never re-processed)
  m.processedIndex = 0

  pool = [m.global.apiPool0, m.global.apiPool1, m.global.apiPool2]

  ' ── Wait for all pool tasks to register their request observers ──
  ' Without this, dispatching a request to a slot that hasn't called
  ' observeField("request", port) yet would silently drop the write —
  ' the slot would never wake up, m.inFlight would never clear, and
  ' the slot would be permanently dead for the rest of the session.
  '
  ' Strategy: observe ready on all not-yet-ready slots, then check ACTUAL
  ' field state in each iteration (never count events — event counting is
  ' vulnerable to double-counting when a slot becomes ready between
  ' observeField() and a subsequent field read).
  waitPort = CreateObject("roMessagePort")
  for each slot in pool
    if not slot.isReady
      slot.observeField("isReady", waitPort)
    end if
  end for
  ' Check actual state until all 3 are ready. Events just wake us up.
  for attempts = 0 to 50
    allReady = true
    for each slot in pool
      if not slot.isReady
        allReady = false
        exit for
      end if
    end for
    if allReady then exit for
    ' At least one slot isn't ready — wait for any ready event to recheck.
    ' 5s timeout per attempt; pool tasks start before us so this is fast.
    wait(5000, waitPort)
  end for
  for each slot in pool
    slot.unobserveField("isReady")
  end for

  ' ── Now safe to accept requests ──
  port = CreateObject("roMessagePort")
  m.top.observeField("enqueue", port)
  ' Signal readiness AFTER pool slots are confirmed ready and our enqueue
  ' observer is registered. fetchRes() callers waiting on this flag can
  ' now safely append children — the full pipeline is operational.
  m.top.isReady = true

  ' Store node IDs so we can match response events back to slot indices.
  ' msg.getNode() returns the ID string of the node whose field changed.
  m.slotNodeIds = [pool[0].id, pool[1].id, pool[2].id]

  ' Observe each pool slot's response field so we know when a slot completes.
  for each slot in pool
    slot.observeField("response", port)
  end for

  ' Process any children that were appended BEFORE the enqueue observer was
  ' registered. If fetchRes() callers bypassed the ready guard (e.g. ready
  ' became true between their check and the appendChild), their wakeup events
  ' were lost because our observer wasn't registered yet. Reading children
  ' here catches those stragglers — the node tree is the source of truth.
  processNewChildren(pool)

  while true
    msg = wait(0, port)
    if type(msg) = "roSGNodeEvent"
      field = msg.getField()

      if field = "enqueue"
        ' Wake-up signal from fetchRes — actual data is in children, not here.
        ' processNewChildren below handles it.

      else if field = "response"
        ' A pool slot finished. Find which slot by matching node ID.
        nodeId = msg.getNode()
        slotIdx = -1
        for i = 0 to 2
          if m.slotNodeIds[i] = nodeId
            slotIdx = i
            exit for
          end if
        end for

        if slotIdx >= 0
          entry = m.inFlight[slotIdx]
          if isValid(entry)
            ' Deliver response to the orchestrator's result node.
            ' Write result before done so it is readable when the observer fires.
            entry.resultNode.result = msg.getData()
            entry.resultNode.isDone = true
            m.inFlight[slotIdx] = invalid
          end if
        end if
      end if
    end if

    ' Always process new children when woken up (by ANY event type).
    ' This is the key coalescing defense: even if multiple enqueue wakeup
    ' events merged into one, we read ALL unprocessed children here.
    processNewChildren(pool)
  end while
end sub

' Read all unprocessed children (ApiResultNodes appended by fetchRes callers),
' extract their request AAs, add to the internal FIFO queue, and dispatch.
' Periodically prunes processed children to prevent unbounded accumulation.
sub processNewChildren(pool as object)
  childCount = m.top.getChildCount()
  while m.processedIndex < childCount
    child = m.top.getChild(m.processedIndex)
    m.processedIndex++
    if isValid(child) and isValid(child.request)
      m.queue.Push({ req: child.request, resultNode: child })
    end if
  end while
  tryDispatch(pool)

  ' Prune processed children to prevent unbounded node accumulation in long sessions.
  ' Safe: entry objects (in m.queue/m.inFlight) and fetchRes() callers hold direct node
  ' references — removing from the child tree only detaches the parent link; node fields
  ' and observers remain valid until all references are dropped.
  if m.processedIndex >= 50
    m.top.removeChildrenIndex(m.processedIndex, 0)
    m.processedIndex = 0
  end if
end sub

' Dispatch queued requests to free pool slots (FIFO).
' Only ever called from this coordinator's task thread — no concurrent access.
sub tryDispatch(pool as object)
  while m.queue.Count() > 0
    freeSlot = -1
    for i = 0 to 2
      if not isValid(m.inFlight[i])
        freeSlot = i
        exit for
      end if
    end for
    if freeSlot = -1 then return

    entry = m.queue[0]
    m.queue.Delete(0)
    m.inFlight[freeSlot] = entry
    ' Writing request triggers ApiTask's port observer, starting execution.
    pool[freeSlot].request = entry.req
  end while
end sub