Skip to content

Add rest of data pipeline#5

Merged
r0mainK merged 3 commits intosrc-d:masterfrom
kafkasl:master
Jul 18, 2018
Merged

Add rest of data pipeline#5
r0mainK merged 3 commits intosrc-d:masterfrom
kafkasl:master

Conversation

@kafkasl
Copy link
Copy Markdown
Contributor

@kafkasl kafkasl commented Jul 16, 2018

This PR is not finished but the code2vec model and Vocabulary2id are pretty much done, so I thought you could start reviewing it in case something is really off.

Currently I'm getting an error when trying to save the model and I'm not sure why. Any help would be appreciated:

  File "/home/hydra/projects/code2vec/src/../src/code2vec.py", line 23, in code2vec
    .link(Vocabulary2Id(args.keep_freq, args.output)) \
  File "/usr/local/lib/python3.5/dist-packages/sourced/ml/transformers/transformer.py", line 95, in execute
    head = node(head)
  File "/home/hydra/projects/code2vec/src/transformers/vocabulary2id.py", line 28, in __call__
    
  File "/usr/local/lib/python3.5/dist-packages/modelforge/model.py", line 269, in save
    self._write_tree(tree, output)
  File "/usr/local/lib/python3.5/dist-packages/modelforge/model.py", line 285, in _write_tree
    asdf.AsdfFile(final_tree).write_to(output, all_array_compression=ARRAY_COMPRESSION)
  File "/usr/local/lib/python3.5/dist-packages/asdf/asdf.py", line 899, in write_to
    self._post_write(fd)
  File "/usr/local/lib/python3.5/dist-packages/asdf/generic_io.py", line 313, in __exit__
    self._fd.__exit__(type, value, traceback)
  File "/usr/local/lib/python3.5/dist-packages/asdf/extern/atomicfile.py", line 113, in __exit__
    self.close()
  File "/usr/local/lib/python3.5/dist-packages/asdf/extern/atomicfile.py", line 109, in close
    atomic_rename(self._tmp_filename, self._filename)
NotADirectoryError: [Errno 20] Not a directory: '/home/hydra/projects/code2vec/results/.___atomic_writeg1zpv8zx' -> '/home/hydra/projects/code2vec/src/../results/'

Copy link
Copy Markdown
Contributor

@r0mainK r0mainK left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok finished this first round. Most of it is good, but can be improved. You mostly should move the path-context to index logic away from the model. I havent reviewed in detail the Model as it will change after this review, ping e when its done on slack i can have a second look tonight. Overall good job, thks for you hard work 👍

src/code2vec.py Outdated
@@ -18,8 +20,10 @@ def code2vec(args):
.link(UastRow2Document()) \

This comment was marked as resolved.

src/code2vec.py Outdated
@@ -39,6 +43,10 @@ def main():
required=False)
parser.add_argument('-w', '--max_width', type=int, default=2, help="Max path width.",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create subparser, let's compartmentalize data and ml pipeline -> I'd suggest commands should be extract_features and then train_model (for now only add the first and a TODO for the second, if you have better name pls propose)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure about this bit. should I create two methods like add_extract_features(parser) and add_train_model_args(parser) and add the respective options inside of them?

Copy link
Copy Markdown
Contributor

@r0mainK r0mainK Jul 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think for one you can omit the second one, as no logic for it exists. anywaay yeah, you can do something like:

subparsers = parser.add_subparsers(help="Commands", dest="command")
extract_parser = subparsers.add_parser(
            "extract", help="Extract features from input repositories", 
formatter_class=args.ArgumentDefaultsHelpFormatterNoNone)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not know how to do this, I've pushed my trial (which does not work) but I'm not sure how these hierarchies of parser work, the rest should be solved.

src/code2vec.py Outdated
required=False)
parser.add_argument('-w', '--max_width', type=int, default=2, help="Max path width.",
required=False)
parser.add_argument('-k', '--keep_freq', type=bool, default=False, help="Keep frequencies "
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*keep-freq and initialize the parser like this for the conversion to keep_freq

from sourced.ml.cmd import ArgumentDefaultsHelpFormatterNoNone
...

parser = argparse.ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatterNoNone)

src/code2vec.py Outdated
parser.add_argument('-k', '--keep_freq', type=bool, default=False, help="Keep frequencies "
"when building vocabularies.", required=False)
parser.add_argument('-o', '--output', type=str, default=os.getcwd(), help="Output were the "
"model will be stored.", required=False)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*"Output path for the Code2VecFeatures model "



@register_model
class Code2Vec(Model):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*Code2VecFeatures , we will proabably have a second Model for the trained ML model

Process rows to gather values and paths with/without their frequencies.
"""

r = rows \
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename r to rows and cache after reduceByKey or you will do it twice, which is time consuming, dont forget to uncache after both collects

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure how things are cached / uncached in Spark I'm pretty new to it, could you give more details?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. in spark you can either do transformations or actions on RDDs/Dataframes/etc. Those objects are lazily evaluated, which means transformations are not applied if an action does not trigger them:

r.map(lambda x: x[0]).filter(my_condition)  # does nothing 
r.map(lambda x: x[0]).filter(my_condition).collect()  # the action collect triggers transformations  

in order to not repeat parts of the pipeline, you can cache the data in RDD or on disk, here:

rows = rows \
    .flatMap(self._get_path) \
    .reduceByKey(operator.add) \
    .persist()  # nothing happens

values = rows.filter(lambda x: type(get_elem(x)) == str).collect() 
# after this, rows is cached, so doing the second collect starts from 
# after the reduceByKey, not before the flatmap
paths = r.filter(lambda x: type(get_elem(x)) == tuple).collect()
rows.unpersist(). # or this will stay in memory/disk

we usually add a persist argument to select level of persistence, for more see here

r = r.map(lambda x: x[0])
get_elem = lambda x: x

values = r.filter(lambda x: type(get_elem(x)) == str)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collect directly here, same for paths

self._paths = paths
self._path_contexts = path_contexts

self._value2index = {w: i for i, w in enumerate(values)}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should not do this here, using the mapping this way means all the computation to map values/context_paths to index are done in the driver, not using Spark. You can do it in different ways, but simplest is to create the indexes after the collect, broadcast them, then use them as mappings before collecting the (doc, [path_context_1, ...]) RDD.

"""
return rows \
.map(self._doc2pc) \
.reduceByKey(operator.add)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you want to do this, you need to map the RDD to (doc, [pc]), 1 no ? In all cases I think for now it is best to not add calculations for document/featurefreqs, so simply do a distinct after the map (distinct should be done after mapping of the path-contrxes strings to indexes, see comment above)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the idea was to add the lists for each key to have a (doc, [pc_1, pc_2, ...]) because _doc2pc is a single path context lik (doc, [pc])

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay i see now, problem is this might give you cases like this: (doc, [pc1, pc2, pc1, pc3 ...]) so you ought to still do a distinct, before the reduceByKey

"""
code2vec model - source code identifier embeddings.
"""
NAME = "code2vec"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same add features here

@kafkasl kafkasl force-pushed the master branch 3 times, most recently from e259cfe to 4c06549 Compare July 17, 2018 12:49
Copy link
Copy Markdown
Contributor

@r0mainK r0mainK left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went over the subparser part, tell me if it still does not work afterwards. I think you could create a __main__.py file for parsing commands, create a cmd folder, and put the rest of the code2vec code in a extract_features file (also rename the function holding the pipleine).

src/code2vec.py Outdated
required=False)
extract_parser.add_argument('-w', '--max_width', type=int, default=2, help="Max path width.",
required=False)
extract_parser.add_argument('-k', '--keep_freq', type=bool, default=False,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*keep-freq

src/code2vec.py Outdated
required=False)
extract_parser.add_argument('-g', '--max_length', type=int, default=5, help="Max path length.",
required=False)
extract_parser.add_argument('-w', '--max_width', type=int, default=2, help="Max path width.",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*max-width

src/code2vec.py Outdated
required=False)
parser.add_argument('-w', '--max_width', type=int, default=2, help="Max path width.",
required=False)
extract_parser.add_argument('-g', '--max_length', type=int, default=5, help="Max path length.",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*max-length

src/code2vec.py Outdated
parser = argparse.ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatterNoNone)

# sourced.engine args
add_repo2_args(parser)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be added to the subparser

src/code2vec.py Outdated
required=False)
extract_parser.add_argument('-k', '--keep_freq', type=bool, default=False,
help="Keep frequencies when building vocabularies.", required=False)
extract_parser.add_argument('-o', '--output', type=str, default=os.getcwd(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

take out default value, and use required=True

values = rows.filter(lambda x: type(get_elem(x)) == str).collect()
paths = rows.filter(lambda x: type(get_elem(x)) == tuple).collect()

value2index = {w: i for i, w in enumerate(values)}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a problem if keep_freq is True, in one case you get the dictionnaries but in the other you get (key, value): index. I would move the index creation to the call, and use self.docfreq to potentially create the frequency dict, and pass it as an optional dict parameter for the Model that default to None.

return value2index, path2index

def _doc2pc(self, row: Row):
(u, path, v), doc = Vocabulary2Id._unstringify_path_context(row), row[0][1]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nest this function in build_doc2pc, and broadcast indexes:

def build_doc2pc(self, rows: RDD):
     value2index = self.sc.broadcast(self.value2index)
     path2index = self.sc.broadcast(self.path2index)
     def _doc2pc(self, row: Row):
            (u, path, v), doc = Vocabulary2Id._unstringify_path_context(row), row[0][1]
             return doc, (value2index.value[u], path2index.value[path], value2index.value[v])
    
    ....
    value2index.unpersist(blocking=True)
    path2index.unpersist(blocking=True)



class Vocabulary2Id(Transformer):
def __init__(self, keep_freq: bool, output: str, **kwargs):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add sparkContext to __init___ -> self.sc used later

src/code2vec.py Outdated
.link(UastDeserializer()) \
.link(Uast2BagFeatures([UastPathsBagExtractor(args.max_length, args.max_width)])) \
.link(Collector()) \
.link(Vocabulary2Id(args.keep_freq, args.output)) \
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here add root.sc to initialize transformer with sparkContext

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get this error:

  File "/home/hydra/projects/code2vec/src/../src/code2vec.py", line 26, in code2vec
    .link(Vocabulary2Id(root.sc, args.keep_freq, args.output)) \
AttributeError: 'Engine' object has no attribute 'sc'


def _load_tree(self, tree):
self.construct(value2index=tree["value2index"].copy(),
path2index=split_strings(tree["path2index"]),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont use split_strings on path2index, use it on path_contexts


def _generate_tree(self):
return {"value2index": self._value2index, "path2index": self._path2index,
"path_contexts": self._path_contexts}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use merge_strings here for path_contexts

@r0mainK
Copy link
Copy Markdown
Contributor

r0mainK commented Jul 18, 2018

@kafkasl added a couple more comments, you need to:

  • broadcast the indexes, it will increase spark performance
  • rework a bit you Model to add frequencies dict if asked for, and rework inner methods, also add a dump method summarizing data, check out other models to see convention
  • rename code2vec, create main (last round of comments)

after this it should be good ^^" pls ping me if u need help !

@kafkasl kafkasl force-pushed the master branch 2 times, most recently from a3ae3e6 to 027ea18 Compare July 18, 2018 13:31
Copy link
Copy Markdown
Contributor

@r0mainK r0mainK left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I've added 2 comments for main (taken from sourced.ml), 1 typo and one note on the broadcasting. Once this is done I would like one last thing before merge: split the PR in 3 commits:

  • first should be Add Code2VecFeatures Model and have only model
  • second should be Add Vocabulary2Id transformer and same, have only transformer
  • last should be Add main and rework data pipeline and hold everything else

src/__main__.py Outdated
extract_parser.add_argument('--max-width', type=int, default=2, help="Max path width.",
required=False)
extract_parser.add_argument('-o', '--output', type=str,
help="Output path for the Code2VecFeatures mode", required=True)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*model.

extract_parser = subparsers.add_parser("extract",
help="Extract features from input repositories",
formatter_class=ArgumentDefaultsHelpFormatterNoNone)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add:

extract_parser.set_defaults(handler=code2vec_extract_features)

src/__main__.py Outdated

args = parser.parse_args()

code2vec_extract_features(args)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace this line with:

    try:
        handler = args.handler
    except AttributeError:
        def print_usage(_):
            parser.print_usage()

        handler = print_usage
return handler(args)

:param value2index_freq: value -> (id, freq)
:param path2index_freq: path -> (id, freq)
"""

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are broadcasting frequencies uselessly here, better to use dict comprehension beforehand:

value2index = self.sc.broadcast({key: id for key, (id, _) in value2index_freq.items()})

kafkasl added 3 commits July 18, 2018 16:33
Signed-off-by: Pol Alvarez Vecino <[email protected]>
Signed-off-by: Pol Alvarez Vecino <[email protected]>
Signed-off-by: Pol Alvarez Vecino <[email protected]>
@r0mainK r0mainK changed the title WIP: Added Vocabulary builder and model Add rest of data pipeline Jul 18, 2018
@r0mainK r0mainK merged commit 19d8c02 into src-d:master Jul 18, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants