Wait For Events During Sequence

I wrote some simple classes (WaitFor and CutScene) that allow you to wait for a event/input between sequences/parallels. To use create a CutScene object and use the add function to add any intervals you want. Anytime you want to pause/wait for input add a WaitFor object.

Code is under CC0 (public domain)
Just the classes

from direct.showbase.DirectObject import DirectObject

class WaitFor(DirectObject):
    def __init__(self, event, timeout=None):
        super().__init__()
        self._is_playing = False
        self.time = timeout
        self.accepting = False
        if type(event) not in [list, tuple]:
            self.accept(event, self.fire)
        else:
            for ev in event:
                self.accept(ev, self.fire)
        if timeout:
            self.add_task(self.timeout)

    def start(self):
        self._is_playing = True
        self.accepting = True

    def stop(self):
        self._is_playing = False
        self.accepting = False

    async def timeout(self, task):
        await task.pause(self.time)
        self._is_playing = False

    def fire(self):
        if self.accepting:
            self._is_playing = False
            self.accepting = False

    def is_playing(self):
        return self._is_playing


class CutScene(DirectObject):
    def __init__(self, play=False):
        super().__init__()

        self.times = None
        self.is_playing = play
        self.intervals = []
        self.current = None
        self.index = 0
        self.wait = False
        self.task = None
        if play:
            self.task = self.add_task(self.update)

    def play(self, times=1, scene_index=0):
        if not self.is_playing:
            if not self.task:
                self.add_task(self.update)
            self.is_playing = True
            self.times = times
            self.current = self.intervals[scene_index]
            self.current.start()

    def loop(self):
        self.play(-1)

    def pause(self):
        self.is_playing = False
        if self.current:
            self.current.pause()

    def add(self, interval):
        self.intervals.append(interval)

    def update(self, task):
        if self.is_playing:
            if not self.current.is_playing():
                if self.wait:
                    self.current.stop()
                self.index += 1

                if self.index == len(self.intervals):
                    if self.times <= 1:
                        self.times -= 1
                        if self.times == 0:
                            return task.done
                        else:
                            self.index = 0

                self.current = self.intervals[self.index]
                if type(self.current) is not WaitFor:
                    self.wait = False
                else:
                    self.wait = True
                self.current.start()

        return task.cont

    def clean_up(self):
        self.remove_all_tasks()
        self.ignore_all()
        for i in self.intervals:
            if type(i) is WaitFor:
                i.clean_up()
            else:
                i.finish()
        self.intervals = None

If you run the code you should see the panda in place until you press the q key.

Example

from direct.showbase.DirectObject import DirectObject

class WaitFor(DirectObject):
    def __init__(self, event, timeout=None):
        super().__init__()
        self._is_playing = False
        self.time = timeout
        self.accepting = False
        if type(event) not in [list, tuple]:
            self.accept(event, self.fire)
        else:
            for ev in event:
                self.accept(ev, self.fire)
        if timeout:
            self.add_task(self.timeout)

    def start(self):
        self._is_playing = True
        self.accepting = True

    def stop(self):
        self._is_playing = False
        self.accepting = False

    async def timeout(self, task):
        await task.pause(self.time)
        self._is_playing = False

    def fire(self):
        if self.accepting:
            self._is_playing = False
            self.accepting = False

    def is_playing(self):
        return self._is_playing

    def clean_up(self):
        self.remove_all_tasks()
        self.ignore_all()
        


class CutScene(DirectObject):
    def __init__(self, play=False):
        super().__init__()

        self.times = None
        self.is_playing = play
        self.intervals = []
        self.current = None
        self.index = 0
        self.wait = False
        if play:
            self.add_task(self.update)

    def play(self, times=1, scene_index=0):
        if not self.is_playing:
            self.add_task(self.update)
            self.is_playing = True
            self.times = times
            self.current = self.intervals[scene_index]
            self.current.start()

    def loop(self):
        self.play(-1)

    def pause(self):
        self.is_playing = False
        if self.current:
            self.current.pause()

    def add(self, interval):
        self.intervals.append(interval)

    def update(self, task):
        if self.is_playing:
            if not self.current.is_playing():
                if self.wait:
                    self.current.stop()
                self.index += 1

                if self.index == len(self.intervals):
                    if self.times <= 1:
                        self.times -= 1
                        if self.times == 0:
                            return task.done
                        else:
                            self.index = 0

                self.current = self.intervals[self.index]
                if type(self.current) is not WaitFor:
                    self.wait = False
                else:
                    self.wait = True
                self.current.start()

        return task.cont

    def clean_up(self):
        self.remove_all_tasks()
        self.ignore_all()
        for i in self.intervals:
            if type(i) is WaitFor:
                i.clean_up()
            else:
                i.finish()
        self.intervals = None


if __name__ == '__main__':
    from math import pi, sin, cos
    from direct.showbase.ShowBase import ShowBase
    from direct.task import Task
    from direct.actor.Actor import Actor
    from direct.interval.IntervalGlobal import Sequence
    from panda3d.core import Point3


    class MyApp(ShowBase):

        def __init__(self):
            ShowBase.__init__(self)

            # Disable the camera trackball controls.

            self.disableMouse()

            # Load the environment model.

            self.scene = self.loader.loadModel("models/environment")

            # Reparent the model to render.

            self.scene.reparentTo(self.render)

            # Apply scale and position transforms on the model.

            self.scene.setScale(0.25, 0.25, 0.25)

            self.scene.setPos(-8, 42, 0)

            # Add the spinCameraTask procedure to the task manager.

            self.taskMgr.add(self.spinCameraTask, "SpinCameraTask")

            # Load and transform the panda actor.

            self.pandaActor = Actor("models/panda-model",

                                    {"walk": "models/panda-walk4"})

            self.pandaActor.setScale(0.005, 0.005, 0.005)

            self.pandaActor.reparentTo(self.render)

            # Loop its animation.

            self.pandaActor.loop("walk")

            # Create the four lerp intervals needed for the panda to

            # walk back and forth.
            self.pandaActor.set_pos(Point3(0, 10, 0))

            posInterval1 = self.pandaActor.posInterval(13,

                                                       Point3(0, -10, 0),

                                                       startPos=Point3(0, 10, 0))

            posInterval2 = self.pandaActor.posInterval(13,

                                                       Point3(0, 10, 0),

                                                       startPos=Point3(0, -10, 0))

            hprInterval1 = self.pandaActor.hprInterval(3,

                                                       Point3(180, 0, 0),

                                                       startHpr=Point3(0, 0, 0))

            hprInterval2 = self.pandaActor.hprInterval(3,

                                                       Point3(0, 0, 0),

                                                       startHpr=Point3(180, 0, 0))

            # Create and play the sequence that coordinates the intervals.

            self.pandaPace = Sequence(posInterval1, hprInterval1,

                                      posInterval2, hprInterval2,

                                      name="pandaPace")

            

            self.cutscene = CutScene()
            self.cutscene.add(WaitFor("q"))
            self.cutscene.add(self.pandaPace)
            self.cutscene.add(WaitFor("q"))
            self.cutscene.loop()

        # Define a procedure to move the camera.

        def spinCameraTask(self, task):
            angleDegrees = task.time * 6.0

            angleRadians = angleDegrees * (pi / 180.0)

            self.camera.setPos(20 * sin(angleRadians), -20 * cos(angleRadians), 3)

            self.camera.setHpr(angleDegrees, 0, 0)

            return Task.cont


    app = MyApp()

    app.run()

Here is the part of the code showing how to use a coroutine to implement the same thing:

            # At the end of the __init__
            self.cutscene = self.taskMgr.add(self.run_cutscene)

        async def run_cutscene(self, task):
            await messenger.future("q")

            await self.pandaActor.posInterval(13, Point3(0, -10, 0), startPos=Point3(0, 10, 0))
            await self.pandaActor.hprInterval(3, Point3(180, 0, 0), startHpr=Point3(0, 0, 0))
            await self.pandaActor.posInterval(13, Point3(0, 10, 0), startPos=Point3(0, -10, 0))
            await self.pandaActor.hprInterval(3, Point3(0, 0, 0), startHpr=Point3(180, 0, 0))

            return task.again

Where my CutScene class doesn’t actually touch the inside of Interval, this shouldn’t cause any issues right? Because all I’m doing is creating a list of intervals and playing them one at a time.

This is how I determine if one should be played next or not, where self.current is just a interval or a WaitFor object.

    def update(self, task):
        if self.is_playing:
            if not self.current.is_playing():
                if self.wait:
                    self.current.stop() # WaitFor: stop accepting events
                self.index += 1
                
                # Check if we are out of intervals to play
                if self.index == len(self.intervals):
                    if self.times <= 1:
                        self.times -= 1
                        if self.times == 0:
                            return task.done
                        else:
                            self.index = 0

                # If we have intervals left get the next one
                self.current = self.intervals[self.index]
                if type(self.current) is not WaitFor:
                    self.wait = False
                else:
                    self.wait = True
                self.current.start()

        return task.cont

Ah, yes, if you’re not actually creating an interval that pauses for an indeterminate amount of time, then that’s totally fine.

1 Like