Cloning a celery chain
I have an interesting issue attempting to clone a celery chain for use in a group, my intended use case is something like group([chain.clone(args=args) for args in it])
however it keeps complaining about not having enough arguments.
I have broken this down using the below
in a file named tasks.py
@app.task
def add(x,y):
return x+y
and then from the python shell
>>> from tasks import add
>>> chain=add.s()|add.s(1)
>>> chain
magic_carpet.celery.add() | add(1)
>>> chain.args
()
>>> chain.delay(2,2)
<AsyncResult: fcc97c30-4700-47a6-aeb6-ffca19a1446f>
>>> cloned_chain=chain.clone(args=(2,))
>>> cloned_chain.args
()
>>> cloned_chain.delay(2)
Traceback (most recent call last):
File "<console>", line 1, in <module>
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 179, in delay
return self.apply_async(partial_args, partial_kwargs)
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 557, in apply_async
dict(self.options, **options) if options else self.options))
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 584, in run
first_task.apply_async(**options)
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 218, in apply_async
return _apply(args, kwargs, **options)
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/app/task.py", line 513, in apply_async
check_arguments(*(args or ()), **(kwargs or ))
TypeError: add() missing 1 required positional argument: 'y'
>>>
obviously, clone
isn't replacing the args in the cloned copy of the chain, but I'm uncertain why, the _chain
class has the clone method implemented documented as
>>> from celery.canvas import _chain
>>> help(_chain.clone)
Help on function clone in module celery.canvas:
clone(self, *args, **kwargs)
Create a copy of this signature.
Arguments:
args (Tuple): Partial args to be prepended to the existing args.
kwargs (Dict): Partial kwargs to be merged with existing kwargs.
options (Dict): Partial options to be merged with
existing options.
Reading the celery source I see nothing obvious that would cause this.
Currently running Celery 4.2.1 and Python 3.6.6
Is this functionality broken somehow, unsupported, or am I being incredibly obtuse and doing something wrong?
celery python-3.6 celery-canvas
add a comment |
I have an interesting issue attempting to clone a celery chain for use in a group, my intended use case is something like group([chain.clone(args=args) for args in it])
however it keeps complaining about not having enough arguments.
I have broken this down using the below
in a file named tasks.py
@app.task
def add(x,y):
return x+y
and then from the python shell
>>> from tasks import add
>>> chain=add.s()|add.s(1)
>>> chain
magic_carpet.celery.add() | add(1)
>>> chain.args
()
>>> chain.delay(2,2)
<AsyncResult: fcc97c30-4700-47a6-aeb6-ffca19a1446f>
>>> cloned_chain=chain.clone(args=(2,))
>>> cloned_chain.args
()
>>> cloned_chain.delay(2)
Traceback (most recent call last):
File "<console>", line 1, in <module>
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 179, in delay
return self.apply_async(partial_args, partial_kwargs)
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 557, in apply_async
dict(self.options, **options) if options else self.options))
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 584, in run
first_task.apply_async(**options)
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 218, in apply_async
return _apply(args, kwargs, **options)
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/app/task.py", line 513, in apply_async
check_arguments(*(args or ()), **(kwargs or ))
TypeError: add() missing 1 required positional argument: 'y'
>>>
obviously, clone
isn't replacing the args in the cloned copy of the chain, but I'm uncertain why, the _chain
class has the clone method implemented documented as
>>> from celery.canvas import _chain
>>> help(_chain.clone)
Help on function clone in module celery.canvas:
clone(self, *args, **kwargs)
Create a copy of this signature.
Arguments:
args (Tuple): Partial args to be prepended to the existing args.
kwargs (Dict): Partial kwargs to be merged with existing kwargs.
options (Dict): Partial options to be merged with
existing options.
Reading the celery source I see nothing obvious that would cause this.
Currently running Celery 4.2.1 and Python 3.6.6
Is this functionality broken somehow, unsupported, or am I being incredibly obtuse and doing something wrong?
celery python-3.6 celery-canvas
add a comment |
I have an interesting issue attempting to clone a celery chain for use in a group, my intended use case is something like group([chain.clone(args=args) for args in it])
however it keeps complaining about not having enough arguments.
I have broken this down using the below
in a file named tasks.py
@app.task
def add(x,y):
return x+y
and then from the python shell
>>> from tasks import add
>>> chain=add.s()|add.s(1)
>>> chain
magic_carpet.celery.add() | add(1)
>>> chain.args
()
>>> chain.delay(2,2)
<AsyncResult: fcc97c30-4700-47a6-aeb6-ffca19a1446f>
>>> cloned_chain=chain.clone(args=(2,))
>>> cloned_chain.args
()
>>> cloned_chain.delay(2)
Traceback (most recent call last):
File "<console>", line 1, in <module>
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 179, in delay
return self.apply_async(partial_args, partial_kwargs)
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 557, in apply_async
dict(self.options, **options) if options else self.options))
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 584, in run
first_task.apply_async(**options)
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 218, in apply_async
return _apply(args, kwargs, **options)
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/app/task.py", line 513, in apply_async
check_arguments(*(args or ()), **(kwargs or ))
TypeError: add() missing 1 required positional argument: 'y'
>>>
obviously, clone
isn't replacing the args in the cloned copy of the chain, but I'm uncertain why, the _chain
class has the clone method implemented documented as
>>> from celery.canvas import _chain
>>> help(_chain.clone)
Help on function clone in module celery.canvas:
clone(self, *args, **kwargs)
Create a copy of this signature.
Arguments:
args (Tuple): Partial args to be prepended to the existing args.
kwargs (Dict): Partial kwargs to be merged with existing kwargs.
options (Dict): Partial options to be merged with
existing options.
Reading the celery source I see nothing obvious that would cause this.
Currently running Celery 4.2.1 and Python 3.6.6
Is this functionality broken somehow, unsupported, or am I being incredibly obtuse and doing something wrong?
celery python-3.6 celery-canvas
I have an interesting issue attempting to clone a celery chain for use in a group, my intended use case is something like group([chain.clone(args=args) for args in it])
however it keeps complaining about not having enough arguments.
I have broken this down using the below
in a file named tasks.py
@app.task
def add(x,y):
return x+y
and then from the python shell
>>> from tasks import add
>>> chain=add.s()|add.s(1)
>>> chain
magic_carpet.celery.add() | add(1)
>>> chain.args
()
>>> chain.delay(2,2)
<AsyncResult: fcc97c30-4700-47a6-aeb6-ffca19a1446f>
>>> cloned_chain=chain.clone(args=(2,))
>>> cloned_chain.args
()
>>> cloned_chain.delay(2)
Traceback (most recent call last):
File "<console>", line 1, in <module>
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 179, in delay
return self.apply_async(partial_args, partial_kwargs)
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 557, in apply_async
dict(self.options, **options) if options else self.options))
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 584, in run
first_task.apply_async(**options)
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 218, in apply_async
return _apply(args, kwargs, **options)
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/app/task.py", line 513, in apply_async
check_arguments(*(args or ()), **(kwargs or ))
TypeError: add() missing 1 required positional argument: 'y'
>>>
obviously, clone
isn't replacing the args in the cloned copy of the chain, but I'm uncertain why, the _chain
class has the clone method implemented documented as
>>> from celery.canvas import _chain
>>> help(_chain.clone)
Help on function clone in module celery.canvas:
clone(self, *args, **kwargs)
Create a copy of this signature.
Arguments:
args (Tuple): Partial args to be prepended to the existing args.
kwargs (Dict): Partial kwargs to be merged with existing kwargs.
options (Dict): Partial options to be merged with
existing options.
Reading the celery source I see nothing obvious that would cause this.
Currently running Celery 4.2.1 and Python 3.6.6
Is this functionality broken somehow, unsupported, or am I being incredibly obtuse and doing something wrong?
celery python-3.6 celery-canvas
celery python-3.6 celery-canvas
edited Nov 23 '18 at 7:52
Bjorn Harpe
asked Nov 14 '18 at 6:42
Bjorn HarpeBjorn Harpe
9519
9519
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
So it turns out that the core issue is that via a roundabout course, clone calls the constructor to chain to create the new instance. this constructor does not accept any args or kwargs to apply to the chain and instead defaults them to empty values causing them to get lost.
My solution at this point is to work around the issue by creating my own clone method that modifies the arguments to the first task in the chain. while setting the args attribute appears to also work, if the chain gets cloned again within celery the values stored within it are lost.
my clone method currently supports cloning tasks
and chains
, though adding support for groups
would be a trivial extension
def clone_signature(sig, args=(), kwargs=(), **opts):
if sig.subtask_type and sig.subtask_type != "chain":
raise NotImplementedError(
"Cloning only supported for Tasks and chains, not %s" % sig.subtask_type
)
clone = sig.clone()
if hasattr(clone, "tasks"):
t = clone.tasks[0]
else:
t = clone
args, kwargs, opts = t._merge(args=args, kwargs=kwargs, options=opts)
t.update(args=args, kwargs=kwargs, options=deepcopy(opts))
return clone
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53294450%2fcloning-a-celery-chain%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
So it turns out that the core issue is that via a roundabout course, clone calls the constructor to chain to create the new instance. this constructor does not accept any args or kwargs to apply to the chain and instead defaults them to empty values causing them to get lost.
My solution at this point is to work around the issue by creating my own clone method that modifies the arguments to the first task in the chain. while setting the args attribute appears to also work, if the chain gets cloned again within celery the values stored within it are lost.
my clone method currently supports cloning tasks
and chains
, though adding support for groups
would be a trivial extension
def clone_signature(sig, args=(), kwargs=(), **opts):
if sig.subtask_type and sig.subtask_type != "chain":
raise NotImplementedError(
"Cloning only supported for Tasks and chains, not %s" % sig.subtask_type
)
clone = sig.clone()
if hasattr(clone, "tasks"):
t = clone.tasks[0]
else:
t = clone
args, kwargs, opts = t._merge(args=args, kwargs=kwargs, options=opts)
t.update(args=args, kwargs=kwargs, options=deepcopy(opts))
return clone
add a comment |
So it turns out that the core issue is that via a roundabout course, clone calls the constructor to chain to create the new instance. this constructor does not accept any args or kwargs to apply to the chain and instead defaults them to empty values causing them to get lost.
My solution at this point is to work around the issue by creating my own clone method that modifies the arguments to the first task in the chain. while setting the args attribute appears to also work, if the chain gets cloned again within celery the values stored within it are lost.
my clone method currently supports cloning tasks
and chains
, though adding support for groups
would be a trivial extension
def clone_signature(sig, args=(), kwargs=(), **opts):
if sig.subtask_type and sig.subtask_type != "chain":
raise NotImplementedError(
"Cloning only supported for Tasks and chains, not %s" % sig.subtask_type
)
clone = sig.clone()
if hasattr(clone, "tasks"):
t = clone.tasks[0]
else:
t = clone
args, kwargs, opts = t._merge(args=args, kwargs=kwargs, options=opts)
t.update(args=args, kwargs=kwargs, options=deepcopy(opts))
return clone
add a comment |
So it turns out that the core issue is that via a roundabout course, clone calls the constructor to chain to create the new instance. this constructor does not accept any args or kwargs to apply to the chain and instead defaults them to empty values causing them to get lost.
My solution at this point is to work around the issue by creating my own clone method that modifies the arguments to the first task in the chain. while setting the args attribute appears to also work, if the chain gets cloned again within celery the values stored within it are lost.
my clone method currently supports cloning tasks
and chains
, though adding support for groups
would be a trivial extension
def clone_signature(sig, args=(), kwargs=(), **opts):
if sig.subtask_type and sig.subtask_type != "chain":
raise NotImplementedError(
"Cloning only supported for Tasks and chains, not %s" % sig.subtask_type
)
clone = sig.clone()
if hasattr(clone, "tasks"):
t = clone.tasks[0]
else:
t = clone
args, kwargs, opts = t._merge(args=args, kwargs=kwargs, options=opts)
t.update(args=args, kwargs=kwargs, options=deepcopy(opts))
return clone
So it turns out that the core issue is that via a roundabout course, clone calls the constructor to chain to create the new instance. this constructor does not accept any args or kwargs to apply to the chain and instead defaults them to empty values causing them to get lost.
My solution at this point is to work around the issue by creating my own clone method that modifies the arguments to the first task in the chain. while setting the args attribute appears to also work, if the chain gets cloned again within celery the values stored within it are lost.
my clone method currently supports cloning tasks
and chains
, though adding support for groups
would be a trivial extension
def clone_signature(sig, args=(), kwargs=(), **opts):
if sig.subtask_type and sig.subtask_type != "chain":
raise NotImplementedError(
"Cloning only supported for Tasks and chains, not %s" % sig.subtask_type
)
clone = sig.clone()
if hasattr(clone, "tasks"):
t = clone.tasks[0]
else:
t = clone
args, kwargs, opts = t._merge(args=args, kwargs=kwargs, options=opts)
t.update(args=args, kwargs=kwargs, options=deepcopy(opts))
return clone
answered Nov 23 '18 at 7:30
Bjorn HarpeBjorn Harpe
9519
9519
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53294450%2fcloning-a-celery-chain%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown