Cloning a celery chain










0















I have an interesting issue attempting to clone a celery chain for use in a group, my intended use case is something like group([chain.clone(args=args) for args in it]) however it keeps complaining about not having enough arguments.



I have broken this down using the below



in a file named tasks.py



@app.task
def add(x,y):
return x+y


and then from the python shell



>>> from tasks import add
>>> chain=add.s()|add.s(1)
>>> chain
magic_carpet.celery.add() | add(1)
>>> chain.args
()
>>> chain.delay(2,2)
<AsyncResult: fcc97c30-4700-47a6-aeb6-ffca19a1446f>
>>> cloned_chain=chain.clone(args=(2,))
>>> cloned_chain.args
()
>>> cloned_chain.delay(2)
Traceback (most recent call last):
File "<console>", line 1, in <module>
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 179, in delay
return self.apply_async(partial_args, partial_kwargs)
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 557, in apply_async
dict(self.options, **options) if options else self.options))
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 584, in run
first_task.apply_async(**options)
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 218, in apply_async
return _apply(args, kwargs, **options)
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/app/task.py", line 513, in apply_async
check_arguments(*(args or ()), **(kwargs or ))
TypeError: add() missing 1 required positional argument: 'y'
>>>


obviously, clone isn't replacing the args in the cloned copy of the chain, but I'm uncertain why, the _chain class has the clone method implemented documented as



>>> from celery.canvas import _chain
>>> help(_chain.clone)

Help on function clone in module celery.canvas:

clone(self, *args, **kwargs)
Create a copy of this signature.

Arguments:
args (Tuple): Partial args to be prepended to the existing args.
kwargs (Dict): Partial kwargs to be merged with existing kwargs.
options (Dict): Partial options to be merged with
existing options.


Reading the celery source I see nothing obvious that would cause this.



Currently running Celery 4.2.1 and Python 3.6.6



Is this functionality broken somehow, unsupported, or am I being incredibly obtuse and doing something wrong?










share|improve this question




























    0















    I have an interesting issue attempting to clone a celery chain for use in a group, my intended use case is something like group([chain.clone(args=args) for args in it]) however it keeps complaining about not having enough arguments.



    I have broken this down using the below



    in a file named tasks.py



    @app.task
    def add(x,y):
    return x+y


    and then from the python shell



    >>> from tasks import add
    >>> chain=add.s()|add.s(1)
    >>> chain
    magic_carpet.celery.add() | add(1)
    >>> chain.args
    ()
    >>> chain.delay(2,2)
    <AsyncResult: fcc97c30-4700-47a6-aeb6-ffca19a1446f>
    >>> cloned_chain=chain.clone(args=(2,))
    >>> cloned_chain.args
    ()
    >>> cloned_chain.delay(2)
    Traceback (most recent call last):
    File "<console>", line 1, in <module>
    File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 179, in delay
    return self.apply_async(partial_args, partial_kwargs)
    File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 557, in apply_async
    dict(self.options, **options) if options else self.options))
    File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 584, in run
    first_task.apply_async(**options)
    File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 218, in apply_async
    return _apply(args, kwargs, **options)
    File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/app/task.py", line 513, in apply_async
    check_arguments(*(args or ()), **(kwargs or ))
    TypeError: add() missing 1 required positional argument: 'y'
    >>>


    obviously, clone isn't replacing the args in the cloned copy of the chain, but I'm uncertain why, the _chain class has the clone method implemented documented as



    >>> from celery.canvas import _chain
    >>> help(_chain.clone)

    Help on function clone in module celery.canvas:

    clone(self, *args, **kwargs)
    Create a copy of this signature.

    Arguments:
    args (Tuple): Partial args to be prepended to the existing args.
    kwargs (Dict): Partial kwargs to be merged with existing kwargs.
    options (Dict): Partial options to be merged with
    existing options.


    Reading the celery source I see nothing obvious that would cause this.



    Currently running Celery 4.2.1 and Python 3.6.6



    Is this functionality broken somehow, unsupported, or am I being incredibly obtuse and doing something wrong?










    share|improve this question


























      0












      0








      0








      I have an interesting issue attempting to clone a celery chain for use in a group, my intended use case is something like group([chain.clone(args=args) for args in it]) however it keeps complaining about not having enough arguments.



      I have broken this down using the below



      in a file named tasks.py



      @app.task
      def add(x,y):
      return x+y


      and then from the python shell



      >>> from tasks import add
      >>> chain=add.s()|add.s(1)
      >>> chain
      magic_carpet.celery.add() | add(1)
      >>> chain.args
      ()
      >>> chain.delay(2,2)
      <AsyncResult: fcc97c30-4700-47a6-aeb6-ffca19a1446f>
      >>> cloned_chain=chain.clone(args=(2,))
      >>> cloned_chain.args
      ()
      >>> cloned_chain.delay(2)
      Traceback (most recent call last):
      File "<console>", line 1, in <module>
      File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 179, in delay
      return self.apply_async(partial_args, partial_kwargs)
      File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 557, in apply_async
      dict(self.options, **options) if options else self.options))
      File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 584, in run
      first_task.apply_async(**options)
      File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 218, in apply_async
      return _apply(args, kwargs, **options)
      File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/app/task.py", line 513, in apply_async
      check_arguments(*(args or ()), **(kwargs or ))
      TypeError: add() missing 1 required positional argument: 'y'
      >>>


      obviously, clone isn't replacing the args in the cloned copy of the chain, but I'm uncertain why, the _chain class has the clone method implemented documented as



      >>> from celery.canvas import _chain
      >>> help(_chain.clone)

      Help on function clone in module celery.canvas:

      clone(self, *args, **kwargs)
      Create a copy of this signature.

      Arguments:
      args (Tuple): Partial args to be prepended to the existing args.
      kwargs (Dict): Partial kwargs to be merged with existing kwargs.
      options (Dict): Partial options to be merged with
      existing options.


      Reading the celery source I see nothing obvious that would cause this.



      Currently running Celery 4.2.1 and Python 3.6.6



      Is this functionality broken somehow, unsupported, or am I being incredibly obtuse and doing something wrong?










      share|improve this question
















      I have an interesting issue attempting to clone a celery chain for use in a group, my intended use case is something like group([chain.clone(args=args) for args in it]) however it keeps complaining about not having enough arguments.



      I have broken this down using the below



      in a file named tasks.py



      @app.task
      def add(x,y):
      return x+y


      and then from the python shell



      >>> from tasks import add
      >>> chain=add.s()|add.s(1)
      >>> chain
      magic_carpet.celery.add() | add(1)
      >>> chain.args
      ()
      >>> chain.delay(2,2)
      <AsyncResult: fcc97c30-4700-47a6-aeb6-ffca19a1446f>
      >>> cloned_chain=chain.clone(args=(2,))
      >>> cloned_chain.args
      ()
      >>> cloned_chain.delay(2)
      Traceback (most recent call last):
      File "<console>", line 1, in <module>
      File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 179, in delay
      return self.apply_async(partial_args, partial_kwargs)
      File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 557, in apply_async
      dict(self.options, **options) if options else self.options))
      File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 584, in run
      first_task.apply_async(**options)
      File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 218, in apply_async
      return _apply(args, kwargs, **options)
      File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/app/task.py", line 513, in apply_async
      check_arguments(*(args or ()), **(kwargs or ))
      TypeError: add() missing 1 required positional argument: 'y'
      >>>


      obviously, clone isn't replacing the args in the cloned copy of the chain, but I'm uncertain why, the _chain class has the clone method implemented documented as



      >>> from celery.canvas import _chain
      >>> help(_chain.clone)

      Help on function clone in module celery.canvas:

      clone(self, *args, **kwargs)
      Create a copy of this signature.

      Arguments:
      args (Tuple): Partial args to be prepended to the existing args.
      kwargs (Dict): Partial kwargs to be merged with existing kwargs.
      options (Dict): Partial options to be merged with
      existing options.


      Reading the celery source I see nothing obvious that would cause this.



      Currently running Celery 4.2.1 and Python 3.6.6



      Is this functionality broken somehow, unsupported, or am I being incredibly obtuse and doing something wrong?







      celery python-3.6 celery-canvas






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 23 '18 at 7:52







      Bjorn Harpe

















      asked Nov 14 '18 at 6:42









      Bjorn HarpeBjorn Harpe

      9519




      9519






















          1 Answer
          1






          active

          oldest

          votes


















          0














          So it turns out that the core issue is that via a roundabout course, clone calls the constructor to chain to create the new instance. this constructor does not accept any args or kwargs to apply to the chain and instead defaults them to empty values causing them to get lost.



          My solution at this point is to work around the issue by creating my own clone method that modifies the arguments to the first task in the chain. while setting the args attribute appears to also work, if the chain gets cloned again within celery the values stored within it are lost.



          my clone method currently supports cloning tasks and chains, though adding support for groups would be a trivial extension



          def clone_signature(sig, args=(), kwargs=(), **opts):
          if sig.subtask_type and sig.subtask_type != "chain":
          raise NotImplementedError(
          "Cloning only supported for Tasks and chains, not %s" % sig.subtask_type
          )
          clone = sig.clone()
          if hasattr(clone, "tasks"):
          t = clone.tasks[0]
          else:
          t = clone
          args, kwargs, opts = t._merge(args=args, kwargs=kwargs, options=opts)
          t.update(args=args, kwargs=kwargs, options=deepcopy(opts))
          return clone





          share|improve this answer






















            Your Answer






            StackExchange.ifUsing("editor", function ()
            StackExchange.using("externalEditor", function ()
            StackExchange.using("snippets", function ()
            StackExchange.snippets.init();
            );
            );
            , "code-snippets");

            StackExchange.ready(function()
            var channelOptions =
            tags: "".split(" "),
            id: "1"
            ;
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function()
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled)
            StackExchange.using("snippets", function()
            createEditor();
            );

            else
            createEditor();

            );

            function createEditor()
            StackExchange.prepareEditor(
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader:
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            ,
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            );



            );













            draft saved

            draft discarded


















            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53294450%2fcloning-a-celery-chain%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            0














            So it turns out that the core issue is that via a roundabout course, clone calls the constructor to chain to create the new instance. this constructor does not accept any args or kwargs to apply to the chain and instead defaults them to empty values causing them to get lost.



            My solution at this point is to work around the issue by creating my own clone method that modifies the arguments to the first task in the chain. while setting the args attribute appears to also work, if the chain gets cloned again within celery the values stored within it are lost.



            my clone method currently supports cloning tasks and chains, though adding support for groups would be a trivial extension



            def clone_signature(sig, args=(), kwargs=(), **opts):
            if sig.subtask_type and sig.subtask_type != "chain":
            raise NotImplementedError(
            "Cloning only supported for Tasks and chains, not %s" % sig.subtask_type
            )
            clone = sig.clone()
            if hasattr(clone, "tasks"):
            t = clone.tasks[0]
            else:
            t = clone
            args, kwargs, opts = t._merge(args=args, kwargs=kwargs, options=opts)
            t.update(args=args, kwargs=kwargs, options=deepcopy(opts))
            return clone





            share|improve this answer



























              0














              So it turns out that the core issue is that via a roundabout course, clone calls the constructor to chain to create the new instance. this constructor does not accept any args or kwargs to apply to the chain and instead defaults them to empty values causing them to get lost.



              My solution at this point is to work around the issue by creating my own clone method that modifies the arguments to the first task in the chain. while setting the args attribute appears to also work, if the chain gets cloned again within celery the values stored within it are lost.



              my clone method currently supports cloning tasks and chains, though adding support for groups would be a trivial extension



              def clone_signature(sig, args=(), kwargs=(), **opts):
              if sig.subtask_type and sig.subtask_type != "chain":
              raise NotImplementedError(
              "Cloning only supported for Tasks and chains, not %s" % sig.subtask_type
              )
              clone = sig.clone()
              if hasattr(clone, "tasks"):
              t = clone.tasks[0]
              else:
              t = clone
              args, kwargs, opts = t._merge(args=args, kwargs=kwargs, options=opts)
              t.update(args=args, kwargs=kwargs, options=deepcopy(opts))
              return clone





              share|improve this answer

























                0












                0








                0







                So it turns out that the core issue is that via a roundabout course, clone calls the constructor to chain to create the new instance. this constructor does not accept any args or kwargs to apply to the chain and instead defaults them to empty values causing them to get lost.



                My solution at this point is to work around the issue by creating my own clone method that modifies the arguments to the first task in the chain. while setting the args attribute appears to also work, if the chain gets cloned again within celery the values stored within it are lost.



                my clone method currently supports cloning tasks and chains, though adding support for groups would be a trivial extension



                def clone_signature(sig, args=(), kwargs=(), **opts):
                if sig.subtask_type and sig.subtask_type != "chain":
                raise NotImplementedError(
                "Cloning only supported for Tasks and chains, not %s" % sig.subtask_type
                )
                clone = sig.clone()
                if hasattr(clone, "tasks"):
                t = clone.tasks[0]
                else:
                t = clone
                args, kwargs, opts = t._merge(args=args, kwargs=kwargs, options=opts)
                t.update(args=args, kwargs=kwargs, options=deepcopy(opts))
                return clone





                share|improve this answer













                So it turns out that the core issue is that via a roundabout course, clone calls the constructor to chain to create the new instance. this constructor does not accept any args or kwargs to apply to the chain and instead defaults them to empty values causing them to get lost.



                My solution at this point is to work around the issue by creating my own clone method that modifies the arguments to the first task in the chain. while setting the args attribute appears to also work, if the chain gets cloned again within celery the values stored within it are lost.



                my clone method currently supports cloning tasks and chains, though adding support for groups would be a trivial extension



                def clone_signature(sig, args=(), kwargs=(), **opts):
                if sig.subtask_type and sig.subtask_type != "chain":
                raise NotImplementedError(
                "Cloning only supported for Tasks and chains, not %s" % sig.subtask_type
                )
                clone = sig.clone()
                if hasattr(clone, "tasks"):
                t = clone.tasks[0]
                else:
                t = clone
                args, kwargs, opts = t._merge(args=args, kwargs=kwargs, options=opts)
                t.update(args=args, kwargs=kwargs, options=deepcopy(opts))
                return clone






                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Nov 23 '18 at 7:30









                Bjorn HarpeBjorn Harpe

                9519




                9519





























                    draft saved

                    draft discarded
















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid


                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.

                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function ()
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53294450%2fcloning-a-celery-chain%23new-answer', 'question_page');

                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    這個網誌中的熱門文章

                    Barbados

                    How to read a connectionString WITH PROVIDER in .NET Core?

                    Node.js Script on GitHub Pages or Amazon S3