Skip to content

Orchestrator load, plugins, and settings

How chat reaches an orchestrator from the pool, how OrchestratorFactory wires adapters and SDKPluginManager, plugin contract resolution, settings injection, and stateless bundle caching.

Intended audience: Solution architects, Developers

Runtime loading path in `engine/` and `infra/plugins/`. Pair with the upload pipeline guide for ZIP validation vs pool-time loading.

Learning outcomes by role

Solution architects

  • Map hot vs demand pool, S3-backed cache, and shared bundle cache to scaling and isolation concerns.

Developers

  • Trace `OrchestratorPool.get` through `factory.create`, `load_plugins`, and bundle creation in source order.
  • Explain registry vs filesystem resolution, `ensure_local`, and `PluginSettingsResolver.resolve` for a given `PluginRef`.

This guide walks through how an orchestrator instance is obtained at runtime, how plugins are resolved and turned into SDKPluginBundle objects, and how per-instance settings are merged and decrypted. For uploading ZIPs to the catalog, see Plugin upload and verification. For product context, see Hot reload AI App pool, AI Agent system, and Chat and engine.

On a chat request, the engine resolves an orchestrator instance id, then OrchestratorPool.get(instance_id) returns a BaseOrchestrator. If the instance is not in the hot or demand pool, the pool loads configuration from the database and OrchestratorFactory.create builds: adapter → SDKPluginManagerload_plugins → streaming wrapper → orchestrator → initialize().

flowchart TB
  subgraph poolLayer [OrchestratorPool]
    get[get instance_id]
    hot[hot_pool dict]
    dem[demand_pool TTL]
    cold[_load_from_db]
  end
  subgraph factoryLayer [OrchestratorFactory.create]
    reg[registry lookup]
    pm[SDKPluginManager.load_plugins]
    sw[streaming wrapper]
    orch[orchestrator.initialize]
  end
  get --> hot
  get --> dem
  get --> cold
  cold --> reg
  reg --> pm
  pm --> sw
  sw --> orch

Why: Orchestrators are expensive; hot instances stay resident; demand instances are TTL-evicted and capped.

What: OrchestratorPool.get in src/cadence/engine/pool/pool.py.

  1. If instance_id in self.hot_pool → return immediately.
  2. Else self.demand_pool.get(instance_id) — on hit, TTL is extended (DemandPool.get updates _last_accessed).
  3. Else acquire self.locks[instance_id], double-check hot and demand, then await self._load_from_db(instance_id, source="on_demand").

DemandPool.peek returns an instance without extending TTL (used when you must inspect without treating access as “use” — e.g. reload paths).

async def get(self, instance_id: str) -> BaseOrchestrator:
if instance_id in self.hot_pool:
return self.hot_pool[instance_id]
orch = self.demand_pool.get(instance_id)
if orch is not None:
return orch
self._ensure_lock(instance_id)
async with self.locks[instance_id]:
if instance_id in self.hot_pool:
return self.hot_pool[instance_id]
orch = self.demand_pool.get(instance_id)
if orch is not None:
return orch
return await self._load_from_db(instance_id, source="on_demand")

Why: Demand-tier misses must hydrate framework_type, mode, config, plugin_settings, and whoami from persistence before construction.

What: _load_from_db loads the instance row, then build_resolved_instance_config (src/cadence/engine/config_builder.py), then await self.factory.create(...), then self.demand_pool.set(instance_id, orchestrator).

  • instance_configconfig merged with plugin_settings from the row (org overrides for each plugin key).
  • resolved_configinstance_config plus org_id and whoami, passed into the orchestrator stack.
instance_config = {
**config,
"plugin_settings": instance.get("plugin_settings", {}),
}
resolved_config = {
**instance_config,
"org_id": instance["org_id"],
"whoami": instance.get("whoami") or "",
}

What: OrchestratorFactory.create in src/cadence/engine/factory.py.

Order:

  1. registry[(framework_type, mode)]BackendRegistryEntry (adapter, orchestrator, streaming wrapper classes).
  2. adapter = adapter_class()
  3. SDKPluginManager(...) with llm_factory, org_id, tenant/system plugin roots, plugin_repo, bundle_cache
  4. await plugin_manager.load_plugins(plugin_specs, instance_config) where plugin_specs = instance_config.get("active_plugins", [])
  5. streaming_wrapper = streaming_wrapper_class()
  6. orchestrator = orchestrator_class(plugin_manager=..., llm_factory=..., resolved_config=..., adapter=..., streaming_wrapper=...)
  7. await orchestrator.initialize()
sequenceDiagram
  participant Pool as OrchestratorPool
  participant Fact as OrchestratorFactory
  participant PM as SDKPluginManager
  participant Orch as BaseOrchestrator
  Pool->>Fact: create(framework_type, mode, org_id, instance_config, resolved_config)
  Fact->>PM: load_plugins(active_plugins, instance_config)
  Fact->>Orch: orchestrator_class(...)
  Fact->>Orch: initialize()

What: SDKPluginManager.load_plugins in src/cadence/infra/plugins/plugin_manager.py.

For each string in active_plugins (e.g. org:com.example.plugin@1.0.0):

  1. PluginRef.from_ref(plugin_spec)
  2. contract = await self._resolve_contract(plugin_ref, registry)
  3. Deduplicate by (source, pid, contract.version)
  4. self._validate_plugin(contract) (SDK structure validation)
  5. bundle = await self._create_bundle_with_cache(contract, settings_resolver, plugin_ref)
  6. Store in self._bundles

PluginSettingsResolver(instance_config) is constructed once per load and passed into bundle creation.

flowchart TD
  loop[For each active_plugins string]
  ref[PluginRef.from_ref]
  contract[_resolve_contract]
  dedupe{bundle key seen?}
  validate[_validate_plugin]
  bundle[_create_bundle_with_cache]
  store[Store in _bundles]
  skip[Skip duplicate version]
  loop --> ref
  ref --> contract
  contract --> dedupe
  dedupe -->|yes| skip
  dedupe -->|no| validate
  validate --> bundle
  bundle --> store

Contract resolution — registry, filesystem, S3 sync

Section titled “Contract resolution — registry, filesystem, S3 sync”

What: PluginLoaderMixin._resolve_contract in src/cadence/infra/plugins/plugin_loader.py.

  1. registry.get_plugin_by_version(pid, version) — in-process SDK registry (e.g. already loaded in this worker).
  2. If missing → await _load_versioned_plugin_from_filesystem(plugin_ref) which uses _resolve_local_plugin_directory_find_plugin_file_ensure_plugin_dependencies_load_plugin_module (runs scan_plugin_ast before exec_module) → _extract_plugin_classPluginContract + register in registry.
  3. If still missing → PluginNotFoundError.
  4. If plugin_repo exists and S3 is enabled → ensure_local(pid, version, org_id) to align local cache with object storage; on ResourceNotFoundError, logs a warning and continues with local-only content.

_resolve_local_plugin_directory tries, in order:

  1. plugin_repo.ensure_local(...) if a store is configured (may download and extract from S3).
  2. Else tenant path tenant_plugins_root / org_id / pid / version or system path system_plugins_dir / pid / version if directories exist and are non-empty.

What: PluginStoreRepository.ensure_local in src/cadence/data/plugins/store.py.

  • If local dir exists with content → cache hit.
  • If S3 disabled and missing locally → PluginNotFoundError (when that path is used as the only source).
  • If S3 enabled → download s3_key, _extract_zip into the local cache path.
flowchart TD
  start[_resolve_contract]
  reg{registry hit?}
  fs[_load_versioned_plugin_from_filesystem]
  local[_resolve_local_plugin_directory]
  hit{local content?}
  s3{S3 enabled?}
  load[load module register contract]
  fail[PluginNotFoundError]
  start --> reg
  reg -->|yes| done[return contract]
  reg -->|no| fs
  fs --> local
  local --> hit
  hit -->|yes| load
  hit -->|no| s3
  s3 -->|try download| load
  s3 -->|cannot| fail
  load --> done

What: PluginSettingsResolver.resolve in src/cadence/infra/plugins/plugin_settings_resolver.py.

Order:

  1. Schema from get_plugin_settings_schema(plugin_class) → defaults.
  2. Overrides from instance_config["plugin_settings"][plugin_ref.to_ref()] — keys like org:pid@version.
  3. Merge {**defaults, **overrides}.
  4. decrypt_sensitive_settings(resolved, settings_schema).
  5. _validate_requiredPluginValidationError if a required key is missing.

Lookup uses PluginRef.to_ref() so instance rows and API payloads stay aligned with active_plugins strings.

sequenceDiagram
  participant R as PluginSettingsResolver
  participant S as settings schema
  participant DB as instance_config plugin_settings
  participant D as decrypt_sensitive_settings
  participant V as _validate_required
  R->>S: defaults from decorator schema
  R->>DB: overrides by PluginRef key
  R->>R: merge overrides over defaults
  R->>D: decrypt sensitive fields
  D-->>R: plaintext values
  R->>V: required keys present
  alt missing required
    V-->>R: PluginValidationError
  else ok
    R-->>R: resolved_settings to bundle
  end

What: PluginBundleBuilderMixin in src/cadence/infra/plugins/plugin_bundle_builder.py.

_create_bundle (simplified order in source):

  1. metadata = contract.plugin_class.get_metadata()
  2. agent = contract.plugin_class.create_agent()
  3. resolved_settings = settings_resolver.resolve(...)
  4. agent.initialize(resolved_settings) when supported
  5. uv_tools = agent.get_tools()orchestrator_tools via adapter
  6. _create_plugin_model when llm_config_id (etc.) appears in resolved settings
  7. LangGraph: create_tool_node(uv_tools) when adapter.framework_type == "langgraph"
  8. Return SDKPluginBundle

_create_bundle_with_cache: If bundle_cache is set and contract.is_stateless, SharedBundleCache.get_or_create keys bundles by (plugin_pid, version, settings_hash, adapter_type) where settings_hash is the first 16 hex chars of SHA-256 of sorted JSON settings (src/cadence/engine/shared_resources/bundle_cache.py). Otherwise builds a fresh bundle every time.

Stateful plugins skip sharing so agent state does not leak across instances.

{
"active_plugins": ["org:com.acme.helpdesk@1.0.0"],
"plugin_settings": {
"org:com.acme.helpdesk@1.0.0": {
"settings": [
{"key": "api_key", "value": "enc:..."},
{"key": "max_results", "value": 25},
]
}
}
}

load_plugins walks active_plugins, resolves each PluginRef, merges settings for that ref, and attaches bundles to the SDKPluginManager used when the orchestrator graph calls into plugin tools.

ScenarioWhat happens
Plugin already in SDK registry_resolve_contract returns without filesystem I/O
Not in registry, local tree presentLoaded from disk, registered, ensure_local may still sync S3
Local empty, S3 onensure_local downloads zip and extracts
Local empty, S3 offResolution fails with PluginNotFoundError when no path works
S3 object missing but files localWarning logged; local copy used
Stateless + cache + same settingsSharedBundleCache may reuse bundle
Stateful pluginAlways fresh bundle path (no cross-instance sharing)
Missing required settingPluginValidationError during resolve
ConcernFile
Hot / demand / DB loadsrc/cadence/engine/pool/pool.py, src/cadence/engine/pool/demand_pool.py
Instance config mergesrc/cadence/engine/config_builder.py
Factory pipelinesrc/cadence/engine/factory.py
Plugin manager + bundlessrc/cadence/infra/plugins/plugin_manager.py, src/cadence/infra/plugins/plugin_bundle_builder.py
Filesystem + registry loadsrc/cadence/infra/plugins/plugin_loader.py
Settings merge + decryptsrc/cadence/infra/plugins/plugin_settings_resolver.py
Stateless bundle cachesrc/cadence/engine/shared_resources/bundle_cache.py
Local / S3 pathssrc/cadence/data/plugins/store.py