Android使用Flow封装一个FlowBus工具类
Android中使用Flow封装一个FlowBus工具类
做过Android的同学应该都使用过EvenutBus、Rxbus、LiveDataBus、LiveData等,这些第三方不仅要导入依赖包,而且还要注册和取消注册,使用起来非常麻烦,稍不注意就导致内存泄漏,自从接触了Flow、SharedFlow之后感觉使用起来方便多了,于是产生了一个封装通用事件工具类的想法,直接上代码.
1.FlowBus:
/*** @auth: njb* @date: 2024/7/18 10:17* @desc: 基于Flow封装的FlowBus*/
object FlowBus {private const val TAG = "FlowBus"private val busMap = mutableMapOf<String, FlowEventBus<*>>()private val busStickMap = mutableMapOf<String, FlowStickEventBus<*>>()@Synchronizedfun <T> with(key: String): FlowEventBus<T> {var flowEventBus = busMap[key]if (flowEventBus == null) {flowEventBus = FlowEventBus<T>(key)busMap[key] = flowEventBus}return flowEventBus as FlowEventBus<T>}@Synchronizedfun <T> withStick(key: String): FlowStickEventBus<T> {var stickEventBus = busStickMap[key]if (stickEventBus == null) {stickEventBus = FlowStickEventBus<T>(key)busStickMap[key] = stickEventBus}return stickEventBus as FlowStickEventBus<T>}open class FlowEventBus<T>(private val key: String) : DefaultLifecycleObserver {//私有对象用于发送消息private val _events: MutableSharedFlow<T> by lazy {obtainEvent()}//暴露的公有对象用于接收消息private val events = _events.asSharedFlow()open fun obtainEvent(): MutableSharedFlow<T> =MutableSharedFlow(0, 1, BufferOverflow.DROP_OLDEST)//在主线程中接收数据fun register(lifecycleOwner: LifecycleOwner,action: (t: T) -> Unit){lifecycleOwner.lifecycleScope.launch {events.collect {try {action(it)}catch (e:Exception){e.printStackTrace()Log.e(TAG, "FlowBus - Error:$e")}}}}//在协程中接收数据fun register(scope: CoroutineScope,action: (t: T) -> Unit){scope.launch {events.collect{try {action(it)}catch (e:Exception){e.printStackTrace()Log.e(TAG, "FlowBus - Error:$e")}}}}//在协程中发送数据suspend fun post(event: T){_events.emit(event)}//在主线程中发送数据fun post(scope: CoroutineScope,event: T){scope.launch {_events.emit(event)}}override fun onDestroy(owner: LifecycleOwner) {super.onDestroy(owner)Log.w(TAG, "FlowBus ==== 自动onDestroy")val subscriptCount = _events.subscriptionCount.valueif (subscriptCount <= 0)busMap.remove(key)}// 手动调用的销毁方法,用于Service、广播等fun destroy() {Log.w(TAG, "FlowBus ==== 手动销毁")val subscriptionCount = _events.subscriptionCount.valueif (subscriptionCount <= 0) {busMap.remove(key)}}}class FlowStickEventBus<T>(key: String) : FlowEventBus<T>(key) {override fun obtainEvent(): MutableSharedFlow<T> =MutableSharedFlow(1, 1, BufferOverflow.DROP_OLDEST)}}
2.在Activity中的使用:
2.1传递参数给主界面Activity:
/*** @auth: njb* @date: 2024/9/10 23:49* @desc: 描述*/
class TestActivity :AppCompatActivity(){private val textView:TextView by lazy { findViewById(R.id.tv_test) }override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_test)initFlowBus()}private fun initFlowBus() {val messageEvent = MessageEvent()messageEvent.message = "stop"messageEvent.state = falsetextView.setOnClickListener {lifecycleScope.launch {FlowBus.with<MessageEvent>("test").post(this, messageEvent)finish()}}}
}
2.2 MainActivity接收:
/*** 初始化*/
private fun initView() {binding.rvWallpaper.apply {layoutManager = GridLayoutManager(this@MainActivity, 2)adapter = wallPaperAdapter}binding.btnGetWallpaper.setOnClickListener {lifecycleScope.launch {mainViewModel.mainIntentChannel.send(MainIntent.GetWallpaper)}val intent = Intent(this@MainActivity,TestActivity::class.java)startActivity(intent)}FlowBus.with<MessageEvent>("test").register(this@MainActivity) {LogUtils.d(TAG,it.toString())if(it.message == "stop"){LogUtils.d(TAG,"===接收到的消息为==="+it.message)}}FlowBus.with<MessageEvent>("mineFragment").register(this@MainActivity) {LogUtils.d(TAG,it.toString())if(it.message == "onMine"){LogUtils.d(TAG,"===接收到的消息为1111==="+it.message)}}
}
3.在Fragment中的使用:
3.1 发送数据
package com.cloud.flowbusdemo.fragmentimport android.os.Bundle
import android.util.Log
import android.view.LayoutInflater
import android.view.View
import android.view.ViewGroup
import androidx.fragment.app.Fragment
import androidx.lifecycle.lifecycleScope
import com.cloud.flowbusdemo.databinding.FragmentMineBinding
import com.cloud.flowbusdemo.flow.FlowBus
import com.cloud.flowbusdemo.model.MessageEvent
import kotlinx.coroutines.launchprivate const val ARG_PARAM_NAME = "name"
private const val ARG_PARAM_AGE = "age"
/*** @auth: njb* @date: 2024/9/17 19:43* @desc: 描述*/
class MineFragment :Fragment(){private lateinit var binding: FragmentMineBindingprivate val TAG = "MineFragment"private var name: String? = nullprivate var age: Int? = nulloverride fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)arguments?.let {name = it.getString(ARG_PARAM_NAME)age = it.getInt(ARG_PARAM_AGE)}Log.i(TAG, "MainFragment 传递到 MineFragment 的参数为 name = $name , age = $age")Log.d(TAG, "姓名:" + name + "年龄:" + age)}override fun onCreateView(inflater: LayoutInflater,container: ViewGroup?,savedInstanceState: Bundle?): View {binding = FragmentMineBinding.inflate(layoutInflater)initView()return binding.root}private fun initView() {val messageEvent = MessageEvent()messageEvent.message = "onMine"messageEvent.state = falsebinding.let {it.tvTitle.text = nameit.tvAge.text = age.toString()it.tvTitle.setOnClickListener {lifecycleScope.launch {FlowBus.with<MessageEvent>("mineFragment").post(this, messageEvent)}}}}
}
3.2 接收数据:
private fun initView() {binding.rvWallpaper.apply {layoutManager = GridLayoutManager(this@MainActivity, 2)adapter = wallPaperAdapter}binding.btnGetWallpaper.setOnClickListener {lifecycleScope.launch {mainViewModel.mainIntentChannel.send(MainIntent.GetWallpaper)}val intent = Intent(this@MainActivity,TestActivity::class.java)startActivity(intent)}FlowBus.with<MessageEvent>("test").register(this@MainActivity) {LogUtils.d(TAG,it.toString())if(it.message == "stop"){LogUtils.d(TAG,"===接收到的消息为==="+it.message)}}FlowBus.with<MessageEvent>("mineFragment").register(this@MainActivity) {LogUtils.d(TAG,it.toString())if(it.message == "onMine"){LogUtils.d(TAG,"===接收到的消息为1111==="+it.message)}}
}
4.在Service中的使用:
4.1发送数据:
private fun initService() {val intent = Intent(this@MainActivity, FlowBusTestService::class.java)intent.putExtra("sockUrl","")startService(intent)
}
4.2接收数据:
/*** @auth: njb* @date: 2024/9/22 23:32* @desc: 描述*/
class FlowBusTestService:Service() {private var sock5Url:String ?= nullprivate val TAG = "FlowBusTestService"override fun onBind(intent: Intent?): IBinder? {return null}override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {intent?.let {this.sock5Url = intent.getStringExtra("sockUrl")LogUtils.d(TAG,"====收到的ip为==="+this.sock5Url)}return if (intent?.action == Constants.ACTION_DISCONNECT) {disconnect()START_NOT_STICKY} else {connect()START_STICKY}}private fun connect() {}private fun disconnect() {}
}
5.在Websock中的使用:
5.1发送数据:
private fun connectWebSocket() {LogUtils.e(TAG, "===connectUrl===$currentWebSocketUrl")try {if (mWebSocketManager == null) {return}mWebSocketManager?.addListener(object : SocketListener {override fun onConnected() {LogUtils.e(TAG, "===连接成功====")val messageEvent = MessageEvent()messageEvent.message = "socket连接成功"FloatWindowManager.log("socket连接成功")CoroutineScope(Dispatchers.Main).launch{FlowBus.with<MessageEvent>("onConnected").post(this,messageEvent)}}override fun onConnectFailed(throwable: Throwable) {LogUtils.e(TAG, "===连接失败====")val messageEvent = MessageEvent()messageEvent.message = "socket连接失败:$currentWebSocketUrl"FloatWindowManager.log("socket连接失败")}override fun onDisconnect() {LogUtils.e(TAG, "===断开连接====")val messageEvent = MessageEvent()messageEvent.message = "socket断开连接"FloatWindowManager.log("socket断开连接")}override fun onSendDataError(errorResponse: ErrorResponse) {LogUtils.e(TAG + "===发送数据失败====" + errorResponse.description)val messageEvent = MessageEvent()messageEvent.message = "发送数据失败--->" + errorResponse.descriptionFloatWindowManager.log("发送数据失败")}override fun <T> onMessage(msg: String, t: T) {LogUtils.e(TAG,"===接收到消息 String===$msg")val messageEvent = MessageEvent()messageEvent.message = msgFloatWindowManager.log("===接收到消息===$msg")taskManager?.onHandleMsg(msg)}override fun <T> onMessage(bytes: ByteBuffer, t: T) {LogUtils.e(TAG, "===接收到消息byteBuffer===="+GsonUtils.toJson(bytes))val rBuffer = ByteBuffer.allocate(1024)val charset = Charset.forName("UTF-8")try {val receiveText =charset.newDecoder().decode(rBuffer.asReadOnlyBuffer()).toString()LogUtils.e(TAG, "===接收到消息byteBuffer====$receiveText")val messageEvent = MessageEvent()messageEvent.message = receiveText// FloatWindowManager.log("===收到消息 byte===$receiveText")} catch (e: CharacterCodingException) {throw RuntimeException(e)}}override fun onPing(pingData: Framedata) {LogUtils.e(TAG, "===心跳onPing===$pingData")}override fun onPong(framedata: Framedata) {LogUtils.e(TAG, "===心跳onPong===$framedata")val messageEvent = MessageEvent()messageEvent.message = format.format(Date()) + " | 心跳onPong->"FloatWindowManager.log("===心跳onPong===${format.format(Date())}${"->"}$currentWebSocketUrl")}})mWebSocketManager?.start()} catch (e: Exception) {e.printStackTrace()}
}
5.2接收数据:
private fun initFlowBus() {FlowBus.with<MessageEvent>("onConnected").register(this@MainActivity) {LogUtils.d(TAG, "收到消息为:$it")}FlowBus.with<MessageEvent>("onStartVpn").register(this@MainActivity) {LogUtils.d(TAG, "收到vpn消息为:$it")CoroutineScope(Dispatchers.Main).launch {if (it.message == "start" && it.state && Constants.SWITCH_IP) {this@MainActivity.sockUrl = it.sockUrlLogUtils.d(TAG, "收到代理地址为:${it.sockUrl}")AppUtils.prepareVpn(this@MainActivity,it.sockUrl)// prepareVpn()}}}FlowBus.with<MessageEvent>("onStopVpn").register(this@MainActivity) {LogUtils.d(TAG, "收到vpn消息为:$it")if (it.message == "stop" && !it.state) {AppUtils.stopVpn(this@MainActivity)}}
}
6.实现的效果如下:
7.项目demo源码如下:
https://gitee.com/jackning_admin/flowbus-demo