feat: support stage config overrides via with_#2086
Conversation
Signed-off-by: Praateek <praateekm@gmail.com>
with_
…e-spec-overrides Signed-off-by: Praateek <praateekm@gmail.com>
|
@claude review |
Greptile SummaryThis PR unifies backend worker sizing under a single
Confidence Score: 4/5Safe to merge after fixing the tutorial notebook that now uses the rejected The new tutorials/synthetic/nemo_data_designer/ndd_data_generation_example.ipynb needs to be updated to Important Files Changed
Sequence Diagram%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant User
participant Stage as ProcessingStage
participant With as with_()
participant Ray as RayDataStageAdapter
participant Xenna as XennaExecutor
User->>Stage: "stage.with_(num_workers=4, ray_stage_spec={...})"
Stage->>With: deepcopy(self)
With->>With: merge ray_stage_spec (user keys win)
With->>With: "set num_workers = _num_workers_method(4)"
With-->>User: new_instance
User->>Ray: process_dataset(dataset)
Ray->>Stage: stage.num_workers() → 4
Ray->>Ray: "TaskPoolStrategy(size=4) for task stage"
Ray->>Ray: ActorPoolStrategy for actor stage
User->>Xenna: execute([stage])
Xenna->>Stage: stage.xenna_stage_spec()
Xenna->>Xenna: reject if num_workers in spec
Xenna->>Stage: stage.num_workers() → 4
Xenna->>Xenna: "StageSpec(num_workers=4, num_workers_per_node=None)"
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant User
participant Stage as ProcessingStage
participant With as with_()
participant Ray as RayDataStageAdapter
participant Xenna as XennaExecutor
User->>Stage: "stage.with_(num_workers=4, ray_stage_spec={...})"
Stage->>With: deepcopy(self)
With->>With: merge ray_stage_spec (user keys win)
With->>With: "set num_workers = _num_workers_method(4)"
With-->>User: new_instance
User->>Ray: process_dataset(dataset)
Ray->>Stage: stage.num_workers() → 4
Ray->>Ray: "TaskPoolStrategy(size=4) for task stage"
Ray->>Ray: ActorPoolStrategy for actor stage
User->>Xenna: execute([stage])
Xenna->>Stage: stage.xenna_stage_spec()
Xenna->>Xenna: reject if num_workers in spec
Xenna->>Stage: stage.num_workers() → 4
Xenna->>Xenna: "StageSpec(num_workers=4, num_workers_per_node=None)"
Reviews (6): Last reviewed commit: "Merge remote-tracking branch 'upstream/m..." | Re-trigger Greptile |
Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
with_…errides Signed-off-by: Praateek <praateekm@gmail.com>
|
/ok to test 4f5d147 |
|
just out of curiosity, what happened before when num_workers() == 1 didnt apply to ray data for single-input/fanout stages? |
| def xenna_stage_spec(self) -> dict[str, Any]: | ||
| return {"num_workers_per_node": 1} | ||
| def ray_stage_spec(self) -> dict[str, object]: | ||
| return {RayStageSpecKeys.IS_FANOUT_STAGE: True} |
There was a problem hiding this comment.
qq: do we set IS_FANOUT_STAGE manually by ourselves now in the pipeline? is there any logic we should follow to set IS_FANOUT_STAGE? did we missed setting it to true for pdf partitioning before this PR?
There was a problem hiding this comment.
Yes we missed it before this PR. I know @sarahyurick is working with an external contributor to make it dynamic but i haven't taken a close look at it
There was a problem hiding this comment.
does setting IS_FANOUT_STAGE make that stage much faster? is there any benchmark result to share if possible?
|
@weijiac0619 ray-data anyway starts leniently from 1 "concurrent" worker so we never had the problem in case of single-input stages for ray data, only for xenna we had the issue |
…errides Signed-off-by: Praateek <praateekm@gmail.com> # Conflicts: # nemo_curator/stages/interleaved/pdf/nemotron_parse/partitioning.py # tests/stages/interleaved/pdf/nemotron_parse/test_stages.py
|
/ok to test c06bcda |
with_overrides forray_stage_spec,xenna_stage_spec, andnum_workers. Stage spec overrides are shallow-merged, with user-provided keys winning; a typed unset sentinel preserves the difference between omitted and explicitNone.num_workersto Ray Data task stages viaTaskPoolStrategy; actor stages continue to useActorPoolStrategy.stage.num_workers()for cluster-wide worker counts, rejectsxenna_stage_spec["num_workers"], and raises if a stage sets bothnum_workers()and Xennanum_workers_per_node.num_workersas theProcessingStagebackend worker hook and renames the ASR dataloader setting todataloader_num_workersso stage-local worker counts do not shadownum_workers().num_workers() == 1for file partitioning, URL generation, PDF partitioning, cluster-wise pairwise partitioning, audio manifest reader/writer, audio segment extraction, and ALM long-form manifest reading. PDF partitioning now also advertises itself as a Ray fanout stage.deduplication/removalstage behavior was removed; the deduplication change is limited to semantic pairwise partitioning using genericnum_workers()instead of Xenna-onlynum_workers_per_node.xenna_num_workersconstructor option as a compatibility alias, but routes it throughnum_workers()instead ofxenna_stage_spec.with_backend overrides, reservednum_workersenforcement, Xenna worker fallback/rejection, Ray Data task worker sizing, and updated singleton-stage defaults.