@@ -27,6 +27,8 @@ typedef struct
2727 char * plan ;
2828} stack_frame ;
2929
30+ static void send_msg_by_bits (shm_mq_handle * mqh , Size nbytes , const void * data );
31+
3032/*
3133 * Get List of stack_frames as a stack of function calls starting from outermost call.
3234 * Each entry contains query text and query state in form of EXPLAIN ANALYZE output.
@@ -148,6 +150,35 @@ serialize_stack(char *dest, List *qs_stack)
148150 serialize_stack_frame (& dest , qs_frame );
149151 }
150152}
153+ // ----------------- DEBUG -----------------
154+ static void
155+ print_sent_bytes (int num , char * src , int offset )
156+ {
157+ elog (INFO , "======= SEND MSG SEGMENT START (%d bytes) =======" , num );
158+ for (int i = offset ; i < offset + num ; i ++ )
159+ elog (INFO , "SENT byte #%d = %02x" , i , (unsigned char ) * (src + i ));
160+ }
161+ // ----------------- DEBUG -----------------
162+
163+ static void
164+ send_msg_by_bits (shm_mq_handle * mqh , Size nbytes , const void * data )
165+ {
166+ int bytes_left ;
167+ int bytes_send ;
168+
169+ /* Send the expected message length */
170+ shm_mq_send (mqh , sizeof (int ), & nbytes , false);
171+
172+ // elog(INFO, "======= SEND MSG (%lu bytes) =======", nbytes);
173+ for (int offset = 0 ; offset < nbytes ; offset += bytes_send )
174+ {
175+ bytes_left = nbytes - offset ;
176+ bytes_send = (bytes_left < BUF_SIZE ) ? bytes_left : BUF_SIZE ;
177+ shm_mq_send (mqh , bytes_send , & (((unsigned char * )data )[offset ]), false);
178+ // DEBUG: print message that we just sent
179+ // print_sent_bytes(bytes_send, (char *) data, offset);
180+ }
181+ }
151182
152183/*
153184 * Send state of current query to shared queue.
@@ -207,15 +238,15 @@ SendQueryState(void)
207238 {
208239 shm_mq_msg msg = { reqid , BASE_SIZEOF_SHM_MQ_MSG , MyProc , STAT_DISABLED };
209240
210- shm_mq_send (mqh , msg .length , & msg , false );
241+ send_msg_by_bits (mqh , msg .length , & msg );
211242 }
212243
213244 /* check if backend doesn't execute any query */
214245 else if (list_length (QueryDescStack ) == 0 )
215246 {
216247 shm_mq_msg msg = { reqid , BASE_SIZEOF_SHM_MQ_MSG , MyProc , QUERY_NOT_RUNNING };
217248
218- shm_mq_send (mqh , msg .length , & msg , false );
249+ send_msg_by_bits (mqh , msg .length , & msg );
219250 }
220251
221252 /* happy path */
@@ -238,7 +269,7 @@ SendQueryState(void)
238269
239270 msg -> stack_depth = list_length (qs_stack );
240271 serialize_stack (msg -> stack , qs_stack );
241- shm_mq_send (mqh , msglen , msg , false );
272+ send_msg_by_bits (mqh , msglen , msg );
242273 }
243274 elog (DEBUG1 , "Worker %d sends response for pg_query_state to %d" , shm_mq_get_sender (mq )-> pid , shm_mq_get_receiver (mq )-> pid );
244275 DetachPeer ();
0 commit comments