@@ -265,7 +265,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
265265 TimeTzADT * timetz = NULL ;
266266 Interval * interval = NULL ;
267267 Decoderbufs__Point dp = DECODERBUFS__POINT__INIT ;
268-
268+
269269 int size = 0 ;
270270 switch (typid ) {
271271 case BOOLOID :
@@ -293,7 +293,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
293293 datum_msg -> datum_double = DatumGetFloat8 (datum );
294294 datum_msg -> datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE ;
295295 break ;
296- case CASHOID :
296+ case CASHOID :
297297 datum_msg -> datum_int64 = DatumGetCash (datum );
298298 datum_msg -> datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64 ;
299299 break ;
@@ -303,7 +303,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
303303 case BPCHAROID :
304304 case TEXTOID :
305305 case JSONOID :
306- case JSONBOID :
306+ case JSONBOID :
307307 case XMLOID :
308308 case BITOID :
309309 case VARBITOID :
@@ -320,7 +320,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
320320 } else {
321321 datum_msg -> datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH (ts );
322322 datum_msg -> datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64 ;
323- break ;
323+ break ;
324324 }
325325 case TIMESTAMPTZOID :
326326 ts = DatumGetTimestampTz (datum );
@@ -330,28 +330,28 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
330330 } else {
331331 datum_msg -> datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH (ts );
332332 datum_msg -> datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64 ;
333- break ;
333+ break ;
334334 }
335- case DATEOID :
335+ case DATEOID :
336336 /* simply get the number of days as the stored 32 bit value and convert to EPOCH */
337337 datum_msg -> datum_int32 = DATE_TO_DAYS_SINCE_EPOCH (DatumGetDateADT (datum ));
338338 datum_msg -> datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32 ;
339339 break ;
340340 case TIMEOID :
341341 datum_msg -> datum_int64 = DatumGetTimeADT (datum );
342342 datum_msg -> datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64 ;
343- break ;
343+ break ;
344344 case TIMETZOID :
345345 timetz = DatumGetTimeTzADTP (datum );
346- /* use GMT-equivalent time */
346+ /* use GMT-equivalent time */
347347 datum_msg -> datum_double = (double ) (timetz -> time + (timetz -> zone * 1000000.0 ));
348348 datum_msg -> datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE ;
349- break ;
349+ break ;
350350 case INTERVALOID :
351351 interval = DatumGetIntervalP (datum );
352- /*
353- Convert the month part of Interval to days using assumed average month length of 365.25/12.0 days.
354- */
352+ /*
353+ Convert the month part of Interval to days using assumed average month length of 365.25/12.0 days.
354+ */
355355 duration = interval -> time + interval -> day * (double ) USECS_PER_DAY + interval -> month * ((DAYS_PER_YEAR / (double ) MONTHS_PER_YEAR ) * USECS_PER_DAY );
356356 datum_msg -> datum_double = duration ;
357357 datum_msg -> datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE ;
@@ -394,14 +394,14 @@ static int valid_attributes_count_from(TupleDesc tupdesc) {
394394 int count = 0 ;
395395 for (natt = 0 ; natt < tupdesc -> natts ; natt ++ ) {
396396 Form_pg_attribute attr = TupleDescAttr (tupdesc , natt );
397-
397+
398398 /* skip dropped columns and system columns */
399- if (attr -> attisdropped || attr -> attnum < 0 ) {
399+ if (attr -> attisdropped || attr -> attnum < 0 ) {
400400 continue ;
401- }
402- count ++ ;
401+ }
402+ count ++ ;
403403 }
404- return count ;
404+ return count ;
405405}
406406
407407/* convert a PG tuple to an array of DatumMessage(s) */
@@ -422,19 +422,19 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
422422 Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT ;
423423
424424 attr = TupleDescAttr (tupdesc , natt );
425-
425+
426426 /* skip dropped columns and system columns */
427427 if (attr -> attisdropped || attr -> attnum < 0 ) {
428428 elog (DEBUG1 , "skipping column %d because %s" , natt + 1 , attr -> attisdropped ? "it's a dropped column" : "it's a system column" );
429429 continue ;
430- }
430+ }
431431
432- attrName = quote_identifier (NameStr (attr -> attname ));
433- elog (DEBUG1 , "processing column %d with name %s" , natt + 1 , attrName );
432+ attrName = quote_identifier (NameStr (attr -> attname ));
433+ elog (DEBUG1 , "processing column %d with name %s" , natt + 1 , attrName );
434434
435435 /* set the column name */
436436 datum_msg .column_name = (char * )attrName ;
437-
437+
438438 /* set datum from tuple */
439439 origval = heap_getattr (tuple , natt + 1 , tupdesc , & isnull );
440440
@@ -446,7 +446,8 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
446446 getTypeOutputInfo (attr -> atttypid , & typoutput , & typisvarlena );
447447 if (!isnull ) {
448448 if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK (origval )) {
449- // TODO: Is there a way we can handle this?
449+ datum_msg .datum_missing = true;
450+ datum_msg .datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_MISSING ;
450451 elog (DEBUG1 , "Not handling external on disk varlena at the moment." );
451452 } else if (!typisvarlena ) {
452453 set_datum_value (& datum_msg , attr -> atttypid , typoutput , origval );
@@ -455,7 +456,7 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
455456 set_datum_value (& datum_msg , attr -> atttypid , typoutput , val );
456457 }
457458 } else {
458- elog (DEBUG1 , "column %s is null, ignoring value" , attrName );
459+ elog (DEBUG1 , "column %s is null, ignoring value" , attrName );
459460 }
460461
461462 tmsg [valid_attr_cnt ] = palloc (sizeof (datum_msg ));
@@ -534,10 +535,10 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
534535 rmsg .has_transaction_id = true;
535536 rmsg .commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH (txn -> commit_time );
536537 rmsg .has_commit_time = true;
537- rmsg .table = pstrdup (quote_qualified_identifier (get_namespace_name (get_rel_namespace (RelationGetRelid (relation ))),
538+ rmsg .table = pstrdup (quote_qualified_identifier (get_namespace_name (get_rel_namespace (RelationGetRelid (relation ))),
538539 NameStr (class_form -> relname )));
539-
540-
540+
541+
541542 /* decode different operation types */
542543 switch (change -> action ) {
543544 case REORDER_BUFFER_CHANGE_INSERT :
@@ -611,7 +612,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
611612 }
612613 break ;
613614 default :
614- elog (WARNING , "unknown change action" );
615+ elog (WARNING , "unknown change action" );
615616 Assert (0 );
616617 break ;
617618 }
0 commit comments