55use Bavix \Entry \Services \BulkService ;
66use Bavix \Entry \Jobs \BulkWriter ;
77use Illuminate \Console \Command ;
8+ use Illuminate \Contracts \Cache \LockTimeoutException ;
9+ use Illuminate \Support \Facades \Cache ;
810
911class BulkWrite extends Command
1012{
@@ -29,23 +31,32 @@ class BulkWrite extends Command
2931 */
3032 public function handle (): void
3133 {
32- $ batchSize = \config ('entry.batchSize ' , 10000 );
33- $ keys = app (BulkService::class)->keys ();
34- foreach ($ keys as $ key ) {
35- [$ bulkName , $ class ] = \explode (': ' , $ key , 2 );
36- $ chunkIterator = app (BulkService::class)
37- ->chunkIterator ($ batchSize , $ key );
38-
39- foreach ($ chunkIterator as $ bulkData ) {
40- foreach ($ bulkData as $ itemKey => $ itemValue ) {
41- $ bulkData [$ itemKey ] = \json_decode ($ itemValue , true );
34+ $ lock = Cache::lock (__CLASS__ , 120 );
35+ try {
36+ $ lock ->block (1 );
37+ // Lock acquired after waiting maximum of second...
38+ $ batchSize = \config ('entry.batchSize ' , 10000 );
39+ $ keys = app (BulkService::class)->keys ();
40+ foreach ($ keys as $ key ) {
41+ [$ bulkName , $ class ] = \explode (': ' , $ key , 2 );
42+ $ chunkIterator = app (BulkService::class)
43+ ->chunkIterator ($ batchSize , $ key );
44+
45+ foreach ($ chunkIterator as $ bulkData ) {
46+ foreach ($ bulkData as $ itemKey => $ itemValue ) {
47+ $ bulkData [$ itemKey ] = \json_decode ($ itemValue , true );
48+ }
49+
50+ $ queueName = \config ('entry.queueName ' , 'default ' );
51+ $ job = new BulkWriter (new $ class , $ bulkData );
52+ $ job ->onQueue ($ queueName );
53+ \dispatch ($ job );
4254 }
43-
44- $ queueName = \config ('entry.queueName ' , 'default ' );
45- $ job = new BulkWriter (new $ class , $ bulkData );
46- $ job ->onQueue ($ queueName );
47- \dispatch ($ job );
4855 }
56+ } catch (LockTimeoutException $ timeoutException ) {
57+ // Unable to acquire lock...
58+ } finally {
59+ optional ($ lock )->release ();
4960 }
5061 }
5162
0 commit comments