The stream_in and stream_out functions implement line-by-line processing of JSON data over a connection , such as a socket, url, file or pipe. JSON streaming requires the ndjson format, which slightly differs from fromJSON() and toJSON(), see details.
con: a connection object. If the connection is not open, stream_in and stream_out will automatically open and later close (and destroy) the connection. See details.
handler: a custom function that is called on each page of JSON data. If not specified, the default handler stores all pages and binds them into a single data frame that will be returned by stream_in. See details.
pagesize: number of lines to read/write from/to the connection per iteration.
verbose: print some information on what is going on.
...: arguments for fromJSON() and toJSON() that control JSON formatting/parsing where applicable. Use with caution.
x: object to be streamed out. Currently only data frames are supported.
prefix: string to write before each line (use "\u001e" to write rfc7464 text sequences)
Returns
The stream_out function always returns NULL. When no custom handler is specified, stream_in returns a data frame of all pages binded together. When a custom handler function is specified, stream_in always returns NULL.
Details
Because parsing huge JSON strings is difficult and inefficient, JSON streaming is done using lines of minified JSON records , a.k.a. ndjson. This is pretty standard: JSON databases such as MongoDB use the same format to import/export datasets. Note that this means that the total stream combined is not valid JSON itself; only the individual lines are. Also note that because line-breaks are used as separators, prettified JSON is not permitted: the JSON lines must be minified. In this respect, the format is a bit different from fromJSON() and toJSON() where all lines are part of a single JSON structure with optional line breaks.
The handler is a callback function which is called for each page (batch) of JSON data with exactly one argument (usually a data frame with pagesize rows). If handler is missing or NULL, a default handler is used which stores all intermediate pages of data, and at the very end binds all pages together into one single data frame that is returned by stream_in. When a custom handler function is specified, stream_in does not store any intermediate results and always returns NULL. It is then up to the handler to process or store data pages. A handler function that does not store intermediate results in memory (for example by writing output to another connection) results in a pipeline that can process an unlimited amount of data. See example.
Note that a vector of JSON strings already in R can parsed with stream_in by creating a connection to it with textConnection().
If a connection is not opened yet, stream_in and stream_out
will automatically open and later close the connection. Because R destroys connections when they are closed, they cannot be reused. To use a single connection for multiple calls to stream_in or stream_out, it needs to be opened beforehand. See example.
Examples
# compare formatsx <- iris[1:3,]toJSON(x)stream_out(x)# Trivial examplemydata <- stream_in(url("https://jeroen.github.io/data/iris.json"))## Not run:#stream large dataset to file and backlibrary(nycflights13)stream_out(flights, file(tmp <- tempfile()))flights2 <- stream_in(file(tmp))unlink(tmp)all.equal(flights2, as.data.frame(flights))# stream over HTTPdiamonds2 <- stream_in(url("https://jeroen.github.io/data/diamonds.json"))# stream over HTTP with gzip compressionflights3 <- stream_in(gzcon(url("https://jeroen.github.io/data/nycflights13.json.gz")))all.equal(flights3, as.data.frame(flights))# stream over HTTPS (HTTP+SSL) via curllibrary(curl)flights4 <- stream_in(gzcon(curl("https://jeroen.github.io/data/nycflights13.json.gz")))all.equal(flights4, as.data.frame(flights))# or alternatively:flights5 <- stream_in(gzcon(pipe("curl https://jeroen.github.io/data/nycflights13.json.gz")))all.equal(flights5, as.data.frame(flights))# Full JSON IO stream from URL to file connection.# Calculate delays for flights over 1000 miles in batches of 5klibrary(dplyr)con_in <- gzcon(url("https://jeroen.github.io/data/nycflights13.json.gz"))con_out <- file(tmp <- tempfile(), open ="wb")stream_in(con_in, handler =function(df){ df <- dplyr::filter(df, distance >1000) df <- dplyr::mutate(df, delta = dep_delay - arr_delay) stream_out(df, con_out, pagesize =1000)}, pagesize =5000)close(con_out)# stream it back inmydata <- stream_in(file(tmp))nrow(mydata)unlink(tmp)# Data from http://openweathermap.org/current#bulk# Each row contains a nested data frame.daily14 <- stream_in(gzcon(url("http://78.46.48.103/sample/daily_14.json.gz")), pagesize=50)subset(daily14, city$name =="Berlin")$data[[1]]# Or with dplyr:library(dplyr)daily14f <- flatten(daily14)filter(daily14f, city.name =="Berlin")$data[[1]]# Stream import large data from zip filetmp <- tempfile()download.file("http://jsonstudio.com/wp-content/uploads/2014/02/companies.zip", tmp)companies <- stream_in(unz(tmp,"companies.json"))## End(Not run)