1

DataJoint pipeline-merging design

 2 years ago
source link: https://gist.github.com/ttngu207/734856c592759d4bfc0c6949fefbcb80
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
DataJoint pipeline-merging design · GitHub

Instantly share code, notes, and snippets.

DataJoint pipeline-merging design

#%% md

# DataJoint pipeline design - on "merging" of pipelines

For DataJoint users working with existing pipelines or designing new ones, a common design question is on the topic of joining or merging different "branches" of the pipeline at one common node/table.

To elaborate a bit more on this topic, let's say that in your workflow, there may be multiple sources of data that may need to go through different processing/analysis routines. But these different routines ultimately arrive at a point where the data format will be identical and can be further processed downstream in the same manner.

In this notebook, we will go through one approach to address this design question.

#%% md

## Let's consider one example scenario

To be more concrete, let me start with an example. Let's say that we are interested in tracking the position of an animal during a freely behaving experiment. Ultimately, we want the `(x, y)` coordinates of the animal over time. In this example, for one experimental session, we are tracking the animal's position using either one of the two methods below: 1. Placing a marker on the body of the animal and track this marker with a set of cameras 2. Using computer vision approach to analyse the position of the animal from the video recording of a camera

With each of the two tracking methods above, the processing and analysis will be different, and being DataJoint users, we'll design a set of tables to define the processing/analysis routine for each method.

So there will likely be two pipeline branches going in parallel, but will need to be merged together at the point where the extraction of `(x, y)` coordinates over time is completed. As there will be set of analyses downstream to be done on top of the extracted animal position, regardless of which method the tracking of a particular exprimental session comes about.

#%% md

## The pipeline for this scenario

Let's put together an example DataJoint pipeline describing this scenario

#%%

import datajoint as dj import numpy as np import hashlib import uuid

#%%

dj.conn()

#%%

schema = dj.Schema('ttngu207_pipeline_merging')

#%%

@schema class Session(dj.Manual): definition = """ animal_name: varchar(16) session_number: int """

#%%

@schema class MethodOneTrackingRaw(dj.Imported): definition = """ -> Session --- tracking_data: longblob """ @schema class MethodOneProcessing(dj.Computed): definition = """ -> MethodOneTrackingRaw --- tracking_data: longblob """ @schema class MethodOneTracking(dj.Computed): definition = """ -> MethodOneProcessing --- x: longblob y: longblob t: longblob """

#%%

@schema class MethodTwoTrackingRaw(dj.Imported): definition = """ -> Session --- tracking_data: longblob """ @schema class MethodTwoProcessing(dj.Computed): definition = """ -> MethodTwoTrackingRaw --- tracking_data: longblob """

@schema class FilterParam(dj.Lookup): definition = """ param_id: int --- sigma: float """ contents = [(0, 1), (1, 10)]

@schema class MethodTwoFiltering(dj.Computed): definition = """ -> MethodTwoProcessing -> FilterParam --- filtered_tracking_data: longblob """ @schema class MethodTwoTracking(dj.Computed): definition = """ -> MethodTwoFiltering --- x: longblob y: longblob t: longblob """

#%%

dj.Diagram(schema)

#%% md

## How to "merge" these two branches?

The next step in our pipeline is to run a number of analysis routines on the animal position data, using the `x, y, t` arrays as inputs. And we don't particularly care if the animal position data from a session is from method one or two, as long as we can work with the `x`, `y` and `t` arrays.

Here, I will proposal a tables merging design approach. I opt for the term "merging" here to describe this joining/merging design to avoid confusion with DataJoint's `join`.

Consider the design below

#%%

@schema class MergedTracking(dj.Manual): definition = """ merged_tracking: uuid --- -> [nullable] MethodOneTracking -> [nullable] MethodTwoTracking """ @schema class Speed(dj.Computed): definition = """ -> MergedTracking --- speed: longblob """

#%%

dj.Diagram(schema)

#%% md

In the prototype above, the `MergedTracking` table is a `dj.Manual` table allowing for the merging of the two different branches of tracking data. The primary key is a single uuid-type attribute, and the non-primary attributes are nullable foreign keys to the tables to be merged.

One uuid entry here should uniquely specify one "tracking" for this session, either method one ***or*** method two. The keyword being ***or***, thus, there must be only one of the foreign keys being non-nullable, and the others must be null.

This design will allow for merging of tables from different branches of the pipeline (or from different pipelines), and fairly easily extendable. For example, say in the future there will be another tracking method, `MethodThreeTracking`, this can be added to the `MergedTracking` by introducing `-> [nullable] MethodThreeTracking` as another non-primary attribute, and calling `MergedTracking.alter()` to update the table definition.

#%% md

## What's the catch?

There are a few caveats in this design, I'm listing below two major ones. However, I'd say these are more inconveniences rather than design flaws or drawbacks.

1. `UUID`-type primary attribute. The fact that the `MergedTracking` has single attribute of type `uuid` causing somewhat of a "disconnection" between this merging table and the upstream. The connection to upstream is established by the non-primary foreign keys. Three points of inconveniences from this: + Always have to do a `join (*)` with this table in queries + Cannot use this as a `dj.Imported` or `dj.Computed` - DataJoint native `autopopulate` would not work + `.insert()` is hard to use, as the `uuid` has to be generated somehow 2. From the database perspective, this table design does not guarantee mutual exclusivity of the member tables to be merged. This means just purely from the table definition, one can have an entry in `MergedTracking` with both `MethodOneTracking` and `MethodTwoTracking` present, violating the "***or***" intention.

#%% md

To somewhat mitigate the caveats above, we can overwrite the `.insert()` method to: 1. auto-generate the ***uuid*** 2. ensure mutual exclusivity of member tables to be merged

#%%

@schema class MergedTracking(dj.Manual): definition = """ merged_tracking: uuid --- -> [nullable] MethodOneTracking -> [nullable] MethodTwoTracking """ @classmethod def insert(cls, rows, **kwargs): """ :param rows: An iterable where an element is a dictionary. """ def dict_to_uuid(key): """ Given a dictionary `key`, returns a hash string as UUID """ hashed = hashlib.md5() for k, v in sorted(key.items()): hashed.update(str(k).encode()) if v is not None: hashed.update(str(v).encode()) return uuid.UUID(hex=hashed.hexdigest()) try: for r in iter(rows): assert isinstance(r, dict), 'Input "rows" must be a list of dictionaries' except TypeError: raise TypeError('Input "rows" must be a list of dictionaries') parents = cls.parents(as_objects=True) entries = [] for row in rows: key, null_attrs = {}, {} for parent in parents: if parent & row: if not key: key = (parent & row).fetch1('KEY') else: raise ValueError(f'Entry exists on more than one parent table - Entry: {row}') else: null_attrs = {**null_attrs, **{k: None for k in parent.primary_key}} entry = {**null_attrs, **key} entries.append({**entry, cls.primary_key[0]: dict_to_uuid(entry)}) super().insert(cls(), entries, **kwargs)

#%% md

## Pipeline in action

#%% md

#### First, let's populate these tables with some mock data

Let's create 4 sessions with mock data.

Sessions 1 and 2 will be using method one, and session 3 and 4 will be using method two for tracking

#%%

Session.insert([('subject1', 1), ('subject1', 2), ('subject1', 3), ('subject1', 4)])

#%%

MethodOneTrackingRaw.insert([('subject1', 1, np.random.randn(10)), ('subject1', 2, np.random.randn(10))], allow_direct_insert=True) MethodOneProcessing.insert([('subject1', 1, np.random.randn(10)), ('subject1', 2, np.random.randn(10))], allow_direct_insert=True)

#%%

MethodTwoTrackingRaw.insert([('subject1', 3, np.random.randn(10)), ('subject1', 4, np.random.randn(10))], allow_direct_insert=True) MethodTwoProcessing.insert([('subject1', 3, np.random.randn(10)), ('subject1', 4, np.random.randn(10))], allow_direct_insert=True)

#%%

MethodTwoFiltering.insert([('subject1', 3, 0, np.random.randn(10)), ('subject1', 4, 0, np.random.randn(10))], allow_direct_insert=True)

#%%

MethodOneTracking.insert([('subject1', 1, np.random.randn(10), np.random.randn(10), np.arange(10)), ('subject1', 2, np.random.randn(10), np.random.randn(10), np.arange(10))], allow_direct_insert=True)

#%%

MethodTwoTracking.insert([('subject1', 3, 0, np.random.randn(10), np.random.randn(10), np.arange(10)), ('subject1', 4, 0, np.random.randn(10), np.random.randn(10), np.arange(10))], allow_direct_insert=True)

#%%

MethodOneTracking()

#%%

MethodTwoTracking()

#%% md

#### Now, let's generate the corresponding entries in the `MergedTracking` table

#%%

method_one_entries = MethodOneTracking.fetch('KEY') method_two_entries = MethodTwoTracking.fetch('KEY')

#%%

MergedTracking.insert(method_one_entries)

#%%

MergedTracking()

#%%

MergedTracking.insert(method_two_entries)


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK