@@ -125,6 +125,34 @@ public async Task UpdateTaskWorker(string taskId, string taskWorker)
125125 await UpdateEntityInTable < AzureTableTaskEntity > ( tableName , taskEntity ) ;
126126 }
127127
128+ public async Task < string ? > LookupTaskIdByExternalId ( string externalTaskId )
129+ {
130+ // get the table name
131+ var tableName = GetExternalTaskIdLookupTable ( ) ;
132+
133+ // lookup
134+ var externalIdEnties = await QueryEntitiyFromTableByPartitionKey < AzureTableTaskEntity > ( tableName , externalTaskId ) ;
135+ if ( externalIdEnties . Length == 0 )
136+ return null ;
137+
138+ return externalIdEnties . First ( ) . RowKey ;
139+ }
140+
141+ public async Task RegisterExternlIdForTask ( string taskId , string externalTaskId )
142+ {
143+ // build the task entity
144+ var externalIdForTaskEntity = new AzureTableTaskEntity ( )
145+ {
146+ PartitionKey = externalTaskId ,
147+ RowKey = taskId ,
148+ } ;
149+
150+ // get the table name
151+ var tableName = GetExternalTaskIdLookupTable ( ) ;
152+
153+ await AddEntityToTable < AzureTableTaskEntity > ( tableName , externalIdForTaskEntity ) ;
154+ }
155+
128156 public async Task < string [ ] > MergePendingMessagesIfNeeded ( DateTimeOffset flushTime , bool force , string taskKey , string [ ] messages )
129157 {
130158 // check if the cache limit is exceeded
@@ -184,6 +212,9 @@ private string GetTableName(string tableName)
184212
185213 private string GetTaskTable ( )
186214 => GetTableName ( "Tasks" ) ;
215+
216+ private string GetExternalTaskIdLookupTable ( )
217+ => GetTableName ( "TasksExternalIdLookup" ) ;
187218
188219 private string GetRunningTaskTable ( )
189220 => $ "{ _environmentPrefix } TasksRunning";
@@ -218,6 +249,21 @@ private async Task UpdateEntityInTable<T>(string tableName, T entity) where T :
218249 private async Task DeleteEntityByKeys ( string tableName , string pKey , string rowKey )
219250 => await ExecuteEntityToTableOperation ( tableName , ( TableClient tc ) => tc . DeleteEntityAsync ( pKey , rowKey , Azure . ETag . All ) ) ;
220251
252+ private async Task < T [ ] > QueryEntitiyFromTableByPartitionKey < T > ( string tableName , string partitionKey ) where T : class , ITableEntity
253+ {
254+ var result = new List < T > ( ) ;
255+
256+ await ExecuteEntityToTableOperation ( tableName , async ( TableClient tc ) =>
257+ {
258+ var entities = tc . QueryAsync < T > ( filter : $ "PartitionKey eq '{ partitionKey } '") ;
259+
260+ await foreach ( var entity in entities )
261+ result . Add ( entity ) ;
262+ } ) ;
263+
264+ return result . ToArray ( ) ;
265+ }
266+
221267 private async Task ExecuteEntityToTableOperation ( string tableName , Func < TableClient , Task > operation )
222268 {
223269 // get the table client
@@ -232,7 +278,17 @@ private async Task ExecuteEntityToTableOperation(string tableName, Func<TableCli
232278 {
233279 if ( e . ErrorCode != null && e . ErrorCode . Equals ( "TableNotFound" ) )
234280 {
235- await tableClient . CreateAsync ( ) ;
281+ try
282+ {
283+ await tableClient . CreateAsync ( ) ;
284+
285+ // double check patter because of race conditions
286+ } catch ( Azure . RequestFailedException e2 )
287+ {
288+ if ( e2 . ErrorCode != null && ! e2 . ErrorCode . Equals ( "TableAlreadyExists" ) )
289+ throw ;
290+ }
291+
236292 await operation ( tableClient ) ;
237293 }
238294 }
0 commit comments