8000 Parallel processing of single transform · Issue #501 · biolink/kgx · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
Parallel processing of single transform #501
Open
@ptgolden

Description

@ptgolden

Is your feature request related to a problem? Please describe.
kgx's CLI allows transforming several files at once via a process pool, but it is not possible to use multiple cores when transforming a single data file. I am transforming a large jsonl file into ntriples, and was hoping to make it faster by processing it in parallel.

Describe the solution you'd like
I have not dug into every sink, but I believe the jsonl->ntriples transform is a pure function-- one jsonl record goes in, and one set of ntriples comes out. That is, a subsequent line transformed will not rely on any of the previous ones. This seems to make it a good candidate for parallelization. (As I said, I don't know if this is true for every source-sink pair).

Describe alternatives you've considered
It would be possible to split up the file ahead of time into N files, where N is >= the number of processes available, and then run the CLI to process all those split files at once. That being said, if there is an opportunity to make computation on a single source parallel, it may be worth doing.

Here is a gist containing an experiment to get this working: https://gist.github.com/ptgolden/8d836d11b9c6b2211e5f606ed6203960

What goes on is the following:

  1. A thread is created which will receive ntriples and do something with them (the current attached function just counts the triples, but there's also a function written in that gist which will write them to a file)
  2. N processes are created containing a kgx_worker function that instantiates a sink and a source
  3. A nodes file is read line by line. After a chunk of nodes are read, they are added to a queue. The kgx_workers read from that queue, convert the record using source.read_node, transform it using sink.write_node, and then write the result to a queue containing ntriples. The thread created in (1) consumes those ntriples.
  4. Same as 3 but with an edges file.
  5. After the nodes and edges files have been consumed, the processes and threads are sent a sentinel value to terminate them.

This is admittedly a bit hacky-- it relies on the fact that the RDF sink stores its file handler in its constructor. That file handler is replaced by an io.BytesIO object to intercept writes. A more robust method would likely require a transform not to assume that it is reading or writing to a file.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      0