Skip to content

Commit 4c91847

Browse files
committed
Fix valgrind leaks (with minor refactor)
1 parent 896799d commit 4c91847

File tree

9 files changed

+70
-128
lines changed

9 files changed

+70
-128
lines changed

src/DAG/dag.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -897,6 +897,10 @@ void Dag_PopulateOp(RAI_DagOp *currentOp, void *rctx, RedisModuleString **inkeys
897897
currentOp->devicestr = currentOp->sctx->script->devicestr;
898898
}
899899

900+
// todo: temporary patch to fix leak, need refactor
901+
array_free(currentOp->inkeys);
902+
array_free(currentOp->outkeys);
903+
900904
currentOp->inkeys = inkeys;
901905
currentOp->outkeys = outkeys;
902906
currentOp->runkey = runkey;

src/DAG/dag_parser.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,8 +395,8 @@ int DAG_CommandParser(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, b
395395
sprintf(buf, "%04d", *instance);
396396
RedisModuleString *mangled_key = RedisModule_CreateStringFromString(NULL, key);
397397
RedisModule_StringAppendBuffer(NULL, mangled_key, buf, strlen(buf));
398-
399398
AI_dictAdd(mangled_persisted, (void *)mangled_key, (void *)1);
399+
RedisModule_FreeString(NULL, mangled_key);
400400
entry = AI_dictNext(iter);
401401
}
402402
AI_dictReleaseIterator(iter);

src/background_workers.c

Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -391,14 +391,11 @@ void *RedisAI_Run_ThreadMain(void *arg) {
391391

392392
// Run is over, now iterate over the run info structs in the batch
393393
// and see if any error was generated
394-
bool first_dag_error = false;
395394
for (long long i = 0; i < array_len(batch_rinfo); i++) {
396395
RedisAI_RunInfo *rinfo = batch_rinfo[i];
397396
// We record that there was an error for later on
398397
run_error = __atomic_load_n(rinfo->dagError, __ATOMIC_RELAXED);
399-
if (i == 0 && run_error == 1) {
400-
first_dag_error = true;
401-
}
398+
402399
// If there was an error and the reference count for the dag
403400
// has gone to zero and the client is still around, we unblock
404401
if (run_error) {
@@ -413,37 +410,35 @@ void *RedisAI_Run_ThreadMain(void *arg) {
413410
__atomic_add_fetch(rinfo->dagCompleteOpCount, 1, __ATOMIC_RELAXED);
414411
}
415412
}
416-
if (first_dag_error) {
417-
run_queue_len = queueLength(run_queue_info->run_queue);
418-
continue;
419-
}
420413
}
421414

422415
// We initialize variables where we'll store the fact hat, after the current
423416
// run, all ops for the device or all ops in the dag could be complete. This
424417
// way we can avoid placing the op back on the queue if there's nothing left
425418
// to do.
426-
RedisModule_Assert(run_error == 0);
427-
int device_complete_after_run = RedisAI_DagDeviceComplete(batch_rinfo[0]);
428-
int dag_complete_after_run = RedisAI_DagComplete(batch_rinfo[0]);
429-
430-
long long dagRefCount = -1;
431-
RedisAI_RunInfo *orig;
432-
if (device_complete == 1 || device_complete_after_run == 1) {
433-
RedisAI_RunInfo *evicted_rinfo = (RedisAI_RunInfo *)(evicted_items[0]->value);
434-
orig = evicted_rinfo->orig_copy;
435-
// We decrease and get the reference count for the DAG.
436-
dagRefCount = RAI_DagRunInfoFreeShallowCopy(evicted_rinfo);
437-
}
419+
int device_complete_after_run;
420+
if (run_error == 0) {
421+
device_complete_after_run = RedisAI_DagDeviceComplete(batch_rinfo[0]);
422+
int dag_complete_after_run = RedisAI_DagComplete(batch_rinfo[0]);
423+
424+
long long dagRefCount = -1;
425+
RedisAI_RunInfo *orig;
426+
if (device_complete == 1 || device_complete_after_run == 1) {
427+
RedisAI_RunInfo *evicted_rinfo = (RedisAI_RunInfo *)(evicted_items[0]->value);
428+
orig = evicted_rinfo->orig_copy;
429+
// We decrease and get the reference count for the DAG.
430+
dagRefCount = RAI_DagRunInfoFreeShallowCopy(evicted_rinfo);
431+
}
438432

439-
// If the DAG was complete, then it's time to unblock the client
440-
if (do_unblock == 1 || dag_complete_after_run == 1) {
433+
// If the DAG was complete, then it's time to unblock the client
434+
if (do_unblock == 1 || dag_complete_after_run == 1) {
441435

442-
// If the reference count for the DAG is zero and the client is still around,
443-
// then we actually unblock the client
444-
if (dagRefCount == 0) {
445-
RedisAI_OnFinishCtx *finish_ctx = orig;
446-
orig->OnFinish(finish_ctx, orig->private_data);
436+
// If the reference count for the DAG is zero and the client is still around,
437+
// then we actually unblock the client
438+
if (dagRefCount == 0) {
439+
RedisAI_OnFinishCtx *finish_ctx = orig;
440+
orig->OnFinish(finish_ctx, orig->private_data);
441+
}
447442
}
448443
}
449444

@@ -499,8 +494,7 @@ void *RedisAI_Run_ThreadMain(void *arg) {
499494

500495
// If there's nothing else to do for the DAG in the current worker or if an error
501496
// occurred in any worker, we just move on
502-
if (device_complete == 1 || device_complete_after_run == 1 || do_unblock == 1 ||
503-
run_error == 1) {
497+
if (device_complete == 1 || device_complete_after_run == 1 || do_unblock == 1) {
504498
for (long long i = 0; i < array_len(evicted_items); i++) {
505499
RedisModule_Free(evicted_items[i]);
506500
}

src/command_parser.c

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,10 @@ int ParseModelRunCommand(RedisAI_RunInfo *rinfo, RedisModuleCtx *ctx, RedisModul
159159
return REDISMODULE_OK;
160160

161161
cleanup:
162+
if (error.detail) {
163+
RedisModule_Free(error.detail);
164+
RedisModule_Free(error.detail_oneline);
165+
}
162166
for (size_t i = 0; i < array_len(inkeys); i++) {
163167
RedisModule_FreeString(NULL, inkeys[i]);
164168
}
@@ -309,17 +313,18 @@ int ParseScriptRunCommand(RedisAI_RunInfo *rinfo, RedisModuleCtx *ctx, RedisModu
309313
if (_ScriptRunCtx_SetParams(ctx, inkeys, outkeys, sctx) == REDISMODULE_ERR)
310314
goto cleanup;
311315
}
312-
if (RAI_InitDagOp(&currentOp) == REDISMODULE_ERR) {
313-
RedisModule_ReplyWithError(
314-
ctx, "ERR Unable to allocate the memory and initialise the RAI_dagOp structure");
315-
goto cleanup;
316-
}
316+
RAI_InitDagOp(&currentOp);
317+
317318
currentOp->commandType = REDISAI_DAG_CMD_SCRIPTRUN;
318319
Dag_PopulateOp(currentOp, sctx, inkeys, outkeys, runkey);
319320
rinfo->dagOps = array_append(rinfo->dagOps, currentOp);
320321
return REDISMODULE_OK;
321322

322323
cleanup:
324+
if (error.detail) {
325+
RedisModule_Free(error.detail);
326+
RedisModule_Free(error.detail_oneline);
327+
}
323328
for (size_t i = 0; i < array_len(inkeys); i++) {
324329
RedisModule_FreeString(NULL, inkeys[i]);
325330
}

src/err.c

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,6 @@ void RAI_SetError(RAI_Error *err, RAI_ErrorCode code, const char *detail) {
5656
int RAI_InitError(RAI_Error **result) {
5757
RAI_Error *err;
5858
err = (RAI_Error *)RedisModule_Calloc(1, sizeof(RAI_Error));
59-
if (!err) {
60-
return REDISMODULE_ERR;
61-
}
6259
err->code = 0;
6360
err->detail = NULL;
6461
err->detail_oneline = NULL;

src/run_info.c

Lines changed: 8 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -41,37 +41,21 @@ AI_dictType AI_dictTypeTensorVals = {
4141
int RAI_InitDagOp(RAI_DagOp **result) {
4242
RAI_DagOp *dagOp;
4343
dagOp = (RAI_DagOp *)RedisModule_Calloc(1, sizeof(RAI_DagOp));
44-
if (!dagOp) {
45-
return REDISMODULE_ERR;
46-
}
44+
4745
dagOp->commandType = REDISAI_DAG_CMD_NONE;
4846
dagOp->runkey = NULL;
4947
dagOp->inkeys = (RedisModuleString **)array_new(RedisModuleString *, 1);
50-
if (!(dagOp->inkeys)) {
51-
return REDISMODULE_ERR;
52-
}
5348
dagOp->outkeys = (RedisModuleString **)array_new(RedisModuleString *, 1);
54-
if (!(dagOp->outkeys)) {
55-
return REDISMODULE_ERR;
56-
}
5749
dagOp->outTensors = (RAI_Tensor **)array_new(RAI_Tensor *, 1);
58-
if (!(dagOp->outTensors)) {
59-
return REDISMODULE_ERR;
60-
}
6150
dagOp->mctx = NULL;
6251
dagOp->sctx = NULL;
6352
dagOp->devicestr = NULL;
6453
dagOp->duration_us = 0;
6554
dagOp->result = -1;
6655
RAI_InitError(&dagOp->err);
67-
if (!(dagOp->err)) {
68-
return REDISMODULE_ERR;
69-
}
7056
dagOp->argv = (RedisModuleString **)array_new(RedisModuleString *, 1);
71-
if (!(dagOp->argv)) {
72-
return REDISMODULE_ERR;
73-
}
7457
dagOp->argc = 0;
58+
7559
*result = dagOp;
7660
return REDISMODULE_OK;
7761
}
@@ -85,56 +69,34 @@ int RAI_InitDagOp(RAI_DagOp **result) {
8569
int RAI_InitRunInfo(RedisAI_RunInfo **result) {
8670
RedisAI_RunInfo *rinfo;
8771
rinfo = (RedisAI_RunInfo *)RedisModule_Calloc(1, sizeof(RedisAI_RunInfo));
88-
if (!rinfo) {
89-
return REDISMODULE_ERR;
90-
}
72+
9173
rinfo->dagTensorsContext = AI_dictCreate(&AI_dictTypeTensorVals, NULL);
92-
if (!(rinfo->dagTensorsContext)) {
93-
return REDISMODULE_ERR;
94-
}
9574
rinfo->dagTensorsLoadedContext = AI_dictCreate(&AI_dictTypeHeapRStrings, NULL);
96-
if (!(rinfo->dagTensorsLoadedContext)) {
97-
return REDISMODULE_ERR;
98-
}
9975
rinfo->dagTensorsPersistedContext = AI_dictCreate(&AI_dictTypeHeapRStrings, NULL);
100-
if (!(rinfo->dagTensorsPersistedContext)) {
101-
return REDISMODULE_ERR;
102-
}
76+
10377
rinfo->dagOps = (RAI_DagOp **)array_new(RAI_DagOp *, 1);
104-
if (!(rinfo->dagOps)) {
105-
return REDISMODULE_ERR;
106-
}
107-
rinfo->dagDeviceOps = (RAI_DagOp **)array_new(RAI_DagOp *, 1);
108-
if (!(rinfo->dagDeviceOps)) {
109-
return REDISMODULE_ERR;
110-
}
11178
rinfo->dagError = RedisModule_Calloc(1, sizeof(int));
11279
RAI_InitError(&rinfo->err);
11380
rinfo->dagLock = RedisModule_Alloc(sizeof(pthread_rwlock_t));
114-
rinfo->dagRefCount = RedisModule_Alloc(sizeof(long long));
115-
*(rinfo->dagRefCount) = 0;
81+
rinfo->dagRefCount = RedisModule_Calloc(1, sizeof(long long));
11682
rinfo->dagOpCount = 0;
11783
rinfo->dagCompleteOpCount = RedisModule_Calloc(1, sizeof(long long));
11884
rinfo->dagDeviceOpCount = 0;
11985
rinfo->dagDeviceCompleteOpCount = 0;
12086
rinfo->orig_copy = rinfo;
12187
pthread_rwlock_init(rinfo->dagLock, NULL);
12288
rinfo->timedOut = RedisModule_Calloc(1, sizeof(int));
89+
12390
*result = rinfo;
12491
return REDISMODULE_OK;
12592
}
12693

12794
int RAI_ShallowCopyDagRunInfo(RedisAI_RunInfo **result, RedisAI_RunInfo *src) {
12895
RedisAI_RunInfo *rinfo;
129-
rinfo = (RedisAI_RunInfo *)RedisModule_Calloc(1, sizeof(RedisAI_RunInfo));
130-
if (!rinfo) {
131-
return REDISMODULE_ERR;
132-
}
96+
rinfo = (RedisAI_RunInfo *)RedisModule_Alloc(sizeof(RedisAI_RunInfo));
13397
memcpy(rinfo, src, sizeof(RedisAI_RunInfo));
98+
13499
rinfo->dagDeviceOps = (RAI_DagOp **)array_new(RAI_DagOp *, 1);
135-
if (!(rinfo->dagDeviceOps)) {
136-
return REDISMODULE_ERR;
137-
}
138100
(*rinfo->dagRefCount)++;
139101
rinfo->dagDeviceOpCount = 0;
140102
rinfo->dagDeviceCompleteOpCount = 0;
@@ -180,7 +142,6 @@ void RAI_FreeDagOp(RAI_DagOp *dagOp) {
180142
}
181143
array_free(dagOp->outkeys);
182144
}
183-
184145
RedisModule_Free(dagOp);
185146
}
186147
}

src/tensor.c

Lines changed: 19 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -59,47 +59,38 @@ DLDataType RAI_TensorDataTypeFromString(const char *typestr) {
5959

6060
static size_t Tensor_DataTypeSize(DLDataType dtype) { return dtype.bits / 8; }
6161

62-
int Tensor_DataTypeStr(DLDataType dtype, char **dtypestr) {
62+
int Tensor_DataTypeStr(DLDataType dtype, char *dtypestr) {
6363
int result = REDISMODULE_ERR;
64-
*dtypestr = RedisModule_Calloc(8, sizeof(char));
64+
6565
if (dtype.code == kDLFloat) {
6666
if (dtype.bits == 32) {
67-
strcpy(*dtypestr, RAI_DATATYPE_STR_FLOAT);
67+
strcpy(dtypestr, RAI_DATATYPE_STR_FLOAT);
6868
result = REDISMODULE_OK;
6969
} else if (dtype.bits == 64) {
70-
strcpy(*dtypestr, RAI_DATATYPE_STR_DOUBLE);
70+
strcpy(dtypestr, RAI_DATATYPE_STR_DOUBLE);
7171
result = REDISMODULE_OK;
72-
} else {
73-
RedisModule_Free(*dtypestr);
74-
*dtypestr = NULL;
7572
}
7673
} else if (dtype.code == kDLInt) {
7774
if (dtype.bits == 8) {
78-
strcpy(*dtypestr, RAI_DATATYPE_STR_INT8);
75+
strcpy(dtypestr, RAI_DATATYPE_STR_INT8);
7976
result = REDISMODULE_OK;
8077
} else if (dtype.bits == 16) {
81-
strcpy(*dtypestr, RAI_DATATYPE_STR_INT16);
78+
strcpy(dtypestr, RAI_DATATYPE_STR_INT16);
8279
result = REDISMODULE_OK;
8380
} else if (dtype.bits == 32) {
84-
strcpy(*dtypestr, RAI_DATATYPE_STR_INT32);
81+
strcpy(dtypestr, RAI_DATATYPE_STR_INT32);
8582
result = REDISMODULE_OK;
8683
} else if (dtype.bits == 64) {
87-
strcpy(*dtypestr, RAI_DATATYPE_STR_INT64);
84+
strcpy(dtypestr, RAI_DATATYPE_STR_INT64);
8885
result = REDISMODULE_OK;
89-
} else {
90-
RedisModule_Free(*dtypestr);
91-
*dtypestr = NULL;
9286
}
9387
} else if (dtype.code == kDLUInt) {
9488
if (dtype.bits == 8) {
95-
strcpy(*dtypestr, RAI_DATATYPE_STR_UINT8);
89+
strcpy(dtypestr, RAI_DATATYPE_STR_UINT8);
9690
result = REDISMODULE_OK;
9791
} else if (dtype.bits == 16) {
98-
strcpy(*dtypestr, RAI_DATATYPE_STR_UINT16);
92+
strcpy(dtypestr, RAI_DATATYPE_STR_UINT16);
9993
result = REDISMODULE_OK;
100-
} else {
101-
RedisModule_Free(*dtypestr);
102-
*dtypestr = NULL;
10394
}
10495
}
10596
return result;
@@ -195,8 +186,9 @@ static void RAI_Tensor_RdbSave(RedisModuleIO *io, void *value) {
195186
static void RAI_Tensor_AofRewrite(RedisModuleIO *aof, RedisModuleString *key, void *value) {
196187
RAI_Tensor *tensor = (RAI_Tensor *)value;
197188

198-
char *dtypestr = NULL;
199-
Tensor_DataTypeStr(RAI_TensorDataType(tensor), &dtypestr);
189+
char dtypestr[8];
190+
const int status = Tensor_DataTypeStr(RAI_TensorDataType(tensor), dtypestr);
191+
RedisModule_Assert(status == REDISMODULE_OK);
200192

201193
char *data = RAI_TensorData(tensor);
202194
long long size = RAI_TensorByteSize(tensor);
@@ -212,8 +204,6 @@ static void RAI_Tensor_AofRewrite(RedisModuleIO *aof, RedisModuleString *key, vo
212204

213205
RedisModule_EmitAOF(aof, "AI.TENSORSET", "scvcb", key, dtypestr, dims, ndims, "BLOB", data,
214206
size);
215-
216-
RedisModule_Free(dtypestr);
217207
}
218208

219209
static void RAI_Tensor_DTFree(void *value) { RAI_TensorFree(value); }
@@ -756,10 +746,9 @@ int RAI_getTensorFromLocalContext(RedisModuleCtx *ctx, AI_dict *localContextDict
756746
void RedisAI_ReplicateTensorSet(RedisModuleCtx *ctx, RedisModuleString *key, RAI_Tensor *t) {
757747
long long ndims = RAI_TensorNumDims(t);
758748

759-
char *dtypestr = NULL;
760-
Tensor_DataTypeStr(RAI_TensorDataType(t), &dtypestr);
761-
762-
assert(dtypestr);
749+
char dtypestr[8];
750+
const int status = Tensor_DataTypeStr(RAI_TensorDataType(t), dtypestr);
751+
RedisModule_Assert(status == REDISMODULE_OK);
763752

764753
char *data = RAI_TensorData(t);
765754
long long size = RAI_TensorByteSize(t);
@@ -776,8 +765,6 @@ void RedisAI_ReplicateTensorSet(RedisModuleCtx *ctx, RedisModuleString *key, RAI
776765
for (long long i = 0; i < ndims; i++) {
777766
RedisModule_FreeString(ctx, dims[i]);
778767
}
779-
780-
RedisModule_Free(dtypestr);
781768
}
782769

783770
int RAI_parseTensorSetArgs(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, RAI_Tensor **t,
@@ -1054,19 +1041,18 @@ int RAI_parseTensorGetArgs(RedisModuleCtx *ctx, RedisModuleString **argv, int ar
10541041

10551042
const long long ndims = RAI_TensorNumDims(t);
10561043

1057-
char *dtypestr = NULL;
1058-
const int dtypestr_result = Tensor_DataTypeStr(RAI_TensorDataType(t), &dtypestr);
1044+
char dtypestr[8];
1045+
const int dtypestr_result = Tensor_DataTypeStr(RAI_TensorDataType(t), dtypestr);
10591046
if (dtypestr_result == REDISMODULE_ERR) {
10601047
RedisModule_ReplyWithError(ctx, "ERR unsupported dtype");
10611048
return -1;
10621049
}
10631050

10641051
RedisModule_ReplyWithArray(ctx, resplen);
1065-
10661052
RedisModule_ReplyWithCString(ctx, "dtype");
10671053
RedisModule_ReplyWithCString(ctx, dtypestr);
1068-
10691054
RedisModule_ReplyWithCString(ctx, "shape");
1055+
10701056
RedisModule_ReplyWithArray(ctx, ndims);
10711057
for (long long i = 0; i < ndims; i++) {
10721058
const long long dim = RAI_TensorDim(t, i);

src/tensor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ DLDataType RAI_TensorDataTypeFromString(const char *dataType);
184184
* DLDataType
185185
* @return REDISMODULE_OK on success, or REDISMODULE_ERR if failed
186186
*/
187-
int Tensor_DataTypeStr(DLDataType dtype, char **dtypestr);
187+
int Tensor_DataTypeStr(DLDataType dtype, char *dtypestr);
188188

189189
/**
190190
* Frees the memory of the RAI_Tensor when the tensor reference count reaches 0.

0 commit comments

Comments
 (0)