OpenSeeFace tracking integration

Made an integration with the OpenSeeFace tracking library.
OpenSeeFace is running as separate process and sending the tracking data using UDP packets.
Panda3D app is reading UDP packets and syncs the points with Panda3D nodes.

I was modified OpenSeeFace and extended it’s protocol. So you need my fork (at this commit) - extend protocol with data count · kitsune-ONE-team/OpenSeeFace@1006ef9 · GitHub
The extended protocol is activated with extra switch “–protocol 2”.

This is just a simple demo, so it have some flaws - it freezes the rendering when there is no UDP packets.

Source code
import io
import socket
import struct
import sys

from direct.showbase.ShowBase import ShowBase

from panda3d.core import (
    PandaNode, NodePath, Quat, Point3, Vec3,
    CS_yup_right, CS_zup_right,
    GeomVertexFormat, GeomVertexData, GeomVertexWriter,
    GeomTriangles, Geom, GeomNode,
    load_prc_file_data)


def make_cube():
    format = GeomVertexFormat.getV3n3c4()
    vdata = GeomVertexData('cube', format, Geom.UHStatic)
    vdata.setNumRows(24)

    vertices = GeomVertexWriter(vdata, 'vertex')
    normals = GeomVertexWriter(vdata, 'normal')
    colors = GeomVertexWriter(vdata, 'color')

    vertices.add_data3f(-1, -1, -1)
    vertices.add_data3f(-1, -1, -1)
    vertices.add_data3f(-1, -1, -1)
    vertices.add_data3f(1, -1, -1)
    vertices.add_data3f(1, -1, -1)
    vertices.add_data3f(1, -1, -1)
    vertices.add_data3f(1, 1, -1)
    vertices.add_data3f(1, 1, -1)
    vertices.add_data3f(1, 1, -1)
    vertices.add_data3f(-1, 1, -1)
    vertices.add_data3f(-1, 1, -1)
    vertices.add_data3f(-1, 1, -1)
    vertices.add_data3f(-1, -1, 1)
    vertices.add_data3f(-1, -1, 1)
    vertices.add_data3f(-1, -1, 1)
    vertices.add_data3f(1, -1, 1)
    vertices.add_data3f(1, -1, 1)
    vertices.add_data3f(1, -1, 1)
    vertices.add_data3f(1, 1, 1)
    vertices.add_data3f(1, 1, 1)
    vertices.add_data3f(1, 1, 1)
    vertices.add_data3f(-1, 1, 1)
    vertices.add_data3f(-1, 1, 1)
    vertices.add_data3f(-1, 1, 1)

    normals.add_data3f(0, -1, 0)
    normals.add_data3f(-1, 0, 0)
    normals.add_data3f(0, 0, -1)
    normals.add_data3f(0, -1, 0)
    normals.add_data3f(1, 0, 0)
    normals.add_data3f(0, 0, -1)
    normals.add_data3f(1, 0, 0)
    normals.add_data3f(0, 1, 0)
    normals.add_data3f(0, 0, -1)
    normals.add_data3f(0, 1, 0)
    normals.add_data3f(-1, 0, 0)
    normals.add_data3f(0, 0, -1)
    normals.add_data3f(0, -1, 0)
    normals.add_data3f(-1, 0, 0)
    normals.add_data3f(0, 0, 1)
    normals.add_data3f(0, -1, 0)
    normals.add_data3f(1, 0, 0)
    normals.add_data3f(0, 0, 1)
    normals.add_data3f(1, 0, 0)
    normals.add_data3f(0, 1, 0)
    normals.add_data3f(0, 0, 1)
    normals.add_data3f(0, 1, 0)
    normals.add_data3f(-1, 0, 0)
    normals.add_data3f(0, 0, 1)

    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)
    colors.add_data4f(1, 1, 1, 1)

    # store the triangles, counter clockwise from front
    primitive = GeomTriangles(Geom.UHStatic)
    primitive.add_vertices(0, 3, 15)
    primitive.add_vertices(0, 15, 12)
    primitive.add_vertices(4, 6, 18)
    primitive.add_vertices(4, 18, 16)
    primitive.add_vertices(7, 9, 21)
    primitive.add_vertices(7, 21, 19)
    primitive.add_vertices(10, 1, 13)
    primitive.add_vertices(10, 13, 22)
    primitive.add_vertices(2, 11, 8)
    primitive.add_vertices(2, 8, 5)
    primitive.add_vertices(14, 17, 20)
    primitive.add_vertices(14, 20, 23)

    geom = Geom(vdata)
    geom.add_primitive(primitive)

    node = GeomNode('cube gnode')
    node.add_geom(geom)
    return node


def unpack(fmt, packet):
    size = 0
    for t in fmt:
        size += {
            'B': 1,  # insigned byte
            'i': 4,  # int
            'f': 4,  # float
            'd': 8,  # double
        }.get(t, 0)

    return struct.unpack(fmt, packet.read(size))


class Demo(ShowBase):
    def __init__(self):
        load_prc_file_data('', '''
            win-size 1280 720
            win-origin -1 -1
        ''')

        super().__init__()
        self.disable_mouse()
        self.set_background_color(0, 0, 0)

        self.cam.set_pos(50, -300, 200)

        self._container = self.render.attach_new_node(PandaNode('Container'))

        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self._socket.bind(('0.0.0.0', 11573))

        self._ibox = NodePath(PandaNode('Interaction Box'))
        self._ibox.reparent_to(self._container)
        self._ibox.set_scale(100)
        self._ibox.set_h(-90)
        self._ibox.set_r(90)

        self._tracker = NodePath(PandaNode('Root Tracker'))
        self._tracker.reparent_to(self._ibox)

        self._trackers = []
        self._targets = []
        self._debug_boxes = []

        self.accept('escape', sys.exit)
        self.accept('q', sys.exit)

        self.task_mgr.add(self._update, '_update')

    def _add_target(self):
        i = len(self._trackers)
        tracker = NodePath(PandaNode('Tracker #{}'.format(i)))
        tracker.reparent_to(self._tracker)
        self._trackers.append(tracker)

        target = NodePath(PandaNode('Target #{}'.format(i)))
        target.reparent_to(tracker)
        self._targets.append(target)

        debug_box = NodePath(make_cube())
        debug_box.reparent_to(tracker)
        debug_box.set_scale(.01)
        self._debug_boxes.append(debug_box)

    def _remove_target(self):
        if self._targets:
            self._debug_boxes[-1].remove_node()
            self._targets[-1].remove_node()
            self._trackers[-1].remove_node()

    def _update(self, task):
        data, addr = self._socket.recvfrom(2048)
        packet = io.BytesIO(data)

        eye_blink = [0, 0]  # right, left
        lms = []  # landmarks (x, y, c)
        pts_3d = []  # 3D points

        now = unpack('d', packet)[0]
        face_id = unpack('i', packet)[0]
        width = unpack('f', packet)[0]
        height = unpack('f', packet)[0]
        eye_blink = unpack('ff', packet)
        success = unpack('B', packet)[0]
        pnp_error = unpack('f', packet)[0]
        quat = Quat(*unpack('ffff', packet))
        euler = Vec3(*unpack('fff', packet))
        pos = Point3(*unpack('fff', packet))

        lms_count = unpack('B', packet)[0]
        for i in range(lms_count):
            c = unpack('f', packet)[0]
            landmark = [0, 0, c]
            lms.append(landmark)

        for i in range(lms_count):
            y, x = unpack('ff', packet)
            lms[i][0] = x
            lms[i][1] = y

        pts_3d_count = unpack('B', packet)[0]
        for i in range(pts_3d_count):
            x, y, z = unpack('fff', packet)
            pts_3d.append(Point3(x, -y, -z))

        features = [
            'eye_l',
            'eye_r',
            'eyebrow_steepness_l',
            'eyebrow_updown_l',
            'eyebrow_quirk_l',
            'eyebrow_steepness_r',
            'eyebrow_updown_r',
            'eyebrow_quirk_r',
            'mouth_corner_updown_l',
            'mouth_corner_inout_l',
            'mouth_corner_updown_r',
            'mouth_corner_inout_r',
            'mouth_open',
            'mouth_wide',
        ]

        current_features = {}
        for k in features:
            current_features[k] = unpack('f', packet)[0]

        while len(self._targets) < len(pts_3d):
            self._add_target()

        while len(self._targets) > len(pts_3d):
            self._remove_target()

        self._tracker.set_hpr(euler.z, 0, euler.y)
        self._tracker.set_pos(-pos.x, -pos.y, pos.z)

        for i, pos in enumerate(pts_3d):
            self._trackers[i].set_pos(pos)

        return task.again


if __name__ == '__main__':
    demo = Demo()
    demo.run()

2 Likes