Description
Is your feature request related to a problem? Please describe.
kgx
's CLI allows transforming several files at once via a process pool, but it is not possible to use multiple cores when transforming a single data file. I am transforming a large jsonl file into ntriples, and was hoping to make it faster by processing it in parallel.
Describe the solution you'd like
I have not dug into every sink, but I believe the jsonl->ntriples transform is a pure function-- one jsonl record goes in, and one set of ntriples comes out. That is, a subsequent line transformed will not rely on any of the previous ones. This seems to make it a good candidate for parallelization. (As I said, I don't know if this is true for every source-sink pair).
Describe alternatives you've considered
It would be possible to split up the file ahead of time into N files, where N is >= the number of processes available, and then run the CLI to process all those split files at once. That being said, if there is an opportunity to make computation on a single source parallel, it may be worth doing.
Here is a gist containing an experiment to get this working: https://gist.github.com/ptgolden/8d836d11b9c6b2211e5f606ed6203960
What goes on is the following:
- A thread is created which will receive ntriples and do something with them (the current attached function just counts the triples, but there's also a function written in that gist which will write them to a file)
- N processes are created containing a
kgx_worker
function that instantiates a sink and a source - A nodes file is read line by line. After a chunk of nodes are read, they are added to a queue. The
kgx_workers
read from that queue, convert the record usingsource.read_node
, transform it usingsink.write_node
, and then write the result to a queue containing ntriples. The thread created in (1) consumes those ntriples. - Same as 3 but with an edges file.
- After the nodes and edges files have been consumed, the processes and threads are sent a sentinel value to terminate them.
This is admittedly a bit hacky-- it relies on the fact that the RDF sink stores its file handler in its constructor. That file handler is replaced by an io.BytesIO
object to intercept writes. A more robust method would likely require a transform not to assume that it is reading or writing to a file.