Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 38 additions & 20 deletions src/file.jl
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ function File(source;
pool::Union{Bool, Real}=0.1,
strict::Bool=false,
silencewarnings::Bool=false,
invalidrow=nothing,
invalidcell=nothing,
debug::Bool=false,
parsingdebug::Bool=false,)

Expand Down Expand Up @@ -241,15 +243,15 @@ function File(source;
# if a column type if promoted to string, the values are stored in the corresponding `tape` instead of `poslen`
if threaded === true
# multithread
rows, tapes, refs, typecodes, intsentinels = multithreadparse(typecodes, buf, datapos, len, options, coloptions, rowsguess, pool, ncols, typemap, limit, debug)
rows, tapes, refs, typecodes, intsentinels = multithreadparse(typecodes, buf, datapos, len, options, coloptions, rowsguess, pool, ncols, typemap, limit, invalidrow, invalidcell, debug)
finalrows = sum(rows)
else
intsentinels = fill(INT_SENTINEL, ncols)
tapes, poslens = allocate(rowsguess, ncols, typecodes)
refs = Vector{Dict{String, UInt64}}(undef, ncols)
lastrefs = zeros(UInt64, ncols)
t = Base.time()
rows, tapes, poslens = parsetape(Val(transpose), ncols, gettypecodes(typemap), tapes, poslens, buf, datapos, len, limit, positions, pool, refs, lastrefs, rowsguess, typecodes, intsentinels, debug, options, coloptions)
rows, tapes, poslens = parsetape(Val(transpose), ncols, gettypecodes(typemap), tapes, poslens, buf, datapos, len, limit, positions, pool, refs, lastrefs, rowsguess, typecodes, intsentinels, invalidrow, invalidcell, debug, options, coloptions)
finalrows = rows
debug && println("time for initial parsing to tape: $(Base.time() - t)")
end
Expand Down Expand Up @@ -284,7 +286,7 @@ function File(source;
return File{something(threaded, false)}(h.name, h.names, finaltypes, finalrows, ncols, columns, lookup)
end

function multithreadparse(typecodes, buf, datapos, len, options, coloptions, rowsguess, pool, ncols, typemap, limit, debug)
function multithreadparse(typecodes, buf, datapos, len, options, coloptions, rowsguess, pool, ncols, typemap, limit, invalidrow, invalidcell, debug)
N = Threads.nthreads()
chunksize = div(len - datapos, N)
ranges = [datapos, (datapos + chunksize * i for i = 1:N)...]
Expand All @@ -310,7 +312,7 @@ function multithreadparse(typecodes, buf, datapos, len, options, coloptions, row
tl_intsentinels = fill(INT_SENTINEL, ncols)
tl_datapos = ranges[i]
tl_len = ranges[i + 1] - (i != N)
tl_rows, tl_tapes, tl_poslens = parsetape(Val(false), ncols, gettypecodes(typemap), tl_tapes, tl_poslens, buf, tl_datapos, tl_len, limit, Int64[], pool, tl_refs, tl_lastrefs, rowchunkguess, perthreadtypecodes[i], tl_intsentinels, debug, options, coloptions)
tl_rows, tl_tapes, tl_poslens = parsetape(Val(false), ncols, gettypecodes(typemap), tl_tapes, tl_poslens, buf, tl_datapos, tl_len, limit, Int64[], pool, tl_refs, tl_lastrefs, rowchunkguess, perthreadtypecodes[i], tl_intsentinels, invalidrow, invalidcell, debug, options, coloptions)
debug && println("thread = $(Threads.threadid()): time for parsing: $(Base.time() - tt)")
perthreadrows[i] = tl_rows
perthreadtapes[i] = tl_tapes
Expand Down Expand Up @@ -446,13 +448,13 @@ end # @static if VERSION >= v"1.3-DEV"
return perthreadrows, perthreadtapes, refs, typecodes, intsentinels
end

function parsetape(TR::Val{transpose}, ncols, typemap, tapes, poslens, buf, pos, len, limit, positions, pool, refs, lastrefs, rowsguess, typecodes, intsentinels, debug, options::Parsers.Options{ignorerepeated}, coloptions) where {transpose, ignorerepeated}
function parsetape(TR::Val{transpose}, ncols, typemap, tapes, poslens, buf, pos, len, limit, positions, pool, refs, lastrefs, rowsguess, typecodes, intsentinels, invalidrow, invalidcell, debug, options::Parsers.Options{ignorerepeated}, coloptions) where {transpose, ignorerepeated}
row = 0
startpos = pos
if pos <= len && len > 0
while row < limit
row += 1
pos = parserow(row, TR, ncols, typemap, tapes, poslens, buf, pos, len, limit, positions, pool, refs, lastrefs, rowsguess, typecodes, intsentinels, debug, options, coloptions)
pos = parserow(row, TR, ncols, typemap, tapes, poslens, buf, pos, len, limit, positions, pool, refs, lastrefs, rowsguess, typecodes, intsentinels, invalidrow, invalidcell, debug, options, coloptions)
pos > len && break
# if our initial row estimate was too few, we need to reallocate our tapes/poslens to read the rest of the file
if row + 1 > rowsguess
Expand Down Expand Up @@ -490,10 +492,10 @@ end
@noinline notenoughcolumns(cols, ncols, row) = println("thread = $(Threads.threadid()) warning: only found $cols / $ncols columns on data row: $row. Filling remaining columns with `missing`")
@noinline toomanycolumns(cols, row) = println("thread = $(Threads.threadid()) warning: parsed expected $cols columns, but didn't reach end of line on data row: $row. Ignoring any extra columns on this row")
@noinline stricterror(T, buf, pos, len, code, row, col) = throw(Error("thread = $(Threads.threadid()) error parsing $T on row = $row, col = $col: \"$(String(buf[pos:pos+len-1]))\", error=$(Parsers.codes(code))"))
@noinline warning(T, buf, pos, len, code, row, col) = println("thread = $(Threads.threadid()) warning: error parsing $T on row = $row, col = $col: \"$(String(buf[pos:pos+len-1]))\", error=$(Parsers.codes(code))")
@noinline warning(T, cell, code, row, col) = println("thread = $(Threads.threadid()) warning: error parsing $T on row = $row, col = $col: \"$cell\", error=$(Parsers.codes(code))")
@noinline fatalerror(buf, pos, len, code, row, col) = throw(Error("thread = $(Threads.threadid()) fatal error, encountered an invalidly quoted field while parsing on row = $row, col = $col: \"$(String(buf[pos:pos+len-1]))\", error=$(Parsers.codes(code)), check your `quotechar` arguments or manually fix the field in the file itself"))

@inline function parserow(row, ::Val{transpose}, ncols, typemap, tapes, poslens, buf, pos, len, limit, positions, pool, refs, lastrefs, rowsguess, typecodes, intsentinels, debug, options::Parsers.Options{ignorerepeated}, coloptions) where {transpose, ignorerepeated}
@inline function parserow(row, ::Val{transpose}, ncols, typemap, tapes, poslens, buf, pos, len, limit, positions, pool, refs, lastrefs, rowsguess, typecodes, intsentinels, invalidrow, invalidcell, debug, options::Parsers.Options{ignorerepeated}, coloptions) where {transpose, ignorerepeated}
for col = 1:ncols
if transpose
@inbounds pos = positions[col]
Expand All @@ -509,17 +511,17 @@ end
elseif type === MISSINGTYPE
pos, code = detect(tape, buf, pos, len, opts, row, col, typemap, pool, refs, lastrefs, intsentinels, debug, typecodes, poslens)
elseif type === INT
pos, code = parseint!(T, tape, buf, pos, len, opts, row, col, typecodes, poslens, intsentinels)
pos, code = parseint!(T, tape, buf, pos, len, opts, row, col, typecodes, poslens, intsentinels, invalidcell)
elseif type === FLOAT
pos, code = parsevalue!(Float64, T, tape, buf, pos, len, opts, row, col, typecodes, poslens)
pos, code = parsevalue!(Float64, T, tape, buf, pos, len, opts, row, col, typecodes, poslens, invalidcell)
elseif type === DATE
pos, code = parsevalue!(Date, T, tape, buf, pos, len, opts, row, col, typecodes, poslens)
pos, code = parsevalue!(Date, T, tape, buf, pos, len, opts, row, col, typecodes, poslens, invalidcell)
elseif type === DATETIME
pos, code = parsevalue!(DateTime, T, tape, buf, pos, len, opts, row, col, typecodes, poslens)
pos, code = parsevalue!(DateTime, T, tape, buf, pos, len, opts, row, col, typecodes, poslens, invalidcell)
elseif type === TIME
pos, code = parsevalue!(Time, T, tape, buf, pos, len, opts, row, col, typecodes, poslens)
pos, code = parsevalue!(Time, T, tape, buf, pos, len, opts, row, col, typecodes, poslens, invalidcell)
elseif type === BOOL
pos, code = parsevalue!(Bool, T, tape, buf, pos, len, opts, row, col, typecodes, poslens)
pos, code = parsevalue!(Bool, T, tape, buf, pos, len, opts, row, col, typecodes, poslens, invalidcell)
elseif type === POOL
pos, code = parsepooled!(T, tape, buf, pos, len, opts, row, col, rowsguess, pool, refs, lastrefs, typecodes, poslens)
else # STRING
Expand All @@ -530,7 +532,11 @@ end
else
if col < ncols
if Parsers.newline(code) || pos > len
options.silencewarnings || notenoughcolumns(col, ncols, row)
if invalidrow !== nothing
invalidrow(row)
elseif !options.silencewarnings
notenoughcolumns(col, ncols, row)
end
for j = (col + 1):ncols
# put in dummy missing values on the tape for missing columns
if !usermissing(typecodes[j])
Expand All @@ -549,7 +555,11 @@ end
end
else
if pos <= len && !Parsers.newline(code)
options.silencewarnings || toomanycolumns(ncols, row)
if invalidrow !== nothing
invalidrow(row)
elseif !options.silencewarnings
toomanycolumns(ncols, row)
end
# ignore the rest of the line
pos = skiptorow(buf, pos, len, options.oq, options.e, options.cq, 1, 2)
end
Expand Down Expand Up @@ -692,7 +702,7 @@ function detect(tape, buf, pos, len, options, row, col, typemap, pool, refs, las
return pos + tlen, code
end

function parseint!(T, tape, buf, pos, len, options, row, col, typecodes, poslens, intsentinels)
function parseint!(T, tape, buf, pos, len, options, row, col, typecodes, poslens, intsentinels, invalidcell)
x, code, vpos, vlen, tlen = Parsers.xparse(Int64, buf, pos, len, options)
if code > 0
if !Parsers.sentinel(code)
Expand Down Expand Up @@ -734,7 +744,11 @@ function parseint!(T, tape, buf, pos, len, options, row, col, typecodes, poslens
end
if user(T)
if !options.strict
options.silencewarnings || warning(Int64, buf, pos, tlen, code, row, col)
if invalidcell !== nothing
invalidcell(Int64, String(buf[pos:pos+tlen-1]), code, row, col)
elseif !options.silencewarnings
warning(Int64, String(buf[pos:pos+tlen-1]), code, row, col)
end
@inbounds typecodes[col] = T | MISSING
@inbounds tape[row] = uint64(intsentinels[col])
else
Expand Down Expand Up @@ -765,7 +779,7 @@ function parseint!(T, tape, buf, pos, len, options, row, col, typecodes, poslens
return pos + tlen, code
end

function parsevalue!(::Type{type}, T, tape, buf, pos, len, options, row, col, typecodes, poslens) where {type}
function parsevalue!(::Type{type}, T, tape, buf, pos, len, options, row, col, typecodes, poslens, invalidcell) where {type}
x, code, vpos, vlen, tlen = Parsers.xparse(type, buf, pos, len, options)
if code > 0
if !Parsers.sentinel(code)
Expand All @@ -784,8 +798,12 @@ function parsevalue!(::Type{type}, T, tape, buf, pos, len, options, row, col, ty
end
if user(T)
if !options.strict
if invalidcell !== nothing
invalidcell(Int64, String(buf[pos:pos+tlen-1]), code, row, col)
elseif !options.silencewarnings
warning(type, String(buf[pos:pos+tlen-1]), code, row, col)
end
code |= Parsers.SENTINEL
options.silencewarnings || warning(type, buf, pos, tlen, code, row, col)
@inbounds typecodes[col] = T | MISSING
@inbounds tape[row] = sentinelvalue(type)
else
Expand Down
12 changes: 9 additions & 3 deletions src/rows.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# no automatic type inference is done, but types are allowed to be passed
# for as many columns as desired; `CSV.detect(row, i)` can also be used to
# use the same inference logic used in `CSV.File` for determing a cell's typed value
struct Rows{transpose, O, IO}
struct Rows{transpose, O, IO, F, F2}
name::String
names::Vector{Symbol} # only includes "select"ed columns
types::Vector{Type} # only includes "select"ed columns
Expand All @@ -20,6 +20,8 @@ struct Rows{transpose, O, IO}
reusebuffer::Bool
tapes::Vector{Vector{UInt64}}
intsentinels::Vector{Int64}
invalidrow::F
invalidcell::F2
lookup::Dict{Symbol, Int}
end

Expand Down Expand Up @@ -111,6 +113,8 @@ function Rows(source;
pool::Union{Bool, Real}=0.1,
strict::Bool=false,
silencewarnings::Bool=false,
invalidrow=nothing,
invalidcell=nothing,
debug::Bool=false,
parsingdebug::Bool=false,
reusebuffer::Bool=false,
Expand All @@ -124,7 +128,7 @@ function Rows(source;
deleteat!(types, h.todrop)
deleteat!(columnmap, h.todrop)
lookup = Dict(nm=>i for (i, nm) in enumerate(h.names))
return Rows{transpose, typeof(h.options), typeof(h.buf)}(
return Rows{transpose, typeof(h.options), typeof(h.buf), typeof(invalidrow), typeof(invalidcell)}(
h.name,
h.names,
types,
Expand All @@ -142,6 +146,8 @@ function Rows(source;
reusebuffer,
tapes,
fill(INT_SENTINEL, h.cols),
invalidrow,
invalidcell,
lookup
)
end
Expand All @@ -161,7 +167,7 @@ const EMPTY_LASTREFS = UInt64[]
(pos > len || row > r.limit) && return nothing
pos > len && return nothing
tapes = r.reusebuffer ? r.tapes : [Vector{UInt64}(undef, usermissing(r.typecodes[i]) ? 0 : 1) for i = 1:r.cols]
pos = parserow(1, Val(transpose), r.cols, EMPTY_TYPEMAP, tapes, EMPTY_POSLENS, r.buf, pos, len, r.limit, r.positions, 0.0, EMPTY_REFS, EMPTY_LASTREFS, 0, r.typecodes, r.intsentinels, false, r.options, r.coloptions)
pos = parserow(1, Val(transpose), r.cols, EMPTY_TYPEMAP, tapes, EMPTY_POSLENS, r.buf, pos, len, r.limit, r.positions, 0.0, EMPTY_REFS, EMPTY_LASTREFS, 0, r.typecodes, r.intsentinels, r.invalidrow, r.invalidcell, false, r.options, r.coloptions)
intsentinels = r.reusebuffer ? r.intsentinels : copy(r.intsentinels)
return Row2(r.names, r.types, r.columnmap, r.typecodes, r.lookup, tapes, r.buf, r.e, r.options, r.coloptions, intsentinels), (pos, len, row + 1)
end
Expand Down
9 changes: 9 additions & 0 deletions test/basics.jl
Original file line number Diff line number Diff line change
Expand Up @@ -403,4 +403,13 @@ f = CSV.File(IOBuffer("time,date,datetime\n10:00:00.0,04/16/2020,2020-04-16 23:1
@test f[1].date == Dates.Date(2020, 4, 16)
@test f[1].datetime == Dates.DateTime(2020, 4, 16, 23, 14)

# manual invalid cell and invalid row callbacks
invalidcells = []
invalidrows = []
f = CSV.File(IOBuffer("a,b,c\n1.0,\nhey,2,3\n"), types=Dict(1=>Float64), invalidrow=row->push!(invalidrows, row), invalidcell=(args...)->push!(invalidcells, args))
@test length(invalidrows) == 1
@test invalidrows[1] == 1
@test length(invalidcells) == 1
@test invalidcells[1] == (Int64, "hey,", -32630, 2, 1)

end