Skip to content

Commit cc2fbc0

Browse files
BigYellowHammerandsel
authored andcommitted
Added settings flag exit_after_read that let to terminate the pipeline once it red all files. Closes #212
Fixes #240
1 parent fcea343 commit cc2fbc0

File tree

13 files changed

+159
-6
lines changed

13 files changed

+159
-6
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 4.1.16
2+
- Added configuration setting exit_after_read to read to EOF and terminate
3+
the input [#240](https://github.com/logstash-plugins/logstash-input-file/pull/240)
4+
15
## 4.1.15
26
- Fixed bug in conversion of sincedb_clean_after setting [#257](https://github.com/logstash-plugins/logstash-input-file/pull/257)
37

docs/index.asciidoc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ see <<plugins-{type}s-{plugin}-string_duration,string_duration>> for the details
168168
| <<plugins-{type}s-{plugin}-delimiter>> |<<string,string>>|No
169169
| <<plugins-{type}s-{plugin}-discover_interval>> |<<number,number>>|No
170170
| <<plugins-{type}s-{plugin}-exclude>> |<<array,array>>|No
171+
| <<plugins-{type}s-{plugin}-exit_after_read>> |<<boolean,boolean>>|No
171172
| <<plugins-{type}s-{plugin}-file_chunk_count>> |<<number,number>>|No
172173
| <<plugins-{type}s-{plugin}-file_chunk_size>> |<<number,number>>|No
173174
| <<plugins-{type}s-{plugin}-file_completed_action>> |<<string,string>>, one of `["delete", "log", "log_and_delete"]`|No
@@ -241,6 +242,19 @@ In Tail mode, you might want to exclude gzipped files:
241242
[source,ruby]
242243
exclude => "*.gz"
243244

245+
[id="plugins-{type}s-{plugin}-exit_after_read"]
246+
===== `exit_after_read`
247+
248+
* Value type is <<boolean,boolean>>
249+
* Default value is `false`
250+
251+
This option can be used in `read` mode to enforce closing all watchers when file gets read.
252+
Can be used in situation when content of the file is static and won't change during execution.
253+
When set to `true` it also disables active discovery of the files - only files that were in
254+
the directories when process was started will be read.
255+
It supports `sincedb` entries. When file was processed once, then modified - next run will only
256+
read newly added entries.
257+
244258
[id="plugins-{type}s-{plugin}-file_chunk_count"]
245259
===== `file_chunk_count`
246260

lib/filewatch/read_mode/processor.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,21 @@ def process_active(watched_files)
103103
else
104104
read_file(watched_file)
105105
end
106+
107+
if @settings.exit_after_read
108+
common_detach_when_allread(watched_file)
109+
end
106110
# handlers take care of closing and unwatching
107111
end
108112
end
109113

114+
def common_detach_when_allread(watched_file)
115+
watched_file.unwatch
116+
watched_file.listener.reading_completed
117+
deletable_filepaths << watched_file.path
118+
logger.trace("Whole file read: #{watched_file.path}, removing from collection")
119+
end
120+
110121
def common_deleted_reaction(watched_file, action)
111122
# file has gone away or we can't read it anymore.
112123
watched_file.unwatch

lib/filewatch/settings.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ class Settings
88
attr_reader :exclude, :start_new_files_at, :file_chunk_count, :file_chunk_size
99
attr_reader :sincedb_path, :sincedb_write_interval, :sincedb_expiry_duration
1010
attr_reader :file_sort_by, :file_sort_direction
11+
attr_reader :exit_after_read
1112

1213
def self.from_options(opts)
1314
new.add_options(opts)
@@ -50,6 +51,7 @@ def add_options(opts)
5051
@sincedb_expiry_duration = @opts.fetch(:sincedb_clean_after)
5152
@file_sort_by = @opts[:file_sort_by]
5253
@file_sort_direction = @opts[:file_sort_direction]
54+
@exit_after_read = @opts[:exit_after_read]
5355
self
5456
end
5557

lib/filewatch/watch.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,11 @@ def subscribe(observer, sincedb_collection)
4343
reset_quit
4444
until quit?
4545
iterate_on_state
46+
# Don't discover new files when files to read are known at the beginning
4647
break if quit?
4748
sincedb_collection.write_if_requested
4849
glob += 1
49-
if glob == interval
50+
if glob == interval && !@settings.exit_after_read
5051
discover
5152
glob = 0
5253
end
@@ -76,7 +77,10 @@ def quit
7677
end
7778

7879
def quit?
79-
@quit.true?
80+
if @settings.exit_after_read
81+
@exit = @watched_files_collection.empty?
82+
end
83+
@quit.true? || @exit
8084
end
8185

8286
private

lib/logstash/inputs/file.rb

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,11 @@ class File < LogStash::Inputs::Base
222222
# perhaps path + asc will help to achieve the goal of controlling the order of file ingestion
223223
config :file_sort_direction, :validate => ["asc", "desc"], :default => "asc"
224224

225+
# When in 'read' mode - this option is closing all file watchers when EOF is hit
226+
# This option also disables discovery of new/changes files. It works only on files found at the beginning
227+
# Sincedb still works, if you run LS once again after doing some changes - only new values will be read
228+
config :exit_after_read, :validate => :boolean, :default => false
229+
225230
public
226231

227232
class << self
@@ -260,6 +265,7 @@ def register
260265
:file_chunk_size => @file_chunk_size,
261266
:file_sort_by => @file_sort_by,
262267
:file_sort_direction => @file_sort_direction,
268+
:exit_after_read => @exit_after_read,
263269
}
264270

265271
@completed_file_handlers = []
@@ -280,7 +286,7 @@ def register
280286
raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"")
281287
end
282288
end
283-
289+
284290
@filewatch_config[:sincedb_path] = @sincedb_path
285291

286292
@filewatch_config[:start_new_files_at] = @start_position.to_sym
@@ -301,6 +307,9 @@ def register
301307
end
302308

303309
if tail_mode?
310+
if @exit_after_read
311+
raise ArgumentError.new('The "exit_after_read" setting only works when the "mode" is set to "read"')
312+
end
304313
@watcher_class = FileWatch::ObservingTail
305314
else
306315
@watcher_class = FileWatch::ObservingRead

lib/logstash/inputs/file_listener.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ def eof
2121
def error
2222
end
2323

24+
def reading_completed
25+
end
26+
2427
def timed_out
2528
input.codec.evict(path)
2629
end

logstash-input-file.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-input-file'
4-
s.version = '4.1.15'
4+
s.version = '4.1.16'
55
s.licenses = ['Apache-2.0']
66
s.summary = "Streams events from files"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/filewatch/reading_spec.rb

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,65 @@ module FileWatch
147147
end
148148
end
149149

150+
context "when watching a directory with files using exit_after_read" do
151+
let(:opts) { super.merge(:exit_after_read => true, :max_open_files => 2) }
152+
let(:file_path3) { ::File.join(directory, "3.log") }
153+
let(:file_path4) { ::File.join(directory, "4.log") }
154+
let(:file_path5) { ::File.join(directory, "5.log") }
155+
let(:lines) { [] }
156+
let(:observer) { TestObserver.new(lines) }
157+
let(:listener3) { observer.listener_for(file_path3) }
158+
let(:file_path6) { ::File.join(directory, "6.log") }
159+
let(:listener6) { observer.listener_for(file_path6) }
160+
161+
it "the file is read" do
162+
File.open(file_path3, "w") { |file| file.write("line1\nline2\n") }
163+
reading.watch_this(watch_dir)
164+
reading.subscribe(observer)
165+
expect(listener3.lines).to eq(["line1", "line2"])
166+
end
167+
it "multiple files are read" do
168+
File.open(file_path3, "w") { |file| file.write("line1\nline2\n") }
169+
File.open(file_path4, "w") { |file| file.write("line3\nline4\n") }
170+
reading.watch_this(watch_dir)
171+
reading.subscribe(observer)
172+
expect(listener3.lines.sort).to eq(["line1", "line2", "line3", "line4"])
173+
end
174+
it "multiple files are read even if max_open_files is smaller then number of files" do
175+
File.open(file_path3, "w") { |file| file.write("line1\nline2\n") }
176+
File.open(file_path4, "w") { |file| file.write("line3\nline4\n") }
177+
File.open(file_path5, "w") { |file| file.write("line5\nline6\n") }
178+
reading.watch_this(watch_dir)
179+
reading.subscribe(observer)
180+
expect(listener3.lines.sort).to eq(["line1", "line2", "line3", "line4", "line5", "line6"])
181+
end
182+
it "file as marked as reading_completed" do
183+
File.open(file_path3, "w") { |file| file.write("line1\nline2\n") }
184+
reading.watch_this(watch_dir)
185+
reading.subscribe(observer)
186+
expect(listener3.calls).to eq([:open, :accept, :accept, :eof, :delete, :reading_completed])
187+
end
188+
it "sincedb works correctly" do
189+
File.open(file_path3, "w") { |file| file.write("line1\nline2\n") }
190+
reading.watch_this(watch_dir)
191+
reading.subscribe(observer)
192+
sincedb_record_fields = File.read(sincedb_path).split(" ")
193+
position_field_index = 3
194+
expect(sincedb_record_fields[position_field_index]).to eq("12")
195+
end
196+
it "does not include new files added after start" do
197+
File.open(file_path3, "w") { |file| file.write("line1\nline2\n") }
198+
reading.watch_this(watch_dir)
199+
reading.subscribe(observer)
200+
File.open(file_path6, "w") { |file| file.write("foob\nbar\n") }
201+
expect(listener3.lines).to eq(["line1", "line2"])
202+
expect(listener3.calls).to eq([:open, :accept, :accept, :eof, :delete, :reading_completed])
203+
expect(listener6.calls).to eq([])
204+
205+
end
206+
207+
end
208+
150209
describe "reading fixtures" do
151210
let(:directory) { FIXTURE_DIR }
152211
let(:actions) do

spec/filewatch/spec_helper.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,10 @@ def eof
152152
def timed_out
153153
@calls << :timed_out
154154
end
155+
156+
def reading_completed
157+
@calls << :reading_completed
158+
end
155159
end
156160

157161
attr_reader :listeners

0 commit comments

Comments
 (0)