-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathapi.go
More file actions
213 lines (178 loc) · 8.16 KB
/
api.go
File metadata and controls
213 lines (178 loc) · 8.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
package klevdb
import (
"errors"
"fmt"
"time"
"github.com/klev-dev/klevdb/pkg/index"
"github.com/klev-dev/klevdb/pkg/message"
"github.com/klev-dev/klevdb/pkg/segment"
)
const (
// OffsetOldest represents the smallest offset still available
// Use it to consume all messages, starting at the first available
OffsetOldest = message.OffsetOldest
// OffsetNewest represents the offset that will be used for the next publish
// Use it to consume only new messages
OffsetNewest = message.OffsetNewest
// OffsetInvalid is the offset returned when error is detected
OffsetInvalid = message.OffsetInvalid
)
type Message = message.Message
// InvalidMessage returned when an error has occurred
var InvalidMessage = message.Invalid
// ErrInvalidOffset error is returned when the offset attribute is invalid or out of bounds
var ErrInvalidOffset = message.ErrInvalidOffset
// ErrNotFound error is returned when the offset, key or timestamp is not found
var ErrNotFound = message.ErrNotFound
// ErrNoIndex error is returned when we try to use key or timestamp, but the log doesn't include index on them
var ErrNoIndex = errors.New("no index")
// ErrReadonly error is returned when attempting to modify (e.g. publish or delete) from a log that is open as a readonly
var ErrReadonly = errors.New("log opened in readonly mode")
type Stats = segment.Stats
type Options struct {
// When set will try to create all directories
CreateDirs bool
// Open the store in readonly mode
Readonly bool
// Index message keys, enabling GetByKey and OffsetByKey.
// This setting must not change after the store is first created; changing it will return ErrCorrupted.
KeyIndex bool
// Index message times, enabling GetByTime and OffsetByTime.
// This setting must not change after the store is first created; changing it will return ErrCorrupted.
TimeIndex bool
// Force filesystem sync after each Publish
AutoSync bool
// At what segment size it will rollover to a new segment. Defaults to 1MB.
Rollover int64
// Check the head segment for integrity, before opening it for reading/writing.
Check bool
// Recover any good prefix from the head segment, before opening it for reading/writing. If the whole
// segment is readable and passing the integrity checks, Recover is a noop. If both Check and Recover are set,
// Open will directly try to recover the segment in read-write mode.
Recover bool
// Upgrade specifies how to upgrade the versions
Version VersionOptions
}
type Version struct {
messages message.Version
index index.Version
}
var (
vUnknown = Version{}
V1 = Version{message.V1, index.V1}
V2 = Version{message.V2, index.V2}
VLast = V2
)
type VersionOptions struct {
// NewSegmentsVersion indicates what version will new segments use
NewSegmentsVersion Version
// KeepRewriteVersion rewriting segments (delete) will keep the original segment version
KeepRewriteVersion bool
// EagerVersionMigrate when true, open will rewrite all segments with NewSegmentsVersion
EagerVersionMigrate bool
}
type Log interface {
// Publish appends messages to the log.
// It returns the offset of the next message to be appended.
// The offset of the message is ignored, set to the actual offset.
// If the time of the message is 0, it is set to the current UTC time.
Publish(messages []Message) (nextOffset int64, err error)
// NextOffset returns the offset of the next message to be published.
NextOffset() (nextOffset int64, err error)
// Consume retrieves messages from the log, starting at the offset.
// It returns offset, which can be used to retrieve for the next consume.
// If offset == OffsetOldest, the first message will be the oldest
// message still available on the log. If the log is empty,
// it will return no error, nextOffset will be 0
// If offset == OffsetNewest, no actual messages will be returned,
// but nextOffset will be set to the offset that will be used
// for the next Publish call
// If offset is before the first available on the log, or is after
// NextOffset, it returns ErrInvalidOffset
// If the exact offset is already deleted, it will start consuming
// from the next available offset.
// Consume is allowed to return no messages, but with increasing nextOffset
// in case messages between offset and nextOffset have been deleted.
// NextOffset is always bigger than offset, unless we are caught up
// to the head of the log in which case they are equal.
Consume(offset int64, maxCount int64) (nextOffset int64, messages []Message, err error)
// ConsumeByKey is similar to Consume, but only returns messages matching the key
ConsumeByKey(key []byte, offset int64, maxCount int64) (nextOffset int64, messages []Message, err error)
// Get retrieves a single message, by its offset
// If offset == OffsetOldest, it returns the first message on the log
// If offset == OffsetNewest, it returns the last message on the log
// If offset is before the first available on the log, or is after
// NextOffset, it returns ErrInvalidOffset
// If log is empty, it returns ErrInvalidOffset
// If the exact offset has been deleted, it returns ErrNotFound
Get(offset int64) (message Message, err error)
// GetByKey retrieves the last message in the log for this key
// If no such message is found, it returns ErrNotFound
GetByKey(key []byte) (message Message, err error)
// OffsetByKey retrieves the last message offset in the log for this key
// If no such message is found, it returns ErrNotFound
OffsetByKey(key []byte) (offset int64, err error)
// GetByTime retrieves the first message after start time
// If start time is after all messages in the log, it returns ErrNotFound
GetByTime(start time.Time) (message Message, err error)
// OffsetByTime retrieves the first message offset and its time after start time
// If start time is after all messages in the log, it returns ErrNotFound
OffsetByTime(start time.Time) (offset int64, messageTime time.Time, err error)
// Delete tries to delete a set of messages by their offset
// from the log and returns the amount of storage deleted
// It does not guarantee that it will delete all messages,
// it returns list of actually deleted messages.
Delete(offsets map[int64]struct{}) (deletedMessages []Message, deletedSize int64, err error)
// Size returns the amount of storage a message occupies in the
// NewSegmentsVersion format (see VersionOptions), plus the index overhead.
// For logs with mixed V1/V2 segments this may differ from the actual
// on-disk size of messages stored in older segments.
Size(m Message) int64
// Stat returns log stats like disk space, number of messages
Stat() (Stats, error)
// Backup takes a backup snapshot of this log to another location
Backup(dir string) error
// Sync forces persisting data to the disk. It returns the nextOffset
// at the time of the Sync, so clients can determine what portion
// of the log is now durable.
Sync() (nextOffset int64, err error)
// GC releases any unused resources associated with this log
GC(unusedFor time.Duration) error
// Close closes the log
Close() error
}
// Stat stats a store directory, without opening the store
func Stat(dir string, opts Options) (Stats, error) {
return segment.StatDir(dir, index.Params{
Times: opts.TimeIndex,
Keys: opts.KeyIndex,
})
}
// Backup backups a store directory to another location, without opening the store
func Backup(src, dst string) error {
return segment.BackupDir(src, dst)
}
// Check runs an integrity check, without opening the store
func Check(dir string, opts Options) error {
return segment.CheckDir(dir, index.Params{
Times: opts.TimeIndex,
Keys: opts.KeyIndex,
})
}
// Recover rewrites the storage to include all messages prior the first that fails an integrity check
func Recover(dir string, opts Options) error {
return segment.RecoverDir(dir, index.Params{
Times: opts.TimeIndex,
Keys: opts.KeyIndex,
})
}
// Migrate rewrites all segments with a concrete options and version
func Migrate(dir string, opts Options, version Version) error {
if version == vUnknown {
return fmt.Errorf("migrate: version must be specified (e.g. klevdb.V2)")
}
return segment.MigrateDir(dir, version.messages, version.index, index.Params{
Times: opts.TimeIndex,
Keys: opts.KeyIndex,
})
}