From 034cdd9e8dba283f32568b0fe7f642e3ea7a71ce Mon Sep 17 00:00:00 2001 From: Marcel Hibbe Date: Wed, 10 Dec 2025 17:19:17 +0100 Subject: [PATCH 01/12] WIP receive chat message from HPB for now, directly passed to ChatActivity and just log it without further processing Signed-off-by: Marcel Hibbe --- .../com/nextcloud/talk/chat/ChatActivity.kt | 10 ++++++ .../signaling/ConversationMessageNotifier.kt | 8 +++++ .../signaling/SignalingMessageReceiver.java | 18 ++++++---- .../talk/webrtc/WebSocketInstance.kt | 36 +++++++++++++++++-- 4 files changed, 64 insertions(+), 8 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt b/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt index 90f20bce462..b891f018d1d 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt @@ -155,6 +155,7 @@ import com.nextcloud.talk.messagesearch.MessageSearchActivity import com.nextcloud.talk.models.ExternalSignalingServer import com.nextcloud.talk.models.domain.ConversationModel import com.nextcloud.talk.models.json.capabilities.SpreedCapability +import com.nextcloud.talk.models.json.chat.ChatMessageJson import com.nextcloud.talk.models.json.chat.ReadStatus import com.nextcloud.talk.models.json.conversations.ConversationEnums import com.nextcloud.talk.models.json.participants.Participant @@ -475,6 +476,15 @@ class ChatActivity : updateTypingIndicator() } } + + override fun onMessageReceived(chatMessage: ChatMessageJson?) { + Log.d( + TAG, + "received message in ChatActivity. This is the chat message received via HPB. It would be " + + "nicer to receive it in the ViewModel or Repository directly. Otherwise it needs to be passed into it" + + " from here..." + ) + } } override fun onCreate(savedInstanceState: Bundle?) { diff --git a/app/src/main/java/com/nextcloud/talk/signaling/ConversationMessageNotifier.kt b/app/src/main/java/com/nextcloud/talk/signaling/ConversationMessageNotifier.kt index 9bd2408abe9..b975d9789f6 100644 --- a/app/src/main/java/com/nextcloud/talk/signaling/ConversationMessageNotifier.kt +++ b/app/src/main/java/com/nextcloud/talk/signaling/ConversationMessageNotifier.kt @@ -6,6 +6,7 @@ */ package com.nextcloud.talk.signaling +import com.nextcloud.talk.models.json.chat.ChatMessageJson import com.nextcloud.talk.signaling.SignalingMessageReceiver.ConversationMessageListener internal class ConversationMessageNotifier { @@ -29,6 +30,13 @@ internal class ConversationMessageNotifier { } } + @Synchronized + fun notifyMessageReceived(chatMessage: ChatMessageJson) { + for (listener in ArrayList(conversationMessageListeners)) { + listener.onMessageReceived(chatMessage) + } + } + fun notifyStopTyping(userId: String?, sessionId: String?) { for (listener in ArrayList(conversationMessageListeners)) { listener.onStopTyping(userId, sessionId) diff --git a/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.java b/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.java index 397ba7b55ce..f753eec32a1 100644 --- a/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.java +++ b/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.java @@ -6,6 +6,7 @@ */ package com.nextcloud.talk.signaling; +import com.nextcloud.talk.models.json.chat.ChatMessageJson; import com.nextcloud.talk.models.json.converters.EnumActorTypeConverter; import com.nextcloud.talk.models.json.converters.EnumParticipantTypeConverter; import com.nextcloud.talk.models.json.participants.Participant; @@ -156,12 +157,8 @@ public interface CallParticipantMessageListener { void onUnshareScreen(); } - /** - * Listener for conversation messages. - */ - public interface ConversationMessageListener { - void onStartTyping(String userId, String session); - void onStopTyping(String userId,String session); + protected void processChatMessageWebSocketMessage(ChatMessageJson chatMessage) { + conversationMessageNotifier.notifyMessageReceived(chatMessage); } /** @@ -541,6 +538,15 @@ protected void processCallWebSocketMessage(CallWebSocketMessage callWebSocketMes } } + /** + * Listener for conversation messages. + */ + public interface ConversationMessageListener { + void onStartTyping(String userId, String session); + void onStopTyping(String userId,String session); + void onMessageReceived(ChatMessageJson chatMessage); + } + protected void processSignalingMessage(NCSignalingMessage signalingMessage) { // Note that in the internal signaling server message "data" is the String representation of a JSON // object, although it is already decoded when used here. diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/WebSocketInstance.kt b/app/src/main/java/com/nextcloud/talk/webrtc/WebSocketInstance.kt index 81b784726e3..999aa2f1de5 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/WebSocketInstance.kt +++ b/app/src/main/java/com/nextcloud/talk/webrtc/WebSocketInstance.kt @@ -17,6 +17,7 @@ import com.nextcloud.talk.application.NextcloudTalkApplication.Companion.sharedA import com.nextcloud.talk.data.user.model.User import com.nextcloud.talk.events.NetworkEvent import com.nextcloud.talk.events.WebSocketCommunicationEvent +import com.nextcloud.talk.models.json.chat.ChatMessageJson import com.nextcloud.talk.models.json.participants.Participant import com.nextcloud.talk.models.json.participants.Participant.ActorType import com.nextcloud.talk.models.json.signaling.NCSignalingMessage @@ -41,6 +42,7 @@ import okio.ByteString import org.greenrobot.eventbus.EventBus import org.greenrobot.eventbus.Subscribe import org.greenrobot.eventbus.ThreadMode +import org.json.JSONObject import java.io.IOException import java.lang.Thread.sleep import javax.inject.Inject @@ -196,7 +198,7 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU when (target) { Globals.TARGET_ROOM -> { if ("message" == eventOverallWebSocketMessage.eventMap!!["type"]) { - processRoomMessageMessage(eventOverallWebSocketMessage) + processRoomMessageMessage(eventOverallWebSocketMessage, text) } else if ("join" == eventOverallWebSocketMessage.eventMap!!["type"]) { processRoomJoinMessage(eventOverallWebSocketMessage) } else if ("leave" == eventOverallWebSocketMessage.eventMap!!["type"]) { @@ -217,7 +219,25 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU } } - private fun processRoomMessageMessage(eventOverallWebSocketMessage: EventOverallWebSocketMessage) { + private fun processRoomMessageMessage( + eventOverallWebSocketMessage: EventOverallWebSocketMessage, + jsonString: String + ) { + fun parseChatMessage(jsonString: String): ChatMessageJson? { + return try { + val root = JSONObject(jsonString) + val eventObj = root.optJSONObject("event") ?: return null + val messageObj = eventObj.optJSONObject("message") ?: return null + val dataObj = messageObj.optJSONObject("data") ?: return null + val chatObj = dataObj.optJSONObject("chat") ?: return null + val commentObj = chatObj.optJSONObject("comment") ?: return null + + LoganSquare.parse(commentObj.toString(), ChatMessageJson::class.java) + } catch (e: Exception) { + null + } + } + val messageHashMap = eventOverallWebSocketMessage.eventMap?.get("message") as Map<*, *>? if (messageHashMap != null && messageHashMap.containsKey("data")) { @@ -231,6 +251,13 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU refreshChatHashMap[BundleKeys.KEY_INTERNAL_USER_ID] = (conversationUser.id!!).toString() eventBus!!.post(WebSocketCommunicationEvent("refreshChat", refreshChatHashMap)) } + + if (chatMap != null && chatMap.containsKey("comment")) { + val chatMessage = parseChatMessage(jsonString) + chatMessage?.let { + signalingMessageReceiver.process(it) + } + } } else if (dataHashMap != null && dataHashMap.containsKey("recording")) { val recordingMap = dataHashMap["recording"] as Map<*, *>? if (recordingMap != null && recordingMap.containsKey("status")) { @@ -481,6 +508,11 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU processSignalingMessage(message?.ncSignalingMessage) } } + + fun process(message: ChatMessageJson) { + processChatMessageWebSocketMessage(message) + Log.d(TAG, "processing Received chat message") + } } inner class ExternalSignalingMessageSender : SignalingMessageSender { From d0db3a8c0f2f901c1d412491d23ca0e3d28fcefc Mon Sep 17 00:00:00 2001 From: Marcel Hibbe Date: Thu, 11 Dec 2025 19:40:10 +0100 Subject: [PATCH 02/12] prepare refactor Signed-off-by: Marcel Hibbe --- .../signaling/SignalingMessageReceiver.java | 19 +++++---- .../talk/webrtc/WebSocketInstance.kt | 42 ++++++++++--------- 2 files changed, 32 insertions(+), 29 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.java b/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.java index f753eec32a1..e9d80d04154 100644 --- a/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.java +++ b/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.java @@ -158,6 +158,7 @@ public interface CallParticipantMessageListener { } protected void processChatMessageWebSocketMessage(ChatMessageJson chatMessage) { + // TODO: pass in string an parse it as json here... conversationMessageNotifier.notifyMessageReceived(chatMessage); } @@ -289,6 +290,15 @@ protected void processEvent(Map eventMap) { } } + /** + * Listener for conversation messages. + */ + public interface ConversationMessageListener { + void onStartTyping(String userId, String session); + void onStopTyping(String userId,String session); + void onMessageReceived(ChatMessageJson chatMessage); + } + private void processSwitchToEvent(Map eventMap) { // Message schema: // { @@ -538,15 +548,6 @@ protected void processCallWebSocketMessage(CallWebSocketMessage callWebSocketMes } } - /** - * Listener for conversation messages. - */ - public interface ConversationMessageListener { - void onStartTyping(String userId, String session); - void onStopTyping(String userId,String session); - void onMessageReceived(ChatMessageJson chatMessage); - } - protected void processSignalingMessage(NCSignalingMessage signalingMessage) { // Note that in the internal signaling server message "data" is the String representation of a JSON // object, although it is already decoded when used here. diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/WebSocketInstance.kt b/app/src/main/java/com/nextcloud/talk/webrtc/WebSocketInstance.kt index 999aa2f1de5..0a4993bdc4f 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/WebSocketInstance.kt +++ b/app/src/main/java/com/nextcloud/talk/webrtc/WebSocketInstance.kt @@ -223,20 +223,20 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU eventOverallWebSocketMessage: EventOverallWebSocketMessage, jsonString: String ) { - fun parseChatMessage(jsonString: String): ChatMessageJson? { - return try { - val root = JSONObject(jsonString) - val eventObj = root.optJSONObject("event") ?: return null - val messageObj = eventObj.optJSONObject("message") ?: return null - val dataObj = messageObj.optJSONObject("data") ?: return null - val chatObj = dataObj.optJSONObject("chat") ?: return null - val commentObj = chatObj.optJSONObject("comment") ?: return null - - LoganSquare.parse(commentObj.toString(), ChatMessageJson::class.java) - } catch (e: Exception) { - null - } - } + // fun parseChatMessage(jsonString: String): ChatMessageJson? { + // return try { + // val root = JSONObject(jsonString) + // val eventObj = root.optJSONObject("event") ?: return null + // val messageObj = eventObj.optJSONObject("message") ?: return null + // val dataObj = messageObj.optJSONObject("data") ?: return null + // val chatObj = dataObj.optJSONObject("chat") ?: return null + // val commentObj = chatObj.optJSONObject("comment") ?: return null + // + // LoganSquare.parse(commentObj.toString(), ChatMessageJson::class.java) + // } catch (e: Exception) { + // null + // } + // } val messageHashMap = eventOverallWebSocketMessage.eventMap?.get("message") as Map<*, *>? @@ -252,12 +252,13 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU eventBus!!.post(WebSocketCommunicationEvent("refreshChat", refreshChatHashMap)) } - if (chatMap != null && chatMap.containsKey("comment")) { - val chatMessage = parseChatMessage(jsonString) - chatMessage?.let { - signalingMessageReceiver.process(it) - } - } + // if (chatMap != null && chatMap.containsKey("comment")) { + // TODO: pass string through.. + // val chatMessage = parseChatMessage(jsonString) + // chatMessage?.let { + // signalingMessageReceiver.process(it) + // } + // } } else if (dataHashMap != null && dataHashMap.containsKey("recording")) { val recordingMap = dataHashMap["recording"] as Map<*, *>? if (recordingMap != null && recordingMap.containsKey("status")) { @@ -510,6 +511,7 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU } fun process(message: ChatMessageJson) { + // TODO: pass string through.. processChatMessageWebSocketMessage(message) Log.d(TAG, "processing Received chat message") } From 7aaca47cead49f32d84807d4090509a14ff45b98 Mon Sep 17 00:00:00 2001 From: Marcel Hibbe Date: Fri, 12 Dec 2025 11:24:35 +0100 Subject: [PATCH 03/12] Rename .java to .kt Signed-off-by: Marcel Hibbe --- ...{SignalingMessageReceiver.java => SignalingMessageReceiver.kt} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename app/src/main/java/com/nextcloud/talk/signaling/{SignalingMessageReceiver.java => SignalingMessageReceiver.kt} (100%) diff --git a/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.java b/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.kt similarity index 100% rename from app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.java rename to app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.kt From 1fe0b377d677193df356ac920ba67ae5f034f320 Mon Sep 17 00:00:00 2001 From: Marcel Hibbe Date: Fri, 12 Dec 2025 11:24:35 +0100 Subject: [PATCH 04/12] migrate SignalingMessageReceiver to kotlin Signed-off-by: Marcel Hibbe --- .../nextcloud/talk/activities/CallActivity.kt | 11 +- .../talk/activities/ParticipantHandler.kt | 2 +- .../com/nextcloud/talk/chat/ChatActivity.kt | 22 +- .../signaling/SignalingMessageReceiver.kt | 505 +++++++++--------- .../talk/webrtc/WebSocketInstance.kt | 7 +- 5 files changed, 265 insertions(+), 282 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/activities/CallActivity.kt b/app/src/main/java/com/nextcloud/talk/activities/CallActivity.kt index 095178fefbf..7cbbdde62fa 100644 --- a/app/src/main/java/com/nextcloud/talk/activities/CallActivity.kt +++ b/app/src/main/java/com/nextcloud/talk/activities/CallActivity.kt @@ -287,7 +287,10 @@ class CallActivity : CallBaseActivity() { private var isBreakoutRoom = false private val localParticipantMessageListener = LocalParticipantMessageListener { token -> switchToRoomToken = token - hangup(true, false) + hangup( + shutDownView = true, + endCallForAll = false + ) } private val offerMessageListener = OfferMessageListener { sessionId, roomType, sdp, nick -> getOrCreatePeerConnectionWrapperForSessionIdAndType( @@ -1919,7 +1922,7 @@ class CallActivity : CallBaseActivity() { when (messageType) { "usersInRoom" -> - internalSignalingMessageReceiver.process(signaling.messageWrapper as List?>?) + internalSignalingMessageReceiver.process(signaling.messageWrapper as List>) "message" -> { val ncSignalingMessage = LoganSquare.parse( @@ -2735,11 +2738,11 @@ class CallActivity : CallBaseActivity() { * All listeners are called in the main thread. */ private class InternalSignalingMessageReceiver : SignalingMessageReceiver() { - fun process(users: List?>?) { + fun process(users: List>) { processUsersInRoom(users) } - fun process(message: NCSignalingMessage?) { + fun process(message: NCSignalingMessage) { processSignalingMessage(message) } } diff --git a/app/src/main/java/com/nextcloud/talk/activities/ParticipantHandler.kt b/app/src/main/java/com/nextcloud/talk/activities/ParticipantHandler.kt index 62bd61e781c..09d681bd306 100644 --- a/app/src/main/java/com/nextcloud/talk/activities/ParticipantHandler.kt +++ b/app/src/main/java/com/nextcloud/talk/activities/ParticipantHandler.kt @@ -139,7 +139,7 @@ class ParticipantHandler( _uiState.update { it.copy(raisedHand = state) } } - override fun onReaction(reaction: String?) { + override fun onReaction(reaction: String) { Log.d(TAG, "onReaction") } diff --git a/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt b/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt index b891f018d1d..0d7b1d9b8dd 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt @@ -428,15 +428,15 @@ class ChatActivity : var callStarted = false - private val localParticipantMessageListener = object : SignalingMessageReceiver.LocalParticipantMessageListener { - override fun onSwitchTo(token: String?) { - if (token != null) { - if (CallActivity.active) { - Log.d(TAG, "CallActivity is running. Ignore to switch chat in ChatActivity...") - } else { - switchToRoom(token, false, false) - } - } + private val localParticipantMessageListener = SignalingMessageReceiver.LocalParticipantMessageListener { token -> + if (CallActivity.active) { + Log.d(TAG, "CallActivity is running. Ignore to switch chat in ChatActivity...") + } else { + switchToRoom( + token = token, + startCallAfterRoomSwitch = false, + isVoiceOnlyCall = false + ) } } @@ -481,8 +481,8 @@ class ChatActivity : Log.d( TAG, "received message in ChatActivity. This is the chat message received via HPB. It would be " + - "nicer to receive it in the ViewModel or Repository directly. Otherwise it needs to be passed into it" + - " from here..." + "nicer to receive it in the ViewModel or Repository directly. " + + "Otherwise it needs to be passed into it from here..." ) } } diff --git a/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.kt b/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.kt index e9d80d04154..37fea124c7d 100644 --- a/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.kt +++ b/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.kt @@ -4,263 +4,271 @@ * SPDX-FileCopyrightText: 2022 Daniel Calviño Sánchez * SPDX-License-Identifier: GPL-3.0-or-later */ -package com.nextcloud.talk.signaling; - -import com.nextcloud.talk.models.json.chat.ChatMessageJson; -import com.nextcloud.talk.models.json.converters.EnumActorTypeConverter; -import com.nextcloud.talk.models.json.converters.EnumParticipantTypeConverter; -import com.nextcloud.talk.models.json.participants.Participant; -import com.nextcloud.talk.models.json.signaling.NCIceCandidate; -import com.nextcloud.talk.models.json.signaling.NCMessagePayload; -import com.nextcloud.talk.models.json.signaling.NCSignalingMessage; -import com.nextcloud.talk.models.json.websocket.CallWebSocketMessage; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +package com.nextcloud.talk.signaling + +import com.nextcloud.talk.models.json.chat.ChatMessageJson +import com.nextcloud.talk.models.json.converters.EnumActorTypeConverter +import com.nextcloud.talk.models.json.converters.EnumParticipantTypeConverter +import com.nextcloud.talk.models.json.participants.Participant +import com.nextcloud.talk.models.json.signaling.NCSignalingMessage +import com.nextcloud.talk.models.json.websocket.CallWebSocketMessage +import kotlin.Any +import kotlin.Int +import kotlin.Long +import kotlin.RuntimeException +import kotlin.String +import kotlin.toString /** * Hub to register listeners for signaling messages of different kinds. - *

+ * * In general, if a listener is added while an event is being handled the new listener will not receive that event. * An exception to that is adding a WebRtcMessageListener when handling an offer in an OfferMessageListener; in that * case the "onOffer()" method of the WebRtcMessageListener will be called for that same offer. - *

+ * * Similarly, if a listener is removed while an event is being handled the removed listener will still receive that * event. Again the exception is removing a WebRtcMessageListener when handling an offer in an OfferMessageListener; in * that case the "onOffer()" method of the WebRtcMessageListener will not be called for that offer. - *

+ * + * * Adding and removing listeners, as well as notifying them is internally synchronized. This should be kept in mind * if listeners are added or removed when handling an event to prevent deadlocks (nevertheless, just adding or * removing a listener in the same thread handling the event is fine, and in most cases it will be fine too if done * in a different thread, as long as the notifier thread is not forced to wait until the listener is added or removed). - *

+ * * SignalingMessageReceiver does not fetch the signaling messages itself; subclasses must fetch them and then call * the appropriate protected methods to process the messages and notify the listeners. */ -public abstract class SignalingMessageReceiver { +abstract class SignalingMessageReceiver { + private val enumActorTypeConverter = EnumActorTypeConverter() - private final EnumActorTypeConverter enumActorTypeConverter = new EnumActorTypeConverter(); + private val participantListMessageNotifier = ParticipantListMessageNotifier() - private final ParticipantListMessageNotifier participantListMessageNotifier = new ParticipantListMessageNotifier(); + private val localParticipantMessageNotifier = LocalParticipantMessageNotifier() - private final LocalParticipantMessageNotifier localParticipantMessageNotifier = new LocalParticipantMessageNotifier(); + private val callParticipantMessageNotifier = CallParticipantMessageNotifier() - private final CallParticipantMessageNotifier callParticipantMessageNotifier = new CallParticipantMessageNotifier(); + private val conversationMessageNotifier = ConversationMessageNotifier() - private final ConversationMessageNotifier conversationMessageNotifier = new ConversationMessageNotifier(); + private val offerMessageNotifier = OfferMessageNotifier() - private final OfferMessageNotifier offerMessageNotifier = new OfferMessageNotifier(); - - private final WebRtcMessageNotifier webRtcMessageNotifier = new WebRtcMessageNotifier(); + private val webRtcMessageNotifier = WebRtcMessageNotifier() /** * Listener for participant list messages. - *

+ * * The messages are implicitly bound to the room currently joined in the signaling server; listeners are expected * to know the current room. */ - public interface ParticipantListMessageListener { - + interface ParticipantListMessageListener { /** * List of all the participants in the room. - *

+ * * This message is received only when the internal signaling server is used. - *

+ * * The message is received periodically, and the participants may not have been modified since the last message. - *

+ * * Only the following participant properties are set: * - inCall * - lastPing * - sessionId * - userId (if the participant is not a guest) - *

+ * * "participantPermissions" is provided in the message (since Talk 13), but not currently set in the * participant. "publishingPermissions" was provided instead in Talk 12, but it was not used anywhere, so it is * ignored. * * @param participants all the participants (users and guests) in the room */ - void onUsersInRoom(List participants); + fun onUsersInRoom(participants: MutableList?) /** * List of all the participants in the call or the room (depending on what triggered the event). - *

+ * * This message is received only when the external signaling server is used. - *

+ * * The message is received when any participant changed, although what changed is not provided and should be * derived from the difference with previous messages. The list of participants may include only the * participants in the call (including those that just left it and thus triggered the event) or all the * participants currently in the room (participants in the room but not currently active, that is, without a * session, are not included). - *

+ * * Only the following participant properties are set: * - inCall * - lastPing * - sessionId * - type * - userId (if the participant is not a guest) - *

+ * * "nextcloudSessionId" is provided in the message (when the "inCall" property of any participant changed), but * not currently set in the participant. - *

+ * * "participantPermissions" is provided in the message (since Talk 13), but not currently set in the * participant. "publishingPermissions" was provided instead in Talk 12, but it was not used anywhere, so it is * ignored. * * @param participants all the participants (users and guests) in the room */ - void onParticipantsUpdate(List participants); + fun onParticipantsUpdate(participants: MutableList?) /** * Update of the properties of all the participants in the room. - *

+ * * This message is received only when the external signaling server is used. * * @param inCall the new value of the inCall property */ - void onAllParticipantsUpdate(long inCall); + fun onAllParticipantsUpdate(inCall: Long) } /** * Listener for local participant messages. - *

+ * * The messages are implicitly bound to the local participant (or, rather, its session); listeners are expected * to know the local participant. - *

+ * * The messages are related to the conversation, so the local participant may or may not be in a call when they * are received. */ - public interface LocalParticipantMessageListener { + fun interface LocalParticipantMessageListener { /** * Request for the client to switch to the given conversation. - *

+ * * This message is received only when the external signaling server is used. * * @param token the token of the conversation to switch to. */ - void onSwitchTo(String token); + fun onSwitchTo(token: String) } /** * Listener for call participant messages. - *

+ * + * * The messages are bound to a specific call participant (or, rather, session), so each listener is expected to * handle messages only for a single call participant. - *

+ * + * * Although "unshareScreen" is technically bound to a specific peer connection it is instead treated as a general * message on the call participant. */ - public interface CallParticipantMessageListener { - void onRaiseHand(boolean state, long timestamp); - void onReaction(String reaction); - void onUnshareScreen(); + interface CallParticipantMessageListener { + fun onRaiseHand(state: Boolean, timestamp: Long) + fun onReaction(reaction: String) + fun onUnshareScreen() } - protected void processChatMessageWebSocketMessage(ChatMessageJson chatMessage) { - // TODO: pass in string an parse it as json here... - conversationMessageNotifier.notifyMessageReceived(chatMessage); + protected fun processChatMessageWebSocketMessage(chatMessage: ChatMessageJson) { + conversationMessageNotifier.notifyMessageReceived(chatMessage) } /** * Listener for WebRTC offers. - *

+ * + * * Unlike the WebRtcMessageListener, which is bound to a specific peer connection, an OfferMessageListener listens * to all offer messages, no matter which peer connection they are bound to. This can be used, for example, to * create a new peer connection when a remote offer for which there is no previous connection is received. - *

+ * + * * When an offer is received all OfferMessageListeners are notified before any WebRtcMessageListener is notified. */ - public interface OfferMessageListener { - void onOffer(String sessionId, String roomType, String sdp, String nick); + fun interface OfferMessageListener { + fun onOffer(sessionId: String?, roomType: String, sdp: String?, nick: String?) } /** * Listener for WebRTC messages. - *

+ * + * * The messages are bound to a specific peer connection, so each listener is expected to handle messages only for * a single peer connection. */ - public interface WebRtcMessageListener { - void onOffer(String sdp, String nick); - void onAnswer(String sdp, String nick); - void onCandidate(String sdpMid, int sdpMLineIndex, String sdp); - void onEndOfCandidates(); + interface WebRtcMessageListener { + fun onOffer(sdp: String, nick: String?) + fun onAnswer(sdp: String, nick: String?) + fun onCandidate(sdpMid: String, sdpMLineIndex: Int, sdp: String) + fun onEndOfCandidates() } /** * Adds a listener for participant list messages. - *

+ * + * * A listener is expected to be added only once. If the same listener is added again it will be notified just once. * * @param listener the ParticipantListMessageListener */ - public void addListener(ParticipantListMessageListener listener) { - participantListMessageNotifier.addListener(listener); + fun addListener(listener: ParticipantListMessageListener?) { + participantListMessageNotifier.addListener(listener) } - public void removeListener(ParticipantListMessageListener listener) { - participantListMessageNotifier.removeListener(listener); + fun removeListener(listener: ParticipantListMessageListener?) { + participantListMessageNotifier.removeListener(listener) } /** * Adds a listener for local participant messages. - *

+ * + * * A listener is expected to be added only once. If the same listener is added again it will be notified just once. * * @param listener the LocalParticipantMessageListener */ - public void addListener(LocalParticipantMessageListener listener) { - localParticipantMessageNotifier.addListener(listener); + fun addListener(listener: LocalParticipantMessageListener?) { + localParticipantMessageNotifier.addListener(listener) } - public void removeListener(LocalParticipantMessageListener listener) { - localParticipantMessageNotifier.removeListener(listener); + fun removeListener(listener: LocalParticipantMessageListener?) { + localParticipantMessageNotifier.removeListener(listener) } /** * Adds a listener for call participant messages. - *

+ * + * * A listener is expected to be added only once. If the same listener is added again it will no longer be notified * for the messages from the previous session ID. * * @param listener the CallParticipantMessageListener * @param sessionId the ID of the session that messages come from */ - public void addListener(CallParticipantMessageListener listener, String sessionId) { - callParticipantMessageNotifier.addListener(listener, sessionId); + fun addListener(listener: CallParticipantMessageListener?, sessionId: String?) { + callParticipantMessageNotifier.addListener(listener, sessionId) } - public void removeListener(CallParticipantMessageListener listener) { - callParticipantMessageNotifier.removeListener(listener); + fun removeListener(listener: CallParticipantMessageListener?) { + callParticipantMessageNotifier.removeListener(listener) } - public void addListener(ConversationMessageListener listener) { - conversationMessageNotifier.addListener(listener); + fun addListener(listener: ConversationMessageListener?) { + conversationMessageNotifier.addListener(listener) } - public void removeListener(ConversationMessageListener listener) { - conversationMessageNotifier.removeListener(listener); + fun removeListener(listener: ConversationMessageListener) { + conversationMessageNotifier.removeListener(listener) } /** * Adds a listener for all offer messages. - *

+ * + * * A listener is expected to be added only once. If the same listener is added again it will be notified just once. * * @param listener the OfferMessageListener */ - public void addListener(OfferMessageListener listener) { - offerMessageNotifier.addListener(listener); + fun addListener(listener: OfferMessageListener?) { + offerMessageNotifier.addListener(listener) } - public void removeListener(OfferMessageListener listener) { - offerMessageNotifier.removeListener(listener); + fun removeListener(listener: OfferMessageListener?) { + offerMessageNotifier.removeListener(listener) } /** * Adds a listener for WebRTC messages from the given session ID and room type. - *

+ * + * * A listener is expected to be added only once. If the same listener is added again it will no longer be notified * for the messages from the previous session ID or room type. * @@ -268,38 +276,38 @@ public abstract class SignalingMessageReceiver { * @param sessionId the ID of the session that messages come from * @param roomType the room type that messages come from */ - public void addListener(WebRtcMessageListener listener, String sessionId, String roomType) { - webRtcMessageNotifier.addListener(listener, sessionId, roomType); + fun addListener(listener: WebRtcMessageListener?, sessionId: String?, roomType: String?) { + webRtcMessageNotifier.addListener(listener, sessionId, roomType) } - public void removeListener(WebRtcMessageListener listener) { - webRtcMessageNotifier.removeListener(listener); + fun removeListener(listener: WebRtcMessageListener?) { + webRtcMessageNotifier.removeListener(listener) } - protected void processEvent(Map eventMap) { - if ("room".equals(eventMap.get("target")) && "switchto".equals(eventMap.get("type"))) { - processSwitchToEvent(eventMap); + fun processEvent(eventMap: Map?) { + if ("room" == eventMap?.get("target") && "switchto" == eventMap["type"]) { + processSwitchToEvent(eventMap) - return; + return } - if ("participants".equals(eventMap.get("target")) && "update".equals(eventMap.get("type"))) { - processUpdateEvent(eventMap); + if ("participants" == eventMap?.get("target") && "update" == eventMap["type"]) { + processUpdateEvent(eventMap) - return; + return } } /** * Listener for conversation messages. */ - public interface ConversationMessageListener { - void onStartTyping(String userId, String session); - void onStopTyping(String userId,String session); - void onMessageReceived(ChatMessageJson chatMessage); + interface ConversationMessageListener { + fun onStartTyping(userId: String?, session: String?) + fun onStopTyping(userId: String?, session: String?) + fun onMessageReceived(chatMessage: ChatMessageJson?) } - private void processSwitchToEvent(Map eventMap) { + private fun processSwitchToEvent(eventMap: Map?) { // Message schema: // { // "type": "event", @@ -312,58 +320,58 @@ public abstract class SignalingMessageReceiver { // }, // } - Map switchToMap; + val switchToMap: Map? try { - switchToMap = (Map) eventMap.get("switchto"); - } catch (RuntimeException e) { + switchToMap = eventMap?.get("switchto") as Map? + } catch (e: RuntimeException) { // Broken message, this should not happen. - return; + return } if (switchToMap == null) { // Broken message, this should not happen. - return; + return } - String token; + val token: String? try { - token = switchToMap.get("roomid").toString(); - } catch (RuntimeException e) { + token = switchToMap["roomid"].toString() + } catch (e: RuntimeException) { // Broken message, this should not happen. - return; + return } - localParticipantMessageNotifier.notifySwitchTo(token); + localParticipantMessageNotifier.notifySwitchTo(token) } - private void processUpdateEvent(Map eventMap) { - Map updateMap; + private fun processUpdateEvent(eventMap: Map?) { + val updateMap: Map? try { - updateMap = (Map) eventMap.get("update"); - } catch (RuntimeException e) { + updateMap = eventMap?.get("update") as Map? + } catch (e: RuntimeException) { // Broken message, this should not happen. - return; + return } if (updateMap == null) { // Broken message, this should not happen. - return; + return } - if (updateMap.get("all") != null && Boolean.parseBoolean(updateMap.get("all").toString())) { - processAllParticipantsUpdate(updateMap); + if (updateMap["all"] != null && updateMap["all"].toString().toBoolean()) { + processAllParticipantsUpdate(updateMap) - return; + return } - if (updateMap.get("users") != null) { - processParticipantsUpdate(updateMap); + if (updateMap["users"] != null) { + processParticipantsUpdate(updateMap) - return; + return } } - private void processAllParticipantsUpdate(Map updateMap) { + private fun processAllParticipantsUpdate(updateMap: Map) { // Message schema: // { // "type": "event", @@ -381,18 +389,18 @@ public abstract class SignalingMessageReceiver { // Note that "incall" in participants->update is all in lower case when the message applies to all participants, // even if it is "inCall" when the message provides separate properties for each participant. - long inCall; + val inCall: Long try { - inCall = Long.parseLong(updateMap.get("incall").toString()); - } catch (RuntimeException e) { + inCall = updateMap["incall"].toString().toLong() + } catch (e: RuntimeException) { // Broken message, this should not happen. - return; + return } - participantListMessageNotifier.notifyAllParticipantsUpdate(inCall); + participantListMessageNotifier.notifyAllParticipantsUpdate(inCall) } - private void processParticipantsUpdate(Map updateMap) { + private fun processParticipantsUpdate(updateMap: Map) { // Message schema: // { // "type": "event", @@ -423,34 +431,34 @@ public abstract class SignalingMessageReceiver { // Note that "userId" in participants->update comes from the Nextcloud server, so it is "userId"; in other // messages, like room->join, it comes directly from the external signaling server, so it is "userid" instead. - List> users; + val users: List>? try { - users = (List>) updateMap.get("users"); - } catch (RuntimeException e) { + users = updateMap["users"] as List>? + } catch (e: RuntimeException) { // Broken message, this should not happen. - return; + return } if (users == null) { // Broken message, this should not happen. - return; + return } - List participants = new ArrayList<>(users.size()); + val participants: MutableList = ArrayList(users.size) - for (Map user: users) { + for (user in users) { try { - participants.add(getParticipantFromMessageMap(user)); - } catch (RuntimeException e) { + participants.add(getParticipantFromMessageMap(user)) + } catch (e: RuntimeException) { // Broken message, this should not happen. - return; + return } } - participantListMessageNotifier.notifyParticipantsUpdate(participants); + participantListMessageNotifier.notifyParticipantsUpdate(participants) } - protected void processUsersInRoom(List> users) { + fun processUsersInRoom(users: List>) { // Message schema: // { // "type": "usersInRoom", @@ -469,23 +477,25 @@ public abstract class SignalingMessageReceiver { // ], // } - List participants = new ArrayList<>(users.size()); + val participants: MutableList = ArrayList(users.size) - for (Map user: users) { + for (user in users) { + val nullSafeUserMap = user as? Map ?: return try { - participants.add(getParticipantFromMessageMap(user)); - } catch (RuntimeException e) { + participants.add(getParticipantFromMessageMap(nullSafeUserMap)) + } catch (e: RuntimeException) { // Broken message, this should not happen. - return; + return } } - participantListMessageNotifier.notifyUsersInRoom(participants); + participantListMessageNotifier.notifyUsersInRoom(participants) } /** * Creates and initializes a Participant from the data in the given map. - *

+ * + * * Maps from internal and external signaling server messages can be used. Nevertheless, besides the differences * between the messages and the optional properties, it is expected that the message is correct and the given data * is parseable. Broken messages (for example, a string instead of an integer for "inCall" or a missing @@ -494,70 +504,73 @@ public abstract class SignalingMessageReceiver { * @param participantMap the map with the participant data * @return the Participant */ - private Participant getParticipantFromMessageMap(Map participantMap) { - Participant participant = new Participant(); + private fun getParticipantFromMessageMap(participantMap: Map): Participant { + val participant = Participant() - participant.setInCall(Long.parseLong(participantMap.get("inCall").toString())); - participant.setLastPing(Long.parseLong(participantMap.get("lastPing").toString())); - participant.setSessionId(participantMap.get("sessionId").toString()); + participant.inCall = participantMap["inCall"].toString().toLong() + participant.lastPing = participantMap["lastPing"].toString().toLong() + participant.sessionId = participantMap["sessionId"].toString() - if (participantMap.get("userId") != null && !participantMap.get("userId").toString().isEmpty()) { - participant.setUserId(participantMap.get("userId").toString()); + if (participantMap["userId"] != null && !participantMap["userId"].toString().isEmpty()) { + participant.userId = participantMap["userId"].toString() } - if (participantMap.get("internal") != null && Boolean.parseBoolean(participantMap.get("internal").toString())) { - participant.setInternal(Boolean.TRUE); + if (participantMap["internal"] != null && participantMap["internal"].toString().toBoolean()) { + participant.internal = true } - if (participantMap.get("actorType") != null && !participantMap.get("actorType").toString().isEmpty()) { - participant.setActorType(enumActorTypeConverter.getFromString(participantMap.get("actorType").toString())); + if (participantMap["actorType"] != null && !participantMap["actorType"].toString().isEmpty()) { + participant.actorType = enumActorTypeConverter.getFromString(participantMap["actorType"].toString()) } - if (participantMap.get("actorId") != null && !participantMap.get("actorId").toString().isEmpty()) { - participant.setActorId(participantMap.get("actorId").toString()); + if (participantMap["actorId"] != null && !participantMap["actorId"].toString().isEmpty()) { + participant.actorId = participantMap["actorId"].toString() } // Only in external signaling messages - if (participantMap.get("participantType") != null) { - int participantTypeInt = Integer.parseInt(participantMap.get("participantType").toString()); + if (participantMap["participantType"] != null) { + val participantTypeInt = participantMap["participantType"].toString().toInt() - EnumParticipantTypeConverter converter = new EnumParticipantTypeConverter(); - participant.setType(converter.getFromInt(participantTypeInt)); + val converter = EnumParticipantTypeConverter() + participant.type = converter.getFromInt(participantTypeInt) } - return participant; + return participant } - protected void processCallWebSocketMessage(CallWebSocketMessage callWebSocketMessage) { - - NCSignalingMessage signalingMessage = callWebSocketMessage.getNcSignalingMessage(); + protected fun processCallWebSocketMessage(callWebSocketMessage: CallWebSocketMessage) { + val signalingMessage = callWebSocketMessage.ncSignalingMessage - if (callWebSocketMessage.getSenderWebSocketMessage() != null && signalingMessage != null) { - String type = signalingMessage.getType(); + if (callWebSocketMessage.senderWebSocketMessage != null && signalingMessage != null) { + val type = signalingMessage.type - String userId = callWebSocketMessage.getSenderWebSocketMessage().getUserid(); - String sessionId = signalingMessage.getFrom(); + val userId = callWebSocketMessage.senderWebSocketMessage!!.userid + val sessionId = signalingMessage.from - if ("startedTyping".equals(type)) { - conversationMessageNotifier.notifyStartTyping(userId, sessionId); + if ("startedTyping" == type) { + conversationMessageNotifier.notifyStartTyping(userId, sessionId) } - if ("stoppedTyping".equals(type)) { - conversationMessageNotifier.notifyStopTyping(userId, sessionId); + if ("stoppedTyping" == type) { + conversationMessageNotifier.notifyStopTyping(userId, sessionId) } } } - protected void processSignalingMessage(NCSignalingMessage signalingMessage) { + fun processSignalingMessage(signalingMessage: NCSignalingMessage?) { + if (signalingMessage == null) { + return + } + // Note that in the internal signaling server message "data" is the String representation of a JSON // object, although it is already decoded when used here. - String type = signalingMessage.getType(); + val type = signalingMessage.type - String sessionId = signalingMessage.getFrom(); - String roomType = signalingMessage.getRoomType(); + val sessionId = signalingMessage.from + val roomType = signalingMessage.roomType - if ("raiseHand".equals(type)) { + if ("raiseHand" == type) { // Message schema (external signaling server): // { // "type": "message", @@ -595,26 +608,16 @@ public abstract class SignalingMessageReceiver { // }, // } - NCMessagePayload payload = signalingMessage.getPayload(); - if (payload == null) { - // Broken message, this should not happen. - return; - } + val payload = signalingMessage.payload ?: return + val state = payload.state ?: return + val timestamp = payload.timestamp ?: return - Boolean state = payload.getState(); - Long timestamp = payload.getTimestamp(); + callParticipantMessageNotifier.notifyRaiseHand(sessionId, state, timestamp) - if (state == null || timestamp == null) { - // Broken message, this should not happen. - return; - } - - callParticipantMessageNotifier.notifyRaiseHand(sessionId, state, timestamp); - - return; + return } - if ("reaction".equals(type)) { + if ("reaction" == type) { // Message schema (external signaling server): // { // "type": "message", @@ -648,27 +651,19 @@ public abstract class SignalingMessageReceiver { // }, // } - NCMessagePayload payload = signalingMessage.getPayload(); - if (payload == null) { - // Broken message, this should not happen. - return; - } + val payload = signalingMessage.payload ?: return - String reaction = payload.getReaction(); - if (reaction == null) { - // Broken message, this should not happen. - return; - } + val reaction = payload.reaction ?: return - callParticipantMessageNotifier.notifyReaction(sessionId, reaction); + callParticipantMessageNotifier.notifyReaction(sessionId, reaction) - return; + return } // "unshareScreen" messages are directly sent to the screen peer connection when the internal signaling // server is used, and to the room when the external signaling server is used. However, the (relevant) data // of the received message ("from" and "type") is the same in both cases. - if ("unshareScreen".equals(type)) { + if ("unshareScreen" == type) { // Message schema (external signaling server): // { // "type": "message", @@ -697,12 +692,12 @@ public abstract class SignalingMessageReceiver { // }, // } - callParticipantMessageNotifier.notifyUnshareScreen(sessionId); + callParticipantMessageNotifier.notifyUnshareScreen(sessionId) - return; + return } - if ("offer".equals(type)) { + if ("offer" == type) { // Message schema (external signaling server): // { // "type": "message", @@ -741,43 +736,35 @@ public abstract class SignalingMessageReceiver { // }, // } - NCMessagePayload payload = signalingMessage.getPayload(); - if (payload == null) { - // Broken message, this should not happen. - return; - } + val payload = signalingMessage.payload ?: return - String sdp = payload.getSdp(); - String nick = payload.getNick(); + val sdp = payload.sdp + val nick = payload.nick // If "processSignalingMessage" is called with two offers from two different threads it is possible, // although extremely unlikely, that the WebRtcMessageListeners for the second offer are notified before the // WebRtcMessageListeners for the first offer. This should not be a problem, though, so for simplicity // the statements are not synchronized. - offerMessageNotifier.notifyOffer(sessionId, roomType, sdp, nick); - webRtcMessageNotifier.notifyOffer(sessionId, roomType, sdp, nick); + offerMessageNotifier.notifyOffer(sessionId, roomType, sdp, nick) + webRtcMessageNotifier.notifyOffer(sessionId, roomType, sdp, nick) - return; + return } - if ("answer".equals(type)) { + if ("answer" == type) { // Message schema: same as offers, but with type "answer". - NCMessagePayload payload = signalingMessage.getPayload(); - if (payload == null) { - // Broken message, this should not happen. - return; - } + val payload = signalingMessage.payload ?: return - String sdp = payload.getSdp(); - String nick = payload.getNick(); + val sdp = payload.sdp + val nick = payload.nick - webRtcMessageNotifier.notifyAnswer(sessionId, roomType, sdp, nick); + webRtcMessageNotifier.notifyAnswer(sessionId, roomType, sdp, nick) - return; + return } - if ("candidate".equals(type)) { + if ("candidate" == type) { // Message schema (external signaling server): // { // "type": "message", @@ -821,31 +808,25 @@ public abstract class SignalingMessageReceiver { // }, // } - NCMessagePayload payload = signalingMessage.getPayload(); - if (payload == null) { - // Broken message, this should not happen. - return; - } + val payload = signalingMessage.payload ?: return - NCIceCandidate ncIceCandidate = payload.getIceCandidate(); - if (ncIceCandidate == null) { - // Broken message, this should not happen. - return; - } + val ncIceCandidate = payload.iceCandidate ?: return - webRtcMessageNotifier.notifyCandidate(sessionId, - roomType, - ncIceCandidate.getSdpMid(), - ncIceCandidate.getSdpMLineIndex(), - ncIceCandidate.getCandidate()); + webRtcMessageNotifier.notifyCandidate( + sessionId, + roomType, + ncIceCandidate.sdpMid, + ncIceCandidate.sdpMLineIndex, + ncIceCandidate.candidate + ) - return; + return } - if ("endOfCandidates".equals(type)) { - webRtcMessageNotifier.notifyEndOfCandidates(sessionId, roomType); + if ("endOfCandidates" == type) { + webRtcMessageNotifier.notifyEndOfCandidates(sessionId, roomType) - return; + return } } } diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/WebSocketInstance.kt b/app/src/main/java/com/nextcloud/talk/webrtc/WebSocketInstance.kt index 0a4993bdc4f..377c7687c53 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/WebSocketInstance.kt +++ b/app/src/main/java/com/nextcloud/talk/webrtc/WebSocketInstance.kt @@ -42,7 +42,6 @@ import okio.ByteString import org.greenrobot.eventbus.EventBus import org.greenrobot.eventbus.Subscribe import org.greenrobot.eventbus.ThreadMode -import org.json.JSONObject import java.io.IOException import java.lang.Thread.sleep import javax.inject.Inject @@ -253,7 +252,7 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU } // if (chatMap != null && chatMap.containsKey("comment")) { - // TODO: pass string through.. + // pass string through.. // val chatMessage = parseChatMessage(jsonString) // chatMessage?.let { // signalingMessageReceiver.process(it) @@ -496,7 +495,7 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU * stays connected, but it may change whenever it is connected again. */ private class ExternalSignalingMessageReceiver : SignalingMessageReceiver() { - fun process(eventMap: Map?) { + fun process(eventMap: Map?) { processEvent(eventMap) } @@ -511,7 +510,7 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU } fun process(message: ChatMessageJson) { - // TODO: pass string through.. + // pass string through.. processChatMessageWebSocketMessage(message) Log.d(TAG, "processing Received chat message") } From 8f55b5b1f5d90d5bf62c4a125a7c33306e58ef92 Mon Sep 17 00:00:00 2001 From: Marcel Hibbe Date: Fri, 12 Dec 2025 11:57:48 +0100 Subject: [PATCH 05/12] parse ChatMessage in SignalingMessageReceiver Signed-off-by: Marcel Hibbe --- .../signaling/SignalingMessageReceiver.kt | 43 ++++++++++++----- .../talk/webrtc/WebSocketInstance.kt | 46 +++++-------------- 2 files changed, 43 insertions(+), 46 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.kt b/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.kt index 37fea124c7d..d11c431d6e0 100644 --- a/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.kt +++ b/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.kt @@ -6,12 +6,14 @@ */ package com.nextcloud.talk.signaling +import com.bluelinelabs.logansquare.LoganSquare import com.nextcloud.talk.models.json.chat.ChatMessageJson import com.nextcloud.talk.models.json.converters.EnumActorTypeConverter import com.nextcloud.talk.models.json.converters.EnumParticipantTypeConverter import com.nextcloud.talk.models.json.participants.Participant import com.nextcloud.talk.models.json.signaling.NCSignalingMessage import com.nextcloud.talk.models.json.websocket.CallWebSocketMessage +import org.json.JSONObject import kotlin.Any import kotlin.Int import kotlin.Long @@ -158,8 +160,13 @@ abstract class SignalingMessageReceiver { fun onUnshareScreen() } - protected fun processChatMessageWebSocketMessage(chatMessage: ChatMessageJson) { - conversationMessageNotifier.notifyMessageReceived(chatMessage) + /** + * Listener for conversation messages. + */ + interface ConversationMessageListener { + fun onStartTyping(userId: String?, session: String?) + fun onStopTyping(userId: String?, session: String?) + fun onMessageReceived(chatMessage: ChatMessageJson?) } /** @@ -298,15 +305,6 @@ abstract class SignalingMessageReceiver { } } - /** - * Listener for conversation messages. - */ - interface ConversationMessageListener { - fun onStartTyping(userId: String?, session: String?) - fun onStopTyping(userId: String?, session: String?) - fun onMessageReceived(chatMessage: ChatMessageJson?) - } - private fun processSwitchToEvent(eventMap: Map?) { // Message schema: // { @@ -344,6 +342,29 @@ abstract class SignalingMessageReceiver { localParticipantMessageNotifier.notifySwitchTo(token) } + protected fun processChatMessageWebSocketMessage(jsonString: String) { + fun parseChatMessage(jsonString: String): ChatMessageJson? { + return try { + val root = JSONObject(jsonString) + val eventObj = root.optJSONObject("event") ?: return null + val messageObj = eventObj.optJSONObject("message") ?: return null + val dataObj = messageObj.optJSONObject("data") ?: return null + val chatObj = dataObj.optJSONObject("chat") ?: return null + val commentObj = chatObj.optJSONObject("comment") ?: return null + + LoganSquare.parse(commentObj.toString(), ChatMessageJson::class.java) + } catch (e: Exception) { + null + } + } + + val chatMessage = parseChatMessage(jsonString) + + chatMessage?.let { + conversationMessageNotifier.notifyMessageReceived(it) + } + } + private fun processUpdateEvent(eventMap: Map?) { val updateMap: Map? try { diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/WebSocketInstance.kt b/app/src/main/java/com/nextcloud/talk/webrtc/WebSocketInstance.kt index 377c7687c53..9e6a3f2fec6 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/WebSocketInstance.kt +++ b/app/src/main/java/com/nextcloud/talk/webrtc/WebSocketInstance.kt @@ -17,7 +17,6 @@ import com.nextcloud.talk.application.NextcloudTalkApplication.Companion.sharedA import com.nextcloud.talk.data.user.model.User import com.nextcloud.talk.events.NetworkEvent import com.nextcloud.talk.events.WebSocketCommunicationEvent -import com.nextcloud.talk.models.json.chat.ChatMessageJson import com.nextcloud.talk.models.json.participants.Participant import com.nextcloud.talk.models.json.participants.Participant.ActorType import com.nextcloud.talk.models.json.signaling.NCSignalingMessage @@ -184,7 +183,7 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU ncSignalingMessage.from = callWebSocketMessage.senderWebSocketMessage!!.sessionId } - signalingMessageReceiver.process(callWebSocketMessage) + signalingMessageReceiver.processChatMessage(callWebSocketMessage) } } @@ -203,11 +202,11 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU } else if ("leave" == eventOverallWebSocketMessage.eventMap!!["type"]) { processRoomLeaveMessage(eventOverallWebSocketMessage) } - signalingMessageReceiver.process(eventOverallWebSocketMessage.eventMap) + signalingMessageReceiver.processChatMessage(eventOverallWebSocketMessage.eventMap) } Globals.TARGET_PARTICIPANTS -> - signalingMessageReceiver.process(eventOverallWebSocketMessage.eventMap) + signalingMessageReceiver.processChatMessage(eventOverallWebSocketMessage.eventMap) else -> Log.i(TAG, "Received unknown/ignored event target: $target") @@ -218,25 +217,7 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU } } - private fun processRoomMessageMessage( - eventOverallWebSocketMessage: EventOverallWebSocketMessage, - jsonString: String - ) { - // fun parseChatMessage(jsonString: String): ChatMessageJson? { - // return try { - // val root = JSONObject(jsonString) - // val eventObj = root.optJSONObject("event") ?: return null - // val messageObj = eventObj.optJSONObject("message") ?: return null - // val dataObj = messageObj.optJSONObject("data") ?: return null - // val chatObj = dataObj.optJSONObject("chat") ?: return null - // val commentObj = chatObj.optJSONObject("comment") ?: return null - // - // LoganSquare.parse(commentObj.toString(), ChatMessageJson::class.java) - // } catch (e: Exception) { - // null - // } - // } - + private fun processRoomMessageMessage(eventOverallWebSocketMessage: EventOverallWebSocketMessage, text: String) { val messageHashMap = eventOverallWebSocketMessage.eventMap?.get("message") as Map<*, *>? if (messageHashMap != null && messageHashMap.containsKey("data")) { @@ -251,13 +232,9 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU eventBus!!.post(WebSocketCommunicationEvent("refreshChat", refreshChatHashMap)) } - // if (chatMap != null && chatMap.containsKey("comment")) { - // pass string through.. - // val chatMessage = parseChatMessage(jsonString) - // chatMessage?.let { - // signalingMessageReceiver.process(it) - // } - // } + if (chatMap != null && chatMap.containsKey("comment")) { + signalingMessageReceiver.processChatMessage(text) + } } else if (dataHashMap != null && dataHashMap.containsKey("recording")) { val recordingMap = dataHashMap["recording"] as Map<*, *>? if (recordingMap != null && recordingMap.containsKey("status")) { @@ -495,11 +472,11 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU * stays connected, but it may change whenever it is connected again. */ private class ExternalSignalingMessageReceiver : SignalingMessageReceiver() { - fun process(eventMap: Map?) { + fun processChatMessage(eventMap: Map?) { processEvent(eventMap) } - fun process(message: CallWebSocketMessage?) { + fun processChatMessage(message: CallWebSocketMessage?) { if (message?.ncSignalingMessage?.type == "startedTyping" || message?.ncSignalingMessage?.type == "stoppedTyping" ) { @@ -509,9 +486,8 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU } } - fun process(message: ChatMessageJson) { - // pass string through.. - processChatMessageWebSocketMessage(message) + fun processChatMessage(jsonString: String) { + processChatMessageWebSocketMessage(jsonString) Log.d(TAG, "processing Received chat message") } } From d2862f08414fd41f50fa9858aef555e73fbebfd0 Mon Sep 17 00:00:00 2001 From: Marcel Hibbe Date: Fri, 12 Dec 2025 15:00:17 +0100 Subject: [PATCH 06/12] show chat messages from signaling (temp. disabled longpolling for now) known issues: mark chat as read not working (have to use separate endpoint to mark as read) Signed-off-by: Marcel Hibbe --- .../com/nextcloud/talk/chat/ChatActivity.kt | 4 ++- .../talk/chat/data/ChatMessageRepository.kt | 3 ++ .../network/OfflineFirstChatRepository.kt | 32 +++++++++++++++++-- .../talk/chat/viewmodels/ChatViewModel.kt | 5 +++ .../signaling/ConversationMessageNotifier.kt | 2 +- .../signaling/SignalingMessageReceiver.kt | 2 +- 6 files changed, 43 insertions(+), 5 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt b/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt index 0d7b1d9b8dd..4a528ee0735 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt @@ -477,7 +477,9 @@ class ChatActivity : } } - override fun onMessageReceived(chatMessage: ChatMessageJson?) { + override fun onChatMessageReceived(chatMessage: ChatMessageJson) { + chatViewModel.onSignalingChatMessageReceived(chatMessage) + Log.d( TAG, "received message in ChatActivity. This is the chat message received via HPB. It would be " + diff --git a/app/src/main/java/com/nextcloud/talk/chat/data/ChatMessageRepository.kt b/app/src/main/java/com/nextcloud/talk/chat/data/ChatMessageRepository.kt index 31ab7cb3854..2f7d1ef9669 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/data/ChatMessageRepository.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/data/ChatMessageRepository.kt @@ -12,6 +12,7 @@ import com.nextcloud.talk.chat.data.io.LifecycleAwareManager import com.nextcloud.talk.chat.data.model.ChatMessage import com.nextcloud.talk.data.user.model.User import com.nextcloud.talk.models.domain.ConversationModel +import com.nextcloud.talk.models.json.chat.ChatMessageJson import com.nextcloud.talk.models.json.chat.ChatOverallSingleMessage import kotlinx.coroutines.Job import kotlinx.coroutines.flow.Flow @@ -117,4 +118,6 @@ interface ChatMessageRepository : LifecycleAwareManager { suspend fun sendUnsentChatMessages(credentials: String, url: String) suspend fun deleteTempMessage(chatMessage: ChatMessage) + + fun onSignalingChatMessageReceived(chatMessage: ChatMessageJson) } diff --git a/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt b/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt index 11fb8e44975..58143b5bad6 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt @@ -49,6 +49,8 @@ import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import java.io.IOException import javax.inject.Inject +import kotlin.collections.any +import kotlin.collections.map @Suppress("LargeClass", "TooManyFunctions") class OfflineFirstChatRepository @Inject constructor( @@ -203,7 +205,8 @@ class OfflineFirstChatRepository @Inject constructor( handleMessagesFromDb(newestMessageIdFromDb) - initMessagePolling(newestMessageIdFromDb) + // temp disabled to test only signaling + // initMessagePolling(newestMessageIdFromDb) } private suspend fun handleMessagesFromDb(newestMessageIdFromDb: Long) { @@ -355,6 +358,9 @@ class OfflineFirstChatRepository @Inject constructor( updateUiForLastCommonRead() + // getNewestMessageIdFromChatBlocks wont work for insurance calls. we dont want newest message + // but only the newest message that came from sync (not from signaling) + // -> create new var to save newest message from sync (set for initial and long polling requests) val newestMessage = chatBlocksDao.getNewestMessageIdFromChatBlocks( internalConversationId, threadId @@ -504,6 +510,7 @@ class OfflineFirstChatRepository @Inject constructor( Log.d(TAG, "Starting online request for single message (e.g. a reply)") sync(bundle) } + // we cant just expect here that sync succeeded?? return chatDao.getChatMessageForConversation( internalConversationId, messageId @@ -676,7 +683,7 @@ class OfflineFirstChatRepository @Inject constructor( newestMessageId = newestMessageIdForNewChatBlock, hasHistory = hasHistory ) - chatBlocksDao.upsertChatBlock(newChatBlock) // crash when no conversation thread exists! + chatBlocksDao.upsertChatBlock(newChatBlock) updateBlocks(newChatBlock) return chatMessagesFromSyncToProcess @@ -1026,6 +1033,27 @@ class OfflineFirstChatRepository @Inject constructor( _removeMessageFlow.emit(chatMessage) } + override fun onSignalingChatMessageReceived(chatMessage: ChatMessageJson) { + scope.launch { + val chatMessageEntities = updateMessagesData( + chatMessagesJson = listOf(chatMessage), + blockContainingQueriedMessage = null, + lookIntoFuture = true, + hasHistory = true + ) + + val chatMessages = chatMessageEntities.map(ChatMessageEntity::asModel) + + handleNewAndTempMessages( + receivedChatMessages = chatMessages, + lookIntoFuture = true, + showUnreadMessagesMarker = false + ) + + updateUiForLastCommonRead() + } + } + @Suppress("Detekt.TooGenericExceptionCaught") override suspend fun addTemporaryMessage( message: CharSequence, diff --git a/app/src/main/java/com/nextcloud/talk/chat/viewmodels/ChatViewModel.kt b/app/src/main/java/com/nextcloud/talk/chat/viewmodels/ChatViewModel.kt index 1ab528e5034..786bf35109a 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/viewmodels/ChatViewModel.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/viewmodels/ChatViewModel.kt @@ -35,6 +35,7 @@ import com.nextcloud.talk.models.domain.ConversationModel import com.nextcloud.talk.models.domain.ReactionAddedModel import com.nextcloud.talk.models.domain.ReactionDeletedModel import com.nextcloud.talk.models.json.capabilities.SpreedCapability +import com.nextcloud.talk.models.json.chat.ChatMessageJson import com.nextcloud.talk.models.json.chat.ChatOverallSingleMessage import com.nextcloud.talk.models.json.conversations.RoomOverall import com.nextcloud.talk.models.json.generic.GenericOverall @@ -132,6 +133,10 @@ class ChatViewModel @Inject constructor( mediaPlayerManager.handleOnStop() } + fun onSignalingChatMessageReceived(chatMessage: ChatMessageJson) { + chatRepository.onSignalingChatMessageReceived(chatMessage) + } + val backgroundPlayUIFlow = mediaPlayerManager.backgroundPlayUIFlow val mediaPlayerSeekbarObserver: Flow diff --git a/app/src/main/java/com/nextcloud/talk/signaling/ConversationMessageNotifier.kt b/app/src/main/java/com/nextcloud/talk/signaling/ConversationMessageNotifier.kt index b975d9789f6..5b5e4dbf242 100644 --- a/app/src/main/java/com/nextcloud/talk/signaling/ConversationMessageNotifier.kt +++ b/app/src/main/java/com/nextcloud/talk/signaling/ConversationMessageNotifier.kt @@ -33,7 +33,7 @@ internal class ConversationMessageNotifier { @Synchronized fun notifyMessageReceived(chatMessage: ChatMessageJson) { for (listener in ArrayList(conversationMessageListeners)) { - listener.onMessageReceived(chatMessage) + listener.onChatMessageReceived(chatMessage) } } diff --git a/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.kt b/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.kt index d11c431d6e0..cd531eba1e3 100644 --- a/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.kt +++ b/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.kt @@ -166,7 +166,7 @@ abstract class SignalingMessageReceiver { interface ConversationMessageListener { fun onStartTyping(userId: String?, session: String?) fun onStopTyping(userId: String?, session: String?) - fun onMessageReceived(chatMessage: ChatMessageJson?) + fun onChatMessageReceived(chatMessage: ChatMessageJson) } /** From 333e910556ae77424551325bdca3c7c87b2b41ea Mon Sep 17 00:00:00 2001 From: Marcel Hibbe Date: Fri, 12 Dec 2025 16:15:20 +0100 Subject: [PATCH 07/12] WIP mark chat as read wip: Maybe trigger it when the user scrolled to the bottom on his own. E.g. when a scrolling listener tells "scrolling is finished" we check if it's scrolled to the bottom and then send the mark as read also todo: mark as read "when the user is sending the app to the background or navigates "back to conversation list" and the last message a user read (saw on screen) is a higher id then the previous known readmarker" Signed-off-by: Marcel Hibbe --- .../com/nextcloud/talk/chat/ChatActivity.kt | 19 +++++++++++++++++++ .../talk/chat/viewmodels/ChatViewModel.kt | 4 ++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt b/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt index 4a528ee0735..e0c8256f5d3 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt @@ -1464,6 +1464,10 @@ class ChatActivity : if (isScrolledToBottom()) { binding.unreadMessagesPopup.visibility = View.GONE binding.scrollDownButton.visibility = View.GONE + + (adapter?.items?.getOrNull(0)?.item as? ChatMessage)?.jsonMessageId?.let { + markAsRead(it) + } } else { if (binding.unreadMessagesPopup.isShown) { binding.scrollDownButton.visibility = View.GONE @@ -3943,9 +3947,24 @@ class ChatActivity : } } + private fun markAsRead(messageId: Int) { + chatViewModel.setChatReadMarker( + credentials!!, + ApiUtils.getUrlForChatReadMarker( + ApiUtils.getChatApiVersion(spreedCapabilities, intArrayOf(ApiUtils.API_V1)), + conversationUser?.baseUrl!!, + roomToken + ), + messageId + ) + } + fun markAsUnread(message: IMessage?) { val chatMessage = message as ChatMessage? if (chatMessage!!.previousMessageId > NO_PREVIOUS_MESSAGE_ID) { + // previousMessageId is taken to mark chat as unread even when "chat-unread" capability is not available + // It should be checked if "chat-unread" capability is available and then use + // https://nextcloud-talk.readthedocs.io/en/latest/chat/#mark-chat-as-unread chatViewModel.setChatReadMarker( credentials!!, ApiUtils.getUrlForChatReadMarker( diff --git a/app/src/main/java/com/nextcloud/talk/chat/viewmodels/ChatViewModel.kt b/app/src/main/java/com/nextcloud/talk/chat/viewmodels/ChatViewModel.kt index 786bf35109a..7a71aa68fe7 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/viewmodels/ChatViewModel.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/viewmodels/ChatViewModel.kt @@ -564,8 +564,8 @@ class ChatViewModel @Inject constructor( }) } - fun setChatReadMarker(credentials: String, url: String, previousMessageId: Int) { - chatNetworkDataSource.setChatReadMarker(credentials, url, previousMessageId) + fun setChatReadMarker(credentials: String, url: String, lastReadMessage: Int) { + chatNetworkDataSource.setChatReadMarker(credentials, url, lastReadMessage) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(object : Observer { From 6c94163af163cdacaa0d7bef2a0ad3622a194033 Mon Sep 17 00:00:00 2001 From: Marcel Hibbe Date: Wed, 17 Dec 2025 12:38:00 +0100 Subject: [PATCH 08/12] implement new readMessage behavior known issues: does not work well together with val weHaveAtLeastTheLastReadMessage = newestMessageIdFromDb >= conversationModel.lastReadMessage.toLong() for now. (Messages can be missing on initial load) Signed-off-by: Marcel Hibbe --- .../messages/IncomingTextMessageViewHolder.kt | 7 +- .../OutcomingTextMessageViewHolder.kt | 7 +- .../com/nextcloud/talk/chat/ChatActivity.kt | 120 ++++++++++-------- .../network/OfflineFirstChatRepository.kt | 10 +- .../talk/chat/viewmodels/ChatViewModel.kt | 27 +++- 5 files changed, 107 insertions(+), 64 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/adapters/messages/IncomingTextMessageViewHolder.kt b/app/src/main/java/com/nextcloud/talk/adapters/messages/IncomingTextMessageViewHolder.kt index b1722de2d42..99442632ff7 100644 --- a/app/src/main/java/com/nextcloud/talk/adapters/messages/IncomingTextMessageViewHolder.kt +++ b/app/src/main/java/com/nextcloud/talk/adapters/messages/IncomingTextMessageViewHolder.kt @@ -10,6 +10,7 @@ package com.nextcloud.talk.adapters.messages import android.content.Context +import android.text.SpannableStringBuilder import android.util.Log import android.util.TypedValue import android.view.View @@ -150,10 +151,10 @@ class IncomingTextMessageViewHolder(itemView: View, payload: Any) : binding.messageAuthor.visibility = View.GONE } binding.messageText.setTextSize(TypedValue.COMPLEX_UNIT_PX, textSize) - binding.messageText.text = processedMessageText + // binding.messageText.text = processedMessageText // just for debugging: - // binding.messageText.text = - // SpannableStringBuilder(processedMessageText).append(" (" + message.jsonMessageId + ")") + binding.messageText.text = + SpannableStringBuilder(processedMessageText).append(" (" + message.jsonMessageId + ")") } else { binding.checkboxContainer.visibility = View.VISIBLE binding.messageText.visibility = View.GONE diff --git a/app/src/main/java/com/nextcloud/talk/adapters/messages/OutcomingTextMessageViewHolder.kt b/app/src/main/java/com/nextcloud/talk/adapters/messages/OutcomingTextMessageViewHolder.kt index b68f816d94c..b5ae9a8c246 100644 --- a/app/src/main/java/com/nextcloud/talk/adapters/messages/OutcomingTextMessageViewHolder.kt +++ b/app/src/main/java/com/nextcloud/talk/adapters/messages/OutcomingTextMessageViewHolder.kt @@ -10,6 +10,7 @@ package com.nextcloud.talk.adapters.messages import android.content.Context +import android.text.SpannableStringBuilder import android.util.Log import android.util.TypedValue import android.view.View @@ -163,10 +164,10 @@ class OutcomingTextMessageViewHolder(itemView: View) : binding.messageTime.layoutParams = layoutParams viewThemeUtils.platform.colorTextView(binding.messageText, ColorRole.ON_SURFACE_VARIANT) - binding.messageText.text = processedMessageText + // binding.messageText.text = processedMessageText // just for debugging: - // binding.messageText.text = - // SpannableStringBuilder(processedMessageText).append(" (" + message.jsonMessageId + ")") + binding.messageText.text = + SpannableStringBuilder(processedMessageText).append(" (" + message.jsonMessageId + ")") } else { binding.messageText.visibility = View.GONE binding.checkboxContainer.visibility = View.VISIBLE diff --git a/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt b/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt index e0c8256f5d3..311d32054f2 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt @@ -230,6 +230,7 @@ import io.reactivex.disposables.Disposable import io.reactivex.schedulers.Schedulers import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch @@ -250,6 +251,8 @@ import java.util.Locale import java.util.concurrent.ExecutionException import javax.inject.Inject import kotlin.math.roundToInt +import kotlinx.coroutines.flow.debounce +import kotlinx.coroutines.flow.launchIn @Suppress("TooManyFunctions") @AutoInjector(NextcloudTalkApplication::class) @@ -660,6 +663,7 @@ class ChatActivity : this.lifecycle.removeObserver(chatViewModel) } + @OptIn(FlowPreview::class) @SuppressLint("NotifyDataSetChanged", "SetTextI18n", "ResourceAsColor") @Suppress("LongMethod") private fun initObservers() { @@ -939,7 +943,7 @@ class ChatActivity : val id = state.msg.ocs!!.data!!.parentMessage!!.id.toString() val index = adapter?.getMessagePositionById(id) ?: 0 - val message = adapter?.items?.get(index)?.item as ChatMessage + val message = adapter?.items?.get(index)?.item as? ChatMessage setMessageAsDeleted(message) } @@ -998,43 +1002,46 @@ class ChatActivity : } } - this.lifecycleScope.launch { - chatViewModel.getMessageFlow - .onEach { triple -> - val lookIntoFuture = triple.first - val setUnreadMessagesMarker = triple.second - var chatMessageList = triple.third - - chatMessageList = handleSystemMessages(chatMessageList) - chatMessageList = handleThreadMessages(chatMessageList) - if (chatMessageList.isEmpty()) { - return@onEach - } + chatViewModel.getMessageFlow + .onEach { triple -> + val lookIntoFuture = triple.first + val setUnreadMessagesMarker = triple.second + var chatMessageList = triple.third - determinePreviousMessageIds(chatMessageList) + chatMessageList = handleSystemMessages(chatMessageList) + chatMessageList = handleThreadMessages(chatMessageList) + if (chatMessageList.isEmpty()) { + return@onEach + } - handleExpandableSystemMessages(chatMessageList) + determinePreviousMessageIds(chatMessageList) - if (ChatMessage.SystemMessageType.CLEARED_CHAT == chatMessageList[0].systemMessageType) { - adapter?.clear() - adapter?.notifyDataSetChanged() - } - - if (lookIntoFuture) { - Log.d(TAG, "chatMessageList.size in getMessageFlow:" + chatMessageList.size) - processMessagesFromTheFuture(chatMessageList, setUnreadMessagesMarker) - } else { - processMessagesNotFromTheFuture(chatMessageList) - collapseSystemMessages() - } - - processExpiredMessages() - processCallStartedMessages() + handleExpandableSystemMessages(chatMessageList) + if (ChatMessage.SystemMessageType.CLEARED_CHAT == chatMessageList[0].systemMessageType) { + adapter?.clear() adapter?.notifyDataSetChanged() } - .collect() - } + + if (lookIntoFuture) { + Log.d(TAG, "chatMessageList.size in getMessageFlow:" + chatMessageList.size) + processMessagesFromTheFuture(chatMessageList, setUnreadMessagesMarker) + } else { + processMessagesNotFromTheFuture(chatMessageList) + collapseSystemMessages() + } + + processExpiredMessages() + processCallStartedMessages() + + adapter?.notifyDataSetChanged() + } + .debounce(300) + .onEach { + advanceLocalLastReadMessageIfNeeded() + } + .launchIn(lifecycleScope) + this.lifecycleScope.launch { chatViewModel.getRemoveMessageFlow @@ -1461,13 +1468,11 @@ class ChatActivity : super.onScrollStateChanged(recyclerView, newState) if (newState == AbsListView.OnScrollListener.SCROLL_STATE_IDLE) { + advanceLocalLastReadMessageIfNeeded() + updateRemoteLastReadMessageIfNeeded() if (isScrolledToBottom()) { binding.unreadMessagesPopup.visibility = View.GONE binding.scrollDownButton.visibility = View.GONE - - (adapter?.items?.getOrNull(0)?.item as? ChatMessage)?.jsonMessageId?.let { - markAsRead(it) - } } else { if (binding.unreadMessagesPopup.isShown) { binding.scrollDownButton.visibility = View.GONE @@ -2759,9 +2764,36 @@ class ChatActivity : if (mentionAutocomplete != null && mentionAutocomplete!!.isPopupShowing) { mentionAutocomplete?.dismissPopup() } + updateRemoteLastReadMessageIfNeeded() + adapter = null } + private fun advanceLocalLastReadMessageIfNeeded() { + val position = layoutManager?.findFirstVisibleItemPosition() + position?.let { + // Casting could fail if it's not a chatMessage. It should not matter as the function is triggered often + // enough. If it's a problem, either improve or wait for migration to Jetpack Compose. + val message = adapter?.items?.getOrNull(it)?.item as? ChatMessage + message?.jsonMessageId?.let { messageId -> + chatViewModel.advanceLocalLastReadMessageIfNeeded(messageId) + } + } + } + + private fun updateRemoteLastReadMessageIfNeeded() { + val url = ApiUtils.getUrlForChatReadMarker( + ApiUtils.getChatApiVersion(spreedCapabilities, intArrayOf(ApiUtils.API_V1)), + conversationUser.baseUrl!!, + roomToken + ) + + chatViewModel.updateRemoteLastReadMessageIfNeeded( + credentials = credentials!!, + url = url + ) + } + private fun isActivityNotChangingConfigurations(): Boolean = !isChangingConfigurations private fun isNotInCall(): Boolean = @@ -3198,7 +3230,7 @@ class ChatActivity : it.item is ChatMessage && (it.item as ChatMessage).id == messageId } if (messagePosition >= 0) { - val currentItem = adapter?.items?.get(messagePosition)?.item + val currentItem = adapter?.items?.getOrNull(messagePosition)?.item if (currentItem is ChatMessage && currentItem.id == messageId) { return Pair(currentItem, messagePosition) } else { @@ -3947,25 +3979,13 @@ class ChatActivity : } } - private fun markAsRead(messageId: Int) { - chatViewModel.setChatReadMarker( - credentials!!, - ApiUtils.getUrlForChatReadMarker( - ApiUtils.getChatApiVersion(spreedCapabilities, intArrayOf(ApiUtils.API_V1)), - conversationUser?.baseUrl!!, - roomToken - ), - messageId - ) - } - fun markAsUnread(message: IMessage?) { val chatMessage = message as ChatMessage? if (chatMessage!!.previousMessageId > NO_PREVIOUS_MESSAGE_ID) { // previousMessageId is taken to mark chat as unread even when "chat-unread" capability is not available // It should be checked if "chat-unread" capability is available and then use // https://nextcloud-talk.readthedocs.io/en/latest/chat/#mark-chat-as-unread - chatViewModel.setChatReadMarker( + chatViewModel.setChatReadMessage( credentials!!, ApiUtils.getUrlForChatReadMarker( ApiUtils.getChatApiVersion(spreedCapabilities, intArrayOf(ApiUtils.API_V1)), diff --git a/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt b/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt index 58143b5bad6..f78e4d58abb 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt @@ -157,6 +157,8 @@ class OfflineFirstChatRepository @Inject constructor( Log.d(TAG, "newestMessageIdFromDb: $newestMessageIdFromDb") val weAlreadyHaveSomeOfflineMessages = newestMessageIdFromDb > 0 + + // do not take the conversationModel.lastReadMessage as it could have been updated val weHaveAtLeastTheLastReadMessage = newestMessageIdFromDb >= conversationModel.lastReadMessage.toLong() Log.d(TAG, "weAlreadyHaveSomeOfflineMessages:$weAlreadyHaveSomeOfflineMessages") Log.d(TAG, "weHaveAtLeastTheLastReadMessage:$weHaveAtLeastTheLastReadMessage") @@ -187,7 +189,6 @@ class OfflineFirstChatRepository @Inject constructor( lookIntoFuture = false, timeout = 0, includeLastKnown = true, - setReadMarker = true, lastKnown = null ) withNetworkParams.putSerializable(BundleKeys.KEY_FIELD_MAP, fieldMap) @@ -291,7 +292,6 @@ class OfflineFirstChatRepository @Inject constructor( lookIntoFuture = false, timeout = 0, includeLastKnown = false, - setReadMarker = true, lastKnown = beforeMessageId.toInt() ) withNetworkParams.putSerializable(BundleKeys.KEY_FIELD_MAP, fieldMap) @@ -319,7 +319,6 @@ class OfflineFirstChatRepository @Inject constructor( // initially no messages but someone writes us in the first 30 seconds. timeout = 0, includeLastKnown = false, - setReadMarker = true, lastKnown = initialMessageId.toInt() ) @@ -371,7 +370,6 @@ class OfflineFirstChatRepository @Inject constructor( lookIntoFuture = true, timeout = 30, includeLastKnown = false, - setReadMarker = true, lastKnown = newestMessage ) @@ -462,7 +460,6 @@ class OfflineFirstChatRepository @Inject constructor( lookIntoFuture: Boolean, timeout: Int, includeLastKnown: Boolean, - setReadMarker: Boolean, lastKnown: Int?, limit: Int = DEFAULT_MESSAGES_LIMIT ): HashMap { @@ -484,7 +481,7 @@ class OfflineFirstChatRepository @Inject constructor( fieldMap["limit"] = limit fieldMap["lookIntoFuture"] = if (lookIntoFuture) 1 else 0 - fieldMap["setReadMarker"] = if (setReadMarker) 1 else 0 + fieldMap["setReadMarker"] = 0 return fieldMap } @@ -501,7 +498,6 @@ class OfflineFirstChatRepository @Inject constructor( lookIntoFuture = false, timeout = 0, includeLastKnown = true, - setReadMarker = false, lastKnown = messageId.toInt(), limit = 1 ) diff --git a/app/src/main/java/com/nextcloud/talk/chat/viewmodels/ChatViewModel.kt b/app/src/main/java/com/nextcloud/talk/chat/viewmodels/ChatViewModel.kt index 7a71aa68fe7..0404cb628a3 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/viewmodels/ChatViewModel.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/viewmodels/ChatViewModel.kt @@ -94,6 +94,10 @@ class ChatViewModel @Inject constructor( lateinit var currentUser: User + private var localLastReadMessage: Int = 0 + + private lateinit var currentConversation: ConversationModel + private val mediaPlayerManager: MediaPlayerManager = MediaPlayerManager.sharedInstance(appPreferences) lateinit var currentLifeCycleFlag: LifeCycleFlag val disposableSet = mutableSetOf() @@ -307,7 +311,10 @@ class ChatViewModel @Inject constructor( } fun updateConversation(currentConversation: ConversationModel) { + this.currentConversation = currentConversation chatRepository.updateConversation(currentConversation) + + advanceLocalLastReadMessageIfNeeded(currentConversation.lastReadMessage) } fun getRoom(user: User, token: String) { @@ -564,7 +571,25 @@ class ChatViewModel @Inject constructor( }) } - fun setChatReadMarker(credentials: String, url: String, lastReadMessage: Int) { + fun advanceLocalLastReadMessageIfNeeded(messageId: Int) { + if (localLastReadMessage < messageId) { + localLastReadMessage = messageId + } + } + + /** + * Please use with caution to not spam the server + */ + fun updateRemoteLastReadMessageIfNeeded(credentials: String, url: String) { + if (localLastReadMessage > currentConversation.lastReadMessage) { + setChatReadMessage(credentials, url, localLastReadMessage) + } + } + + /** + * Please use with caution to not spam the server + */ + fun setChatReadMessage(credentials: String, url: String, lastReadMessage: Int) { chatNetworkDataSource.setChatReadMarker(credentials, url, lastReadMessage) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) From 4ca77d789a2c8fab63da2144b37f8080b5b2dab3 Mon Sep 17 00:00:00 2001 From: Marcel Hibbe Date: Wed, 17 Dec 2025 12:46:55 +0100 Subject: [PATCH 09/12] identify problem for load of initial messages (add comment) Signed-off-by: Marcel Hibbe --- .../talk/chat/data/network/OfflineFirstChatRepository.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt b/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt index f78e4d58abb..8052449815a 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt @@ -158,7 +158,6 @@ class OfflineFirstChatRepository @Inject constructor( val weAlreadyHaveSomeOfflineMessages = newestMessageIdFromDb > 0 - // do not take the conversationModel.lastReadMessage as it could have been updated val weHaveAtLeastTheLastReadMessage = newestMessageIdFromDb >= conversationModel.lastReadMessage.toLong() Log.d(TAG, "weAlreadyHaveSomeOfflineMessages:$weAlreadyHaveSomeOfflineMessages") Log.d(TAG, "weHaveAtLeastTheLastReadMessage:$weHaveAtLeastTheLastReadMessage") @@ -169,6 +168,9 @@ class OfflineFirstChatRepository @Inject constructor( "Initial online request is skipped because offline messages are up to date" + " until lastReadMessage" ) + + // This is a problem! No long polling should be done when we have the HPB. How to initially get the + // messages newer than TheLastReadMessage? Log.d(TAG, "For messages newer than lastRead, lookIntoFuture will load them.") } else { if (!weAlreadyHaveSomeOfflineMessages) { From 81ef9b2234c38c24c67a13e64d342244a0c51790 Mon Sep 17 00:00:00 2001 From: Marcel Hibbe Date: Wed, 17 Dec 2025 15:20:51 +0100 Subject: [PATCH 10/12] handle loading of initial messages depending on HPB existence Signed-off-by: Marcel Hibbe --- .../com/nextcloud/talk/chat/ChatActivity.kt | 10 ++++-- .../talk/chat/data/ChatMessageRepository.kt | 2 +- .../network/OfflineFirstChatRepository.kt | 32 +++++++++++++------ .../talk/chat/viewmodels/ChatViewModel.kt | 5 +-- 4 files changed, 34 insertions(+), 15 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt b/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt index 311d32054f2..2c4f02fdce8 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt @@ -830,7 +830,9 @@ class ChatActivity : chatViewModel.loadMessages( withCredentials = credentials!!, - withUrl = urlForChatting + withUrl = urlForChatting, + hasHighPerformanceBackend = + WebSocketConnectionHelper.getWebSocketInstanceForUser(conversationUser) != null ) } else { Log.w( @@ -1042,7 +1044,6 @@ class ChatActivity : } .launchIn(lifecycleScope) - this.lifecycleScope.launch { chatViewModel.getRemoveMessageFlow .onEach { @@ -2764,6 +2765,10 @@ class ChatActivity : if (mentionAutocomplete != null && mentionAutocomplete!!.isPopupShowing) { mentionAutocomplete?.dismissPopup() } + + // TODO: when updating remote last read message in onPause, there is a race condition with loading conversations + // for conversation list. It may or may not include info about the sent last read message... + // -> save this field offline in conversation? updateRemoteLastReadMessageIfNeeded() adapter = null @@ -2920,6 +2925,7 @@ class ChatActivity : private fun setupWebsocket() { if (currentConversation == null || conversationUser == null) { + Log.e(TAG, "setupWebsocket: currentConversation or conversationUser is null") return } diff --git a/app/src/main/java/com/nextcloud/talk/chat/data/ChatMessageRepository.kt b/app/src/main/java/com/nextcloud/talk/chat/data/ChatMessageRepository.kt index 2f7d1ef9669..0b1023ffc46 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/data/ChatMessageRepository.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/data/ChatMessageRepository.kt @@ -50,7 +50,7 @@ interface ChatMessageRepository : LifecycleAwareManager { fun updateConversation(conversationModel: ConversationModel) - fun initScopeAndLoadInitialMessages(withNetworkParams: Bundle) + fun initScopeAndLoadInitialMessages(withNetworkParams: Bundle, hasHighPerformanceBackend: Boolean) /** * Loads messages from local storage. If the messages are not found, then it diff --git a/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt b/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt index 8052449815a..215d27e4d53 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt @@ -140,12 +140,15 @@ class OfflineFirstChatRepository @Inject constructor( this.conversationModel = conversationModel } - override fun initScopeAndLoadInitialMessages(withNetworkParams: Bundle) { + override fun initScopeAndLoadInitialMessages(withNetworkParams: Bundle, hasHighPerformanceBackend: Boolean) { scope = CoroutineScope(Dispatchers.IO) - loadInitialMessages(withNetworkParams) + loadInitialMessages( + withNetworkParams, + hasHighPerformanceBackend + ) } - private fun loadInitialMessages(withNetworkParams: Bundle): Job = + private fun loadInitialMessages(withNetworkParams: Bundle, hasHighPerformanceBackend: Boolean): Job = scope.launch { Log.d(TAG, "---- loadInitialMessages ------------") newXChatLastCommonRead = conversationModel.lastCommonReadMessage @@ -161,19 +164,27 @@ class OfflineFirstChatRepository @Inject constructor( val weHaveAtLeastTheLastReadMessage = newestMessageIdFromDb >= conversationModel.lastReadMessage.toLong() Log.d(TAG, "weAlreadyHaveSomeOfflineMessages:$weAlreadyHaveSomeOfflineMessages") Log.d(TAG, "weHaveAtLeastTheLastReadMessage:$weHaveAtLeastTheLastReadMessage") + Log.d(TAG, "hasHighPerformanceBackend:$hasHighPerformanceBackend") - if (weAlreadyHaveSomeOfflineMessages && weHaveAtLeastTheLastReadMessage) { + if (weAlreadyHaveSomeOfflineMessages && weHaveAtLeastTheLastReadMessage && !hasHighPerformanceBackend) { Log.d( TAG, "Initial online request is skipped because offline messages are up to date" + " until lastReadMessage" ) - // This is a problem! No long polling should be done when we have the HPB. How to initially get the - // messages newer than TheLastReadMessage? - Log.d(TAG, "For messages newer than lastRead, lookIntoFuture will load them.") + // For messages newer than lastRead, lookIntoFuture will load them. + // We must only end up here when NO HPB is used! + // If a HPB is used, longPolling is not available to handle loading of newer messages. + // When a HPB is used the initial request must be made. } else { - if (!weAlreadyHaveSomeOfflineMessages) { + if (hasHighPerformanceBackend) { + Log.d( + TAG, + "An online request for newest 100 messages is made because HPB is used (No long " + + "polling available to catch up with messages newer than last read.)" + ) + } else if (!weAlreadyHaveSomeOfflineMessages) { Log.d(TAG, "An online request for newest 100 messages is made because offline chat is empty") if (networkMonitor.isOnline.value.not()) { _generalUIFlow.emit(ChatActivity.NO_OFFLINE_MESSAGES_FOUND) @@ -208,8 +219,9 @@ class OfflineFirstChatRepository @Inject constructor( handleMessagesFromDb(newestMessageIdFromDb) - // temp disabled to test only signaling - // initMessagePolling(newestMessageIdFromDb) + if (!hasHighPerformanceBackend) { + initMessagePolling(newestMessageIdFromDb) + } } private suspend fun handleMessagesFromDb(newestMessageIdFromDb: Long) { diff --git a/app/src/main/java/com/nextcloud/talk/chat/viewmodels/ChatViewModel.kt b/app/src/main/java/com/nextcloud/talk/chat/viewmodels/ChatViewModel.kt index 0404cb628a3..3289e8760f4 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/viewmodels/ChatViewModel.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/viewmodels/ChatViewModel.kt @@ -508,12 +508,13 @@ class ChatViewModel @Inject constructor( } } - fun loadMessages(withCredentials: String, withUrl: String) { + fun loadMessages(withCredentials: String, withUrl: String, hasHighPerformanceBackend: Boolean) { val bundle = Bundle() bundle.putString(BundleKeys.KEY_CHAT_URL, withUrl) bundle.putString(BundleKeys.KEY_CREDENTIALS, withCredentials) chatRepository.initScopeAndLoadInitialMessages( - withNetworkParams = bundle + withNetworkParams = bundle, + hasHighPerformanceBackend = hasHighPerformanceBackend ) } From 70b9a457ce6bd30d44fbad9ae66aee66df1daa92 Mon Sep 17 00:00:00 2001 From: Marcel Hibbe Date: Wed, 17 Dec 2025 17:44:26 +0100 Subject: [PATCH 11/12] minor renaming and prepare insurance request Signed-off-by: Marcel Hibbe --- .../talk/chat/data/ChatMessageRepository.kt | 2 +- .../network/OfflineFirstChatRepository.kt | 31 +++++++++++++------ 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/chat/data/ChatMessageRepository.kt b/app/src/main/java/com/nextcloud/talk/chat/data/ChatMessageRepository.kt index 0b1023ffc46..c3d3b01c562 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/data/ChatMessageRepository.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/data/ChatMessageRepository.kt @@ -71,7 +71,7 @@ interface ChatMessageRepository : LifecycleAwareManager { * the database with the server and emits the new messages to [messageFlow], * else it simply retries after timeout. */ - fun initMessagePolling(initialMessageId: Long): Job + fun initLongPolling(initialMessageId: Long): Job /** * Gets a individual message. diff --git a/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt b/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt index 215d27e4d53..5e943ec343c 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt @@ -208,7 +208,7 @@ class OfflineFirstChatRepository @Inject constructor( withNetworkParams.putString(BundleKeys.KEY_ROOM_TOKEN, conversationModel.token) Log.d(TAG, "Starting online request for initial loading") - val chatMessageEntities = sync(withNetworkParams) + val chatMessageEntities = getMessages(withNetworkParams) if (chatMessageEntities == null) { Log.e(TAG, "initial loading of messages failed") } @@ -220,7 +220,9 @@ class OfflineFirstChatRepository @Inject constructor( handleMessagesFromDb(newestMessageIdFromDb) if (!hasHighPerformanceBackend) { - initMessagePolling(newestMessageIdFromDb) + initLongPolling(newestMessageIdFromDb) + } else { + initRepeatingInsuranceRequest(newestMessageIdFromDb) } } @@ -314,16 +316,16 @@ class OfflineFirstChatRepository @Inject constructor( if (loadFromServer) { Log.d(TAG, "Starting online request for loadMoreMessages") - sync(withNetworkParams) + getMessages(withNetworkParams) } showMessagesBefore(internalConversationId, beforeMessageId, DEFAULT_MESSAGES_LIMIT) updateUiForLastCommonRead() } - override fun initMessagePolling(initialMessageId: Long): Job = + override fun initLongPolling(initialMessageId: Long): Job = scope.launch { - Log.d(TAG, "---- initMessagePolling ------------") + Log.d(TAG, "---- initLongPolling ------------") Log.d(TAG, "newestMessage: $initialMessageId") @@ -345,11 +347,11 @@ class OfflineFirstChatRepository @Inject constructor( Thread.sleep(HALF_SECOND) } else { // sync database with server - // (This is a long blocking call because long polling (lookIntoFuture) is set) + // (This is a long blocking call because long polling (lookIntoFuture and timeout) is set) networkParams.putSerializable(BundleKeys.KEY_FIELD_MAP, fieldMap) Log.d(TAG, "Starting online request for long polling") - val resultsFromSync = sync(networkParams) + val resultsFromSync = getMessages(networkParams) if (!resultsFromSync.isNullOrEmpty()) { val chatMessages = resultsFromSync.map(ChatMessageEntity::asModel) @@ -392,6 +394,17 @@ class OfflineFirstChatRepository @Inject constructor( } } + private fun initRepeatingInsuranceRequest(initialMessageId: Long) { + scope.launch { + Log.d(TAG, "---- initRepeatingInsuranceRequest ------------") + + Log.d(TAG, "newestMessage: $initialMessageId") + + + + } + } + private suspend fun handleNewAndTempMessages( receivedChatMessages: List, lookIntoFuture: Boolean, @@ -518,7 +531,7 @@ class OfflineFirstChatRepository @Inject constructor( bundle.putSerializable(BundleKeys.KEY_FIELD_MAP, fieldMap) Log.d(TAG, "Starting online request for single message (e.g. a reply)") - sync(bundle) + getMessages(bundle) } // we cant just expect here that sync succeeded?? return chatDao.getChatMessageForConversation( @@ -596,7 +609,7 @@ class OfflineFirstChatRepository @Inject constructor( return null } - private suspend fun sync(bundle: Bundle): List? { + private suspend fun getMessages(bundle: Bundle): List? { if (!networkMonitor.isOnline.value) { Log.d(TAG, "Device is offline, can't load chat messages from server") return null From f50377e59937b5fe013dc7403b233362326d69a9 Mon Sep 17 00:00:00 2001 From: Marcel Hibbe Date: Wed, 17 Dec 2025 19:40:17 +0100 Subject: [PATCH 12/12] migrate request to pull messages to flow/coroutine Signed-off-by: Marcel Hibbe --- .../java/com/nextcloud/talk/api/NcApi.java | 12 - .../com/nextcloud/talk/api/NcApiCoroutines.kt | 8 + .../data/network/ChatNetworkDataSource.kt | 8 +- .../network/OfflineFirstChatRepository.kt | 221 ++++++++++-------- .../chat/data/network/RetrofitChatNetwork.kt | 5 +- .../talk/chat/domain/ChatPullResult.kt | 21 ++ 6 files changed, 163 insertions(+), 112 deletions(-) create mode 100644 app/src/main/java/com/nextcloud/talk/chat/domain/ChatPullResult.kt diff --git a/app/src/main/java/com/nextcloud/talk/api/NcApi.java b/app/src/main/java/com/nextcloud/talk/api/NcApi.java index 6b79ae80091..b539670fbde 100644 --- a/app/src/main/java/com/nextcloud/talk/api/NcApi.java +++ b/app/src/main/java/com/nextcloud/talk/api/NcApi.java @@ -324,18 +324,6 @@ Observable> setPassword2(@Header("Authorization") Strin Observable getRoomCapabilities(@Header("Authorization") String authorization, @Url String url); - /* - QueryMap items are as follows: - - "lookIntoFuture": int (0 or 1), - - "limit" : int, range 100-200, - - "timeout": used with look into future, 30 default, 60 at most - - "lastKnownMessageId", int, use one from X-Chat-Last-Given - */ - @GET - Observable> pullChatMessages(@Header("Authorization") String authorization, - @Url String url, - @QueryMap Map fields); - /* Fieldmap items are as follows: - "message": , diff --git a/app/src/main/java/com/nextcloud/talk/api/NcApiCoroutines.kt b/app/src/main/java/com/nextcloud/talk/api/NcApiCoroutines.kt index f3f5324787b..c6710a3c8bd 100644 --- a/app/src/main/java/com/nextcloud/talk/api/NcApiCoroutines.kt +++ b/app/src/main/java/com/nextcloud/talk/api/NcApiCoroutines.kt @@ -27,6 +27,7 @@ import com.nextcloud.talk.models.json.threads.ThreadsOverall import com.nextcloud.talk.models.json.userAbsence.UserAbsenceOverall import okhttp3.MultipartBody import okhttp3.RequestBody +import retrofit2.Response import retrofit2.http.Body import retrofit2.http.DELETE import retrofit2.http.Field @@ -323,4 +324,11 @@ interface NcApiCoroutines { @GET suspend fun status(@Header("Authorization") authorization: String, @Url url: String): StatusOverall + + @GET + suspend fun pullChatMessages( + @Header("Authorization") authorization: String, + @Url url: String, + @QueryMap fields: Map + ): Response } diff --git a/app/src/main/java/com/nextcloud/talk/chat/data/network/ChatNetworkDataSource.kt b/app/src/main/java/com/nextcloud/talk/chat/data/network/ChatNetworkDataSource.kt index 5dcdfe3b618..e77407b4700 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/data/network/ChatNetworkDataSource.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/data/network/ChatNetworkDataSource.kt @@ -10,6 +10,7 @@ import com.nextcloud.talk.data.user.model.User import com.nextcloud.talk.models.domain.ConversationModel import com.nextcloud.talk.models.json.capabilities.SpreedCapability import com.nextcloud.talk.models.json.chat.ChatMessageJson +import com.nextcloud.talk.models.json.chat.ChatOverall import com.nextcloud.talk.models.json.chat.ChatOverallSingleMessage import com.nextcloud.talk.models.json.conversations.RoomOverall import com.nextcloud.talk.models.json.generic.GenericOverall @@ -63,7 +64,12 @@ interface ChatNetworkDataSource { threadTitle: String? ): ChatOverallSingleMessage - fun pullChatMessages(credentials: String, url: String, fieldMap: HashMap): Observable> + suspend fun pullChatMessages( + credentials: String, + url: String, + fieldMap: HashMap + ): Response + fun deleteChatMessage(credentials: String, url: String): Observable fun createRoom(credentials: String, url: String, map: Map): Observable fun setChatReadMarker(credentials: String, url: String, previousMessageId: Int): Observable diff --git a/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt b/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt index 5e943ec343c..eb0936c3bfa 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt @@ -13,6 +13,7 @@ import android.util.Log import com.nextcloud.talk.chat.ChatActivity import com.nextcloud.talk.chat.data.ChatMessageRepository import com.nextcloud.talk.chat.data.model.ChatMessage +import com.nextcloud.talk.chat.domain.ChatPullResult import com.nextcloud.talk.data.database.dao.ChatBlocksDao import com.nextcloud.talk.data.database.dao.ChatMessagesDao import com.nextcloud.talk.data.database.mappers.asEntity @@ -25,14 +26,11 @@ import com.nextcloud.talk.data.user.model.User import com.nextcloud.talk.extensions.toIntOrZero import com.nextcloud.talk.models.domain.ConversationModel import com.nextcloud.talk.models.json.chat.ChatMessageJson -import com.nextcloud.talk.models.json.chat.ChatOverall import com.nextcloud.talk.models.json.chat.ChatOverallSingleMessage import com.nextcloud.talk.models.json.converters.EnumActorTypeConverter import com.nextcloud.talk.models.json.participants.Participant import com.nextcloud.talk.utils.bundle.BundleKeys import com.nextcloud.talk.utils.message.SendMessageUtils -import io.reactivex.android.schedulers.AndroidSchedulers -import io.reactivex.schedulers.Schedulers import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job @@ -44,9 +42,11 @@ import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.firstOrNull import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import retrofit2.HttpException import java.io.IOException import javax.inject.Inject import kotlin.collections.any @@ -540,74 +540,91 @@ class OfflineFirstChatRepository @Inject constructor( ).map(ChatMessageEntity::asModel) } - @Suppress("UNCHECKED_CAST", "MagicNumber", "Detekt.TooGenericExceptionCaught") - private fun getMessagesFromServer(bundle: Bundle): Pair>? { + fun pullMessagesFlow(bundle: Bundle): Flow = flow { val fieldMap = bundle.getSerializable(BundleKeys.KEY_FIELD_MAP) as HashMap - var attempts = 1 + while (attempts < 5) { - Log.d(TAG, "message limit: " + fieldMap["limit"]) - try { - val result = network.pullChatMessages(credentials, urlForChatting, fieldMap) - .subscribeOn(Schedulers.io()) - .observeOn(AndroidSchedulers.mainThread()) - .map { it -> - when (it.code()) { - HTTP_CODE_OK -> { - Log.d(TAG, "getMessagesFromServer HTTP_CODE_OK") - newXChatLastCommonRead = it.headers()["X-Chat-Last-Common-Read"]?.let { - Integer.parseInt(it) - } - - return@map Pair( - HTTP_CODE_OK, - (it.body() as ChatOverall).ocs!!.data!! - ) - } - - HTTP_CODE_NOT_MODIFIED -> { - Log.d(TAG, "getMessagesFromServer HTTP_CODE_NOT_MODIFIED") - - return@map Pair( - HTTP_CODE_NOT_MODIFIED, - listOf() - ) - } - - HTTP_CODE_PRECONDITION_FAILED -> { - Log.d(TAG, "getMessagesFromServer HTTP_CODE_PRECONDITION_FAILED") - - return@map Pair( - HTTP_CODE_PRECONDITION_FAILED, - listOf() - ) - } - - else -> { - return@map Pair( - HTTP_CODE_PRECONDITION_FAILED, - listOf() - ) - } - } + runCatching { + network.pullChatMessages(credentials, urlForChatting, fieldMap) + }.fold( + onSuccess = { response -> + val result = when (response.code()) { + HTTP_CODE_OK -> ChatPullResult.Success( + messages = response.body()?.ocs?.data.orEmpty(), + lastCommonRead = response.headers()["X-Chat-Last-Common-Read"]?.toInt() + ) + HTTP_CODE_NOT_MODIFIED -> ChatPullResult.NotModified + HTTP_CODE_PRECONDITION_FAILED -> ChatPullResult.PreconditionFailed + else -> ChatPullResult.Error(HttpException(response)) } - .blockingSingle() - return result - } catch (e: Exception) { - Log.e(TAG, "Something went wrong when pulling chat messages (attempt: $attempts)", e) - attempts++ - val newMessageLimit = when (attempts) { - 2 -> 50 - 3 -> 10 - else -> 5 + emit(result) + return@flow + }, + onFailure = { e -> + Log.e(TAG, "Attempt $attempts failed", e) + attempts++ + fieldMap["limit"] = when (attempts) { 2 -> 50; 3 -> 10; else -> 5 } } - fieldMap["limit"] = newMessageLimit - } + ) } - Log.e(TAG, "All attempts to get messages from server failed") - return null - } + + emit(ChatPullResult.Error(IllegalStateException("All attempts failed"))) + }.flowOn(Dispatchers.IO) + + + + // private suspend fun getMessages(bundle: Bundle): List? { + // if (!networkMonitor.isOnline.value) { + // Log.d(TAG, "Device is offline, can't load chat messages from server") + // return null + // } + // + // val result = pullMessagesFlow(bundle) + // if (result == null) { + // Log.d(TAG, "No result from server") + // return null + // } + // + // var chatMessagesFromSync: List? = null + // + // val fieldMap = bundle.getSerializable(BundleKeys.KEY_FIELD_MAP) as HashMap + // val queriedMessageId = fieldMap["lastKnownMessageId"] + // val lookIntoFuture = fieldMap["lookIntoFuture"] == 1 + // + // val statusCode = result.first + // + // val hasHistory = getHasHistory(statusCode, lookIntoFuture) + // + // Log.d( + // TAG, + // "internalConv=$internalConversationId statusCode=$statusCode lookIntoFuture=$lookIntoFuture " + + // "hasHistory=$hasHistory " + + // "queriedMessageId=$queriedMessageId" + // ) + // + // val blockContainingQueriedMessage: ChatBlockEntity? = getBlockOfMessage(queriedMessageId) + // + // if (blockContainingQueriedMessage != null && !hasHistory) { + // blockContainingQueriedMessage.hasHistory = false + // chatBlocksDao.upsertChatBlock(blockContainingQueriedMessage) + // Log.d(TAG, "End of chat was reached so hasHistory=false is set") + // } + // + // if (result.second.isNotEmpty()) { + // chatMessagesFromSync = updateMessagesData( + // result.second, + // blockContainingQueriedMessage, + // lookIntoFuture, + // hasHistory + // ) + // } else { + // Log.d(TAG, "no data is updated...") + // } + // + // return chatMessagesFromSync + // } private suspend fun getMessages(bundle: Bundle): List? { if (!networkMonitor.isOnline.value) { @@ -615,49 +632,59 @@ class OfflineFirstChatRepository @Inject constructor( return null } - val result = getMessagesFromServer(bundle) - if (result == null) { - Log.d(TAG, "No result from server") - return null - } - - var chatMessagesFromSync: List? = null - val fieldMap = bundle.getSerializable(BundleKeys.KEY_FIELD_MAP) as HashMap val queriedMessageId = fieldMap["lastKnownMessageId"] val lookIntoFuture = fieldMap["lookIntoFuture"] == 1 - val statusCode = result.first + val result = pullMessagesFlow(bundle).first() - val hasHistory = getHasHistory(statusCode, lookIntoFuture) + return when (result) { - Log.d( - TAG, - "internalConv=$internalConversationId statusCode=$statusCode lookIntoFuture=$lookIntoFuture " + - "hasHistory=$hasHistory " + - "queriedMessageId=$queriedMessageId" - ) + is ChatPullResult.Success -> { + val hasHistory = getHasHistory(HTTP_CODE_OK, lookIntoFuture) - val blockContainingQueriedMessage: ChatBlockEntity? = getBlockOfMessage(queriedMessageId) + Log.d( + TAG, + "internalConv=$internalConversationId statusCode=${HTTP_CODE_OK} lookIntoFuture=$lookIntoFuture " + + "hasHistory=$hasHistory queriedMessageId=$queriedMessageId" + ) - if (blockContainingQueriedMessage != null && !hasHistory) { - blockContainingQueriedMessage.hasHistory = false - chatBlocksDao.upsertChatBlock(blockContainingQueriedMessage) - Log.d(TAG, "End of chat was reached so hasHistory=false is set") - } + val blockContainingQueriedMessage: ChatBlockEntity? = getBlockOfMessage(queriedMessageId) - if (result.second.isNotEmpty()) { - chatMessagesFromSync = updateMessagesData( - result.second, - blockContainingQueriedMessage, - lookIntoFuture, - hasHistory - ) - } else { - Log.d(TAG, "no data is updated...") - } + blockContainingQueriedMessage?.takeIf { !hasHistory }?.apply { + this.hasHistory = false + chatBlocksDao.upsertChatBlock(this) + Log.d(TAG, "End of chat reached, set hasHistory=false") + } + + if (result.messages.isNotEmpty()) { + updateMessagesData( + result.messages, + blockContainingQueriedMessage, + lookIntoFuture, + hasHistory + ) + } else { + Log.d(TAG, "No new messages to update") + null + } + } - return chatMessagesFromSync + is ChatPullResult.NotModified -> { + Log.d(TAG, "Server returned NOT_MODIFIED, nothing to update") + null + } + + is ChatPullResult.PreconditionFailed -> { + Log.d(TAG, "Server returned PRECONDITION_FAILED, nothing to update") + null + } + + is ChatPullResult.Error -> { + Log.e(TAG, "Error pulling messages from server", result.throwable) + null + } + } } private suspend fun OfflineFirstChatRepository.updateMessagesData( diff --git a/app/src/main/java/com/nextcloud/talk/chat/data/network/RetrofitChatNetwork.kt b/app/src/main/java/com/nextcloud/talk/chat/data/network/RetrofitChatNetwork.kt index 6bb6836cafe..8ad2f882008 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/data/network/RetrofitChatNetwork.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/data/network/RetrofitChatNetwork.kt @@ -12,6 +12,7 @@ import com.nextcloud.talk.data.user.model.User import com.nextcloud.talk.models.domain.ConversationModel import com.nextcloud.talk.models.json.capabilities.SpreedCapability import com.nextcloud.talk.models.json.chat.ChatMessageJson +import com.nextcloud.talk.models.json.chat.ChatOverall import com.nextcloud.talk.models.json.chat.ChatOverallSingleMessage import com.nextcloud.talk.models.json.conversations.RoomOverall import com.nextcloud.talk.models.json.generic.GenericOverall @@ -158,11 +159,11 @@ class RetrofitChatNetwork(private val ncApi: NcApi, private val ncApiCoroutines: threadTitle ) - override fun pullChatMessages( + override suspend fun pullChatMessages( credentials: String, url: String, fieldMap: HashMap - ): Observable> = ncApi.pullChatMessages(credentials, url, fieldMap).map { it } + ): Response = ncApiCoroutines.pullChatMessages(credentials, url, fieldMap) override fun deleteChatMessage(credentials: String, url: String): Observable = ncApi.deleteChatMessage(credentials, url).map { diff --git a/app/src/main/java/com/nextcloud/talk/chat/domain/ChatPullResult.kt b/app/src/main/java/com/nextcloud/talk/chat/domain/ChatPullResult.kt new file mode 100644 index 00000000000..99b8ac1c5a1 --- /dev/null +++ b/app/src/main/java/com/nextcloud/talk/chat/domain/ChatPullResult.kt @@ -0,0 +1,21 @@ +/* + * Nextcloud Talk - Android Client + * + * SPDX-FileCopyrightText: 2025 Your Name + * SPDX-License-Identifier: GPL-3.0-or-later + */ + +package com.nextcloud.talk.chat.domain + +import com.nextcloud.talk.models.json.chat.ChatMessageJson + +sealed class ChatPullResult { + data class Success( + val messages: List, + val lastCommonRead: Int? + ) : ChatPullResult() + + object NotModified : ChatPullResult() + object PreconditionFailed : ChatPullResult() + data class Error(val throwable: Throwable) : ChatPullResult() +}