diff --git a/app/src/main/java/com/nextcloud/talk/activities/CallActivity.kt b/app/src/main/java/com/nextcloud/talk/activities/CallActivity.kt index 095178fefbf..7cbbdde62fa 100644 --- a/app/src/main/java/com/nextcloud/talk/activities/CallActivity.kt +++ b/app/src/main/java/com/nextcloud/talk/activities/CallActivity.kt @@ -287,7 +287,10 @@ class CallActivity : CallBaseActivity() { private var isBreakoutRoom = false private val localParticipantMessageListener = LocalParticipantMessageListener { token -> switchToRoomToken = token - hangup(true, false) + hangup( + shutDownView = true, + endCallForAll = false + ) } private val offerMessageListener = OfferMessageListener { sessionId, roomType, sdp, nick -> getOrCreatePeerConnectionWrapperForSessionIdAndType( @@ -1919,7 +1922,7 @@ class CallActivity : CallBaseActivity() { when (messageType) { "usersInRoom" -> - internalSignalingMessageReceiver.process(signaling.messageWrapper as List?>?) + internalSignalingMessageReceiver.process(signaling.messageWrapper as List>) "message" -> { val ncSignalingMessage = LoganSquare.parse( @@ -2735,11 +2738,11 @@ class CallActivity : CallBaseActivity() { * All listeners are called in the main thread. */ private class InternalSignalingMessageReceiver : SignalingMessageReceiver() { - fun process(users: List?>?) { + fun process(users: List>) { processUsersInRoom(users) } - fun process(message: NCSignalingMessage?) { + fun process(message: NCSignalingMessage) { processSignalingMessage(message) } } diff --git a/app/src/main/java/com/nextcloud/talk/activities/ParticipantHandler.kt b/app/src/main/java/com/nextcloud/talk/activities/ParticipantHandler.kt index 62bd61e781c..09d681bd306 100644 --- a/app/src/main/java/com/nextcloud/talk/activities/ParticipantHandler.kt +++ b/app/src/main/java/com/nextcloud/talk/activities/ParticipantHandler.kt @@ -139,7 +139,7 @@ class ParticipantHandler( _uiState.update { it.copy(raisedHand = state) } } - override fun onReaction(reaction: String?) { + override fun onReaction(reaction: String) { Log.d(TAG, "onReaction") } diff --git a/app/src/main/java/com/nextcloud/talk/adapters/messages/IncomingTextMessageViewHolder.kt b/app/src/main/java/com/nextcloud/talk/adapters/messages/IncomingTextMessageViewHolder.kt index b1722de2d42..99442632ff7 100644 --- a/app/src/main/java/com/nextcloud/talk/adapters/messages/IncomingTextMessageViewHolder.kt +++ b/app/src/main/java/com/nextcloud/talk/adapters/messages/IncomingTextMessageViewHolder.kt @@ -10,6 +10,7 @@ package com.nextcloud.talk.adapters.messages import android.content.Context +import android.text.SpannableStringBuilder import android.util.Log import android.util.TypedValue import android.view.View @@ -150,10 +151,10 @@ class IncomingTextMessageViewHolder(itemView: View, payload: Any) : binding.messageAuthor.visibility = View.GONE } binding.messageText.setTextSize(TypedValue.COMPLEX_UNIT_PX, textSize) - binding.messageText.text = processedMessageText + // binding.messageText.text = processedMessageText // just for debugging: - // binding.messageText.text = - // SpannableStringBuilder(processedMessageText).append(" (" + message.jsonMessageId + ")") + binding.messageText.text = + SpannableStringBuilder(processedMessageText).append(" (" + message.jsonMessageId + ")") } else { binding.checkboxContainer.visibility = View.VISIBLE binding.messageText.visibility = View.GONE diff --git a/app/src/main/java/com/nextcloud/talk/adapters/messages/OutcomingTextMessageViewHolder.kt b/app/src/main/java/com/nextcloud/talk/adapters/messages/OutcomingTextMessageViewHolder.kt index b68f816d94c..b5ae9a8c246 100644 --- a/app/src/main/java/com/nextcloud/talk/adapters/messages/OutcomingTextMessageViewHolder.kt +++ b/app/src/main/java/com/nextcloud/talk/adapters/messages/OutcomingTextMessageViewHolder.kt @@ -10,6 +10,7 @@ package com.nextcloud.talk.adapters.messages import android.content.Context +import android.text.SpannableStringBuilder import android.util.Log import android.util.TypedValue import android.view.View @@ -163,10 +164,10 @@ class OutcomingTextMessageViewHolder(itemView: View) : binding.messageTime.layoutParams = layoutParams viewThemeUtils.platform.colorTextView(binding.messageText, ColorRole.ON_SURFACE_VARIANT) - binding.messageText.text = processedMessageText + // binding.messageText.text = processedMessageText // just for debugging: - // binding.messageText.text = - // SpannableStringBuilder(processedMessageText).append(" (" + message.jsonMessageId + ")") + binding.messageText.text = + SpannableStringBuilder(processedMessageText).append(" (" + message.jsonMessageId + ")") } else { binding.messageText.visibility = View.GONE binding.checkboxContainer.visibility = View.VISIBLE diff --git a/app/src/main/java/com/nextcloud/talk/api/NcApi.java b/app/src/main/java/com/nextcloud/talk/api/NcApi.java index 6b79ae80091..b539670fbde 100644 --- a/app/src/main/java/com/nextcloud/talk/api/NcApi.java +++ b/app/src/main/java/com/nextcloud/talk/api/NcApi.java @@ -324,18 +324,6 @@ Observable> setPassword2(@Header("Authorization") Strin Observable getRoomCapabilities(@Header("Authorization") String authorization, @Url String url); - /* - QueryMap items are as follows: - - "lookIntoFuture": int (0 or 1), - - "limit" : int, range 100-200, - - "timeout": used with look into future, 30 default, 60 at most - - "lastKnownMessageId", int, use one from X-Chat-Last-Given - */ - @GET - Observable> pullChatMessages(@Header("Authorization") String authorization, - @Url String url, - @QueryMap Map fields); - /* Fieldmap items are as follows: - "message": , diff --git a/app/src/main/java/com/nextcloud/talk/api/NcApiCoroutines.kt b/app/src/main/java/com/nextcloud/talk/api/NcApiCoroutines.kt index f3f5324787b..c6710a3c8bd 100644 --- a/app/src/main/java/com/nextcloud/talk/api/NcApiCoroutines.kt +++ b/app/src/main/java/com/nextcloud/talk/api/NcApiCoroutines.kt @@ -27,6 +27,7 @@ import com.nextcloud.talk.models.json.threads.ThreadsOverall import com.nextcloud.talk.models.json.userAbsence.UserAbsenceOverall import okhttp3.MultipartBody import okhttp3.RequestBody +import retrofit2.Response import retrofit2.http.Body import retrofit2.http.DELETE import retrofit2.http.Field @@ -323,4 +324,11 @@ interface NcApiCoroutines { @GET suspend fun status(@Header("Authorization") authorization: String, @Url url: String): StatusOverall + + @GET + suspend fun pullChatMessages( + @Header("Authorization") authorization: String, + @Url url: String, + @QueryMap fields: Map + ): Response } diff --git a/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt b/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt index 90f20bce462..2c4f02fdce8 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt @@ -155,6 +155,7 @@ import com.nextcloud.talk.messagesearch.MessageSearchActivity import com.nextcloud.talk.models.ExternalSignalingServer import com.nextcloud.talk.models.domain.ConversationModel import com.nextcloud.talk.models.json.capabilities.SpreedCapability +import com.nextcloud.talk.models.json.chat.ChatMessageJson import com.nextcloud.talk.models.json.chat.ReadStatus import com.nextcloud.talk.models.json.conversations.ConversationEnums import com.nextcloud.talk.models.json.participants.Participant @@ -229,6 +230,7 @@ import io.reactivex.disposables.Disposable import io.reactivex.schedulers.Schedulers import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch @@ -249,6 +251,8 @@ import java.util.Locale import java.util.concurrent.ExecutionException import javax.inject.Inject import kotlin.math.roundToInt +import kotlinx.coroutines.flow.debounce +import kotlinx.coroutines.flow.launchIn @Suppress("TooManyFunctions") @AutoInjector(NextcloudTalkApplication::class) @@ -427,15 +431,15 @@ class ChatActivity : var callStarted = false - private val localParticipantMessageListener = object : SignalingMessageReceiver.LocalParticipantMessageListener { - override fun onSwitchTo(token: String?) { - if (token != null) { - if (CallActivity.active) { - Log.d(TAG, "CallActivity is running. Ignore to switch chat in ChatActivity...") - } else { - switchToRoom(token, false, false) - } - } + private val localParticipantMessageListener = SignalingMessageReceiver.LocalParticipantMessageListener { token -> + if (CallActivity.active) { + Log.d(TAG, "CallActivity is running. Ignore to switch chat in ChatActivity...") + } else { + switchToRoom( + token = token, + startCallAfterRoomSwitch = false, + isVoiceOnlyCall = false + ) } } @@ -475,6 +479,17 @@ class ChatActivity : updateTypingIndicator() } } + + override fun onChatMessageReceived(chatMessage: ChatMessageJson) { + chatViewModel.onSignalingChatMessageReceived(chatMessage) + + Log.d( + TAG, + "received message in ChatActivity. This is the chat message received via HPB. It would be " + + "nicer to receive it in the ViewModel or Repository directly. " + + "Otherwise it needs to be passed into it from here..." + ) + } } override fun onCreate(savedInstanceState: Bundle?) { @@ -648,6 +663,7 @@ class ChatActivity : this.lifecycle.removeObserver(chatViewModel) } + @OptIn(FlowPreview::class) @SuppressLint("NotifyDataSetChanged", "SetTextI18n", "ResourceAsColor") @Suppress("LongMethod") private fun initObservers() { @@ -814,7 +830,9 @@ class ChatActivity : chatViewModel.loadMessages( withCredentials = credentials!!, - withUrl = urlForChatting + withUrl = urlForChatting, + hasHighPerformanceBackend = + WebSocketConnectionHelper.getWebSocketInstanceForUser(conversationUser) != null ) } else { Log.w( @@ -927,7 +945,7 @@ class ChatActivity : val id = state.msg.ocs!!.data!!.parentMessage!!.id.toString() val index = adapter?.getMessagePositionById(id) ?: 0 - val message = adapter?.items?.get(index)?.item as ChatMessage + val message = adapter?.items?.get(index)?.item as? ChatMessage setMessageAsDeleted(message) } @@ -986,43 +1004,45 @@ class ChatActivity : } } - this.lifecycleScope.launch { - chatViewModel.getMessageFlow - .onEach { triple -> - val lookIntoFuture = triple.first - val setUnreadMessagesMarker = triple.second - var chatMessageList = triple.third - - chatMessageList = handleSystemMessages(chatMessageList) - chatMessageList = handleThreadMessages(chatMessageList) - if (chatMessageList.isEmpty()) { - return@onEach - } - - determinePreviousMessageIds(chatMessageList) - - handleExpandableSystemMessages(chatMessageList) + chatViewModel.getMessageFlow + .onEach { triple -> + val lookIntoFuture = triple.first + val setUnreadMessagesMarker = triple.second + var chatMessageList = triple.third - if (ChatMessage.SystemMessageType.CLEARED_CHAT == chatMessageList[0].systemMessageType) { - adapter?.clear() - adapter?.notifyDataSetChanged() - } + chatMessageList = handleSystemMessages(chatMessageList) + chatMessageList = handleThreadMessages(chatMessageList) + if (chatMessageList.isEmpty()) { + return@onEach + } - if (lookIntoFuture) { - Log.d(TAG, "chatMessageList.size in getMessageFlow:" + chatMessageList.size) - processMessagesFromTheFuture(chatMessageList, setUnreadMessagesMarker) - } else { - processMessagesNotFromTheFuture(chatMessageList) - collapseSystemMessages() - } + determinePreviousMessageIds(chatMessageList) - processExpiredMessages() - processCallStartedMessages() + handleExpandableSystemMessages(chatMessageList) + if (ChatMessage.SystemMessageType.CLEARED_CHAT == chatMessageList[0].systemMessageType) { + adapter?.clear() adapter?.notifyDataSetChanged() } - .collect() - } + + if (lookIntoFuture) { + Log.d(TAG, "chatMessageList.size in getMessageFlow:" + chatMessageList.size) + processMessagesFromTheFuture(chatMessageList, setUnreadMessagesMarker) + } else { + processMessagesNotFromTheFuture(chatMessageList) + collapseSystemMessages() + } + + processExpiredMessages() + processCallStartedMessages() + + adapter?.notifyDataSetChanged() + } + .debounce(300) + .onEach { + advanceLocalLastReadMessageIfNeeded() + } + .launchIn(lifecycleScope) this.lifecycleScope.launch { chatViewModel.getRemoveMessageFlow @@ -1449,6 +1469,8 @@ class ChatActivity : super.onScrollStateChanged(recyclerView, newState) if (newState == AbsListView.OnScrollListener.SCROLL_STATE_IDLE) { + advanceLocalLastReadMessageIfNeeded() + updateRemoteLastReadMessageIfNeeded() if (isScrolledToBottom()) { binding.unreadMessagesPopup.visibility = View.GONE binding.scrollDownButton.visibility = View.GONE @@ -2743,9 +2765,40 @@ class ChatActivity : if (mentionAutocomplete != null && mentionAutocomplete!!.isPopupShowing) { mentionAutocomplete?.dismissPopup() } + + // TODO: when updating remote last read message in onPause, there is a race condition with loading conversations + // for conversation list. It may or may not include info about the sent last read message... + // -> save this field offline in conversation? + updateRemoteLastReadMessageIfNeeded() + adapter = null } + private fun advanceLocalLastReadMessageIfNeeded() { + val position = layoutManager?.findFirstVisibleItemPosition() + position?.let { + // Casting could fail if it's not a chatMessage. It should not matter as the function is triggered often + // enough. If it's a problem, either improve or wait for migration to Jetpack Compose. + val message = adapter?.items?.getOrNull(it)?.item as? ChatMessage + message?.jsonMessageId?.let { messageId -> + chatViewModel.advanceLocalLastReadMessageIfNeeded(messageId) + } + } + } + + private fun updateRemoteLastReadMessageIfNeeded() { + val url = ApiUtils.getUrlForChatReadMarker( + ApiUtils.getChatApiVersion(spreedCapabilities, intArrayOf(ApiUtils.API_V1)), + conversationUser.baseUrl!!, + roomToken + ) + + chatViewModel.updateRemoteLastReadMessageIfNeeded( + credentials = credentials!!, + url = url + ) + } + private fun isActivityNotChangingConfigurations(): Boolean = !isChangingConfigurations private fun isNotInCall(): Boolean = @@ -2872,6 +2925,7 @@ class ChatActivity : private fun setupWebsocket() { if (currentConversation == null || conversationUser == null) { + Log.e(TAG, "setupWebsocket: currentConversation or conversationUser is null") return } @@ -3182,7 +3236,7 @@ class ChatActivity : it.item is ChatMessage && (it.item as ChatMessage).id == messageId } if (messagePosition >= 0) { - val currentItem = adapter?.items?.get(messagePosition)?.item + val currentItem = adapter?.items?.getOrNull(messagePosition)?.item if (currentItem is ChatMessage && currentItem.id == messageId) { return Pair(currentItem, messagePosition) } else { @@ -3934,7 +3988,10 @@ class ChatActivity : fun markAsUnread(message: IMessage?) { val chatMessage = message as ChatMessage? if (chatMessage!!.previousMessageId > NO_PREVIOUS_MESSAGE_ID) { - chatViewModel.setChatReadMarker( + // previousMessageId is taken to mark chat as unread even when "chat-unread" capability is not available + // It should be checked if "chat-unread" capability is available and then use + // https://nextcloud-talk.readthedocs.io/en/latest/chat/#mark-chat-as-unread + chatViewModel.setChatReadMessage( credentials!!, ApiUtils.getUrlForChatReadMarker( ApiUtils.getChatApiVersion(spreedCapabilities, intArrayOf(ApiUtils.API_V1)), diff --git a/app/src/main/java/com/nextcloud/talk/chat/data/ChatMessageRepository.kt b/app/src/main/java/com/nextcloud/talk/chat/data/ChatMessageRepository.kt index 31ab7cb3854..c3d3b01c562 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/data/ChatMessageRepository.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/data/ChatMessageRepository.kt @@ -12,6 +12,7 @@ import com.nextcloud.talk.chat.data.io.LifecycleAwareManager import com.nextcloud.talk.chat.data.model.ChatMessage import com.nextcloud.talk.data.user.model.User import com.nextcloud.talk.models.domain.ConversationModel +import com.nextcloud.talk.models.json.chat.ChatMessageJson import com.nextcloud.talk.models.json.chat.ChatOverallSingleMessage import kotlinx.coroutines.Job import kotlinx.coroutines.flow.Flow @@ -49,7 +50,7 @@ interface ChatMessageRepository : LifecycleAwareManager { fun updateConversation(conversationModel: ConversationModel) - fun initScopeAndLoadInitialMessages(withNetworkParams: Bundle) + fun initScopeAndLoadInitialMessages(withNetworkParams: Bundle, hasHighPerformanceBackend: Boolean) /** * Loads messages from local storage. If the messages are not found, then it @@ -70,7 +71,7 @@ interface ChatMessageRepository : LifecycleAwareManager { * the database with the server and emits the new messages to [messageFlow], * else it simply retries after timeout. */ - fun initMessagePolling(initialMessageId: Long): Job + fun initLongPolling(initialMessageId: Long): Job /** * Gets a individual message. @@ -117,4 +118,6 @@ interface ChatMessageRepository : LifecycleAwareManager { suspend fun sendUnsentChatMessages(credentials: String, url: String) suspend fun deleteTempMessage(chatMessage: ChatMessage) + + fun onSignalingChatMessageReceived(chatMessage: ChatMessageJson) } diff --git a/app/src/main/java/com/nextcloud/talk/chat/data/network/ChatNetworkDataSource.kt b/app/src/main/java/com/nextcloud/talk/chat/data/network/ChatNetworkDataSource.kt index 5dcdfe3b618..e77407b4700 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/data/network/ChatNetworkDataSource.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/data/network/ChatNetworkDataSource.kt @@ -10,6 +10,7 @@ import com.nextcloud.talk.data.user.model.User import com.nextcloud.talk.models.domain.ConversationModel import com.nextcloud.talk.models.json.capabilities.SpreedCapability import com.nextcloud.talk.models.json.chat.ChatMessageJson +import com.nextcloud.talk.models.json.chat.ChatOverall import com.nextcloud.talk.models.json.chat.ChatOverallSingleMessage import com.nextcloud.talk.models.json.conversations.RoomOverall import com.nextcloud.talk.models.json.generic.GenericOverall @@ -63,7 +64,12 @@ interface ChatNetworkDataSource { threadTitle: String? ): ChatOverallSingleMessage - fun pullChatMessages(credentials: String, url: String, fieldMap: HashMap): Observable> + suspend fun pullChatMessages( + credentials: String, + url: String, + fieldMap: HashMap + ): Response + fun deleteChatMessage(credentials: String, url: String): Observable fun createRoom(credentials: String, url: String, map: Map): Observable fun setChatReadMarker(credentials: String, url: String, previousMessageId: Int): Observable diff --git a/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt b/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt index 11fb8e44975..eb0936c3bfa 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt @@ -13,6 +13,7 @@ import android.util.Log import com.nextcloud.talk.chat.ChatActivity import com.nextcloud.talk.chat.data.ChatMessageRepository import com.nextcloud.talk.chat.data.model.ChatMessage +import com.nextcloud.talk.chat.domain.ChatPullResult import com.nextcloud.talk.data.database.dao.ChatBlocksDao import com.nextcloud.talk.data.database.dao.ChatMessagesDao import com.nextcloud.talk.data.database.mappers.asEntity @@ -25,14 +26,11 @@ import com.nextcloud.talk.data.user.model.User import com.nextcloud.talk.extensions.toIntOrZero import com.nextcloud.talk.models.domain.ConversationModel import com.nextcloud.talk.models.json.chat.ChatMessageJson -import com.nextcloud.talk.models.json.chat.ChatOverall import com.nextcloud.talk.models.json.chat.ChatOverallSingleMessage import com.nextcloud.talk.models.json.converters.EnumActorTypeConverter import com.nextcloud.talk.models.json.participants.Participant import com.nextcloud.talk.utils.bundle.BundleKeys import com.nextcloud.talk.utils.message.SendMessageUtils -import io.reactivex.android.schedulers.AndroidSchedulers -import io.reactivex.schedulers.Schedulers import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job @@ -44,11 +42,15 @@ import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.firstOrNull import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import retrofit2.HttpException import java.io.IOException import javax.inject.Inject +import kotlin.collections.any +import kotlin.collections.map @Suppress("LargeClass", "TooManyFunctions") class OfflineFirstChatRepository @Inject constructor( @@ -138,12 +140,15 @@ class OfflineFirstChatRepository @Inject constructor( this.conversationModel = conversationModel } - override fun initScopeAndLoadInitialMessages(withNetworkParams: Bundle) { + override fun initScopeAndLoadInitialMessages(withNetworkParams: Bundle, hasHighPerformanceBackend: Boolean) { scope = CoroutineScope(Dispatchers.IO) - loadInitialMessages(withNetworkParams) + loadInitialMessages( + withNetworkParams, + hasHighPerformanceBackend + ) } - private fun loadInitialMessages(withNetworkParams: Bundle): Job = + private fun loadInitialMessages(withNetworkParams: Bundle, hasHighPerformanceBackend: Boolean): Job = scope.launch { Log.d(TAG, "---- loadInitialMessages ------------") newXChatLastCommonRead = conversationModel.lastCommonReadMessage @@ -155,19 +160,31 @@ class OfflineFirstChatRepository @Inject constructor( Log.d(TAG, "newestMessageIdFromDb: $newestMessageIdFromDb") val weAlreadyHaveSomeOfflineMessages = newestMessageIdFromDb > 0 + val weHaveAtLeastTheLastReadMessage = newestMessageIdFromDb >= conversationModel.lastReadMessage.toLong() Log.d(TAG, "weAlreadyHaveSomeOfflineMessages:$weAlreadyHaveSomeOfflineMessages") Log.d(TAG, "weHaveAtLeastTheLastReadMessage:$weHaveAtLeastTheLastReadMessage") + Log.d(TAG, "hasHighPerformanceBackend:$hasHighPerformanceBackend") - if (weAlreadyHaveSomeOfflineMessages && weHaveAtLeastTheLastReadMessage) { + if (weAlreadyHaveSomeOfflineMessages && weHaveAtLeastTheLastReadMessage && !hasHighPerformanceBackend) { Log.d( TAG, "Initial online request is skipped because offline messages are up to date" + " until lastReadMessage" ) - Log.d(TAG, "For messages newer than lastRead, lookIntoFuture will load them.") + + // For messages newer than lastRead, lookIntoFuture will load them. + // We must only end up here when NO HPB is used! + // If a HPB is used, longPolling is not available to handle loading of newer messages. + // When a HPB is used the initial request must be made. } else { - if (!weAlreadyHaveSomeOfflineMessages) { + if (hasHighPerformanceBackend) { + Log.d( + TAG, + "An online request for newest 100 messages is made because HPB is used (No long " + + "polling available to catch up with messages newer than last read.)" + ) + } else if (!weAlreadyHaveSomeOfflineMessages) { Log.d(TAG, "An online request for newest 100 messages is made because offline chat is empty") if (networkMonitor.isOnline.value.not()) { _generalUIFlow.emit(ChatActivity.NO_OFFLINE_MESSAGES_FOUND) @@ -185,14 +202,13 @@ class OfflineFirstChatRepository @Inject constructor( lookIntoFuture = false, timeout = 0, includeLastKnown = true, - setReadMarker = true, lastKnown = null ) withNetworkParams.putSerializable(BundleKeys.KEY_FIELD_MAP, fieldMap) withNetworkParams.putString(BundleKeys.KEY_ROOM_TOKEN, conversationModel.token) Log.d(TAG, "Starting online request for initial loading") - val chatMessageEntities = sync(withNetworkParams) + val chatMessageEntities = getMessages(withNetworkParams) if (chatMessageEntities == null) { Log.e(TAG, "initial loading of messages failed") } @@ -203,7 +219,11 @@ class OfflineFirstChatRepository @Inject constructor( handleMessagesFromDb(newestMessageIdFromDb) - initMessagePolling(newestMessageIdFromDb) + if (!hasHighPerformanceBackend) { + initLongPolling(newestMessageIdFromDb) + } else { + initRepeatingInsuranceRequest(newestMessageIdFromDb) + } } private suspend fun handleMessagesFromDb(newestMessageIdFromDb: Long) { @@ -288,7 +308,6 @@ class OfflineFirstChatRepository @Inject constructor( lookIntoFuture = false, timeout = 0, includeLastKnown = false, - setReadMarker = true, lastKnown = beforeMessageId.toInt() ) withNetworkParams.putSerializable(BundleKeys.KEY_FIELD_MAP, fieldMap) @@ -297,16 +316,16 @@ class OfflineFirstChatRepository @Inject constructor( if (loadFromServer) { Log.d(TAG, "Starting online request for loadMoreMessages") - sync(withNetworkParams) + getMessages(withNetworkParams) } showMessagesBefore(internalConversationId, beforeMessageId, DEFAULT_MESSAGES_LIMIT) updateUiForLastCommonRead() } - override fun initMessagePolling(initialMessageId: Long): Job = + override fun initLongPolling(initialMessageId: Long): Job = scope.launch { - Log.d(TAG, "---- initMessagePolling ------------") + Log.d(TAG, "---- initLongPolling ------------") Log.d(TAG, "newestMessage: $initialMessageId") @@ -316,7 +335,6 @@ class OfflineFirstChatRepository @Inject constructor( // initially no messages but someone writes us in the first 30 seconds. timeout = 0, includeLastKnown = false, - setReadMarker = true, lastKnown = initialMessageId.toInt() ) @@ -329,11 +347,11 @@ class OfflineFirstChatRepository @Inject constructor( Thread.sleep(HALF_SECOND) } else { // sync database with server - // (This is a long blocking call because long polling (lookIntoFuture) is set) + // (This is a long blocking call because long polling (lookIntoFuture and timeout) is set) networkParams.putSerializable(BundleKeys.KEY_FIELD_MAP, fieldMap) Log.d(TAG, "Starting online request for long polling") - val resultsFromSync = sync(networkParams) + val resultsFromSync = getMessages(networkParams) if (!resultsFromSync.isNullOrEmpty()) { val chatMessages = resultsFromSync.map(ChatMessageEntity::asModel) @@ -355,6 +373,9 @@ class OfflineFirstChatRepository @Inject constructor( updateUiForLastCommonRead() + // getNewestMessageIdFromChatBlocks wont work for insurance calls. we dont want newest message + // but only the newest message that came from sync (not from signaling) + // -> create new var to save newest message from sync (set for initial and long polling requests) val newestMessage = chatBlocksDao.getNewestMessageIdFromChatBlocks( internalConversationId, threadId @@ -365,7 +386,6 @@ class OfflineFirstChatRepository @Inject constructor( lookIntoFuture = true, timeout = 30, includeLastKnown = false, - setReadMarker = true, lastKnown = newestMessage ) @@ -374,6 +394,17 @@ class OfflineFirstChatRepository @Inject constructor( } } + private fun initRepeatingInsuranceRequest(initialMessageId: Long) { + scope.launch { + Log.d(TAG, "---- initRepeatingInsuranceRequest ------------") + + Log.d(TAG, "newestMessage: $initialMessageId") + + + + } + } + private suspend fun handleNewAndTempMessages( receivedChatMessages: List, lookIntoFuture: Boolean, @@ -456,7 +487,6 @@ class OfflineFirstChatRepository @Inject constructor( lookIntoFuture: Boolean, timeout: Int, includeLastKnown: Boolean, - setReadMarker: Boolean, lastKnown: Int?, limit: Int = DEFAULT_MESSAGES_LIMIT ): HashMap { @@ -478,7 +508,7 @@ class OfflineFirstChatRepository @Inject constructor( fieldMap["limit"] = limit fieldMap["lookIntoFuture"] = if (lookIntoFuture) 1 else 0 - fieldMap["setReadMarker"] = if (setReadMarker) 1 else 0 + fieldMap["setReadMarker"] = 0 return fieldMap } @@ -495,139 +525,166 @@ class OfflineFirstChatRepository @Inject constructor( lookIntoFuture = false, timeout = 0, includeLastKnown = true, - setReadMarker = false, lastKnown = messageId.toInt(), limit = 1 ) bundle.putSerializable(BundleKeys.KEY_FIELD_MAP, fieldMap) Log.d(TAG, "Starting online request for single message (e.g. a reply)") - sync(bundle) + getMessages(bundle) } + // we cant just expect here that sync succeeded?? return chatDao.getChatMessageForConversation( internalConversationId, messageId ).map(ChatMessageEntity::asModel) } - @Suppress("UNCHECKED_CAST", "MagicNumber", "Detekt.TooGenericExceptionCaught") - private fun getMessagesFromServer(bundle: Bundle): Pair>? { + fun pullMessagesFlow(bundle: Bundle): Flow = flow { val fieldMap = bundle.getSerializable(BundleKeys.KEY_FIELD_MAP) as HashMap - var attempts = 1 + while (attempts < 5) { - Log.d(TAG, "message limit: " + fieldMap["limit"]) - try { - val result = network.pullChatMessages(credentials, urlForChatting, fieldMap) - .subscribeOn(Schedulers.io()) - .observeOn(AndroidSchedulers.mainThread()) - .map { it -> - when (it.code()) { - HTTP_CODE_OK -> { - Log.d(TAG, "getMessagesFromServer HTTP_CODE_OK") - newXChatLastCommonRead = it.headers()["X-Chat-Last-Common-Read"]?.let { - Integer.parseInt(it) - } - - return@map Pair( - HTTP_CODE_OK, - (it.body() as ChatOverall).ocs!!.data!! - ) - } - - HTTP_CODE_NOT_MODIFIED -> { - Log.d(TAG, "getMessagesFromServer HTTP_CODE_NOT_MODIFIED") - - return@map Pair( - HTTP_CODE_NOT_MODIFIED, - listOf() - ) - } - - HTTP_CODE_PRECONDITION_FAILED -> { - Log.d(TAG, "getMessagesFromServer HTTP_CODE_PRECONDITION_FAILED") - - return@map Pair( - HTTP_CODE_PRECONDITION_FAILED, - listOf() - ) - } - - else -> { - return@map Pair( - HTTP_CODE_PRECONDITION_FAILED, - listOf() - ) - } - } + runCatching { + network.pullChatMessages(credentials, urlForChatting, fieldMap) + }.fold( + onSuccess = { response -> + val result = when (response.code()) { + HTTP_CODE_OK -> ChatPullResult.Success( + messages = response.body()?.ocs?.data.orEmpty(), + lastCommonRead = response.headers()["X-Chat-Last-Common-Read"]?.toInt() + ) + HTTP_CODE_NOT_MODIFIED -> ChatPullResult.NotModified + HTTP_CODE_PRECONDITION_FAILED -> ChatPullResult.PreconditionFailed + else -> ChatPullResult.Error(HttpException(response)) } - .blockingSingle() - return result - } catch (e: Exception) { - Log.e(TAG, "Something went wrong when pulling chat messages (attempt: $attempts)", e) - attempts++ - val newMessageLimit = when (attempts) { - 2 -> 50 - 3 -> 10 - else -> 5 + emit(result) + return@flow + }, + onFailure = { e -> + Log.e(TAG, "Attempt $attempts failed", e) + attempts++ + fieldMap["limit"] = when (attempts) { 2 -> 50; 3 -> 10; else -> 5 } } - fieldMap["limit"] = newMessageLimit - } + ) } - Log.e(TAG, "All attempts to get messages from server failed") - return null - } - private suspend fun sync(bundle: Bundle): List? { + emit(ChatPullResult.Error(IllegalStateException("All attempts failed"))) + }.flowOn(Dispatchers.IO) + + + + // private suspend fun getMessages(bundle: Bundle): List? { + // if (!networkMonitor.isOnline.value) { + // Log.d(TAG, "Device is offline, can't load chat messages from server") + // return null + // } + // + // val result = pullMessagesFlow(bundle) + // if (result == null) { + // Log.d(TAG, "No result from server") + // return null + // } + // + // var chatMessagesFromSync: List? = null + // + // val fieldMap = bundle.getSerializable(BundleKeys.KEY_FIELD_MAP) as HashMap + // val queriedMessageId = fieldMap["lastKnownMessageId"] + // val lookIntoFuture = fieldMap["lookIntoFuture"] == 1 + // + // val statusCode = result.first + // + // val hasHistory = getHasHistory(statusCode, lookIntoFuture) + // + // Log.d( + // TAG, + // "internalConv=$internalConversationId statusCode=$statusCode lookIntoFuture=$lookIntoFuture " + + // "hasHistory=$hasHistory " + + // "queriedMessageId=$queriedMessageId" + // ) + // + // val blockContainingQueriedMessage: ChatBlockEntity? = getBlockOfMessage(queriedMessageId) + // + // if (blockContainingQueriedMessage != null && !hasHistory) { + // blockContainingQueriedMessage.hasHistory = false + // chatBlocksDao.upsertChatBlock(blockContainingQueriedMessage) + // Log.d(TAG, "End of chat was reached so hasHistory=false is set") + // } + // + // if (result.second.isNotEmpty()) { + // chatMessagesFromSync = updateMessagesData( + // result.second, + // blockContainingQueriedMessage, + // lookIntoFuture, + // hasHistory + // ) + // } else { + // Log.d(TAG, "no data is updated...") + // } + // + // return chatMessagesFromSync + // } + + private suspend fun getMessages(bundle: Bundle): List? { if (!networkMonitor.isOnline.value) { Log.d(TAG, "Device is offline, can't load chat messages from server") return null } - val result = getMessagesFromServer(bundle) - if (result == null) { - Log.d(TAG, "No result from server") - return null - } - - var chatMessagesFromSync: List? = null - val fieldMap = bundle.getSerializable(BundleKeys.KEY_FIELD_MAP) as HashMap val queriedMessageId = fieldMap["lastKnownMessageId"] val lookIntoFuture = fieldMap["lookIntoFuture"] == 1 - val statusCode = result.first + val result = pullMessagesFlow(bundle).first() - val hasHistory = getHasHistory(statusCode, lookIntoFuture) + return when (result) { - Log.d( - TAG, - "internalConv=$internalConversationId statusCode=$statusCode lookIntoFuture=$lookIntoFuture " + - "hasHistory=$hasHistory " + - "queriedMessageId=$queriedMessageId" - ) + is ChatPullResult.Success -> { + val hasHistory = getHasHistory(HTTP_CODE_OK, lookIntoFuture) + + Log.d( + TAG, + "internalConv=$internalConversationId statusCode=${HTTP_CODE_OK} lookIntoFuture=$lookIntoFuture " + + "hasHistory=$hasHistory queriedMessageId=$queriedMessageId" + ) - val blockContainingQueriedMessage: ChatBlockEntity? = getBlockOfMessage(queriedMessageId) + val blockContainingQueriedMessage: ChatBlockEntity? = getBlockOfMessage(queriedMessageId) - if (blockContainingQueriedMessage != null && !hasHistory) { - blockContainingQueriedMessage.hasHistory = false - chatBlocksDao.upsertChatBlock(blockContainingQueriedMessage) - Log.d(TAG, "End of chat was reached so hasHistory=false is set") - } + blockContainingQueriedMessage?.takeIf { !hasHistory }?.apply { + this.hasHistory = false + chatBlocksDao.upsertChatBlock(this) + Log.d(TAG, "End of chat reached, set hasHistory=false") + } - if (result.second.isNotEmpty()) { - chatMessagesFromSync = updateMessagesData( - result.second, - blockContainingQueriedMessage, - lookIntoFuture, - hasHistory - ) - } else { - Log.d(TAG, "no data is updated...") - } + if (result.messages.isNotEmpty()) { + updateMessagesData( + result.messages, + blockContainingQueriedMessage, + lookIntoFuture, + hasHistory + ) + } else { + Log.d(TAG, "No new messages to update") + null + } + } + + is ChatPullResult.NotModified -> { + Log.d(TAG, "Server returned NOT_MODIFIED, nothing to update") + null + } + + is ChatPullResult.PreconditionFailed -> { + Log.d(TAG, "Server returned PRECONDITION_FAILED, nothing to update") + null + } - return chatMessagesFromSync + is ChatPullResult.Error -> { + Log.e(TAG, "Error pulling messages from server", result.throwable) + null + } + } } private suspend fun OfflineFirstChatRepository.updateMessagesData( @@ -676,7 +733,7 @@ class OfflineFirstChatRepository @Inject constructor( newestMessageId = newestMessageIdForNewChatBlock, hasHistory = hasHistory ) - chatBlocksDao.upsertChatBlock(newChatBlock) // crash when no conversation thread exists! + chatBlocksDao.upsertChatBlock(newChatBlock) updateBlocks(newChatBlock) return chatMessagesFromSyncToProcess @@ -1026,6 +1083,27 @@ class OfflineFirstChatRepository @Inject constructor( _removeMessageFlow.emit(chatMessage) } + override fun onSignalingChatMessageReceived(chatMessage: ChatMessageJson) { + scope.launch { + val chatMessageEntities = updateMessagesData( + chatMessagesJson = listOf(chatMessage), + blockContainingQueriedMessage = null, + lookIntoFuture = true, + hasHistory = true + ) + + val chatMessages = chatMessageEntities.map(ChatMessageEntity::asModel) + + handleNewAndTempMessages( + receivedChatMessages = chatMessages, + lookIntoFuture = true, + showUnreadMessagesMarker = false + ) + + updateUiForLastCommonRead() + } + } + @Suppress("Detekt.TooGenericExceptionCaught") override suspend fun addTemporaryMessage( message: CharSequence, diff --git a/app/src/main/java/com/nextcloud/talk/chat/data/network/RetrofitChatNetwork.kt b/app/src/main/java/com/nextcloud/talk/chat/data/network/RetrofitChatNetwork.kt index 6bb6836cafe..8ad2f882008 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/data/network/RetrofitChatNetwork.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/data/network/RetrofitChatNetwork.kt @@ -12,6 +12,7 @@ import com.nextcloud.talk.data.user.model.User import com.nextcloud.talk.models.domain.ConversationModel import com.nextcloud.talk.models.json.capabilities.SpreedCapability import com.nextcloud.talk.models.json.chat.ChatMessageJson +import com.nextcloud.talk.models.json.chat.ChatOverall import com.nextcloud.talk.models.json.chat.ChatOverallSingleMessage import com.nextcloud.talk.models.json.conversations.RoomOverall import com.nextcloud.talk.models.json.generic.GenericOverall @@ -158,11 +159,11 @@ class RetrofitChatNetwork(private val ncApi: NcApi, private val ncApiCoroutines: threadTitle ) - override fun pullChatMessages( + override suspend fun pullChatMessages( credentials: String, url: String, fieldMap: HashMap - ): Observable> = ncApi.pullChatMessages(credentials, url, fieldMap).map { it } + ): Response = ncApiCoroutines.pullChatMessages(credentials, url, fieldMap) override fun deleteChatMessage(credentials: String, url: String): Observable = ncApi.deleteChatMessage(credentials, url).map { diff --git a/app/src/main/java/com/nextcloud/talk/chat/domain/ChatPullResult.kt b/app/src/main/java/com/nextcloud/talk/chat/domain/ChatPullResult.kt new file mode 100644 index 00000000000..99b8ac1c5a1 --- /dev/null +++ b/app/src/main/java/com/nextcloud/talk/chat/domain/ChatPullResult.kt @@ -0,0 +1,21 @@ +/* + * Nextcloud Talk - Android Client + * + * SPDX-FileCopyrightText: 2025 Your Name + * SPDX-License-Identifier: GPL-3.0-or-later + */ + +package com.nextcloud.talk.chat.domain + +import com.nextcloud.talk.models.json.chat.ChatMessageJson + +sealed class ChatPullResult { + data class Success( + val messages: List, + val lastCommonRead: Int? + ) : ChatPullResult() + + object NotModified : ChatPullResult() + object PreconditionFailed : ChatPullResult() + data class Error(val throwable: Throwable) : ChatPullResult() +} diff --git a/app/src/main/java/com/nextcloud/talk/chat/viewmodels/ChatViewModel.kt b/app/src/main/java/com/nextcloud/talk/chat/viewmodels/ChatViewModel.kt index 1ab528e5034..3289e8760f4 100644 --- a/app/src/main/java/com/nextcloud/talk/chat/viewmodels/ChatViewModel.kt +++ b/app/src/main/java/com/nextcloud/talk/chat/viewmodels/ChatViewModel.kt @@ -35,6 +35,7 @@ import com.nextcloud.talk.models.domain.ConversationModel import com.nextcloud.talk.models.domain.ReactionAddedModel import com.nextcloud.talk.models.domain.ReactionDeletedModel import com.nextcloud.talk.models.json.capabilities.SpreedCapability +import com.nextcloud.talk.models.json.chat.ChatMessageJson import com.nextcloud.talk.models.json.chat.ChatOverallSingleMessage import com.nextcloud.talk.models.json.conversations.RoomOverall import com.nextcloud.talk.models.json.generic.GenericOverall @@ -93,6 +94,10 @@ class ChatViewModel @Inject constructor( lateinit var currentUser: User + private var localLastReadMessage: Int = 0 + + private lateinit var currentConversation: ConversationModel + private val mediaPlayerManager: MediaPlayerManager = MediaPlayerManager.sharedInstance(appPreferences) lateinit var currentLifeCycleFlag: LifeCycleFlag val disposableSet = mutableSetOf() @@ -132,6 +137,10 @@ class ChatViewModel @Inject constructor( mediaPlayerManager.handleOnStop() } + fun onSignalingChatMessageReceived(chatMessage: ChatMessageJson) { + chatRepository.onSignalingChatMessageReceived(chatMessage) + } + val backgroundPlayUIFlow = mediaPlayerManager.backgroundPlayUIFlow val mediaPlayerSeekbarObserver: Flow @@ -302,7 +311,10 @@ class ChatViewModel @Inject constructor( } fun updateConversation(currentConversation: ConversationModel) { + this.currentConversation = currentConversation chatRepository.updateConversation(currentConversation) + + advanceLocalLastReadMessageIfNeeded(currentConversation.lastReadMessage) } fun getRoom(user: User, token: String) { @@ -496,12 +508,13 @@ class ChatViewModel @Inject constructor( } } - fun loadMessages(withCredentials: String, withUrl: String) { + fun loadMessages(withCredentials: String, withUrl: String, hasHighPerformanceBackend: Boolean) { val bundle = Bundle() bundle.putString(BundleKeys.KEY_CHAT_URL, withUrl) bundle.putString(BundleKeys.KEY_CREDENTIALS, withCredentials) chatRepository.initScopeAndLoadInitialMessages( - withNetworkParams = bundle + withNetworkParams = bundle, + hasHighPerformanceBackend = hasHighPerformanceBackend ) } @@ -559,8 +572,26 @@ class ChatViewModel @Inject constructor( }) } - fun setChatReadMarker(credentials: String, url: String, previousMessageId: Int) { - chatNetworkDataSource.setChatReadMarker(credentials, url, previousMessageId) + fun advanceLocalLastReadMessageIfNeeded(messageId: Int) { + if (localLastReadMessage < messageId) { + localLastReadMessage = messageId + } + } + + /** + * Please use with caution to not spam the server + */ + fun updateRemoteLastReadMessageIfNeeded(credentials: String, url: String) { + if (localLastReadMessage > currentConversation.lastReadMessage) { + setChatReadMessage(credentials, url, localLastReadMessage) + } + } + + /** + * Please use with caution to not spam the server + */ + fun setChatReadMessage(credentials: String, url: String, lastReadMessage: Int) { + chatNetworkDataSource.setChatReadMarker(credentials, url, lastReadMessage) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(object : Observer { diff --git a/app/src/main/java/com/nextcloud/talk/signaling/ConversationMessageNotifier.kt b/app/src/main/java/com/nextcloud/talk/signaling/ConversationMessageNotifier.kt index 9bd2408abe9..5b5e4dbf242 100644 --- a/app/src/main/java/com/nextcloud/talk/signaling/ConversationMessageNotifier.kt +++ b/app/src/main/java/com/nextcloud/talk/signaling/ConversationMessageNotifier.kt @@ -6,6 +6,7 @@ */ package com.nextcloud.talk.signaling +import com.nextcloud.talk.models.json.chat.ChatMessageJson import com.nextcloud.talk.signaling.SignalingMessageReceiver.ConversationMessageListener internal class ConversationMessageNotifier { @@ -29,6 +30,13 @@ internal class ConversationMessageNotifier { } } + @Synchronized + fun notifyMessageReceived(chatMessage: ChatMessageJson) { + for (listener in ArrayList(conversationMessageListeners)) { + listener.onChatMessageReceived(chatMessage) + } + } + fun notifyStopTyping(userId: String?, sessionId: String?) { for (listener in ArrayList(conversationMessageListeners)) { listener.onStopTyping(userId, sessionId) diff --git a/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.java b/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.kt similarity index 64% rename from app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.java rename to app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.kt index 397ba7b55ce..cd531eba1e3 100644 --- a/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.java +++ b/app/src/main/java/com/nextcloud/talk/signaling/SignalingMessageReceiver.kt @@ -4,265 +4,278 @@ * SPDX-FileCopyrightText: 2022 Daniel Calviño Sánchez * SPDX-License-Identifier: GPL-3.0-or-later */ -package com.nextcloud.talk.signaling; - -import com.nextcloud.talk.models.json.converters.EnumActorTypeConverter; -import com.nextcloud.talk.models.json.converters.EnumParticipantTypeConverter; -import com.nextcloud.talk.models.json.participants.Participant; -import com.nextcloud.talk.models.json.signaling.NCIceCandidate; -import com.nextcloud.talk.models.json.signaling.NCMessagePayload; -import com.nextcloud.talk.models.json.signaling.NCSignalingMessage; -import com.nextcloud.talk.models.json.websocket.CallWebSocketMessage; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +package com.nextcloud.talk.signaling + +import com.bluelinelabs.logansquare.LoganSquare +import com.nextcloud.talk.models.json.chat.ChatMessageJson +import com.nextcloud.talk.models.json.converters.EnumActorTypeConverter +import com.nextcloud.talk.models.json.converters.EnumParticipantTypeConverter +import com.nextcloud.talk.models.json.participants.Participant +import com.nextcloud.talk.models.json.signaling.NCSignalingMessage +import com.nextcloud.talk.models.json.websocket.CallWebSocketMessage +import org.json.JSONObject +import kotlin.Any +import kotlin.Int +import kotlin.Long +import kotlin.RuntimeException +import kotlin.String +import kotlin.toString /** * Hub to register listeners for signaling messages of different kinds. - *

+ * * In general, if a listener is added while an event is being handled the new listener will not receive that event. * An exception to that is adding a WebRtcMessageListener when handling an offer in an OfferMessageListener; in that * case the "onOffer()" method of the WebRtcMessageListener will be called for that same offer. - *

+ * * Similarly, if a listener is removed while an event is being handled the removed listener will still receive that * event. Again the exception is removing a WebRtcMessageListener when handling an offer in an OfferMessageListener; in * that case the "onOffer()" method of the WebRtcMessageListener will not be called for that offer. - *

+ * + * * Adding and removing listeners, as well as notifying them is internally synchronized. This should be kept in mind * if listeners are added or removed when handling an event to prevent deadlocks (nevertheless, just adding or * removing a listener in the same thread handling the event is fine, and in most cases it will be fine too if done * in a different thread, as long as the notifier thread is not forced to wait until the listener is added or removed). - *

+ * * SignalingMessageReceiver does not fetch the signaling messages itself; subclasses must fetch them and then call * the appropriate protected methods to process the messages and notify the listeners. */ -public abstract class SignalingMessageReceiver { - - private final EnumActorTypeConverter enumActorTypeConverter = new EnumActorTypeConverter(); +abstract class SignalingMessageReceiver { + private val enumActorTypeConverter = EnumActorTypeConverter() - private final ParticipantListMessageNotifier participantListMessageNotifier = new ParticipantListMessageNotifier(); + private val participantListMessageNotifier = ParticipantListMessageNotifier() - private final LocalParticipantMessageNotifier localParticipantMessageNotifier = new LocalParticipantMessageNotifier(); + private val localParticipantMessageNotifier = LocalParticipantMessageNotifier() - private final CallParticipantMessageNotifier callParticipantMessageNotifier = new CallParticipantMessageNotifier(); + private val callParticipantMessageNotifier = CallParticipantMessageNotifier() - private final ConversationMessageNotifier conversationMessageNotifier = new ConversationMessageNotifier(); + private val conversationMessageNotifier = ConversationMessageNotifier() - private final OfferMessageNotifier offerMessageNotifier = new OfferMessageNotifier(); + private val offerMessageNotifier = OfferMessageNotifier() - private final WebRtcMessageNotifier webRtcMessageNotifier = new WebRtcMessageNotifier(); + private val webRtcMessageNotifier = WebRtcMessageNotifier() /** * Listener for participant list messages. - *

+ * * The messages are implicitly bound to the room currently joined in the signaling server; listeners are expected * to know the current room. */ - public interface ParticipantListMessageListener { - + interface ParticipantListMessageListener { /** * List of all the participants in the room. - *

+ * * This message is received only when the internal signaling server is used. - *

+ * * The message is received periodically, and the participants may not have been modified since the last message. - *

+ * * Only the following participant properties are set: * - inCall * - lastPing * - sessionId * - userId (if the participant is not a guest) - *

+ * * "participantPermissions" is provided in the message (since Talk 13), but not currently set in the * participant. "publishingPermissions" was provided instead in Talk 12, but it was not used anywhere, so it is * ignored. * * @param participants all the participants (users and guests) in the room */ - void onUsersInRoom(List participants); + fun onUsersInRoom(participants: MutableList?) /** * List of all the participants in the call or the room (depending on what triggered the event). - *

+ * * This message is received only when the external signaling server is used. - *

+ * * The message is received when any participant changed, although what changed is not provided and should be * derived from the difference with previous messages. The list of participants may include only the * participants in the call (including those that just left it and thus triggered the event) or all the * participants currently in the room (participants in the room but not currently active, that is, without a * session, are not included). - *

+ * * Only the following participant properties are set: * - inCall * - lastPing * - sessionId * - type * - userId (if the participant is not a guest) - *

+ * * "nextcloudSessionId" is provided in the message (when the "inCall" property of any participant changed), but * not currently set in the participant. - *

+ * * "participantPermissions" is provided in the message (since Talk 13), but not currently set in the * participant. "publishingPermissions" was provided instead in Talk 12, but it was not used anywhere, so it is * ignored. * * @param participants all the participants (users and guests) in the room */ - void onParticipantsUpdate(List participants); + fun onParticipantsUpdate(participants: MutableList?) /** * Update of the properties of all the participants in the room. - *

+ * * This message is received only when the external signaling server is used. * * @param inCall the new value of the inCall property */ - void onAllParticipantsUpdate(long inCall); + fun onAllParticipantsUpdate(inCall: Long) } /** * Listener for local participant messages. - *

+ * * The messages are implicitly bound to the local participant (or, rather, its session); listeners are expected * to know the local participant. - *

+ * * The messages are related to the conversation, so the local participant may or may not be in a call when they * are received. */ - public interface LocalParticipantMessageListener { + fun interface LocalParticipantMessageListener { /** * Request for the client to switch to the given conversation. - *

+ * * This message is received only when the external signaling server is used. * * @param token the token of the conversation to switch to. */ - void onSwitchTo(String token); + fun onSwitchTo(token: String) } /** * Listener for call participant messages. - *

+ * + * * The messages are bound to a specific call participant (or, rather, session), so each listener is expected to * handle messages only for a single call participant. - *

+ * + * * Although "unshareScreen" is technically bound to a specific peer connection it is instead treated as a general * message on the call participant. */ - public interface CallParticipantMessageListener { - void onRaiseHand(boolean state, long timestamp); - void onReaction(String reaction); - void onUnshareScreen(); + interface CallParticipantMessageListener { + fun onRaiseHand(state: Boolean, timestamp: Long) + fun onReaction(reaction: String) + fun onUnshareScreen() } /** * Listener for conversation messages. */ - public interface ConversationMessageListener { - void onStartTyping(String userId, String session); - void onStopTyping(String userId,String session); + interface ConversationMessageListener { + fun onStartTyping(userId: String?, session: String?) + fun onStopTyping(userId: String?, session: String?) + fun onChatMessageReceived(chatMessage: ChatMessageJson) } /** * Listener for WebRTC offers. - *

+ * + * * Unlike the WebRtcMessageListener, which is bound to a specific peer connection, an OfferMessageListener listens * to all offer messages, no matter which peer connection they are bound to. This can be used, for example, to * create a new peer connection when a remote offer for which there is no previous connection is received. - *

+ * + * * When an offer is received all OfferMessageListeners are notified before any WebRtcMessageListener is notified. */ - public interface OfferMessageListener { - void onOffer(String sessionId, String roomType, String sdp, String nick); + fun interface OfferMessageListener { + fun onOffer(sessionId: String?, roomType: String, sdp: String?, nick: String?) } /** * Listener for WebRTC messages. - *

+ * + * * The messages are bound to a specific peer connection, so each listener is expected to handle messages only for * a single peer connection. */ - public interface WebRtcMessageListener { - void onOffer(String sdp, String nick); - void onAnswer(String sdp, String nick); - void onCandidate(String sdpMid, int sdpMLineIndex, String sdp); - void onEndOfCandidates(); + interface WebRtcMessageListener { + fun onOffer(sdp: String, nick: String?) + fun onAnswer(sdp: String, nick: String?) + fun onCandidate(sdpMid: String, sdpMLineIndex: Int, sdp: String) + fun onEndOfCandidates() } /** * Adds a listener for participant list messages. - *

+ * + * * A listener is expected to be added only once. If the same listener is added again it will be notified just once. * * @param listener the ParticipantListMessageListener */ - public void addListener(ParticipantListMessageListener listener) { - participantListMessageNotifier.addListener(listener); + fun addListener(listener: ParticipantListMessageListener?) { + participantListMessageNotifier.addListener(listener) } - public void removeListener(ParticipantListMessageListener listener) { - participantListMessageNotifier.removeListener(listener); + fun removeListener(listener: ParticipantListMessageListener?) { + participantListMessageNotifier.removeListener(listener) } /** * Adds a listener for local participant messages. - *

+ * + * * A listener is expected to be added only once. If the same listener is added again it will be notified just once. * * @param listener the LocalParticipantMessageListener */ - public void addListener(LocalParticipantMessageListener listener) { - localParticipantMessageNotifier.addListener(listener); + fun addListener(listener: LocalParticipantMessageListener?) { + localParticipantMessageNotifier.addListener(listener) } - public void removeListener(LocalParticipantMessageListener listener) { - localParticipantMessageNotifier.removeListener(listener); + fun removeListener(listener: LocalParticipantMessageListener?) { + localParticipantMessageNotifier.removeListener(listener) } /** * Adds a listener for call participant messages. - *

+ * + * * A listener is expected to be added only once. If the same listener is added again it will no longer be notified * for the messages from the previous session ID. * * @param listener the CallParticipantMessageListener * @param sessionId the ID of the session that messages come from */ - public void addListener(CallParticipantMessageListener listener, String sessionId) { - callParticipantMessageNotifier.addListener(listener, sessionId); + fun addListener(listener: CallParticipantMessageListener?, sessionId: String?) { + callParticipantMessageNotifier.addListener(listener, sessionId) } - public void removeListener(CallParticipantMessageListener listener) { - callParticipantMessageNotifier.removeListener(listener); + fun removeListener(listener: CallParticipantMessageListener?) { + callParticipantMessageNotifier.removeListener(listener) } - public void addListener(ConversationMessageListener listener) { - conversationMessageNotifier.addListener(listener); + fun addListener(listener: ConversationMessageListener?) { + conversationMessageNotifier.addListener(listener) } - public void removeListener(ConversationMessageListener listener) { - conversationMessageNotifier.removeListener(listener); + fun removeListener(listener: ConversationMessageListener) { + conversationMessageNotifier.removeListener(listener) } /** * Adds a listener for all offer messages. - *

+ * + * * A listener is expected to be added only once. If the same listener is added again it will be notified just once. * * @param listener the OfferMessageListener */ - public void addListener(OfferMessageListener listener) { - offerMessageNotifier.addListener(listener); + fun addListener(listener: OfferMessageListener?) { + offerMessageNotifier.addListener(listener) } - public void removeListener(OfferMessageListener listener) { - offerMessageNotifier.removeListener(listener); + fun removeListener(listener: OfferMessageListener?) { + offerMessageNotifier.removeListener(listener) } /** * Adds a listener for WebRTC messages from the given session ID and room type. - *

+ * + * * A listener is expected to be added only once. If the same listener is added again it will no longer be notified * for the messages from the previous session ID or room type. * @@ -270,29 +283,29 @@ public void removeListener(OfferMessageListener listener) { * @param sessionId the ID of the session that messages come from * @param roomType the room type that messages come from */ - public void addListener(WebRtcMessageListener listener, String sessionId, String roomType) { - webRtcMessageNotifier.addListener(listener, sessionId, roomType); + fun addListener(listener: WebRtcMessageListener?, sessionId: String?, roomType: String?) { + webRtcMessageNotifier.addListener(listener, sessionId, roomType) } - public void removeListener(WebRtcMessageListener listener) { - webRtcMessageNotifier.removeListener(listener); + fun removeListener(listener: WebRtcMessageListener?) { + webRtcMessageNotifier.removeListener(listener) } - protected void processEvent(Map eventMap) { - if ("room".equals(eventMap.get("target")) && "switchto".equals(eventMap.get("type"))) { - processSwitchToEvent(eventMap); + fun processEvent(eventMap: Map?) { + if ("room" == eventMap?.get("target") && "switchto" == eventMap["type"]) { + processSwitchToEvent(eventMap) - return; + return } - if ("participants".equals(eventMap.get("target")) && "update".equals(eventMap.get("type"))) { - processUpdateEvent(eventMap); + if ("participants" == eventMap?.get("target") && "update" == eventMap["type"]) { + processUpdateEvent(eventMap) - return; + return } } - private void processSwitchToEvent(Map eventMap) { + private fun processSwitchToEvent(eventMap: Map?) { // Message schema: // { // "type": "event", @@ -305,58 +318,81 @@ private void processSwitchToEvent(Map eventMap) { // }, // } - Map switchToMap; + val switchToMap: Map? try { - switchToMap = (Map) eventMap.get("switchto"); - } catch (RuntimeException e) { + switchToMap = eventMap?.get("switchto") as Map? + } catch (e: RuntimeException) { // Broken message, this should not happen. - return; + return } if (switchToMap == null) { // Broken message, this should not happen. - return; + return } - String token; + val token: String? try { - token = switchToMap.get("roomid").toString(); - } catch (RuntimeException e) { + token = switchToMap["roomid"].toString() + } catch (e: RuntimeException) { // Broken message, this should not happen. - return; + return + } + + localParticipantMessageNotifier.notifySwitchTo(token) + } + + protected fun processChatMessageWebSocketMessage(jsonString: String) { + fun parseChatMessage(jsonString: String): ChatMessageJson? { + return try { + val root = JSONObject(jsonString) + val eventObj = root.optJSONObject("event") ?: return null + val messageObj = eventObj.optJSONObject("message") ?: return null + val dataObj = messageObj.optJSONObject("data") ?: return null + val chatObj = dataObj.optJSONObject("chat") ?: return null + val commentObj = chatObj.optJSONObject("comment") ?: return null + + LoganSquare.parse(commentObj.toString(), ChatMessageJson::class.java) + } catch (e: Exception) { + null + } } - localParticipantMessageNotifier.notifySwitchTo(token); + val chatMessage = parseChatMessage(jsonString) + + chatMessage?.let { + conversationMessageNotifier.notifyMessageReceived(it) + } } - private void processUpdateEvent(Map eventMap) { - Map updateMap; + private fun processUpdateEvent(eventMap: Map?) { + val updateMap: Map? try { - updateMap = (Map) eventMap.get("update"); - } catch (RuntimeException e) { + updateMap = eventMap?.get("update") as Map? + } catch (e: RuntimeException) { // Broken message, this should not happen. - return; + return } if (updateMap == null) { // Broken message, this should not happen. - return; + return } - if (updateMap.get("all") != null && Boolean.parseBoolean(updateMap.get("all").toString())) { - processAllParticipantsUpdate(updateMap); + if (updateMap["all"] != null && updateMap["all"].toString().toBoolean()) { + processAllParticipantsUpdate(updateMap) - return; + return } - if (updateMap.get("users") != null) { - processParticipantsUpdate(updateMap); + if (updateMap["users"] != null) { + processParticipantsUpdate(updateMap) - return; + return } } - private void processAllParticipantsUpdate(Map updateMap) { + private fun processAllParticipantsUpdate(updateMap: Map) { // Message schema: // { // "type": "event", @@ -374,18 +410,18 @@ private void processAllParticipantsUpdate(Map updateMap) { // Note that "incall" in participants->update is all in lower case when the message applies to all participants, // even if it is "inCall" when the message provides separate properties for each participant. - long inCall; + val inCall: Long try { - inCall = Long.parseLong(updateMap.get("incall").toString()); - } catch (RuntimeException e) { + inCall = updateMap["incall"].toString().toLong() + } catch (e: RuntimeException) { // Broken message, this should not happen. - return; + return } - participantListMessageNotifier.notifyAllParticipantsUpdate(inCall); + participantListMessageNotifier.notifyAllParticipantsUpdate(inCall) } - private void processParticipantsUpdate(Map updateMap) { + private fun processParticipantsUpdate(updateMap: Map) { // Message schema: // { // "type": "event", @@ -416,34 +452,34 @@ private void processParticipantsUpdate(Map updateMap) { // Note that "userId" in participants->update comes from the Nextcloud server, so it is "userId"; in other // messages, like room->join, it comes directly from the external signaling server, so it is "userid" instead. - List> users; + val users: List>? try { - users = (List>) updateMap.get("users"); - } catch (RuntimeException e) { + users = updateMap["users"] as List>? + } catch (e: RuntimeException) { // Broken message, this should not happen. - return; + return } if (users == null) { // Broken message, this should not happen. - return; + return } - List participants = new ArrayList<>(users.size()); + val participants: MutableList = ArrayList(users.size) - for (Map user: users) { + for (user in users) { try { - participants.add(getParticipantFromMessageMap(user)); - } catch (RuntimeException e) { + participants.add(getParticipantFromMessageMap(user)) + } catch (e: RuntimeException) { // Broken message, this should not happen. - return; + return } } - participantListMessageNotifier.notifyParticipantsUpdate(participants); + participantListMessageNotifier.notifyParticipantsUpdate(participants) } - protected void processUsersInRoom(List> users) { + fun processUsersInRoom(users: List>) { // Message schema: // { // "type": "usersInRoom", @@ -462,23 +498,25 @@ protected void processUsersInRoom(List> users) { // ], // } - List participants = new ArrayList<>(users.size()); + val participants: MutableList = ArrayList(users.size) - for (Map user: users) { + for (user in users) { + val nullSafeUserMap = user as? Map ?: return try { - participants.add(getParticipantFromMessageMap(user)); - } catch (RuntimeException e) { + participants.add(getParticipantFromMessageMap(nullSafeUserMap)) + } catch (e: RuntimeException) { // Broken message, this should not happen. - return; + return } } - participantListMessageNotifier.notifyUsersInRoom(participants); + participantListMessageNotifier.notifyUsersInRoom(participants) } /** * Creates and initializes a Participant from the data in the given map. - *

+ * + * * Maps from internal and external signaling server messages can be used. Nevertheless, besides the differences * between the messages and the optional properties, it is expected that the message is correct and the given data * is parseable. Broken messages (for example, a string instead of an integer for "inCall" or a missing @@ -487,70 +525,73 @@ protected void processUsersInRoom(List> users) { * @param participantMap the map with the participant data * @return the Participant */ - private Participant getParticipantFromMessageMap(Map participantMap) { - Participant participant = new Participant(); + private fun getParticipantFromMessageMap(participantMap: Map): Participant { + val participant = Participant() - participant.setInCall(Long.parseLong(participantMap.get("inCall").toString())); - participant.setLastPing(Long.parseLong(participantMap.get("lastPing").toString())); - participant.setSessionId(participantMap.get("sessionId").toString()); + participant.inCall = participantMap["inCall"].toString().toLong() + participant.lastPing = participantMap["lastPing"].toString().toLong() + participant.sessionId = participantMap["sessionId"].toString() - if (participantMap.get("userId") != null && !participantMap.get("userId").toString().isEmpty()) { - participant.setUserId(participantMap.get("userId").toString()); + if (participantMap["userId"] != null && !participantMap["userId"].toString().isEmpty()) { + participant.userId = participantMap["userId"].toString() } - if (participantMap.get("internal") != null && Boolean.parseBoolean(participantMap.get("internal").toString())) { - participant.setInternal(Boolean.TRUE); + if (participantMap["internal"] != null && participantMap["internal"].toString().toBoolean()) { + participant.internal = true } - if (participantMap.get("actorType") != null && !participantMap.get("actorType").toString().isEmpty()) { - participant.setActorType(enumActorTypeConverter.getFromString(participantMap.get("actorType").toString())); + if (participantMap["actorType"] != null && !participantMap["actorType"].toString().isEmpty()) { + participant.actorType = enumActorTypeConverter.getFromString(participantMap["actorType"].toString()) } - if (participantMap.get("actorId") != null && !participantMap.get("actorId").toString().isEmpty()) { - participant.setActorId(participantMap.get("actorId").toString()); + if (participantMap["actorId"] != null && !participantMap["actorId"].toString().isEmpty()) { + participant.actorId = participantMap["actorId"].toString() } // Only in external signaling messages - if (participantMap.get("participantType") != null) { - int participantTypeInt = Integer.parseInt(participantMap.get("participantType").toString()); + if (participantMap["participantType"] != null) { + val participantTypeInt = participantMap["participantType"].toString().toInt() - EnumParticipantTypeConverter converter = new EnumParticipantTypeConverter(); - participant.setType(converter.getFromInt(participantTypeInt)); + val converter = EnumParticipantTypeConverter() + participant.type = converter.getFromInt(participantTypeInt) } - return participant; + return participant } - protected void processCallWebSocketMessage(CallWebSocketMessage callWebSocketMessage) { - - NCSignalingMessage signalingMessage = callWebSocketMessage.getNcSignalingMessage(); + protected fun processCallWebSocketMessage(callWebSocketMessage: CallWebSocketMessage) { + val signalingMessage = callWebSocketMessage.ncSignalingMessage - if (callWebSocketMessage.getSenderWebSocketMessage() != null && signalingMessage != null) { - String type = signalingMessage.getType(); + if (callWebSocketMessage.senderWebSocketMessage != null && signalingMessage != null) { + val type = signalingMessage.type - String userId = callWebSocketMessage.getSenderWebSocketMessage().getUserid(); - String sessionId = signalingMessage.getFrom(); + val userId = callWebSocketMessage.senderWebSocketMessage!!.userid + val sessionId = signalingMessage.from - if ("startedTyping".equals(type)) { - conversationMessageNotifier.notifyStartTyping(userId, sessionId); + if ("startedTyping" == type) { + conversationMessageNotifier.notifyStartTyping(userId, sessionId) } - if ("stoppedTyping".equals(type)) { - conversationMessageNotifier.notifyStopTyping(userId, sessionId); + if ("stoppedTyping" == type) { + conversationMessageNotifier.notifyStopTyping(userId, sessionId) } } } - protected void processSignalingMessage(NCSignalingMessage signalingMessage) { + fun processSignalingMessage(signalingMessage: NCSignalingMessage?) { + if (signalingMessage == null) { + return + } + // Note that in the internal signaling server message "data" is the String representation of a JSON // object, although it is already decoded when used here. - String type = signalingMessage.getType(); + val type = signalingMessage.type - String sessionId = signalingMessage.getFrom(); - String roomType = signalingMessage.getRoomType(); + val sessionId = signalingMessage.from + val roomType = signalingMessage.roomType - if ("raiseHand".equals(type)) { + if ("raiseHand" == type) { // Message schema (external signaling server): // { // "type": "message", @@ -588,26 +629,16 @@ protected void processSignalingMessage(NCSignalingMessage signalingMessage) { // }, // } - NCMessagePayload payload = signalingMessage.getPayload(); - if (payload == null) { - // Broken message, this should not happen. - return; - } - - Boolean state = payload.getState(); - Long timestamp = payload.getTimestamp(); + val payload = signalingMessage.payload ?: return + val state = payload.state ?: return + val timestamp = payload.timestamp ?: return - if (state == null || timestamp == null) { - // Broken message, this should not happen. - return; - } + callParticipantMessageNotifier.notifyRaiseHand(sessionId, state, timestamp) - callParticipantMessageNotifier.notifyRaiseHand(sessionId, state, timestamp); - - return; + return } - if ("reaction".equals(type)) { + if ("reaction" == type) { // Message schema (external signaling server): // { // "type": "message", @@ -641,27 +672,19 @@ protected void processSignalingMessage(NCSignalingMessage signalingMessage) { // }, // } - NCMessagePayload payload = signalingMessage.getPayload(); - if (payload == null) { - // Broken message, this should not happen. - return; - } + val payload = signalingMessage.payload ?: return - String reaction = payload.getReaction(); - if (reaction == null) { - // Broken message, this should not happen. - return; - } + val reaction = payload.reaction ?: return - callParticipantMessageNotifier.notifyReaction(sessionId, reaction); + callParticipantMessageNotifier.notifyReaction(sessionId, reaction) - return; + return } // "unshareScreen" messages are directly sent to the screen peer connection when the internal signaling // server is used, and to the room when the external signaling server is used. However, the (relevant) data // of the received message ("from" and "type") is the same in both cases. - if ("unshareScreen".equals(type)) { + if ("unshareScreen" == type) { // Message schema (external signaling server): // { // "type": "message", @@ -690,12 +713,12 @@ protected void processSignalingMessage(NCSignalingMessage signalingMessage) { // }, // } - callParticipantMessageNotifier.notifyUnshareScreen(sessionId); + callParticipantMessageNotifier.notifyUnshareScreen(sessionId) - return; + return } - if ("offer".equals(type)) { + if ("offer" == type) { // Message schema (external signaling server): // { // "type": "message", @@ -734,43 +757,35 @@ protected void processSignalingMessage(NCSignalingMessage signalingMessage) { // }, // } - NCMessagePayload payload = signalingMessage.getPayload(); - if (payload == null) { - // Broken message, this should not happen. - return; - } + val payload = signalingMessage.payload ?: return - String sdp = payload.getSdp(); - String nick = payload.getNick(); + val sdp = payload.sdp + val nick = payload.nick // If "processSignalingMessage" is called with two offers from two different threads it is possible, // although extremely unlikely, that the WebRtcMessageListeners for the second offer are notified before the // WebRtcMessageListeners for the first offer. This should not be a problem, though, so for simplicity // the statements are not synchronized. - offerMessageNotifier.notifyOffer(sessionId, roomType, sdp, nick); - webRtcMessageNotifier.notifyOffer(sessionId, roomType, sdp, nick); + offerMessageNotifier.notifyOffer(sessionId, roomType, sdp, nick) + webRtcMessageNotifier.notifyOffer(sessionId, roomType, sdp, nick) - return; + return } - if ("answer".equals(type)) { + if ("answer" == type) { // Message schema: same as offers, but with type "answer". - NCMessagePayload payload = signalingMessage.getPayload(); - if (payload == null) { - // Broken message, this should not happen. - return; - } + val payload = signalingMessage.payload ?: return - String sdp = payload.getSdp(); - String nick = payload.getNick(); + val sdp = payload.sdp + val nick = payload.nick - webRtcMessageNotifier.notifyAnswer(sessionId, roomType, sdp, nick); + webRtcMessageNotifier.notifyAnswer(sessionId, roomType, sdp, nick) - return; + return } - if ("candidate".equals(type)) { + if ("candidate" == type) { // Message schema (external signaling server): // { // "type": "message", @@ -814,31 +829,25 @@ protected void processSignalingMessage(NCSignalingMessage signalingMessage) { // }, // } - NCMessagePayload payload = signalingMessage.getPayload(); - if (payload == null) { - // Broken message, this should not happen. - return; - } + val payload = signalingMessage.payload ?: return - NCIceCandidate ncIceCandidate = payload.getIceCandidate(); - if (ncIceCandidate == null) { - // Broken message, this should not happen. - return; - } + val ncIceCandidate = payload.iceCandidate ?: return - webRtcMessageNotifier.notifyCandidate(sessionId, - roomType, - ncIceCandidate.getSdpMid(), - ncIceCandidate.getSdpMLineIndex(), - ncIceCandidate.getCandidate()); + webRtcMessageNotifier.notifyCandidate( + sessionId, + roomType, + ncIceCandidate.sdpMid, + ncIceCandidate.sdpMLineIndex, + ncIceCandidate.candidate + ) - return; + return } - if ("endOfCandidates".equals(type)) { - webRtcMessageNotifier.notifyEndOfCandidates(sessionId, roomType); + if ("endOfCandidates" == type) { + webRtcMessageNotifier.notifyEndOfCandidates(sessionId, roomType) - return; + return } } } diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/WebSocketInstance.kt b/app/src/main/java/com/nextcloud/talk/webrtc/WebSocketInstance.kt index 81b784726e3..9e6a3f2fec6 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/WebSocketInstance.kt +++ b/app/src/main/java/com/nextcloud/talk/webrtc/WebSocketInstance.kt @@ -183,7 +183,7 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU ncSignalingMessage.from = callWebSocketMessage.senderWebSocketMessage!!.sessionId } - signalingMessageReceiver.process(callWebSocketMessage) + signalingMessageReceiver.processChatMessage(callWebSocketMessage) } } @@ -196,17 +196,17 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU when (target) { Globals.TARGET_ROOM -> { if ("message" == eventOverallWebSocketMessage.eventMap!!["type"]) { - processRoomMessageMessage(eventOverallWebSocketMessage) + processRoomMessageMessage(eventOverallWebSocketMessage, text) } else if ("join" == eventOverallWebSocketMessage.eventMap!!["type"]) { processRoomJoinMessage(eventOverallWebSocketMessage) } else if ("leave" == eventOverallWebSocketMessage.eventMap!!["type"]) { processRoomLeaveMessage(eventOverallWebSocketMessage) } - signalingMessageReceiver.process(eventOverallWebSocketMessage.eventMap) + signalingMessageReceiver.processChatMessage(eventOverallWebSocketMessage.eventMap) } Globals.TARGET_PARTICIPANTS -> - signalingMessageReceiver.process(eventOverallWebSocketMessage.eventMap) + signalingMessageReceiver.processChatMessage(eventOverallWebSocketMessage.eventMap) else -> Log.i(TAG, "Received unknown/ignored event target: $target") @@ -217,7 +217,7 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU } } - private fun processRoomMessageMessage(eventOverallWebSocketMessage: EventOverallWebSocketMessage) { + private fun processRoomMessageMessage(eventOverallWebSocketMessage: EventOverallWebSocketMessage, text: String) { val messageHashMap = eventOverallWebSocketMessage.eventMap?.get("message") as Map<*, *>? if (messageHashMap != null && messageHashMap.containsKey("data")) { @@ -231,6 +231,10 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU refreshChatHashMap[BundleKeys.KEY_INTERNAL_USER_ID] = (conversationUser.id!!).toString() eventBus!!.post(WebSocketCommunicationEvent("refreshChat", refreshChatHashMap)) } + + if (chatMap != null && chatMap.containsKey("comment")) { + signalingMessageReceiver.processChatMessage(text) + } } else if (dataHashMap != null && dataHashMap.containsKey("recording")) { val recordingMap = dataHashMap["recording"] as Map<*, *>? if (recordingMap != null && recordingMap.containsKey("status")) { @@ -468,11 +472,11 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU * stays connected, but it may change whenever it is connected again. */ private class ExternalSignalingMessageReceiver : SignalingMessageReceiver() { - fun process(eventMap: Map?) { + fun processChatMessage(eventMap: Map?) { processEvent(eventMap) } - fun process(message: CallWebSocketMessage?) { + fun processChatMessage(message: CallWebSocketMessage?) { if (message?.ncSignalingMessage?.type == "startedTyping" || message?.ncSignalingMessage?.type == "stoppedTyping" ) { @@ -481,6 +485,11 @@ class WebSocketInstance internal constructor(conversationUser: User, connectionU processSignalingMessage(message?.ncSignalingMessage) } } + + fun processChatMessage(jsonString: String) { + processChatMessageWebSocketMessage(jsonString) + Log.d(TAG, "processing Received chat message") + } } inner class ExternalSignalingMessageSender : SignalingMessageSender {