Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion clients/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
python_requires=">=3.8",
version="0.0.0.beta45",
packages=find_packages(),
package_data={"llmengine": ["py.typed"]},
package_data={"llmengine": ["py.typed"]}, # type: ignore[arg-type]
)
92 changes: 52 additions & 40 deletions model-engine/model_engine_server/core/celery/celery_autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,57 +80,69 @@ async def list_deployments(apps_api) -> Dict[Tuple[str, str], CeleryAutoscalerPa
from model_engine_server.common.config import hmi_config

endpoint_namespace = hmi_config.endpoint_namespace
# Also scan "default" so non-launch celery deployments (e.g. nucleus workers) are still
# autoscaled. Previously the autoscaler scanned every namespace; #770 scoped it to a single
# namespace for startup speed but inadvertently dropped these.
# Dedupe to avoid double-scanning if endpoint_namespace itself is "default".
namespaces_to_scan = list(dict.fromkeys([endpoint_namespace, "default"]))
celery_deployments_params = {}
namespace_start_time = time.time()
deployments = await apps_api.list_namespaced_deployment(namespace=endpoint_namespace)
logger.info(
f"list_namespaced_deployment in {endpoint_namespace} took {time.time() - namespace_start_time} seconds"
)
for deployment in deployments.items:
deployment_name = deployment.metadata.name
annotations = deployment.metadata.annotations

if not annotations:
for namespace_name in namespaces_to_scan:
namespace_start_time = time.time()
try:
deployments = await apps_api.list_namespaced_deployment(namespace=namespace_name)
except ApiException as exc:
# Don't let a failure in one namespace (e.g. missing RBAC) wipe out scaling for the
# other. Log and move on; the next iteration of the outer loop will retry.
logger.error(f"Failed to list deployments in namespace {namespace_name}: {exc}")
continue
Comment on lines +93 to 97

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Persistent ERROR spam when RBAC for "default" is not granted

If the autoscaler pod lacks RBAC permission to list deployments in the "default" namespace (which is a likely production config where only endpoint_namespace access was ever granted), every loop iteration (~3 s) will emit an ERROR-level log for the failed namespace. Over time this floods log pipelines and may mask real errors. Consider logging at WARNING level, or tracking the error and only re-logging after a backoff interval.

Prompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/core/celery/celery_autoscaler.py
Line: 93-97

Comment:
**Persistent ERROR spam when RBAC for "default" is not granted**

If the autoscaler pod lacks RBAC permission to list deployments in the `"default"` namespace (which is a likely production config where only `endpoint_namespace` access was ever granted), every loop iteration (~3 s) will emit an ERROR-level log for the failed namespace. Over time this floods log pipelines and may mask real errors. Consider logging at WARNING level, or tracking the error and only re-logging after a backoff interval.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code Fix in Codex

logger.info(
f"list_namespaced_deployment in {namespace_name} took {time.time() - namespace_start_time} seconds"
)
for deployment in deployments.items:
deployment_name = deployment.metadata.name
annotations = deployment.metadata.annotations

# Parse parameters
params = {}
if not annotations:
continue

if "celery.scaleml.autoscaler/broker" in annotations:
deployment_broker = annotations["celery.scaleml.autoscaler/broker"]
else:
deployment_broker = ELASTICACHE_REDIS_BROKER
# Parse parameters
params = {}

if deployment_broker != autoscaler_broker:
logger.debug(
f"Skipping deployment {deployment_name}; deployment's broker {deployment_broker} is not {autoscaler_broker}"
)
continue
if "celery.scaleml.autoscaler/broker" in annotations:
deployment_broker = annotations["celery.scaleml.autoscaler/broker"]
else:
deployment_broker = ELASTICACHE_REDIS_BROKER

for f in dataclasses.fields(CeleryAutoscalerParams):
k = f.name
v = annotations.get(f"celery.scaleml.autoscaler/{stringcase.camelcase(k)}")
if not v:
if deployment_broker != autoscaler_broker:
logger.debug(
f"Skipping deployment {deployment_name}; deployment's broker {deployment_broker} is not {autoscaler_broker}"
)
continue

try:
if k == "task_visibility":
v = TaskVisibility.from_name(v)
v = f.type(v)
except (ValueError, KeyError):
logger.exception(f"Unable to convert {f.name}: {v} to {f.type}")
for f in dataclasses.fields(CeleryAutoscalerParams):
k = f.name
v = annotations.get(f"celery.scaleml.autoscaler/{stringcase.camelcase(k)}")
if not v:
continue

params[k] = v
try:
if k == "task_visibility":
v = TaskVisibility.from_name(v)
v = f.type(v)
except (ValueError, KeyError):
logger.exception(f"Unable to convert {f.name}: {v} to {f.type}")

try:
celery_autoscaler_params = CeleryAutoscalerParams(**params)
except TypeError:
logger.debug(
f"Missing params, skipping deployment : {deployment_name} in {endpoint_namespace}"
)
continue
params[k] = v

try:
celery_autoscaler_params = CeleryAutoscalerParams(**params)
except TypeError:
logger.debug(
f"Missing params, skipping deployment : {deployment_name} in {namespace_name}"
)
continue

celery_deployments_params[(deployment_name, endpoint_namespace)] = celery_autoscaler_params
celery_deployments_params[(deployment_name, namespace_name)] = celery_autoscaler_params

return celery_deployments_params

Expand Down