source_api_apiPool.bs

import "pkg:/source/constants/timeouts.bs"
import "pkg:/source/utils/misc.bs"

' Submits a request to the API queue coordinator and blocks the calling Task thread
' until the response arrives (or the timeouts.API_WAIT_MS deadline elapses).
'
' Safe to call concurrently from multiple Task threads — each request gets its own
' ApiResultNode, so responses are never mixed up between callers. The coordinator
' (ApiQueueTask) is the only entity that dispatches to pool slots, eliminating the
' TOCTOU race that existed when callers claimed slots directly.
'
' Delivery mechanism: the ApiResultNode is appended as a CHILD of the coordinator
' node. This is immune to SceneGraph port-event coalescing — even if multiple
' concurrent enqueue wakeup events merge into one, the coordinator reads ALL
' unprocessed children when it wakes up, so no request is ever lost.
'
' @param req      - request AA from a Build*Request() method; returns invalid if invalid
' @param requestId - unique string identifying this request (echoed in the response AA)
' @return response AA { requestId, ok, statusCode, json, text } or invalid on timeout (timeouts.API_WAIT_MS)
function fetchRes(req as dynamic, requestId as string) as dynamic
  if not isValid(req) then return invalid

  apiQueue = m.global.apiQueue
  if not isValid(apiQueue) then return invalid

  ' Startup ordering guard: the coordinator sets isReady=true only AFTER all 3 pool
  ' tasks have registered their request observers AND the coordinator has registered
  ' its own observers. This guarantees the full pipeline (pool → coordinator →
  ' callers) is operational before any request flows.
  ' After the first request this check costs a single field-read rendezvous; it is
  ' effectively free for the rest of the session once the pipeline is up.
  if not apiQueue.isReady
    readyPort = CreateObject("roMessagePort")
    apiQueue.observeField("isReady", readyPort)
    ' Double-check after registering: isReady may have become true in the narrow window
    ' between the first check and observeField(), in which case we skip the wait.
    ' Loop with short waits: a single 5s wait can miss the event if the coordinator
    ' becomes ready between observeField and the first wait (event consumed by
    ' the observe itself on some firmware). Re-checking actual field state each
    ' iteration makes this immune to event delivery quirks.
    for _attempt = 0 to 50
      if apiQueue.isReady then exit for
      wait(200, readyPort)
    end for
    apiQueue.unobserveField("isReady")
  end if

  req.requestId = requestId

  ' One result node per request — carries the request in and the response out.
  ' The coordinator reads 'request', dispatches to a pool slot, then writes
  ' 'result' and sets 'done' when the slot completes.
  resultNode = CreateObject("roSGNode", "ApiResultNode")
  resultNode.request = req

  ' Observe BEFORE appending. If the request completes before we reach wait(),
  ' the 'done' event is already in the port queue and wait() returns immediately.
  port = CreateObject("roMessagePort")
  resultNode.observeField("isDone", port)

  ' Append the result node as a child of the coordinator. The coordinator reads
  ' ALL unprocessed children when woken up — this is immune to event coalescing,
  ' unlike writing data to a shared field where concurrent writes can merge events.
  apiQueue.appendChild(resultNode)
  ' Wake-up signal — coordinator ignores the value and reads children instead.
  apiQueue.enqueue = requestId

  msg = wait(timeouts.API_WAIT_MS, port)
  if type(msg) = "roSGNodeEvent"
    resultNode.unobserveField("isDone")
    return resultNode.result
  end if
  ' Timeout: clean up the observer to prevent a stale reference to the dead port.
  ' The result node stays as a child of the coordinator — do NOT removeChild(),
  ' as that shifts child indices and corrupts the coordinator's m.processedIndex.
  ' When the coordinator eventually delivers the response it will write to the
  ' abandoned node (no crash), free the slot, and dispatch the next queued
  ' request normally. The node is pruned at the 50-child threshold.
  resultNode.unobserveField("isDone")
  return invalid
end function

' Convenience wrapper — submits via the queue and returns the parsed JSON body.
' Returns invalid on timeout, HTTP error, or if req is invalid.
'
' @param req       - request AA from a Build*Request() method
' @param requestId - unique string identifying this request
' @return res.json or invalid
function fetchJson(req as dynamic, requestId as string) as dynamic
  res = fetchRes(req, requestId)
  if isValid(res) and res.ok then return res.json
  return invalid
end function

' Submits a request to the API queue coordinator and returns immediately without
' blocking. The caller observes 'done' on the returned ApiResultNode via a
' render-thread callback, then reads 'result' for the response.
'
' The render thread does NOT make the HTTP call — it just creates a node,
' appends it to the coordinator, and returns (~microseconds). The coordinator
' (ApiQueueTask, a Task thread) dispatches to an ApiTask pool slot (another
' Task thread) which performs the actual HTTP request. When done, the result
' is written back to the ApiResultNode and the render-thread observeField
' callback fires.
'
' Use for single API calls with trivial callbacks (set a boolean, read one value).
' Do NOT use when the callback needs data transforms or array processing —
' use an Orchestrator Task with fetchJson() instead.
'
' @param req       - request AA from a Build*Request() method; returns invalid if invalid
' @param requestId - unique string identifying this request
' @return ApiResultNode to observe, or invalid if the pool is not ready
function submitApiRequest(req as dynamic, requestId as string) as dynamic
  if not isValid(req) then return invalid

  apiQueue = m.global.apiQueue
  if not isValid(apiQueue) or not apiQueue.isReady then return invalid

  req.requestId = requestId

  resultNode = CreateObject("roSGNode", "ApiResultNode")
  resultNode.request = req

  apiQueue.appendChild(resultNode)
  apiQueue.enqueue = requestId

  return resultNode
end function

' Submits a fire-and-forget request to the global SideEffectTask.
' No response is observed. Calls are serialised — do not use for cancellable UI requests.
'
' @param req - request AA: { method, url, headers?, body?, timeout? }
sub SubmitSideEffect(req as dynamic)
  if not isValid(req) then return
  sideEffectTask = m.global.sideEffectTask
  if not isValid(sideEffectTask) then return
  sideEffectTask.request = req
  sideEffectTask.control = "RUN"
end sub