From a802e5a21dd521257518b0dc83d4458245c4bed7 Mon Sep 17 00:00:00 2001 From: Jeffrey Lovitz Date: Wed, 18 Dec 2019 14:53:19 -0500 Subject: [PATCH 01/15] Change directory structure --- bulk_insert/__init__.py | 0 bulk_insert.py => bulk_insert/bulk_insert.py | 0 setup.py | 17 +++++++++++++++++ 3 files changed, 17 insertions(+) create mode 100644 bulk_insert/__init__.py rename bulk_insert.py => bulk_insert/bulk_insert.py (100%) create mode 100644 setup.py diff --git a/bulk_insert/__init__.py b/bulk_insert/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bulk_insert.py b/bulk_insert/bulk_insert.py similarity index 100% rename from bulk_insert.py rename to bulk_insert/bulk_insert.py diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..1acf928 --- /dev/null +++ b/setup.py @@ -0,0 +1,17 @@ + +from setuptools import setup, find_packages +setup( + name='redisgraph-bulk-loader', + version='0.9dev', + + description='RedisGraph Bulk Import Tool', + url='https://github.com/redisgraph/redisgraph-bulk-loader', + packages=find_packages(), + install_requires=['redis', 'click'], + classifiers=[ + 'Intended Audience :: Developers', + 'License :: OSI Approved :: BSD License', + 'Programming Language :: Python :: 3.0', + 'Topic :: Database' + ] +) From 6e8e44197ea9227de68daea50bcf0378876cad21 Mon Sep 17 00:00:00 2001 From: Jeffrey Lovitz Date: Wed, 18 Dec 2019 15:30:34 -0500 Subject: [PATCH 02/15] WIP --- .gitignore | 30 +++ bulk_insert/__init__.py | 5 + bulk_insert/bulk_insert.py | 353 ++--------------------------------- bulk_insert/configs.py | 16 ++ bulk_insert/entity_file.py | 122 ++++++++++++ bulk_insert/label.py | 55 ++++++ bulk_insert/module_vars.py | 10 + bulk_insert/query_buffer.py | 56 ++++++ bulk_insert/relation_type.py | 60 ++++++ 9 files changed, 373 insertions(+), 334 deletions(-) create mode 100644 bulk_insert/configs.py create mode 100644 bulk_insert/entity_file.py create mode 100644 bulk_insert/label.py create mode 100644 bulk_insert/module_vars.py create mode 100644 bulk_insert/query_buffer.py create mode 100644 bulk_insert/relation_type.py diff --git a/.gitignore b/.gitignore index 722d5e7..89c4502 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,31 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec .vscode + diff --git a/bulk_insert/__init__.py b/bulk_insert/__init__.py index e69de29..989c1b9 100644 --- a/bulk_insert/__init__.py +++ b/bulk_insert/__init__.py @@ -0,0 +1,5 @@ +from .configs import Configs +from .label import Label +from .relation_type import RelationType + +from .query_buffer import QueryBuffer diff --git a/bulk_insert/bulk_insert.py b/bulk_insert/bulk_insert.py index 10b4786..6155884 100644 --- a/bulk_insert/bulk_insert.py +++ b/bulk_insert/bulk_insert.py @@ -8,335 +8,27 @@ from timeit import default_timer as timer import redis import click - -# Global variables -CONFIGS = None # thresholds for batching Redis queries -NODE_DICT = {} # global node dictionary -TOP_NODE_ID = 0 # next ID to assign to a node -QUERY_BUF = None # Buffer for query being constructed -QUOTING = None - -FIELD_TYPES = None - -# Custom error class for invalid inputs -class CSVError(Exception): - pass - -# Official enum support varies widely between 2.7 and 3.x, so we'll use a custom class -class Type: - NULL = 0 - BOOL = 1 - NUMERIC = 2 - STRING = 3 - -# User-configurable thresholds for when to send queries to Redis -class Configs(object): - def __init__(self, max_token_count, max_buffer_size, max_token_size, skip_invalid_nodes, skip_invalid_edges): - # Maximum number of tokens per query - # 1024 * 1024 is the hard-coded Redis maximum. We'll set a slightly lower limit so - # that we can safely ignore tokens that aren't binary strings - # ("GRAPH.BULK", "BEGIN", graph name, counts) - self.max_token_count = min(max_token_count, 1024 * 1023) - # Maximum size in bytes per query - self.max_buffer_size = max_buffer_size * 1000000 - # Maximum size in bytes per token - # 512 megabytes is a hard-coded Redis maximum - self.max_token_size = min(max_token_size * 1000000, 512 * 1000000) - - self.skip_invalid_nodes = skip_invalid_nodes - self.skip_invalid_edges = skip_invalid_edges - -# QueryBuffer is the class that processes input CSVs and emits their binary formats to the Redis client. -class QueryBuffer(object): - def __init__(self, graphname, client): - # Redis client and data for each query - self.client = client - - # Sizes for buffer currently being constructed - self.redis_token_count = 0 - self.buffer_size = 0 - - # The first query should include a "BEGIN" token - self.graphname = graphname - self.initial_query = True - - self.node_count = 0 - self.relation_count = 0 - - self.labels = [] # List containing all pending Label objects - self.reltypes = [] # List containing all pending RelationType objects - - self.nodes_created = 0 # Total number of nodes created - self.relations_created = 0 # Total number of relations created - - # Send all pending inserts to Redis - def send_buffer(self): - # Do nothing if we have no entities - if self.node_count == 0 and self.relation_count == 0: - return - - args = [self.node_count, self.relation_count, len(self.labels), len(self.reltypes)] + self.labels + self.reltypes - # Prepend a "BEGIN" token if this is the first query - if self.initial_query: - args.insert(0, "BEGIN") - self.initial_query = False - - result = self.client.execute_command("GRAPH.BULK", self.graphname, *args) - stats = result.split(', '.encode()) - self.nodes_created += int(stats[0].split(' '.encode())[0]) - self.relations_created += int(stats[1].split(' '.encode())[0]) - - self.clear_buffer() - - # Delete all entities that have been inserted - def clear_buffer(self): - self.redis_token_count = 0 - self.buffer_size = 0 - - # All constructed entities have been inserted, so clear buffers - self.node_count = 0 - self.relation_count = 0 - del self.labels[:] - del self.reltypes[:] - - def report_completion(self, runtime): - print("Construction of graph '%s' complete: %d nodes created, %d relations created in %f seconds" - % (self.graphname, self.nodes_created, self.relations_created, runtime)) - -# Superclass for label and relation CSV files -class EntityFile(object): - def __init__(self, filename, separator): - # The label or relation type string is the basename of the file - self.entity_str = os.path.splitext(os.path.basename(filename))[0] - # Input file handling - self.infile = io.open(filename, 'rt') - # Initialize CSV reader that ignores leading whitespace in each field - # and does not modify input quote characters - self.reader = csv.reader(self.infile, delimiter=separator, skipinitialspace=True, quoting=QUOTING) - - self.prop_offset = 0 # Starting index of properties in row - self.prop_count = 0 # Number of properties per entity - - self.packed_header = b'' - self.binary_entities = [] - self.binary_size = 0 # size of binary token - self.count_entities() # number of entities/row in file. - - # Count number of rows in file. - def count_entities(self): - self.entities_count = 0 - self.entities_count = sum(1 for line in self.infile) - # discard header row - self.entities_count -= 1 - # seek back - self.infile.seek(0) - return self.entities_count - - # Simple input validations for each row of a CSV file - def validate_row(self, expected_col_count, row): - # Each row should have the same number of fields - if len(row) != expected_col_count: - raise CSVError("%s:%d Expected %d columns, encountered %d ('%s')" - % (self.infile.name, self.reader.line_num, expected_col_count, len(row), ','.join(row))) - - # If part of a CSV file was sent to Redis, delete the processed entities and update the binary size - def reset_partial_binary(self): - self.binary_entities = [] - self.binary_size = len(self.packed_header) - - # Convert property keys from a CSV file header into a binary string - def pack_header(self, header): - prop_count = len(header) - self.prop_offset - # String format - entity_bytes = self.entity_str.encode() - fmt = "=%dsI" % (len(entity_bytes) + 1) # Unaligned native, entity name, count of properties - args = [entity_bytes, prop_count] - for p in header[self.prop_offset:]: - prop = p.encode() - fmt += "%ds" % (len(prop) + 1) # encode string with a null terminator - args.append(prop) - return struct.pack(fmt, *args) - - # Convert a list of properties into a binary string - def pack_props(self, line): - props = [] - for num, field in enumerate(line[self.prop_offset:]): - field_type_idx = self.prop_offset+num - try: - FIELD_TYPES[self.entity_str][field_type_idx] - except: - props.append(prop_to_binary(field, None)) - else: - props.append(prop_to_binary(field, FIELD_TYPES[self.entity_str][field_type_idx])) - return b''.join(p for p in props) - - def to_binary(self): - return self.packed_header + b''.join(self.binary_entities) - -# Handler class for processing label csv files. -class Label(EntityFile): - def __init__(self, infile, separator): - super(Label, self).__init__(infile, separator) - expected_col_count = self.process_header() - self.process_entities(expected_col_count) - self.infile.close() - - def process_header(self): - # Header format: - # node identifier (which may be a property key), then all other property keys - header = next(self.reader) - expected_col_count = len(header) - # If identifier field begins with an underscore, don't add it as a property. - if header[0][0] == '_': - self.prop_offset = 1 - self.packed_header = self.pack_header(header) - self.binary_size += len(self.packed_header) - return expected_col_count - - def process_entities(self, expected_col_count): - global NODE_DICT - global TOP_NODE_ID - global QUERY_BUF - - entities_created = 0 - with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str) as reader: - for row in reader: - self.validate_row(expected_col_count, row) - # Add identifier->ID pair to dictionary if we are building relations - if NODE_DICT is not None: - if row[0] in NODE_DICT: - sys.stderr.write("Node identifier '%s' was used multiple times - second occurrence at %s:%d\n" - % (row[0], self.infile.name, self.reader.line_num)) - if CONFIGS.skip_invalid_nodes is False: - exit(1) - NODE_DICT[row[0]] = TOP_NODE_ID - TOP_NODE_ID += 1 - row_binary = self.pack_props(row) - row_binary_len = len(row_binary) - # If the addition of this entity will make the binary token grow too large, - # send the buffer now. - if self.binary_size + row_binary_len > CONFIGS.max_token_size: - QUERY_BUF.labels.append(self.to_binary()) - QUERY_BUF.send_buffer() - self.reset_partial_binary() - # Push the label onto the query buffer again, as there are more entities to process. - QUERY_BUF.labels.append(self.to_binary()) - - QUERY_BUF.node_count += 1 - entities_created += 1 - self.binary_size += row_binary_len - self.binary_entities.append(row_binary) - QUERY_BUF.labels.append(self.to_binary()) - print("%d nodes created with label '%s'" % (entities_created, self.entity_str)) - -# Handler class for processing relation csv files. -class RelationType(EntityFile): - def __init__(self, infile, separator): - super(RelationType, self).__init__(infile, separator) - expected_col_count = self.process_header() - self.process_entities(expected_col_count) - self.infile.close() - - def process_header(self): - # Header format: - # source identifier, dest identifier, properties[0..n] - header = next(self.reader) - # Assume rectangular CSVs - expected_col_count = len(header) - self.prop_count = expected_col_count - 2 - if self.prop_count < 0: - raise CSVError("Relation file '%s' should have at least 2 elements in header line." - % (self.infile.name)) - - self.prop_offset = 2 - self.packed_header = self.pack_header(header) # skip src and dest identifiers - self.binary_size += len(self.packed_header) - return expected_col_count - - def process_entities(self, expected_col_count): - entities_created = 0 - with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str) as reader: - for row in reader: - self.validate_row(expected_col_count, row) - try: - src = NODE_DICT[row[0]] - dest = NODE_DICT[row[1]] - except KeyError as e: - print("Relationship specified a non-existent identifier. src: %s; dest: %s" % (row[0], row[1])) - if CONFIGS.skip_invalid_edges is False: - raise e - continue - fmt = "=QQ" # 8-byte unsigned ints for src and dest - row_binary = struct.pack(fmt, src, dest) + self.pack_props(row) - row_binary_len = len(row_binary) - # If the addition of this entity will make the binary token grow too large, - # send the buffer now. - if self.binary_size + row_binary_len > CONFIGS.max_token_size: - QUERY_BUF.reltypes.append(self.to_binary()) - QUERY_BUF.send_buffer() - self.reset_partial_binary() - # Push the reltype onto the query buffer again, as there are more entities to process. - QUERY_BUF.reltypes.append(self.to_binary()) - - QUERY_BUF.relation_count += 1 - entities_created += 1 - self.binary_size += row_binary_len - self.binary_entities.append(row_binary) - QUERY_BUF.reltypes.append(self.to_binary()) - print("%d relations created for type '%s'" % (entities_created, self.entity_str)) - -# Convert a single CSV property field into a binary stream. -# Supported property types are string, numeric, boolean, and NULL. -# type is either Type.NUMERIC, Type.BOOL or Type.STRING, and explicitly sets the value to this type if possible -def prop_to_binary(prop_val, type): - # All format strings start with an unsigned char to represent our Type enum - format_str = "=B" - if prop_val is None: - # An empty field indicates a NULL property - return struct.pack(format_str, Type.NULL) - - # If field can be cast to a float, allow it - if type == None or type == Type.NUMERIC: - try: - numeric_prop = float(prop_val) - if not math.isnan(numeric_prop) and not math.isinf(numeric_prop): # Don't accept non-finite values. - return struct.pack(format_str + "d", Type.NUMERIC, numeric_prop) - except: - pass - - if type == None or type == Type.BOOL: - # If field is 'false' or 'true', it is a boolean - if prop_val.lower() == 'false': - return struct.pack(format_str + '?', Type.BOOL, False) - elif prop_val.lower() == 'true': - return struct.pack(format_str + '?', Type.BOOL, True) - - if type == None or type == Type.STRING: - # If we've reached this point, the property is a string - encoded_str = str.encode(prop_val) # struct.pack requires bytes objects as arguments - # Encoding len+1 adds a null terminator to the string - format_str += "%ds" % (len(encoded_str) + 1) - return struct.pack(format_str, Type.STRING, encoded_str) - - ## if it hasn't returned by this point, it is trying to set it to a type that it can't adopt - raise Exception("unable to parse [" + prop_val + "] with type ["+repr(type)+"]") +from configs import Configs +from query_buffer import QueryBuffer +from label import Label +from relation_type import RelationType +import module_vars # For each node input file, validate contents and convert to binary format. # If any buffer limits have been reached, flush all enqueued inserts to Redis. def process_entity_csvs(cls, csvs, separator): - global QUERY_BUF for in_csv in csvs: # Build entity descriptor from input CSV entity = cls(in_csv, separator) added_size = entity.binary_size # Check to see if the addition of this data will exceed the buffer's capacity - if (QUERY_BUF.buffer_size + added_size >= CONFIGS.max_buffer_size - or QUERY_BUF.redis_token_count + len(entity.binary_entities) >= CONFIGS.max_token_count): + if (module_vars.QUERY_BUF.buffer_size + added_size >= module_vars.CONFIGS.max_buffer_size + or module_vars.QUERY_BUF.redis_token_count + len(entity.binary_entities) >= module_vars.CONFIGS.max_token_count): # Send and flush the buffer if appropriate - QUERY_BUF.send_buffer() + module_vars.QUERY_BUF.send_buffer() # Add binary data to list and update all counts - QUERY_BUF.redis_token_count += len(entity.binary_entities) - QUERY_BUF.buffer_size += added_size + module_vars.QUERY_BUF.redis_token_count += len(entity.binary_entities) + module_vars.QUERY_BUF.buffer_size += added_size # Command-line arguments @click.command() @@ -360,26 +52,19 @@ def process_entity_csvs(cls, csvs, separator): def bulk_insert(graph, host, port, password, nodes, relations, separator, max_token_count, max_buffer_size, max_token_size, quote, field_types, skip_invalid_nodes, skip_invalid_edges): - global CONFIGS - global NODE_DICT - global TOP_NODE_ID - global QUERY_BUF - global QUOTING - global FIELD_TYPES - if sys.version_info[0] < 3: raise Exception("Python 3 is required for the RedisGraph bulk loader.") if field_types is not None: try: - FIELD_TYPES = json.loads(field_types) + module_vars.FIELD_TYPES = json.loads(field_types) except: raise Exception("Problem parsing field-types. Use the format {