Skip to content

Clustering Spec WIP

Aatman Vaidya edited this page Aug 30, 2024 · 9 revisions

Overview

Goal

Feluda does a good job of encapsulating Machine Learning and DSP tools. We want to define a spec that researchers can take and implement to integrate with Feluda. This entails describing data formats of the input that they must forward to Feluda and the result they should expect from Feluda.

In the past we have done so for Operators that work on a single file at a time. Though relatively undocumented, we have had some success with standardizing it for all supported media types. Now as we build a new class of Operators that are used to process a collection of files, we should document it with more rigour, to enable use of the clustering features across Tattle projects and also by independent researchers doing similar work.

Intended Workflow

sequenceDiagram
    Client->>EmbeddingOperator: file_1
    EmbeddingOperator->>Client: embedding_1
    Client->>EmbeddingOperator: file_2
    EmbeddingOperator->>Client: embedding_2
    Client->>EmbeddingOperator: file_3
    EmbeddingOperator->>Client: embedding_3
    Client->>ClusteringOperator: embeddings
    ClusteringOperator->>Client: clusters
Loading

Client could be a Feluda Worker or any other entity using and orchestrating Feluda operators.

Acceptance Criteria

  • Run using file on disk for experimentation and debugging
  • Run using file on s3 when in cloud
  • Every operator should have a test for the first 2
  • Spec : Document the input and output data structure for a clustering operator

Questions For Aatman

  1. lets separate embedding generation and storage from clustering
    1. embeddings are reusable and can be generated sequential
    2. might reduce the memory consumption and operational requirement for a clustering operator
  2. all our Clustering is embedding based, lets namespace it as cluster_embedding_*

Separation of Concerns

Proposed Responsibilities

worker
	read file one line at a time
		get embeddings from db
	run operator(embeddings)
	store result in a file (local and s3)
operator(embeddings):: 
	return [{label_1: [post_a, post_b, post_c ...]}]
	and raw_output by a function

Clustering Worker

A worker is used to deploy operators at scale. It brings all the other component's of Feluda like Queue, Store etc together.

Overview of What should happen.

  • A worker will first receive all the files from the client. The files received will have some information like id, url, media type for each file. The input of the worker is documented below
  • Once, the worker receives the file, it will have 2 different workflows to handle audio and video files, and will do the following:
    • Step 1 - what we will now have is a collection of dict for n number of videos and audios.
    • Step 2 - First the media file will be downloaded from the url. (using Media Factory functions for audio and video respectively)
    • Step 3 - Next, the embedding's for that media file will be extracted from the respective embedding operators (CLIP for video and CLAP for audio)
    • Step 4 - Clustering operator will be run on the embedding's
    • Step 5 - t-SNE reduction operator will be run on the embedding's
    • Step 6 - the output of the clustering and reduction will be sent back to the client.

Detailed breakdown of steps with input and output

Step 0 - create a folder and dockerfile for the worker

You can refer to the media worker for folder structure and file contents - link

  • Inside src/worker, create a new folder called clustering_media. This folder will contain all the code for the worker.
  • Next is to create a Dockerfile so that you can run the worker inside this docker image. You can replicate the Dockerfile of the media worker and change things as per the requirements of the clustering worker. The dockerfile should be named - Dockerfile.clustering_media_worker
    • Key change in the clustering worker dockerfile will be installing packages relevant to this worker. Adding reference to operator package's install for media worker.
    • install ffmpeg as I think that's required for video clustering to work.
  • create the files for the worker. And add basic skeleton code refercing form the media worker. Files you will create now are the following
    • config.yml
    • clustering_media_worker.py
    • clustering_media_paylaod_writer.py

Step 1 - sending files to worker using payload writer

  • The payload writer file here is a proxy for a client. This is our way to send media items to the worker for local development.
  • The payload writer will send a media item with some of its information like id, url, media_type in a Queue. We use RabbitMQ as a message broker to send and report files to and fro from the worker.

The payload writer send's a json to the queue that looks like this

[
  {
    "id": "<unique_id>",
    "path": "https://path/to/cdn/hosted/file",
    "media_type": "video"
  }
]
  • BUT, in our case for clustering, the client (payload writer), will send multiple files, as the requirement for clustering is that is needed multiple files.
  • so here, instead of sending a path to single media file, we will send a JSON file which contains information about all the media files. This is how our payload should now look
[
  {
    "path": "https://path/to/cdn/hosted/JSON_file.json",
  }
]

and below is how the JSON file should look like

[
  {
    "id": "<unique_id_1>",
    "path": "https://path/to/cdn/hosted/file1",
    "media_type": "video"
  },
  {
    "id": "<unique_id_2>",
    "path": "https://path/to/cdn/hosted/file2",
    "media_type": "audio"
  },
  {
    "id": "<unique_id_3>",
    "path": "https://path/to/cdn/hosted/file3",
    "media_type": "video"
  }
]

Note

If you hand me a bunch of audio and video files, I can prepare this JSON and give it to you.

What should be the Queue's called?

Worker Index Queue Report Queue (of Index)
clustering_media clustering-media-index-queue clustering-media-report-queue
  • The Index Queue will be the queue where client will send the JSON file with media times to be processed.
  • The Report Queue will be the queue where worker (Feluda) will send the clustering results back to the client.

how to create the config?

  • you can take a look at the config.yml file from the media worker for what format to follow. The dataclass for this config is defined in this file

Step 2, 3, 4, 5 - coding the Worker

  • the first thing to do here is to sort out media items by media_type (audio and video). Hence they can be processed differently now
  • Then for each media item, download them using Media Factory functions like VideoFactory and AudioFactory will be used here.
  • Next, find the embedding's using respective operators for media items (CLAP and CLIP)
  • perform clustering and reduction (t-SNE).

Step 6 - sending the results back to Client

  • all the results will be sent back to the client in the Report Queue.
  • two key results to be send back to the client are results of the cluster's and t-SNE (x,y) co-ordinates for each file.

so the clustering results should be returned in a json like this (which the operator is already doing)

[
  {
    "cluster 0": ["file-id-1", "file-id-2", "file-id-5"],
    "cluster 1": ["file-id-8", "file-id-7", "file-id-4"]
  }
]

and the 2-D coordinates can returned in a JSON like this

[
  {
    "id": "<unique_id_1>",
    "t-sne-cords": [2, 5]
  },
  {
    "id": "<unique_id_2>",
    "t-sne-cords": [3, 6]
  },
  {
    "id": "<unique_id_3>",
    "t-sne-cords": [4, 7]
  }
]

Both the results JSON can be added to a function that look's something like this

def make_report_indexed(clustering_results_json, t_sne_cords_json, status):
    report = {}
    report["clustering_results"] = clustering_results_json
    report["t-sne-cords"] = t_sne_cords_json
    report["status_code"] = 200
    report["status"] = status
    return json.dumps(report)

The output of make_report_indexed() will be sent to the Report Queue