stream_in function

Streaming JSON input/output

Streaming JSON input/output

The stream_in and stream_out functions implement line-by-line processing of JSON data over a connection , such as a socket, url, file or pipe. JSON streaming requires the ndjson format, which slightly differs from fromJSON() and toJSON(), see details.

stream_in(con, handler = NULL, pagesize = 500, verbose = TRUE, ...) stream_out(x, con = stdout(), pagesize = 500, verbose = TRUE, prefix = "", ...)

Arguments

  • con: a connection object. If the connection is not open, stream_in and stream_out will automatically open and later close (and destroy) the connection. See details.
  • handler: a custom function that is called on each page of JSON data. If not specified, the default handler stores all pages and binds them into a single data frame that will be returned by stream_in. See details.
  • pagesize: number of lines to read/write from/to the connection per iteration.
  • verbose: print some information on what is going on.
  • ...: arguments for fromJSON() and toJSON() that control JSON formatting/parsing where applicable. Use with caution.
  • x: object to be streamed out. Currently only data frames are supported.
  • prefix: string to write before each line (use "\u001e" to write rfc7464 text sequences)

Returns

The stream_out function always returns NULL. When no custom handler is specified, stream_in returns a data frame of all pages binded together. When a custom handler function is specified, stream_in always returns NULL.

Details

Because parsing huge JSON strings is difficult and inefficient, JSON streaming is done using lines of minified JSON records , a.k.a. ndjson. This is pretty standard: JSON databases such as MongoDB use the same format to import/export datasets. Note that this means that the total stream combined is not valid JSON itself; only the individual lines are. Also note that because line-breaks are used as separators, prettified JSON is not permitted: the JSON lines must be minified. In this respect, the format is a bit different from fromJSON() and toJSON() where all lines are part of a single JSON structure with optional line breaks.

The handler is a callback function which is called for each page (batch) of JSON data with exactly one argument (usually a data frame with pagesize rows). If handler is missing or NULL, a default handler is used which stores all intermediate pages of data, and at the very end binds all pages together into one single data frame that is returned by stream_in. When a custom handler function is specified, stream_in does not store any intermediate results and always returns NULL. It is then up to the handler to process or store data pages. A handler function that does not store intermediate results in memory (for example by writing output to another connection) results in a pipeline that can process an unlimited amount of data. See example.

Note that a vector of JSON strings already in R can parsed with stream_in by creating a connection to it with textConnection().

If a connection is not opened yet, stream_in and stream_out

will automatically open and later close the connection. Because R destroys connections when they are closed, they cannot be reused. To use a single connection for multiple calls to stream_in or stream_out, it needs to be opened beforehand. See example.

Examples

# compare formats x <- iris[1:3,] toJSON(x) stream_out(x) # Trivial example mydata <- stream_in(url("https://jeroen.github.io/data/iris.json")) ## Not run: #stream large dataset to file and back library(nycflights13) stream_out(flights, file(tmp <- tempfile())) flights2 <- stream_in(file(tmp)) unlink(tmp) all.equal(flights2, as.data.frame(flights)) # stream over HTTP diamonds2 <- stream_in(url("https://jeroen.github.io/data/diamonds.json")) # stream over HTTP with gzip compression flights3 <- stream_in(gzcon(url("https://jeroen.github.io/data/nycflights13.json.gz"))) all.equal(flights3, as.data.frame(flights)) # stream over HTTPS (HTTP+SSL) via curl library(curl) flights4 <- stream_in(gzcon(curl("https://jeroen.github.io/data/nycflights13.json.gz"))) all.equal(flights4, as.data.frame(flights)) # or alternatively: flights5 <- stream_in(gzcon(pipe("curl https://jeroen.github.io/data/nycflights13.json.gz"))) all.equal(flights5, as.data.frame(flights)) # Full JSON IO stream from URL to file connection. # Calculate delays for flights over 1000 miles in batches of 5k library(dplyr) con_in <- gzcon(url("https://jeroen.github.io/data/nycflights13.json.gz")) con_out <- file(tmp <- tempfile(), open = "wb") stream_in(con_in, handler = function(df){ df <- dplyr::filter(df, distance > 1000) df <- dplyr::mutate(df, delta = dep_delay - arr_delay) stream_out(df, con_out, pagesize = 1000) }, pagesize = 5000) close(con_out) # stream it back in mydata <- stream_in(file(tmp)) nrow(mydata) unlink(tmp) # Data from http://openweathermap.org/current#bulk # Each row contains a nested data frame. daily14 <- stream_in(gzcon(url("http://78.46.48.103/sample/daily_14.json.gz")), pagesize=50) subset(daily14, city$name == "Berlin")$data[[1]] # Or with dplyr: library(dplyr) daily14f <- flatten(daily14) filter(daily14f, city.name == "Berlin")$data[[1]] # Stream import large data from zip file tmp <- tempfile() download.file("http://jsonstudio.com/wp-content/uploads/2014/02/companies.zip", tmp) companies <- stream_in(unz(tmp, "companies.json")) ## End(Not run)

References

MongoDB export format: https://www.mongodb.com/docs/database-tools/mongoexport/

Documentation for the JSON Lines text file format: https://jsonlines.org/

See Also

fromJSON(), read_json()

  • Maintainer: Jeroen Ooms
  • License: MIT + file LICENSE
  • Last published: 2025-03-27