# Tf.data as the default data pipeline

StellarGraph has made great choices in adopting Keras and TensorFlow 2.0 as its underlying technology. However, although pandas and Numpy are relatively easy to use, they don’t offer the full-blown, production-grade versatility of tf.data, which is now established as the canonical data pipeline for TensorFlow.

I’m working on an R&D project for cybersecurity which is very much interested in StellarGraph. Academics are fine with pandas, but the “industrial” side of the collaboration considers that the absence of tf.data is a no-go for the adoption of StellarGraph in the real world. Any insights or strategies for upgrading from pandas to tf.data?

Thanks for getting in touch!

It sounds like you’re interested in tf.data for preprocessing graph data from files/database/… to create a StellarGraph object. Is that accurate?

If so, I think there’s a somewhat fundamental conceptual mismatch that means many of the benefits of tf.data don’t apply. In particular, tf.data seems optimised to be a pipeline that lazily yields pieces of data (possibly repeating) transiently for training/predicting in a model, and they’re discarded before getting the next element. However, graph algorithms typically require these to be far more accessible:

• the edge list needs to be fully realised, to be able to do operations like “get the neighbours of node X” or even “create an adjacency matrix”
• any features need to be fully realised, to be able to do operations like “get the features of node Y”

It would be extremely inefficient to have to repeatedly stream the whole dataset for each query.

That is, using tf.data would be slightly deceiving from a “data pipeline is production-grade” perspective, because internally StellarGraph would eagerly load the whole dataset into memory anyway.

It would allow using some of the tf.data preprocessing tools, but I think that can be achieved “manually” too. In particular, one could use tf.data for loading and transforming and then pull into memory and build the required input for a StellarGraph as a very final step. It’s simplest form, this might look something like:

ds = ... # a tf.data.Dataset containing batches of feature arrays
numpy_arr = np.concatenate(list(numpy.as_numpy_iterator()), axis=0)
df = pd.DataFrame(numpy_arr)


More complicated datasets could be handled by generalising this.

There’s various pieces of active and inactive work in this space too:

Could you speak a bit more about the pieces of tf.data that are most interesting/important for your use case? We might be able to find an appropriate middle ground.

Thanks for the reply. I must say I haven’t looked “under the hood” of the StellarGraph algorithms, but my understanding from your answer is that no variant of StellarGraph can function without loading the entirety of the data into memory. It is precisely that limitation which is of concern to us.

Another aspect which I’m not sure I understand is that there is nothing that pandas does that tf.data cannot do, with the added advantage that the latter is fully scalable and readily integrable into the TensorFlow suite. It isn’t obvious to me either why certain operations like the construction of adjacency matrices or the “full realization” of the features preclude a batched, piecewise approach. If anything, that may require an algorithmic adjustment which will open the floodgates of scalability. (Admittedly easier said than done.)

A little bit about our use case: We’re working with attributed node classification on heterogeneous and dynamic networks. I.e., we have terabytes of attributed nodes each corresponding to entities of various types (IP addresses, domain names, ISPs, files, e-mail addresses, etc.). These nodes interact with each other over time. E.g. domain X is queried by IP Y to be hosted at ISP Z etc. Some entities are labeled and we’d like to propagate the labels as the interactions unfold. E.g., if a malicious domain X is hosted at IP Y, then, based on the attributes of X and Y and their respective histories, we can infer a maliciousness score on Y and classify it as “ransomware” or “spyware” etc. The dynamic and heterogeneous nature of this problem is challenging enough, and we’re still grappling with that. But from from an infrastructure point of view, what is certain is that all this data cannot be loaded in memory via pandas.

Thanks for sharing abit more about your use case, detecting spyware and randomsware sounds interesting!

To work with graphs that can’t fit into memory we now support using GraphSAGE on a graph stored on Neo4j, these demos show you how to get started: https://stellargraph.readthedocs.io/en/stable/demos/connector/neo4j/index.html. Unfortunately, this still requires the node features to be loaded into memory, but many graphs with timestamped edges like yours have much more edge data than node data so this might be sufficient. Out of curiosity, how much node data, and how much edge data do you have?

We’re actively working on Neo4j support, and our next release will support looking up features stored in Neo4j and more algorithms! What algorithms are you interested in trying on your dataset? We might be able to prioritise adding Neo4j implementations for these.

there is nothing that pandas does that tf.data cannot do

This is an interesting point, unfortunately there are some things that tensorflow doesn’t do well. I’ll talk about why we haven’t used tf.data either for in-memory, or on-disk datasets.

• tf.data for on-disk datasets: tf.data is optimized for streaming data which works great for standard ML workloads like computer vision and NLP, but graph traversal requires lookups of neighbours, neighbours of neighbours, etc. which is a tricky problem to solve optimally. tf.data doesn’t have the right tools for this yet, and any solution we provide would basically be a graph database. That’s why we leverage Neo4j to efficiently traverse graphs on disc

• tf.data for in-memory datasets: most of our algorithms are based on graph traversal which requires fast neighbour lookups. We would really like to exploit tf.data's parallelism here but tensorflow's slice operation is catastrophically slow, 300x slower than numpy to be exact which negates any performance benefit https://github.com/tensorflow/tensorflow/issues/39750

That being said, we love contributions and if you have any ideas for efficiently looking up data on disk (or anything else) please file an issues and even a PR: https://github.com/stellargraph/stellargraph/blob/develop/CONTRIBUTING.md#proposing-a-new-feature

(Thanks for waiting over our weekend.)

Yeah, some algorithms are designed to train scalable using subgraphs of large graphs. For instance, Cluster-GCN, and algorithms for “knowledge graphs” (basically graphs defined by “triples” of (source node, edge type, target node) without node features or even node types) via PyTorch BigGraph (not (yet) implemented in StellarGraph).

Note that the latter are somewhat more similar to traditional algorithms, in that each edge/triple can be processed in more isolation, without doing too many random access queries to get node or edge attributes (because there are none).

Thanks for giving so much info!

I’m curious about a few things that might guide the best path forward:

• Echoing @kieranr question, is most of the data in the graph data itself (e.g. node identifiers and the edges between them) or in the node attributes or in the edge attributes?
• what format is the data stored in? Is it already clustered/grouped in any form? If it isn’t clustered and there’s non-trivial/large node attributes, streaming becomes quite hard, because any edge could require the attributes of any node, and if those nodes are just stored on disk, doing efficient random access of the nodes attribute data to fetch them hits the “implement a database” problem @kieranr is hinting at. In addition, even just computing subgraphs to process with a Cluster-GCN-style algorithm requires a significant amount of work.

Distributed systems are hard, as are “databases”, and so StellarGraph is currently focusing our limited engineering resources on:

• in-memory graphs, given one can fit a lot of data into memory currently (e.g. EC2 offers machines with terabytes of RAM)
• connections to existing graph databases, in particular Neo4j

Expanding on this, we have recently added experimental Cluster-GCN on Neo4j support.

Thanks for elaborating. I may well be getting ahead of myself with my insistence on tf.data since we’re still at the prototyping stage. However, what remains certain is that our “guardians of the production temple” will dismiss any PoC that is tied to in-memory operations.

Most of the data sits with the nodes.

The data is stored as design matrices per node type. I.e., we’ll have a design matrix for IP addresses, another for domain names etc. As those nodes interact with each other through time as N-partite “events”, the design matrices are updated to reflect those interactions. Let’s say nodes x, y and z interact in a given context, their respective entries in the design matrices need to be updated to reflect that interaction. I was hoping (perhaps naively) that the read-write overhead of the attributes of x, y and z during this particular 3-partite event would be a manageable, certainly compared to having the full design matrices of every single node type loaded into memory.

My apologies if I’m not clear. We’re still grappling with the relative novelty of these tools/concepts.

Fair enough. It’s unfortunate that the random access nature of graphs makes any sort of out-of-memory support a huge step up from doing things in-memory!

Thanks.

Hm, I’m not quite understanding the correspondence to a graph. Does the “design matrix” for the each type have a column for every other node? For instance, if there’s 3 IP addresses, 4 domain names and 5 ISPs, the IP address matrix has 3 rows and 4 + 5 = 9 columns.

Random access to disk is very expensive (orders of magnitude more than random access to RAM), so needing to do so for every node feature access is likely to have significant overhead, no matter the size of the data being read. You may already have an intuition for the various costs here, but, if not, https://colin-scott.github.io/personal_website/research/interactive_latency.html has a good comparison of memory vs. SSD vs. disk.

Of course, the increased latency of disk might be worth it, compared to paying for terabytes of RAM, but it’s something to keep in mind. (And operating systems are able to cache (parts of?) files in memory to hide this cost.)

No, because that would be tantamount to a dragging around an unwieldy adjacency matrix of sorts.

This is actually the crux of our conceptual (as opposed to implementational) investigation. Let’s say we have three categories C_1, C_2, C_3 (e.g., IPs, domains, ISPs). Each category has its own design matrix that stores the attributes of all instances of those categories. The dimension of C_k is N_k\times M_k, where N_k is be the number of instances c_k^{(j)}, with j \in \{1, \cdots, N_k \}, of category C_k and M_k is be the dimension of its attribute space. So far there is no mention of anything “graph”.

Then comes the fact that at time t a subset of nodes, say, \{ c_3^{(7)}, c_2^{(4)}, c_2^{(13)} \} get to interact and “infect” each other to various degrees. The same happens at t+1 with a different subset of nodes and so on. One possibility is that each of these interactions types (not instances) can be appended as a column in the design matrices of the protagonist nodes. It is then those columns that could act as graph features in a manner somewhat akin to, say, the aggregation feature in GraphSAGE.

As I said in the OP, our problem is

1. the classification of
2. attributed nodes,
3. in a dynamic (both edge- and node-wise),
4. heterogeneous graph,

where the dynamic aspect is essentially a timestamped sequence of “interactions” between the various nodes. Here, “interactions” are meant to encapsulate any a priori black-boxed context where a subset of interconnected nodes influence each other so as to update their labels or attributes. An interaction is therefore broader than the mere instantiation of a (bipartite) edge.

Did I add to the confusion?

Indeed, I was a little concerned that it’d be unwieldy and inefficient.

Ah, ok, so the design matrix are the non-graph features associated with each node? I understand.

Hm, this sounds like it’s most naturally modelled as a “hyperedge” that connects more than two vertices. I don’t know of extensive research for these forms of graphs (there’s nothing yet implemented in Stellargraph that handles this sort of structure natively).

Another option you may’ve already considered would be to have an explicit event nodes (such as e_t for the event at time t), and have edges from each of c_3^{(7)}, c_2^{(4)} and c_2^{(13)} to e_t. (I guess this is somewhere between the aggregated “interaction type” form and the hyperedge form.) One may have to increase (~double) the network aggregation depth to have it “see” the same nodes.

I think I understand your problem better, but I’m not sure I can give much specific advice for how to handle it with StellarGraph, given it looks like there’s still a lot of modelling questions and potentially the hard-to-surmount in-memory issue.

(Thanks again for waiting over the weekend!)

@huon Thanks for the feedback. I’ve spent the past couple of weeks formalizing the problem on paper (not on the initial tf.data issue, but rather the conceptual questions that arose later in this thread). I was wondering if there is anybody in particular within the StellarGraph team which I could reach out to in order to discuss a potential collaboration, or at least get some feedback from?

What I have is unpublished work so far, so I’d be reluctant to share it directly to the forum, although the idea is to eventually turn it into a publication if it were to gain traction.